From ad17a4c14816c0134cdf2270facd3bbb1468c8fc Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Fri, 13 Nov 2020 17:57:49 +0000 Subject: [PATCH] refactor connection and disconnection --- .../src/main/java/org/capnproto/RpcState.java | 18 ++- .../main/java/org/capnproto/RpcSystem.java | 70 +++------- .../java/org/capnproto/TwoPartyClient.java | 12 +- .../java/org/capnproto/TwoPartyServer.java | 111 ++++------------ .../org/capnproto/TwoPartyVatNetwork.java | 15 ++- .../main/java/org/capnproto/VatNetwork.java | 2 +- .../test/java/org/capnproto/TwoPartyTest.java | 125 +++++++++--------- .../main/java/org/capnproto/AnyPointer.java | 5 + .../main/java/org/capnproto/Capability.java | 8 +- .../src/main/java/org/capnproto/Pipeline.java | 5 + .../main/java/org/capnproto/PipelineHook.java | 9 +- .../java/org/capnproto/RemotePromise.java | 4 +- 12 files changed, 161 insertions(+), 223 deletions(-) diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index 537328f..aa221a4 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -5,9 +5,11 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; +import java.nio.channels.ClosedChannelException; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; @@ -288,7 +290,7 @@ final class RpcState { startMessageLoop(); } - public CompletableFuture getMessageLoop() { + CompletableFuture onDisconnection() { return this.messageLoop; } @@ -363,6 +365,12 @@ final class RpcState { return CompletableFuture.completedFuture(null); } } + else if (ioExc instanceof CompletionException) { + var compExc = (CompletionException)ioExc; + if (compExc.getCause() instanceof ClosedChannelException) { + return CompletableFuture.completedFuture(null); + } + } return CompletableFuture.failedFuture(ioExc); }); @@ -371,9 +379,7 @@ final class RpcState { this.disconnectFulfiller.complete(new DisconnectInfo(shutdownPromise)); for (var pipeline: pipelinesToRelease) { - if (pipeline instanceof RpcState.RpcPipeline) { - ((RpcPipeline) pipeline).redirectLater.completeExceptionally(networkExc); - } + pipeline.cancel(networkExc); } } @@ -1556,8 +1562,8 @@ final class RpcState { } @Override - public void close() { - this.question.finish(); + public void cancel(Throwable exc) { + this.question.reject(exc); } } diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcSystem.java b/runtime-rpc/src/main/java/org/capnproto/RpcSystem.java index e7c5b0c..2dda98d 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcSystem.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcSystem.java @@ -1,27 +1,21 @@ package org.capnproto; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; public class RpcSystem { private final VatNetwork network; private final BootstrapFactory bootstrapFactory; - private final Map, RpcState> connections = new HashMap<>(); - private final CompletableFuture messageLoop; - private final CompletableFuture acceptLoop; + private final Map, RpcState> connections = new ConcurrentHashMap<>(); public RpcSystem(VatNetwork network) { - this.network = network; - this.bootstrapFactory = null; - this.acceptLoop = new CompletableFuture<>(); - this.messageLoop = doMessageLoop(); - } - - public VatNetwork getNetwork() { - return this.network; + this(network, (BootstrapFactory)null); } public RpcSystem(VatNetwork network, @@ -49,8 +43,7 @@ public class RpcSystem { BootstrapFactory bootstrapFactory) { this.network = network; this.bootstrapFactory = bootstrapFactory; - this.acceptLoop = doAcceptLoop(); - this.messageLoop = doMessageLoop(); + this.startAcceptLoop(); } public Capability.Client bootstrap(VatId vatId) { @@ -68,21 +61,19 @@ public class RpcSystem { } } - RpcState getConnectionState(VatNetwork.Connection connection) { - var state = this.connections.get(connection); - if (state == null) { - var onDisconnect = new CompletableFuture() - .whenComplete((info, exc) -> { - this.connections.remove(connection); - try { - connection.close(); - } catch (IOException ignored) { - } - }); + public VatNetwork getNetwork() { + return this.network; + } - state = new RpcState<>(this.bootstrapFactory, connection, onDisconnect); - this.connections.put(connection, state); - } + RpcState getConnectionState(VatNetwork.Connection connection) { + var state = this.connections.computeIfAbsent(connection, conn -> { + var onDisconnect = new CompletableFuture(); + onDisconnect.thenCompose(info -> { + this.connections.remove(connection); + return info.shutdownPromise.thenRun(() -> connection.close()); + }); + return new RpcState<>(this.bootstrapFactory, conn, onDisconnect); + }); return state; } @@ -90,27 +81,10 @@ public class RpcSystem { getConnectionState(connection); } - private CompletableFuture doAcceptLoop() { - return this.network.baseAccept().thenCompose(connection -> { - this.accept(connection); - return this.doAcceptLoop(); - }); - } - - private CompletableFuture doMessageLoop() { - var accept = this.getAcceptLoop(); - for (var conn: this.connections.values()) { - accept = accept.acceptEither(conn.getMessageLoop(), x -> {}); - } - return accept.thenCompose(x -> this.doMessageLoop()); - } - - public CompletableFuture getMessageLoop() { - return this.messageLoop; - } - - private CompletableFuture getAcceptLoop() { - return this.acceptLoop; + private void startAcceptLoop() { + this.network.baseAccept() + .thenAccept(this::accept) + .thenRunAsync(this::startAcceptLoop); } public static diff --git a/runtime-rpc/src/main/java/org/capnproto/TwoPartyClient.java b/runtime-rpc/src/main/java/org/capnproto/TwoPartyClient.java index d6056c6..f42f5b2 100644 --- a/runtime-rpc/src/main/java/org/capnproto/TwoPartyClient.java +++ b/runtime-rpc/src/main/java/org/capnproto/TwoPartyClient.java @@ -1,6 +1,6 @@ package org.capnproto; -import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.AsynchronousByteChannel; import java.util.concurrent.CompletableFuture; public class TwoPartyClient { @@ -8,15 +8,15 @@ public class TwoPartyClient { private final TwoPartyVatNetwork network; private final RpcSystem rpcSystem; - public TwoPartyClient(AsynchronousSocketChannel channel) { + public TwoPartyClient(AsynchronousByteChannel channel) { this(channel, null); } - public TwoPartyClient(AsynchronousSocketChannel channel, Capability.Client bootstrapInterface) { + public TwoPartyClient(AsynchronousByteChannel channel, Capability.Client bootstrapInterface) { this(channel, bootstrapInterface, RpcTwoPartyProtocol.Side.CLIENT); } - public TwoPartyClient(AsynchronousSocketChannel channel, + public TwoPartyClient(AsynchronousByteChannel channel, Capability.Client bootstrapInterface, RpcTwoPartyProtocol.Side side) { this.network = new TwoPartyVatNetwork(channel, side); @@ -31,4 +31,8 @@ public class TwoPartyClient { : RpcTwoPartyProtocol.Side.CLIENT); return rpcSystem.bootstrap(vatId.asReader()); } + + CompletableFuture onDisconnect() { + return this.network.onDisconnect(); + } } diff --git a/runtime-rpc/src/main/java/org/capnproto/TwoPartyServer.java b/runtime-rpc/src/main/java/org/capnproto/TwoPartyServer.java index 5503b0a..b51b752 100644 --- a/runtime-rpc/src/main/java/org/capnproto/TwoPartyServer.java +++ b/runtime-rpc/src/main/java/org/capnproto/TwoPartyServer.java @@ -10,135 +10,80 @@ import java.util.concurrent.CompletableFuture; public class TwoPartyServer { private class AcceptedConnection { - final AsynchronousSocketChannel channel; + final AsynchronousSocketChannel connection; final TwoPartyVatNetwork network; final RpcSystem rpcSystem; - private final CompletableFuture messageLoop; - AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousSocketChannel channel) { - this.channel = channel; - this.network = new TwoPartyVatNetwork(channel, RpcTwoPartyProtocol.Side.SERVER); + AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousSocketChannel connection) { + this.connection = connection; + this.network = new TwoPartyVatNetwork(this.connection, RpcTwoPartyProtocol.Side.SERVER); this.rpcSystem = new RpcSystem<>(network, bootstrapInterface); - this.messageLoop = this.rpcSystem.getMessageLoop().exceptionally(exc -> { - connections.remove(this); - return null; - }); - } - - public CompletableFuture getMessageLoop() { - return this.messageLoop; } } class ConnectionReceiver { - AsynchronousServerSocketChannel listener; - final CompletableFuture messageLoop; - public ConnectionReceiver(AsynchronousServerSocketChannel listener) { + final AsynchronousServerSocketChannel listener; + + ConnectionReceiver(AsynchronousServerSocketChannel listener) { this.listener = listener; - this.messageLoop = doMessageLoop(); } - public CompletableFuture getMessageLoop() { - return this.messageLoop; - } - - private CompletableFuture doMessageLoop() { - final var accepted = new CompletableFuture(); - listener.accept(null, new CompletionHandler<>() { - + CompletableFuture accept() { + CompletableFuture result = new CompletableFuture<>(); + this.listener.accept(null, new CompletionHandler<>() { @Override public void completed(AsynchronousSocketChannel channel, Object attachment) { - accepted.complete(channel); + result.complete(channel); } @Override public void failed(Throwable exc, Object attachment) { - accepted.completeExceptionally(exc); + result.completeExceptionally(exc); } }); - return accepted.thenCompose(channel -> CompletableFuture.allOf( - accept(channel), - doMessageLoop())); + return result.copy(); } } private final Capability.Client bootstrapInterface; private final List connections = new ArrayList<>(); - private final List listeners = new ArrayList<>(); - private final CompletableFuture messageLoop; public TwoPartyServer(Capability.Client bootstrapInterface) { this.bootstrapInterface = bootstrapInterface; - this.messageLoop = doMessageLoop(); } public TwoPartyServer(Capability.Server bootstrapServer) { this(new Capability.Client(bootstrapServer)); } - private CompletableFuture getMessageLoop() { - return this.messageLoop; - } - - public CompletableFuture drain() { - CompletableFuture done = new CompletableFuture<>(); - for (var conn: this.connections) { - done = CompletableFuture.allOf(done, conn.getMessageLoop()); - } - return done; - } - - private CompletableFuture accept(AsynchronousSocketChannel channel) { + public void accept(AsynchronousSocketChannel channel) { var connection = new AcceptedConnection(this.bootstrapInterface, channel); this.connections.add(connection); - return connection.network.onDisconnect().whenComplete((x, exc) -> { + connection.network.onDisconnect().whenComplete((x, exc) -> { this.connections.remove(connection); }); } -/* - private final CompletableFuture acceptLoop(AsynchronousServerSocketChannel listener) { - final var accepted = new CompletableFuture(); - listener.accept(null, new CompletionHandler<>() { - @Override - public void completed(AsynchronousSocketChannel channel, Object attachment) { - accepted.complete(channel); - } + public CompletableFuture listen(AsynchronousServerSocketChannel listener) { + return this.listen(wrapListenSocket(listener)); + } - @Override - public void failed(Throwable exc, Object attachment) { - accepted.completeExceptionally(exc); - } + CompletableFuture listen(ConnectionReceiver listener) { + return listener.accept().thenCompose(channel -> { + this.accept(channel); + return this.listen(listener); }); - return accepted.thenCompose(channel -> CompletableFuture.anyOf( - accept(channel), - acceptLoop(listener))); - } -*/ - public CompletableFuture listen(AsynchronousServerSocketChannel listener) { - var receiver = new ConnectionReceiver(listener); - this.listeners.add(receiver); - return receiver.getMessageLoop(); } - private CompletableFuture doMessageLoop() { - var done = new CompletableFuture<>(); + CompletableFuture drain() { + CompletableFuture loop = CompletableFuture.completedFuture(null); for (var conn: this.connections) { - done = CompletableFuture.anyOf(done, conn.getMessageLoop()); + loop = CompletableFuture.allOf(loop, conn.network.onDisconnect()); } - for (var listener: this.listeners) { - done = CompletableFuture.anyOf(done, listener.getMessageLoop()); - } - return done.thenCompose(x -> doMessageLoop()); + return loop; } - /* - public CompletableFuture runOnce() { - var done = new CompletableFuture<>(); - for (var conn: connections) { - done = CompletableFuture.anyOf(done, conn.runOnce()); - } - return done; + ConnectionReceiver wrapListenSocket(AsynchronousServerSocketChannel channel) { + return new ConnectionReceiver(channel); } - */ } diff --git a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java index b8a2c12..40b4ba1 100644 --- a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java +++ b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java @@ -37,9 +37,14 @@ public class TwoPartyVatNetwork } @Override - public void close() throws IOException { - this.channel.close(); - this.disconnectPromise.complete(null); + public void close() { + try { + this.channel.close(); + this.disconnectPromise.complete(null); + } + catch (Exception exc) { + this.disconnectPromise.completeExceptionally(exc); + } } public RpcTwoPartyProtocol.Side getSide() { @@ -113,13 +118,13 @@ public class TwoPartyVatNetwork public CompletableFuture shutdown() { assert this.previousWrite != null: "Already shut down"; - var result = this.previousWrite.thenRun(() -> { + var result = this.previousWrite.whenComplete((void_, exc) -> { try { if (this.channel instanceof AsynchronousSocketChannel) { ((AsynchronousSocketChannel)this.channel).shutdownOutput(); } } - catch (Exception ioExc) { + catch (Exception ignored) { } }); diff --git a/runtime-rpc/src/main/java/org/capnproto/VatNetwork.java b/runtime-rpc/src/main/java/org/capnproto/VatNetwork.java index 5660d6c..4204b78 100644 --- a/runtime-rpc/src/main/java/org/capnproto/VatNetwork.java +++ b/runtime-rpc/src/main/java/org/capnproto/VatNetwork.java @@ -13,7 +13,7 @@ public interface VatNetwork CompletableFuture receiveIncomingMessage(); CompletableFuture shutdown(); VatId getPeerVatId(); - void close() throws IOException; + void close(); } CompletableFuture> baseAccept(); diff --git a/runtime-rpc/src/test/java/org/capnproto/TwoPartyTest.java b/runtime-rpc/src/test/java/org/capnproto/TwoPartyTest.java index 0102c0a..fa92cd5 100644 --- a/runtime-rpc/src/test/java/org/capnproto/TwoPartyTest.java +++ b/runtime-rpc/src/test/java/org/capnproto/TwoPartyTest.java @@ -4,78 +4,78 @@ import org.capnproto.rpctest.*; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.function.ThrowingRunnable; import java.io.IOException; +import java.nio.channels.AsynchronousByteChannel; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; public class TwoPartyTest { - private Thread runServer(org.capnproto.TwoPartyVatNetwork network) { - var thread = new Thread(() -> { - try { - network.onDisconnect().get(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } - }, "Server"); + static final class PipeThread { + Thread thread; + AsynchronousByteChannel channel; - thread.start(); - return thread; + static PipeThread newPipeThread(Consumer startFunc) throws Exception { + var pipeThread = new PipeThread(); + var serverAcceptSocket = AsynchronousServerSocketChannel.open(); + serverAcceptSocket.bind(null); + var clientSocket = AsynchronousSocketChannel.open(); + + pipeThread.thread = new Thread(() -> { + try { + var serverSocket = serverAcceptSocket.accept().get(); + startFunc.accept(serverSocket); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + pipeThread.thread.start(); + pipeThread.thread.setName("TwoPartyTest server"); + + clientSocket.connect(serverAcceptSocket.getLocalAddress()).get(); + pipeThread.channel = clientSocket; + return pipeThread; + } } - AsynchronousServerSocketChannel serverAcceptSocket; - AsynchronousSocketChannel serverSocket; - AsynchronousSocketChannel clientSocket; - TwoPartyClient client; - org.capnproto.TwoPartyVatNetwork serverNetwork; - Thread serverThread; + PipeThread runServer(Capability.Server bootstrapInterface) throws Exception { + return runServer(new Capability.Client(bootstrapInterface)); + } + + PipeThread runServer(Capability.Client bootstrapInterface) throws Exception { + return PipeThread.newPipeThread(channel -> { + var network = new TwoPartyVatNetwork(channel, RpcTwoPartyProtocol.Side.SERVER); + var system = new RpcSystem<>(network, bootstrapInterface); + network.onDisconnect().join(); + }); + } @Before - public void setUp() throws Exception { - this.serverAcceptSocket = AsynchronousServerSocketChannel.open(); - this.serverAcceptSocket.bind(null); - - this.clientSocket = AsynchronousSocketChannel.open(); - this.clientSocket.connect(this.serverAcceptSocket.getLocalAddress()).get(); - this.client = new TwoPartyClient(clientSocket); - //this.client.getNetwork().setTap(new Tap()); - - this.serverSocket = serverAcceptSocket.accept().get(); - this.serverNetwork = new org.capnproto.TwoPartyVatNetwork(this.serverSocket, RpcTwoPartyProtocol.Side.SERVER); - //this.serverNetwork.setTap(new Tap()); - //this.serverNetwork.dumper.addSchema(Demo.TestCap1); - this.serverThread = runServer(this.serverNetwork); + public void setUp() { } @After - public void tearDown() throws Exception { - this.clientSocket.close(); - this.serverSocket.close(); - this.serverAcceptSocket.close(); - this.serverThread.join(); - this.client = null; + public void tearDown() { } @org.junit.Test - public void testNullCap() throws ExecutionException, InterruptedException { - var server = new RpcSystem<>(this.serverNetwork, new Capability.Client()); - var cap = this.client.bootstrap(); - var resolved = cap.whenResolved(); + public void testNullCap() throws Exception { + var pipe = runServer(new Capability.Client()); + var rpcClient = new TwoPartyClient(pipe.channel); + var client = rpcClient.bootstrap(); + var resolved = client.whenResolved(); resolved.get(); } @org.junit.Test - public void testBasic() throws InterruptedException, IOException { - + public void testBasic() throws Exception { var callCount = new Counter(); - var server = new RpcSystem<>(this.serverNetwork, new RpcTestUtil.TestInterfaceImpl(callCount)); - - var client = new Test.TestInterface.Client(this.client.bootstrap()); + var pipe = runServer(new RpcTestUtil.TestInterfaceImpl(callCount)); + var rpcClient = new TwoPartyClient(pipe.channel); + var client = new Test.TestInterface.Client(rpcClient.bootstrap()); var request1 = client.fooRequest(); request1.getParams().setI(123); request1.getParams().setJ(true); @@ -99,24 +99,22 @@ public class TwoPartyTest { promise3.join(); Assert.assertEquals(2, callCount.value()); - this.clientSocket.shutdownOutput(); - serverThread.join(); } @org.junit.Test public void testDisconnect() throws IOException { - this.serverSocket.shutdownOutput(); - this.serverNetwork.close(); - this.serverNetwork.onDisconnect().join(); + //this.serverSocket.shutdownOutput(); + //this.serverNetwork.close(); + //this.serverNetwork.onDisconnect().join(); } @org.junit.Test - public void testPipelining() throws IOException { + public void testPipelining() throws Exception { var callCount = new Counter(); var chainedCallCount = new Counter(); - - var server = new RpcSystem<>(this.serverNetwork, new RpcTestUtil.TestPipelineImpl(callCount)); - var client = new Test.TestPipeline.Client(this.client.bootstrap()); + var pipe = runServer(new RpcTestUtil.TestPipelineImpl(callCount)); + var rpcClient = new TwoPartyClient(pipe.channel); + var client = new Test.TestPipeline.Client(rpcClient.bootstrap()); { var request = client.getCapRequest(); @@ -146,11 +144,9 @@ public class TwoPartyTest { Assert.assertEquals(1, chainedCallCount.value()); } - /* - // disconnect the server - //this.serverSocket.shutdownOutput(); - this.serverNetwork.close(); - this.serverNetwork.onDisconnect().join(); + // disconnect the client + ((AsynchronousSocketChannel)pipe.channel).shutdownOutput(); + rpcClient.onDisconnect().join(); { // Use the now-broken capability. @@ -173,8 +169,11 @@ public class TwoPartyTest { Assert.assertEquals(3, callCount.value()); Assert.assertEquals(1, chainedCallCount.value()); } + } + + @org.junit.Test + public void testAbort() { - */ } /* @org.junit.Test diff --git a/runtime/src/main/java/org/capnproto/AnyPointer.java b/runtime/src/main/java/org/capnproto/AnyPointer.java index a15f68c..63d5ca4 100644 --- a/runtime/src/main/java/org/capnproto/AnyPointer.java +++ b/runtime/src/main/java/org/capnproto/AnyPointer.java @@ -167,6 +167,11 @@ public final class AnyPointer { return this; } + @Override + public void cancel(Throwable exc) { + this.hook.cancel(exc); + } + public Pipeline noop() { return new Pipeline(this.hook, this.ops.clone()); } diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index ae2c57f..61bbaa7 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -388,11 +388,6 @@ public final class Capability { public final ClientHook getPipelinedCap(PipelineOp[] ops) { return this.results.getPipelinedCap(ops); } - - @Override - public void close() { - this.ctx.allowCancellation(); - } } private static final class LocalResponse implements ResponseHook { @@ -542,7 +537,7 @@ public final class Capability { : new QueuedClient(this.promise.thenApply( pipeline -> pipeline.getPipelinedCap(ops))); } - +/* @Override public void close() { if (this.redirect != null) { @@ -552,6 +547,7 @@ public final class Capability { this.promise.cancel(false); } } + */ } // A ClientHook which simply queues calls while waiting for a ClientHook to which to forward them. diff --git a/runtime/src/main/java/org/capnproto/Pipeline.java b/runtime/src/main/java/org/capnproto/Pipeline.java index 1cfe25e..df9ad68 100644 --- a/runtime/src/main/java/org/capnproto/Pipeline.java +++ b/runtime/src/main/java/org/capnproto/Pipeline.java @@ -1,5 +1,10 @@ package org.capnproto; public interface Pipeline { + AnyPointer.Pipeline typelessPipeline(); + + default void cancel(Throwable exc) { + this.typelessPipeline().cancel(exc); + } } diff --git a/runtime/src/main/java/org/capnproto/PipelineHook.java b/runtime/src/main/java/org/capnproto/PipelineHook.java index 32ff23f..1298ab1 100644 --- a/runtime/src/main/java/org/capnproto/PipelineHook.java +++ b/runtime/src/main/java/org/capnproto/PipelineHook.java @@ -1,14 +1,13 @@ package org.capnproto; -public interface PipelineHook extends AutoCloseable { +public interface PipelineHook { ClientHook getPipelinedCap(PipelineOp[] ops); + default void cancel(Throwable exc) { + } + static PipelineHook newBrokenPipeline(Throwable exc) { return ops -> Capability.newBrokenCap(exc); } - - @Override - default void close() { - } } diff --git a/runtime/src/main/java/org/capnproto/RemotePromise.java b/runtime/src/main/java/org/capnproto/RemotePromise.java index 86bd8ef..6ea4503 100644 --- a/runtime/src/main/java/org/capnproto/RemotePromise.java +++ b/runtime/src/main/java/org/capnproto/RemotePromise.java @@ -27,8 +27,8 @@ public class RemotePromise } @Override - public void close() throws Exception { - this.pipeline.hook.close(); + public void close() { + this.pipeline.cancel(RpcException.failed("Cancelled")); this.join(); }