diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index 0211aa0..7cc1e02 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -1177,7 +1177,7 @@ final class RpcState { } var pipeline = base.pipeline; if (pipeline == null) { - pipeline = PipelineHook.newBrokenPipeline( + pipeline = Capability.newBrokenPipeline( RpcException.failed("Pipeline call on a request that returned no capabilities or was already closed.")); } var ops = ToPipelineOps(promisedAnswer); @@ -1681,7 +1681,8 @@ final class RpcState { @Override public RemotePromise send() { if (isDisconnected()) { - return new RemotePromise<>(CompletableFuture.failedFuture(disconnected), null); + return new RemotePromise<>(CompletableFuture.failedFuture(disconnected), + Capability.newBrokenPipeline(disconnected)); } var redirect = this.target.writeTarget(this.callBuilder.getTarget()); @@ -1700,7 +1701,7 @@ final class RpcState { var appPromise = questionRef.response.thenApply( hook -> new Response<>(hook.getResults(), hook)); - return new RemotePromise<>(appPromise, new AnyPointer.Pipeline(pipeline)); + return new RemotePromise<>(appPromise, pipeline); } QuestionRef sendInternal(boolean isTailCall) { diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index 43eeaf6..ee02617 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -442,7 +442,7 @@ public final class Capability { this.callRelease.complete(null); assert promiseAndPipeline.pipeline != null; - return new RemotePromise<>(promise, new AnyPointer.Pipeline(promiseAndPipeline.pipeline)); + return new RemotePromise<>(promise, promiseAndPipeline.pipeline); } @Override @@ -545,7 +545,7 @@ public final class Capability { public ClientHook.VoidPromiseAndPipeline directTailCall(RequestHook request) { assert this.response == null : "Can't call tailCall() after initializing the results struct."; var promise = request.send(); - var voidPromise = promise._getResponse().thenAccept(tailResponse -> { + var voidPromise = promise.response.thenAccept(tailResponse -> { this.response = tailResponse; }); return new ClientHook.VoidPromiseAndPipeline(voidPromise, promise.pipeline().hook); @@ -619,6 +619,10 @@ public final class Capability { }; } + static PipelineHook newBrokenPipeline(Throwable exc) { + return ops -> newBrokenCap(exc); + } + static Request newBrokenRequest(Throwable exc) { var message = new MessageBuilder(); @@ -628,7 +632,7 @@ public final class Capability { @Override public RemotePromise send() { return new RemotePromise<>(CompletableFuture.failedFuture(exc), - new AnyPointer.Pipeline(PipelineHook.newBrokenPipeline(exc))); + newBrokenPipeline(exc)); } @Override @@ -665,7 +669,7 @@ public final class Capability { @Override public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) { - return new VoidPromiseAndPipeline(CompletableFuture.failedFuture(exc), PipelineHook.newBrokenPipeline(exc)); + return new VoidPromiseAndPipeline(CompletableFuture.failedFuture(exc), newBrokenPipeline(exc)); } @Override @@ -696,7 +700,7 @@ public final class Capability { this.promise = promise.whenComplete((pipeline, exc) -> { this.redirect = exc == null ? pipeline - : PipelineHook.newBrokenPipeline(exc); + : newBrokenPipeline(exc); }); } diff --git a/runtime/src/main/java/org/capnproto/PipelineHook.java b/runtime/src/main/java/org/capnproto/PipelineHook.java index 2a2adad..0b691e0 100644 --- a/runtime/src/main/java/org/capnproto/PipelineHook.java +++ b/runtime/src/main/java/org/capnproto/PipelineHook.java @@ -6,8 +6,4 @@ public interface PipelineHook { default void cancel(Throwable exc) { } - - static PipelineHook newBrokenPipeline(Throwable exc) { - return ops -> Capability.newBrokenCap(exc); - } } diff --git a/runtime/src/main/java/org/capnproto/RemotePromise.java b/runtime/src/main/java/org/capnproto/RemotePromise.java index 6ea4503..21d9c7d 100644 --- a/runtime/src/main/java/org/capnproto/RemotePromise.java +++ b/runtime/src/main/java/org/capnproto/RemotePromise.java @@ -6,7 +6,7 @@ public class RemotePromise extends CompletableFutureWrapper implements AutoCloseable { - private final CompletableFuture> response; + final CompletableFuture> response; private final AnyPointer.Pipeline pipeline; public RemotePromise(FromPointerReader factory, @@ -19,6 +19,11 @@ public class RemotePromise this.pipeline = other.pipeline; } + public RemotePromise(CompletableFuture> promise, + PipelineHook pipeline) { + this(promise, new AnyPointer.Pipeline(pipeline)); + } + public RemotePromise(CompletableFuture> promise, AnyPointer.Pipeline pipeline) { super(promise.thenApply(Response::getResults)); @@ -32,10 +37,6 @@ public class RemotePromise this.join(); } - CompletableFuture> _getResponse() { - return this.response; - } - public AnyPointer.Pipeline pipeline() { return this.pipeline; }