diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index c364b34..d95e5cf 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -260,7 +260,8 @@ final class RpcState { private final CompletableFuture onDisconnect; private Throwable disconnected = null; private CompletableFuture messageReady = CompletableFuture.completedFuture(null); - private final CompletableFuture messageLoop; + private final CompletableFuture messageLoop = new CompletableFuture<>(); + // completes when the message loop exits private final ReferenceQueue questionRefs = new ReferenceQueue<>(); private final ReferenceQueue importRefs = new ReferenceQueue<>(); @@ -270,7 +271,7 @@ final class RpcState { this.bootstrapFactory = bootstrapFactory; this.connection = connection; this.onDisconnect = onDisconnect; - this.messageLoop = this.doMessageLoop(); + startMessageLoop(); } public CompletableFuture getMessageLoop() { @@ -397,24 +398,30 @@ final class RpcState { return pipeline.getPipelinedCap(new PipelineOp[0]); } - private CompletableFuture doMessageLoop() { + private void startMessageLoop() { if (isDisconnected()) { - return CompletableFuture.failedFuture(this.disconnected); + this.messageLoop.completeExceptionally(this.disconnected); + return; } - return connection.receiveIncomingMessage().thenCompose(message -> { - try { - this.handleMessage(message); - } catch (Exception rpcExc) { - // either we received an Abort message from peer - // or internal RpcState is bad. - return this.disconnect(rpcExc); - } - this.cleanupImports(); - this.cleanupQuestions(); - return this.doMessageLoop(); + var messageReader = this.connection.receiveIncomingMessage() + .thenAccept(message -> { + if (message == null) { + this.messageLoop.complete(null); + return; + } + try { + this.handleMessage(message); + } catch (Exception rpcExc) { + // either we received an Abort message from peer + // or internal RpcState is bad. + this.disconnect(rpcExc); + } + this.cleanupImports(); + this.cleanupQuestions(); + }); - }).exceptionallyCompose(exc -> this.disconnect(exc)); + messageReader.thenRunAsync(this::startMessageLoop); } private void handleMessage(IncomingRpcMessage message) throws RpcException { diff --git a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java index 2c86606..1576aa6 100644 --- a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java +++ b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java @@ -77,11 +77,7 @@ public class TwoPartyVatNetwork public CompletableFuture receiveIncomingMessage() { var message = Serialize.readAsync(channel) .thenApply(reader -> (IncomingRpcMessage) new IncomingMessage(reader)) - .whenComplete((msg, exc) -> { - if (exc != null) { - this.peerDisconnected.complete(null); - } - }); + .exceptionally(exc -> null); // send to message tap if (this.tap != null) { diff --git a/runtime-rpc/src/test/java/org/capnproto/RpcStateTest.java b/runtime-rpc/src/test/java/org/capnproto/RpcStateTest.java index 35636d1..7cb4959 100644 --- a/runtime-rpc/src/test/java/org/capnproto/RpcStateTest.java +++ b/runtime-rpc/src/test/java/org/capnproto/RpcStateTest.java @@ -110,7 +110,6 @@ public class RpcStateTest { var msg = new TestMessage(); msg.builder.getRoot(RpcProtocol.Message.factory).initUnimplemented(); this.connection.setNextIncomingMessage(msg); - Assert.assertFalse(sent.isEmpty()); } @Test