diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index 6bf8301..77dc415 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -1637,7 +1637,7 @@ final class RpcState { private Request newCallNoIntercept(long interfaceId, short methodId) { if (isDisconnected()) { - return Request.newBrokenRequest(AnyPointer.factory, disconnected); + return Capability.newBrokenRequest(disconnected); } var request = new RpcRequest(this); @@ -1645,7 +1645,7 @@ final class RpcState { callBuilder.setInterfaceId(interfaceId); callBuilder.setMethodId(methodId); var root = request.getRoot(); - return new AnyPointer.Request(root, request); + return Capability.newTypelessRequest(root, request); } } @@ -1689,7 +1689,7 @@ final class RpcState { if (redirect != null) { var redirected = redirect.newCall( this.callBuilder.getInterfaceId(), this.callBuilder.getMethodId()); - var replacement = new AnyPointer.Request(paramsBuilder, redirected.getHook()); + var replacement = Capability.newTypelessRequest(paramsBuilder, redirected.getHook()); return replacement.sendInternal(); } diff --git a/runtime/src/main/java/org/capnproto/AnyPointer.java b/runtime/src/main/java/org/capnproto/AnyPointer.java index 9153edf..ca3f490 100644 --- a/runtime/src/main/java/org/capnproto/AnyPointer.java +++ b/runtime/src/main/java/org/capnproto/AnyPointer.java @@ -21,8 +21,6 @@ package org.capnproto; -import java.util.concurrent.CompletableFuture; - public final class AnyPointer { public static final class Factory implements PointerFactory, @@ -189,86 +187,4 @@ public final class AnyPointer { return new Pipeline(this.hook, newOps); } } - - public static final class Request - implements org.capnproto.Request { - - private final Builder params; - private final RequestHook requestHook; - - Request(Builder params, RequestHook requestHook) { - this.params = params; - this.requestHook = requestHook; - } - - @Override - public Builder getParams() { - return this.params; - } - - @Override - public org.capnproto.Request getTypelessRequest() { - return this; - } - - @Override - public org.capnproto.Request getBaseRequest() { - return this; - } - - @Override - public RequestHook getHook() { - return this.requestHook; - } - - @Override - public FromPointerBuilder getParamsFactory() { - return AnyPointer.factory; - } - - @Override - public RemotePromise sendInternal() { - return this.requestHook.send(); - } - } - - public static final class StreamingRequest - implements org.capnproto.StreamingRequest { - - private final Builder params; - private final RequestHook requestHook; - - StreamingRequest(AnyPointer.Request request) { - this(request.params, request.requestHook); - } - - StreamingRequest(Builder params, RequestHook requestHook) { - this.params = params; - this.requestHook = requestHook; - } - - @Override - public Builder getParams() { - return this.params; - } - - @Override - public org.capnproto.StreamingRequest getTypelessRequest() { - return this; - } - - @Override - public RequestHook getHook() { - return this.requestHook; - } - - @Override - public FromPointerBuilder getParamsFactory() { - return AnyPointer.factory; - } - - public CompletableFuture send() { - return this.requestHook.sendStreaming(); - } - } } diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index 927a8d3..a54bc57 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -109,7 +109,7 @@ public final class Capability { default StreamingRequest newStreamingCall(FromPointerBuilder paramsFactory, long interfaceId, short methodId) { var request = this.getHook().newCall(interfaceId, methodId); - var streamingRequest = new AnyPointer.StreamingRequest(request.getParams(), request.getHook()); + var streamingRequest = newTypelessStreamingRequest(request.getParams(), request.getHook()); return new StreamingRequest<>() { @Override public FromPointerBuilder getParamsFactory() { @@ -204,7 +204,7 @@ public final class Capability { public Request newCall(long interfaceId, short methodId) { var hook = new LocalRequest(interfaceId, methodId, this); var root = hook.message.getRoot(AnyPointer.factory); - return new AnyPointer.Request(root, hook); + return newTypelessRequest(root, hook); } @Override @@ -556,6 +556,89 @@ public final class Capability { } } + static Request newTypelessRequest(AnyPointer.Builder params, RequestHook requestHook) { + return new Request<>() { + @Override + public AnyPointer.Builder getParams() { + return params; + } + + @Override + public org.capnproto.Request getTypelessRequest() { + return this; + } + + @Override + public org.capnproto.Request getBaseRequest() { + return this; + } + + @Override + public RequestHook getHook() { + return requestHook; + } + + @Override + public FromPointerBuilder getParamsFactory() { + return AnyPointer.factory; + } + + @Override + public RemotePromise sendInternal() { + return requestHook.send(); + } + }; + } + + static StreamingRequest newTypelessStreamingRequest(AnyPointer.Builder params, RequestHook requestHook) { + return new StreamingRequest<>() { + @Override + public AnyPointer.Builder getParams() { + return params; + } + + @Override + public org.capnproto.StreamingRequest getTypelessRequest() { + return this; + } + + @Override + public RequestHook getHook() { + return requestHook; + } + + @Override + public FromPointerBuilder getParamsFactory() { + return AnyPointer.factory; + } + + public CompletableFuture send() { + return requestHook.sendStreaming(); + } + }; + } + + static Request newBrokenRequest(Throwable exc) { + + var message = new MessageBuilder(); + var params = message.getRoot(AnyPointer.factory); + + var hook = new RequestHook() { + @Override + public RemotePromise send() { + return new RemotePromise<>(CompletableFuture.failedFuture(exc), + new AnyPointer.Pipeline(PipelineHook.newBrokenPipeline(exc))); + } + + @Override + public CompletableFuture sendStreaming() { + return CompletableFuture.failedFuture(exc); + } + }; + + return Capability.newTypelessRequest(params, hook); + } + public static ClientHook newBrokenCap(String reason) { return newBrokenClient(reason, false, ClientHook.BROKEN_CAPABILITY_BRAND); } @@ -568,16 +651,15 @@ public final class Capability { return newBrokenClient(RpcException.failed("Called null capability"), true, ClientHook.NULL_CAPABILITY_BRAND); } - static private ClientHook newBrokenClient(String reason, boolean resolved, Object brand) { + private static ClientHook newBrokenClient(String reason, boolean resolved, Object brand) { return newBrokenClient(RpcException.failed(reason), resolved, brand); } - static private ClientHook newBrokenClient(Throwable exc, boolean resolved, Object brand) { + private static ClientHook newBrokenClient(Throwable exc, boolean resolved, Object brand) { return new ClientHook() { @Override public Request newCall(long interfaceId, short methodId) { - var broken = Request.newBrokenRequest(AnyPointer.factory, exc); - return new AnyPointer.Request(broken.getParams(), broken.getHook()); + return newBrokenRequest(exc); } @Override @@ -689,7 +771,7 @@ public final class Capability { var hook = new LocalRequest(interfaceId, methodId, this); this.pendingCalls.add(hook); var root = hook.message.getRoot(AnyPointer.factory); - return new AnyPointer.Request(root, hook); + return newTypelessRequest(root, hook); } @Override diff --git a/runtime/src/main/java/org/capnproto/Request.java b/runtime/src/main/java/org/capnproto/Request.java index 5e24a82..6f78237 100644 --- a/runtime/src/main/java/org/capnproto/Request.java +++ b/runtime/src/main/java/org/capnproto/Request.java @@ -14,61 +14,4 @@ public interface Request default RequestBase getTypelessRequest() { return getBaseRequest().getTypelessRequest(); } - - static Request newBrokenRequest(FromPointerBuilder paramsFactory, - Throwable exc) { - - var message = new MessageBuilder(); - - var hook = new RequestHook() { - @Override - public RemotePromise send() { - return new RemotePromise<>(CompletableFuture.failedFuture(exc), - new AnyPointer.Pipeline(PipelineHook.newBrokenPipeline(exc))); - } - - @Override - public CompletableFuture sendStreaming() { - return CompletableFuture.failedFuture(exc); - } - }; - - return new Request<>() { - @Override - public FromPointerBuilder getParamsFactory() { - return paramsFactory; - } - - @Override - public RequestBase getTypelessRequest() { - return new AnyPointer.Request(message.getRoot(AnyPointer.factory), hook); - } - - @Override - public Request getBaseRequest() { - return this; - } - }; - } - - static Request fromTypeless( - FromPointerBuilder paramsFactory, - Request typeless) { - return new Request<>() { - @Override - public FromPointerBuilder getParamsFactory() { - return paramsFactory; - } - - @Override - public Request getTypelessRequest() { - return typeless; - } - - @Override - public Request getBaseRequest() { - return this; - } - }; - } }