use java nio AsynchronousByteChannel instead for Serialize
This commit is contained in:
parent
ee0d727ade
commit
e3f447d4c7
7 changed files with 43 additions and 122 deletions
|
@ -1,6 +1,6 @@
|
|||
package org.capnproto;
|
||||
|
||||
import java.nio.channels.AsynchronousSocketChannel;
|
||||
import java.nio.channels.AsynchronousByteChannel;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class EzRpcClient {
|
||||
|
@ -8,7 +8,7 @@ public class EzRpcClient {
|
|||
private final TwoPartyClient twoPartyRpc;
|
||||
private final Capability.Client client;
|
||||
|
||||
public EzRpcClient(AsynchronousSocketChannel socket) {
|
||||
public EzRpcClient(AsynchronousByteChannel socket) {
|
||||
this.twoPartyRpc = new TwoPartyClient(socket);
|
||||
this.client = this.twoPartyRpc.bootstrap();
|
||||
}
|
||||
|
|
|
@ -2,8 +2,7 @@ package org.capnproto;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.channels.AsynchronousChannelGroup;
|
||||
import java.nio.channels.AsynchronousServerSocketChannel;
|
||||
import java.nio.channels.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
|
@ -32,6 +31,21 @@ public class EzRpcServer {
|
|||
}
|
||||
|
||||
public CompletableFuture<java.lang.Void> start() {
|
||||
return this.twoPartyRpc.listen(this.serverAcceptSocket);
|
||||
return this.twoPartyRpc.listen(new AsynchronousByteListenChannel() {
|
||||
@Override
|
||||
public <A> void accept(A attachment, CompletionHandler<AsynchronousByteChannel, ? super A> handler) {
|
||||
serverAcceptSocket.accept(attachment, new CompletionHandler<>() {
|
||||
@Override
|
||||
public void completed(AsynchronousSocketChannel result, A attachment) {
|
||||
handler.completed(result, attachment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable exc, A attachment) {
|
||||
handler.failed(exc, attachment);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package org.capnproto;
|
||||
|
||||
import java.nio.channels.AsynchronousSocketChannel;
|
||||
import java.nio.channels.AsynchronousByteChannel;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class TwoPartyClient {
|
||||
|
@ -8,15 +8,15 @@ public class TwoPartyClient {
|
|||
private final TwoPartyVatNetwork network;
|
||||
private final RpcSystem<RpcTwoPartyProtocol.VatId.Reader> rpcSystem;
|
||||
|
||||
public TwoPartyClient(AsynchronousSocketChannel channel) {
|
||||
public TwoPartyClient(AsynchronousByteChannel channel) {
|
||||
this(channel, null);
|
||||
}
|
||||
|
||||
public TwoPartyClient(AsynchronousSocketChannel channel, Capability.Client bootstrapInterface) {
|
||||
public TwoPartyClient(AsynchronousByteChannel channel, Capability.Client bootstrapInterface) {
|
||||
this(channel, bootstrapInterface, RpcTwoPartyProtocol.Side.CLIENT);
|
||||
}
|
||||
|
||||
public TwoPartyClient(AsynchronousSocketChannel channel,
|
||||
public TwoPartyClient(AsynchronousByteChannel channel,
|
||||
Capability.Client bootstrapInterface,
|
||||
RpcTwoPartyProtocol.Side side) {
|
||||
this.network = new TwoPartyVatNetwork(channel, side);
|
||||
|
|
|
@ -8,11 +8,11 @@ import java.util.concurrent.CompletableFuture;
|
|||
public class TwoPartyServer {
|
||||
|
||||
private class AcceptedConnection {
|
||||
private final AsynchronousSocketChannel connection;
|
||||
private final AsynchronousByteChannel connection;
|
||||
private final TwoPartyVatNetwork network;
|
||||
private final RpcSystem<RpcTwoPartyProtocol.VatId.Reader> rpcSystem;
|
||||
|
||||
AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousSocketChannel connection) {
|
||||
AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousByteChannel connection) {
|
||||
this.connection = connection;
|
||||
this.network = new TwoPartyVatNetwork(this.connection, RpcTwoPartyProtocol.Side.SERVER);
|
||||
this.rpcSystem = new RpcSystem<>(network, bootstrapInterface);
|
||||
|
@ -31,7 +31,7 @@ public class TwoPartyServer {
|
|||
this(new Capability.Client(bootstrapServer));
|
||||
}
|
||||
|
||||
public void accept(AsynchronousSocketChannel channel) {
|
||||
public void accept(AsynchronousByteChannel channel) {
|
||||
var connection = new AcceptedConnection(this.bootstrapInterface, channel);
|
||||
this.connections.add(connection);
|
||||
connection.network.onDisconnect().whenComplete((x, exc) -> {
|
||||
|
@ -39,11 +39,11 @@ public class TwoPartyServer {
|
|||
});
|
||||
}
|
||||
|
||||
public CompletableFuture<java.lang.Void> listen(AsynchronousServerSocketChannel listener) {
|
||||
var result = new CompletableFuture<AsynchronousSocketChannel>();
|
||||
public CompletableFuture<java.lang.Void> listen(AsynchronousByteListenChannel listener) {
|
||||
var result = new CompletableFuture<AsynchronousByteChannel>();
|
||||
listener.accept(null, new CompletionHandler<>() {
|
||||
@Override
|
||||
public void completed(AsynchronousSocketChannel channel, Object attachment) {
|
||||
public void completed(AsynchronousByteChannel channel, Object attachment) {
|
||||
accept(channel);
|
||||
result.complete(null);
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package org.capnproto;
|
||||
|
||||
import java.io.FileDescriptor;
|
||||
import java.nio.channels.AsynchronousSocketChannel;
|
||||
import java.nio.channels.AsynchronousByteChannel;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
|
@ -11,12 +11,12 @@ public class TwoPartyVatNetwork
|
|||
|
||||
private CompletableFuture<java.lang.Void> previousWrite = CompletableFuture.completedFuture(null);
|
||||
private final CompletableFuture<java.lang.Void> disconnectPromise = new CompletableFuture<>();
|
||||
private final AsynchronousSocketChannel channel;
|
||||
private final AsynchronousByteChannel channel;
|
||||
private final RpcTwoPartyProtocol.Side side;
|
||||
private final MessageBuilder peerVatId = new MessageBuilder(4);
|
||||
private boolean accepted;
|
||||
|
||||
public TwoPartyVatNetwork(AsynchronousSocketChannel channel, RpcTwoPartyProtocol.Side side) {
|
||||
public TwoPartyVatNetwork(AsynchronousByteChannel channel, RpcTwoPartyProtocol.Side side) {
|
||||
this.channel = channel;
|
||||
this.side = side;
|
||||
this.peerVatId.initRoot(RpcTwoPartyProtocol.VatId.factory).setSide(
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
package org.capnproto;
|
||||
|
||||
import java.nio.channels.AsynchronousByteChannel;
|
||||
import java.nio.channels.CompletionHandler;
|
||||
|
||||
public interface AsynchronousByteListenChannel {
|
||||
public abstract <A> void accept(A attachment, CompletionHandler<AsynchronousByteChannel,? super A> handler);
|
||||
}
|
|
@ -338,51 +338,10 @@ public final class Serialize {
|
|||
abstract void read(int bytes, Consumer<? super ByteBuffer> consumer);
|
||||
}
|
||||
|
||||
static class AsyncSocketReader extends AsyncMessageReader {
|
||||
private final AsynchronousSocketChannel channel;
|
||||
private final long timeout;
|
||||
private final TimeUnit timeUnit;
|
||||
|
||||
AsyncSocketReader(AsynchronousSocketChannel channel, ReaderOptions options, long timeout, TimeUnit timeUnit) {
|
||||
super(options);
|
||||
this.channel = channel;
|
||||
this.timeout = timeout;
|
||||
this.timeUnit = timeUnit;
|
||||
}
|
||||
|
||||
void read(int bytes, Consumer<? super ByteBuffer> consumer) {
|
||||
var buffer = Serialize.makeByteBuffer(bytes);
|
||||
var handler = new CompletionHandler<Integer, Object>() {
|
||||
@Override
|
||||
public void completed(Integer result, Object attachment) {
|
||||
//System.out.println(channel.toString() + ": read " + result + " bytes");
|
||||
if (result <= 0) {
|
||||
var text = result == 0
|
||||
? "Read zero bytes. Is the channel in non-blocking mode?"
|
||||
: "Premature EOF";
|
||||
readCompleted.completeExceptionally(new IOException(text));
|
||||
} else if (buffer.hasRemaining()) {
|
||||
// partial read
|
||||
channel.read(buffer, timeout, timeUnit, null, this);
|
||||
} else {
|
||||
consumer.accept(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable exc, Object attachment) {
|
||||
readCompleted.completeExceptionally(exc);
|
||||
}
|
||||
};
|
||||
|
||||
this.channel.read(buffer, this.timeout, this.timeUnit, null, handler);
|
||||
}
|
||||
}
|
||||
|
||||
static class AsyncByteChannelReader extends AsyncMessageReader {
|
||||
static class AsynchronousByteChannelReader extends AsyncMessageReader {
|
||||
private final AsynchronousByteChannel channel;
|
||||
|
||||
AsyncByteChannelReader(AsynchronousByteChannel channel, ReaderOptions options) {
|
||||
AsynchronousByteChannelReader(AsynchronousByteChannel channel, ReaderOptions options) {
|
||||
super(options);
|
||||
this.channel = channel;
|
||||
}
|
||||
|
@ -421,23 +380,7 @@ public final class Serialize {
|
|||
}
|
||||
|
||||
public static CompletableFuture<MessageReader> readAsync(AsynchronousByteChannel channel, ReaderOptions options) {
|
||||
return new AsyncByteChannelReader(channel, options).getMessage();
|
||||
}
|
||||
|
||||
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel) {
|
||||
return readAsync(channel, ReaderOptions.DEFAULT_READER_OPTIONS, Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel, ReaderOptions options) {
|
||||
return readAsync(channel, options, Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel, long timeout, TimeUnit timeUnit) {
|
||||
return readAsync(channel, ReaderOptions.DEFAULT_READER_OPTIONS, timeout, timeUnit);
|
||||
}
|
||||
|
||||
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel, ReaderOptions options, long timeout, TimeUnit timeUnit) {
|
||||
return new AsyncSocketReader(channel, options, timeout, timeUnit).getMessage();
|
||||
return new AsynchronousByteChannelReader(channel, options).getMessage();
|
||||
}
|
||||
|
||||
public static CompletableFuture<java.lang.Void> writeAsync(AsynchronousByteChannel outputChannel, MessageBuilder message) {
|
||||
|
@ -477,50 +420,6 @@ public final class Serialize {
|
|||
return writeCompleted;
|
||||
}
|
||||
|
||||
public static CompletableFuture<java.lang.Void> writeAsync(AsynchronousSocketChannel outputChannel, MessageBuilder message) {
|
||||
return writeAsync(outputChannel, message, Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public static CompletableFuture<java.lang.Void> writeAsync(AsynchronousSocketChannel outputChannel, MessageBuilder message, long timeout, TimeUnit timeUnit) {
|
||||
var writeCompleted = new CompletableFuture<java.lang.Void>();
|
||||
var segments = message.getSegmentsForOutput();
|
||||
var header = getHeaderForOutput(segments);
|
||||
long totalBytes = header.remaining();
|
||||
|
||||
// TODO avoid this copy?
|
||||
var allSegments = new ByteBuffer[segments.length+1];
|
||||
allSegments[0] = header;
|
||||
for (int ii = 0; ii < segments.length; ++ii) {
|
||||
var segment = segments[ii];
|
||||
allSegments[ii+1] = segment;
|
||||
totalBytes += segment.remaining();
|
||||
}
|
||||
|
||||
outputChannel.write(allSegments, 0, allSegments.length, timeout, timeUnit, totalBytes, new CompletionHandler<>() {
|
||||
@Override
|
||||
public void completed(Long result, Long totalBytes) {
|
||||
//System.out.println(outputChannel.toString() + ": Wrote " + result + "/" + totalBytes + " bytes");
|
||||
if (result < 0) {
|
||||
writeCompleted.completeExceptionally(new IOException("Write failed"));
|
||||
}
|
||||
else if (result < totalBytes) {
|
||||
// partial write
|
||||
outputChannel.write(allSegments, 0, allSegments.length, timeout, timeUnit, totalBytes - result, this);
|
||||
}
|
||||
else {
|
||||
writeCompleted.complete(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable exc, Long attachment) {
|
||||
writeCompleted.completeExceptionally(exc);
|
||||
}
|
||||
});
|
||||
|
||||
return writeCompleted;
|
||||
}
|
||||
|
||||
private static ByteBuffer getHeaderForOutput(ByteBuffer[] segments) {
|
||||
assert segments.length > 0: "Empty message";
|
||||
int tableSize = (segments.length + 2) & (~1);
|
||||
|
|
Loading…
Reference in a new issue