diff --git a/runtime-rpc/src/main/java/org/capnproto/EzRpcClient.java b/runtime-rpc/src/main/java/org/capnproto/EzRpcClient.java index cb72660..64ba830 100644 --- a/runtime-rpc/src/main/java/org/capnproto/EzRpcClient.java +++ b/runtime-rpc/src/main/java/org/capnproto/EzRpcClient.java @@ -1,6 +1,6 @@ package org.capnproto; -import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.AsynchronousByteChannel; import java.util.concurrent.CompletableFuture; public class EzRpcClient { @@ -8,7 +8,7 @@ public class EzRpcClient { private final TwoPartyClient twoPartyRpc; private final Capability.Client client; - public EzRpcClient(AsynchronousSocketChannel socket) { + public EzRpcClient(AsynchronousByteChannel socket) { this.twoPartyRpc = new TwoPartyClient(socket); this.client = this.twoPartyRpc.bootstrap(); } diff --git a/runtime-rpc/src/main/java/org/capnproto/EzRpcServer.java b/runtime-rpc/src/main/java/org/capnproto/EzRpcServer.java index ccd313f..21001bd 100644 --- a/runtime-rpc/src/main/java/org/capnproto/EzRpcServer.java +++ b/runtime-rpc/src/main/java/org/capnproto/EzRpcServer.java @@ -2,8 +2,7 @@ package org.capnproto; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.AsynchronousChannelGroup; -import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -32,6 +31,21 @@ public class EzRpcServer { } public CompletableFuture start() { - return this.twoPartyRpc.listen(this.serverAcceptSocket); + return this.twoPartyRpc.listen(new AsynchronousByteListenChannel() { + @Override + public void accept(A attachment, CompletionHandler handler) { + serverAcceptSocket.accept(attachment, new CompletionHandler<>() { + @Override + public void completed(AsynchronousSocketChannel result, A attachment) { + handler.completed(result, attachment); + } + + @Override + public void failed(Throwable exc, A attachment) { + handler.failed(exc, attachment); + } + }); + } + }); } } diff --git a/runtime-rpc/src/main/java/org/capnproto/TwoPartyClient.java b/runtime-rpc/src/main/java/org/capnproto/TwoPartyClient.java index 5c21a48..f1296cb 100644 --- a/runtime-rpc/src/main/java/org/capnproto/TwoPartyClient.java +++ b/runtime-rpc/src/main/java/org/capnproto/TwoPartyClient.java @@ -1,6 +1,6 @@ package org.capnproto; -import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.AsynchronousByteChannel; import java.util.concurrent.CompletableFuture; public class TwoPartyClient { @@ -8,15 +8,15 @@ public class TwoPartyClient { private final TwoPartyVatNetwork network; private final RpcSystem rpcSystem; - public TwoPartyClient(AsynchronousSocketChannel channel) { + public TwoPartyClient(AsynchronousByteChannel channel) { this(channel, null); } - public TwoPartyClient(AsynchronousSocketChannel channel, Capability.Client bootstrapInterface) { + public TwoPartyClient(AsynchronousByteChannel channel, Capability.Client bootstrapInterface) { this(channel, bootstrapInterface, RpcTwoPartyProtocol.Side.CLIENT); } - public TwoPartyClient(AsynchronousSocketChannel channel, + public TwoPartyClient(AsynchronousByteChannel channel, Capability.Client bootstrapInterface, RpcTwoPartyProtocol.Side side) { this.network = new TwoPartyVatNetwork(channel, side); diff --git a/runtime-rpc/src/main/java/org/capnproto/TwoPartyServer.java b/runtime-rpc/src/main/java/org/capnproto/TwoPartyServer.java index 7f39f16..aaf59c0 100644 --- a/runtime-rpc/src/main/java/org/capnproto/TwoPartyServer.java +++ b/runtime-rpc/src/main/java/org/capnproto/TwoPartyServer.java @@ -8,11 +8,11 @@ import java.util.concurrent.CompletableFuture; public class TwoPartyServer { private class AcceptedConnection { - private final AsynchronousSocketChannel connection; + private final AsynchronousByteChannel connection; private final TwoPartyVatNetwork network; private final RpcSystem rpcSystem; - AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousSocketChannel connection) { + AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousByteChannel connection) { this.connection = connection; this.network = new TwoPartyVatNetwork(this.connection, RpcTwoPartyProtocol.Side.SERVER); this.rpcSystem = new RpcSystem<>(network, bootstrapInterface); @@ -31,7 +31,7 @@ public class TwoPartyServer { this(new Capability.Client(bootstrapServer)); } - public void accept(AsynchronousSocketChannel channel) { + public void accept(AsynchronousByteChannel channel) { var connection = new AcceptedConnection(this.bootstrapInterface, channel); this.connections.add(connection); connection.network.onDisconnect().whenComplete((x, exc) -> { @@ -39,11 +39,11 @@ public class TwoPartyServer { }); } - public CompletableFuture listen(AsynchronousServerSocketChannel listener) { - var result = new CompletableFuture(); + public CompletableFuture listen(AsynchronousByteListenChannel listener) { + var result = new CompletableFuture(); listener.accept(null, new CompletionHandler<>() { @Override - public void completed(AsynchronousSocketChannel channel, Object attachment) { + public void completed(AsynchronousByteChannel channel, Object attachment) { accept(channel); result.complete(null); } diff --git a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java index 70ee22a..3344147 100644 --- a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java +++ b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java @@ -1,7 +1,7 @@ package org.capnproto; import java.io.FileDescriptor; -import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.AsynchronousByteChannel; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -11,12 +11,12 @@ public class TwoPartyVatNetwork private CompletableFuture previousWrite = CompletableFuture.completedFuture(null); private final CompletableFuture disconnectPromise = new CompletableFuture<>(); - private final AsynchronousSocketChannel channel; + private final AsynchronousByteChannel channel; private final RpcTwoPartyProtocol.Side side; private final MessageBuilder peerVatId = new MessageBuilder(4); private boolean accepted; - public TwoPartyVatNetwork(AsynchronousSocketChannel channel, RpcTwoPartyProtocol.Side side) { + public TwoPartyVatNetwork(AsynchronousByteChannel channel, RpcTwoPartyProtocol.Side side) { this.channel = channel; this.side = side; this.peerVatId.initRoot(RpcTwoPartyProtocol.VatId.factory).setSide( diff --git a/runtime/src/main/java/org/capnproto/AsynchronousByteListenChannel.java b/runtime/src/main/java/org/capnproto/AsynchronousByteListenChannel.java new file mode 100644 index 0000000..99161ac --- /dev/null +++ b/runtime/src/main/java/org/capnproto/AsynchronousByteListenChannel.java @@ -0,0 +1,8 @@ +package org.capnproto; + +import java.nio.channels.AsynchronousByteChannel; +import java.nio.channels.CompletionHandler; + +public interface AsynchronousByteListenChannel { + public abstract void accept(A attachment, CompletionHandler handler); +} diff --git a/runtime/src/main/java/org/capnproto/Serialize.java b/runtime/src/main/java/org/capnproto/Serialize.java index 5b8ae75..bcd3a77 100644 --- a/runtime/src/main/java/org/capnproto/Serialize.java +++ b/runtime/src/main/java/org/capnproto/Serialize.java @@ -338,51 +338,10 @@ public final class Serialize { abstract void read(int bytes, Consumer consumer); } - static class AsyncSocketReader extends AsyncMessageReader { - private final AsynchronousSocketChannel channel; - private final long timeout; - private final TimeUnit timeUnit; - - AsyncSocketReader(AsynchronousSocketChannel channel, ReaderOptions options, long timeout, TimeUnit timeUnit) { - super(options); - this.channel = channel; - this.timeout = timeout; - this.timeUnit = timeUnit; - } - - void read(int bytes, Consumer consumer) { - var buffer = Serialize.makeByteBuffer(bytes); - var handler = new CompletionHandler() { - @Override - public void completed(Integer result, Object attachment) { - //System.out.println(channel.toString() + ": read " + result + " bytes"); - if (result <= 0) { - var text = result == 0 - ? "Read zero bytes. Is the channel in non-blocking mode?" - : "Premature EOF"; - readCompleted.completeExceptionally(new IOException(text)); - } else if (buffer.hasRemaining()) { - // partial read - channel.read(buffer, timeout, timeUnit, null, this); - } else { - consumer.accept(buffer); - } - } - - @Override - public void failed(Throwable exc, Object attachment) { - readCompleted.completeExceptionally(exc); - } - }; - - this.channel.read(buffer, this.timeout, this.timeUnit, null, handler); - } - } - - static class AsyncByteChannelReader extends AsyncMessageReader { + static class AsynchronousByteChannelReader extends AsyncMessageReader { private final AsynchronousByteChannel channel; - AsyncByteChannelReader(AsynchronousByteChannel channel, ReaderOptions options) { + AsynchronousByteChannelReader(AsynchronousByteChannel channel, ReaderOptions options) { super(options); this.channel = channel; } @@ -421,23 +380,7 @@ public final class Serialize { } public static CompletableFuture readAsync(AsynchronousByteChannel channel, ReaderOptions options) { - return new AsyncByteChannelReader(channel, options).getMessage(); - } - - public static CompletableFuture readAsync(AsynchronousSocketChannel channel) { - return readAsync(channel, ReaderOptions.DEFAULT_READER_OPTIONS, Long.MAX_VALUE, TimeUnit.SECONDS); - } - - public static CompletableFuture readAsync(AsynchronousSocketChannel channel, ReaderOptions options) { - return readAsync(channel, options, Long.MAX_VALUE, TimeUnit.SECONDS); - } - - public static CompletableFuture readAsync(AsynchronousSocketChannel channel, long timeout, TimeUnit timeUnit) { - return readAsync(channel, ReaderOptions.DEFAULT_READER_OPTIONS, timeout, timeUnit); - } - - public static CompletableFuture readAsync(AsynchronousSocketChannel channel, ReaderOptions options, long timeout, TimeUnit timeUnit) { - return new AsyncSocketReader(channel, options, timeout, timeUnit).getMessage(); + return new AsynchronousByteChannelReader(channel, options).getMessage(); } public static CompletableFuture writeAsync(AsynchronousByteChannel outputChannel, MessageBuilder message) { @@ -477,50 +420,6 @@ public final class Serialize { return writeCompleted; } - public static CompletableFuture writeAsync(AsynchronousSocketChannel outputChannel, MessageBuilder message) { - return writeAsync(outputChannel, message, Long.MAX_VALUE, TimeUnit.SECONDS); - } - - public static CompletableFuture writeAsync(AsynchronousSocketChannel outputChannel, MessageBuilder message, long timeout, TimeUnit timeUnit) { - var writeCompleted = new CompletableFuture(); - var segments = message.getSegmentsForOutput(); - var header = getHeaderForOutput(segments); - long totalBytes = header.remaining(); - - // TODO avoid this copy? - var allSegments = new ByteBuffer[segments.length+1]; - allSegments[0] = header; - for (int ii = 0; ii < segments.length; ++ii) { - var segment = segments[ii]; - allSegments[ii+1] = segment; - totalBytes += segment.remaining(); - } - - outputChannel.write(allSegments, 0, allSegments.length, timeout, timeUnit, totalBytes, new CompletionHandler<>() { - @Override - public void completed(Long result, Long totalBytes) { - //System.out.println(outputChannel.toString() + ": Wrote " + result + "/" + totalBytes + " bytes"); - if (result < 0) { - writeCompleted.completeExceptionally(new IOException("Write failed")); - } - else if (result < totalBytes) { - // partial write - outputChannel.write(allSegments, 0, allSegments.length, timeout, timeUnit, totalBytes - result, this); - } - else { - writeCompleted.complete(null); - } - } - - @Override - public void failed(Throwable exc, Long attachment) { - writeCompleted.completeExceptionally(exc); - } - }); - - return writeCompleted; - } - private static ByteBuffer getHeaderForOutput(ByteBuffer[] segments) { assert segments.length > 0: "Empty message"; int tableSize = (segments.length + 2) & (~1);