diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcSystem.java b/runtime-rpc/src/main/java/org/capnproto/RpcSystem.java index 2dda98d..fba8bf4 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcSystem.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcSystem.java @@ -82,7 +82,7 @@ public class RpcSystem { } private void startAcceptLoop() { - this.network.baseAccept() + this.network.accept() .thenAccept(this::accept) .thenRunAsync(this::startAcceptLoop); } diff --git a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java index 84a758f..1e82da3 100644 --- a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java +++ b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java @@ -1,6 +1,5 @@ package org.capnproto; -import java.io.IOException; import java.nio.channels.AsynchronousByteChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.List; @@ -10,11 +9,6 @@ public class TwoPartyVatNetwork implements VatNetwork, VatNetwork.Connection { - @Override - public CompletableFuture> baseAccept() { - return this.accept(); - } - public interface MessageTap { void incoming(IncomingRpcMessage message, RpcTwoPartyProtocol.Side side); } @@ -49,23 +43,7 @@ public class TwoPartyVatNetwork @Override public String toString() { - return this.getSide().toString(); - } - - public RpcTwoPartyProtocol.Side getSide() { - return side; - } - - public void setTap(MessageTap tap) { - this.tap = tap; - } - - public Connection asConnection() { - return this; - } - - public CompletableFuture onDisconnect() { - return this.disconnectPromise.copy(); + return this.side.toString(); } @Override @@ -75,17 +53,7 @@ public class TwoPartyVatNetwork : null; } - public CompletableFuture> accept() { - if (side == RpcTwoPartyProtocol.Side.SERVER & !accepted) { - accepted = true; - return CompletableFuture.completedFuture(this.asConnection()); - } - else { - // never completes - return new CompletableFuture<>(); - } - } - + @Override public RpcTwoPartyProtocol.VatId.Reader getPeerVatId() { return this.peerVatId.getRoot(RpcTwoPartyProtocol.VatId.factory).asReader(); } @@ -108,7 +76,7 @@ public class TwoPartyVatNetwork return; } - var side = this.getSide() == RpcTwoPartyProtocol.Side.CLIENT + var side = this.side == RpcTwoPartyProtocol.Side.CLIENT ? RpcTwoPartyProtocol.Side.SERVER : RpcTwoPartyProtocol.Side.CLIENT; @@ -137,6 +105,34 @@ public class TwoPartyVatNetwork return result; } + public RpcTwoPartyProtocol.Side getSide() { + return side; + } + + public void setTap(MessageTap tap) { + this.tap = tap; + } + + public Connection asConnection() { + return this; + } + + public CompletableFuture onDisconnect() { + return this.disconnectPromise.copy(); + } + + + public CompletableFuture> accept() { + if (side == RpcTwoPartyProtocol.Side.SERVER & !accepted) { + accepted = true; + return CompletableFuture.completedFuture(this.asConnection()); + } + else { + // never completes + return new CompletableFuture<>(); + } + } + final class OutgoingMessage implements OutgoingRpcMessage { private final MessageBuilder message; @@ -160,7 +156,7 @@ public class TwoPartyVatNetwork @Override public void send() { - previousWrite = previousWrite.thenCompose(x -> Serialize.writeAsync(channel, message)); + previousWrite = previousWrite.thenRun(() -> Serialize.writeAsync(channel, message)); } @Override @@ -173,7 +169,7 @@ public class TwoPartyVatNetwork } } - final class IncomingMessage implements IncomingRpcMessage { + static final class IncomingMessage implements IncomingRpcMessage { private final MessageReader message; private final List fds; diff --git a/runtime-rpc/src/main/java/org/capnproto/VatNetwork.java b/runtime-rpc/src/main/java/org/capnproto/VatNetwork.java index 3ee8726..eb8fc21 100644 --- a/runtime-rpc/src/main/java/org/capnproto/VatNetwork.java +++ b/runtime-rpc/src/main/java/org/capnproto/VatNetwork.java @@ -15,7 +15,6 @@ public interface VatNetwork void close(); } - CompletableFuture> baseAccept(); + CompletableFuture> accept(); Connection connect(VatId hostId); } - diff --git a/runtime-rpc/src/test/java/org/capnproto/RpcTest.java b/runtime-rpc/src/test/java/org/capnproto/RpcTest.java index 93523e4..66e8f92 100644 --- a/runtime-rpc/src/test/java/org/capnproto/RpcTest.java +++ b/runtime-rpc/src/test/java/org/capnproto/RpcTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import static org.capnproto.RpcState.FromException; public class RpcTest { @@ -187,7 +186,7 @@ public class RpcTest { int sent = 0; int received = 0; Map connections = new HashMap<>(); - Queue> fulfillerQueue = new ArrayDeque<>(); + Queue>> fulfillerQueue = new ArrayDeque<>(); Queue connectionQueue = new ArrayDeque<>(); TestNetworkAdapter(TestNetwork network, String self) { @@ -199,10 +198,6 @@ public class RpcTest { return new Connection(isClient, peerId); } - public CompletableFuture> baseAccept() { - return this.accept().thenApply(conn -> conn); - } - @Override public void close() { var exc = RpcException.failed("Network was destroyed"); @@ -241,11 +236,11 @@ public class RpcTest { return local; } - public CompletableFuture accept() { + public CompletableFuture> accept() { if (this.connections.isEmpty()) { - var promise = new CompletableFuture(); + var promise = new CompletableFuture>(); this.fulfillerQueue.add(promise); - return promise.thenApply(conn -> conn); + return promise; } else { return CompletableFuture.completedFuture(this.connectionQueue.remove());