diff --git a/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java b/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java index 9e5d0af..d9f67d8 100644 --- a/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java +++ b/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java @@ -12,7 +12,6 @@ public class TwoPartyVatNetwork void incoming(IncomingRpcMessage message, RpcTwoPartyProtocol.Side side); } - private static BuilderArena.AllocationStrategy allocationStrategy = BuilderArena.SUGGESTED_ALLOCATION_STRATEGY; private CompletableFuture previousWrite = CompletableFuture.completedFuture(null); private final CompletableFuture peerDisconnected = new CompletableFuture<>(); private final AsynchronousSocketChannel channel; @@ -68,22 +67,30 @@ public class TwoPartyVatNetwork @Override public CompletableFuture receiveIncomingMessage() { - return Serialize.readAsync(channel) + var message = Serialize.readAsync(channel) .thenApply(reader -> (IncomingRpcMessage) new IncomingMessage(reader)) .whenComplete((msg, exc) -> { if (exc != null) { this.peerDisconnected.complete(null); } - }) - .whenComplete((msg, exc) -> { - if (this.tap != null && msg != null) { - this.tap.incoming( - msg, - this.getSide() == RpcTwoPartyProtocol.Side.CLIENT - ? RpcTwoPartyProtocol.Side.SERVER - : RpcTwoPartyProtocol.Side.CLIENT); - } }); + + // send to message tap + if (this.tap != null) { + message = message.whenComplete((msg, exc) -> { + if (this.tap == null || msg == null) { + return; + } + + var side = this.getSide() == RpcTwoPartyProtocol.Side.CLIENT + ? RpcTwoPartyProtocol.Side.SERVER + : RpcTwoPartyProtocol.Side.CLIENT; + + this.tap.incoming(msg, side); + }); + } + + return message; } @Override @@ -114,11 +121,13 @@ public class TwoPartyVatNetwork final class OutgoingMessage implements OutgoingRpcMessage { - final MessageBuilder message; - List fds = List.of(); + private final MessageBuilder message; + private List fds = List.of(); OutgoingMessage(int firstSegmentWordSize) { - this.message = new MessageBuilder(firstSegmentWordSize, allocationStrategy); + this.message = new MessageBuilder(firstSegmentWordSize == 0 + ? BuilderArena.SUGGESTED_FIRST_SEGMENT_WORDS + : firstSegmentWordSize); } @Override @@ -148,8 +157,8 @@ public class TwoPartyVatNetwork final class IncomingMessage implements IncomingRpcMessage { - final MessageReader message; - final List fds; + private final MessageReader message; + private final List fds; IncomingMessage(MessageReader message) { this(message, List.of()); @@ -162,12 +171,12 @@ public class TwoPartyVatNetwork @Override public AnyPointer.Reader getBody() { - return message.getRoot(AnyPointer.factory); + return this.message.getRoot(AnyPointer.factory); } @Override public List getAttachedFds() { - return fds; + return this.fds; } } }