use gather writes for AsynchronousSocketChannels
This commit is contained in:
parent
de85613570
commit
e3d52a0bbd
5 changed files with 181 additions and 71 deletions
|
@ -1,7 +1,6 @@
|
||||||
package org.capnproto;
|
package org.capnproto;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.nio.channels.AsynchronousSocketChannel;
|
||||||
import java.nio.channels.AsynchronousByteChannel;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
public class TwoPartyClient {
|
public class TwoPartyClient {
|
||||||
|
@ -9,15 +8,15 @@ public class TwoPartyClient {
|
||||||
private final TwoPartyVatNetwork network;
|
private final TwoPartyVatNetwork network;
|
||||||
private final RpcSystem<RpcTwoPartyProtocol.VatId.Reader> rpcSystem;
|
private final RpcSystem<RpcTwoPartyProtocol.VatId.Reader> rpcSystem;
|
||||||
|
|
||||||
public TwoPartyClient(AsynchronousByteChannel channel) {
|
public TwoPartyClient(AsynchronousSocketChannel channel) {
|
||||||
this(channel, null);
|
this(channel, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TwoPartyClient(AsynchronousByteChannel channel, Capability.Client bootstrapInterface) {
|
public TwoPartyClient(AsynchronousSocketChannel channel, Capability.Client bootstrapInterface) {
|
||||||
this(channel, bootstrapInterface, RpcTwoPartyProtocol.Side.CLIENT);
|
this(channel, bootstrapInterface, RpcTwoPartyProtocol.Side.CLIENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TwoPartyClient(AsynchronousByteChannel channel,
|
public TwoPartyClient(AsynchronousSocketChannel channel,
|
||||||
Capability.Client bootstrapInterface,
|
Capability.Client bootstrapInterface,
|
||||||
RpcTwoPartyProtocol.Side side) {
|
RpcTwoPartyProtocol.Side side) {
|
||||||
this.network = new TwoPartyVatNetwork(channel, side);
|
this.network = new TwoPartyVatNetwork(channel, side);
|
||||||
|
|
|
@ -8,11 +8,11 @@ import java.util.concurrent.CompletableFuture;
|
||||||
public class TwoPartyServer {
|
public class TwoPartyServer {
|
||||||
|
|
||||||
private class AcceptedConnection {
|
private class AcceptedConnection {
|
||||||
private final AsynchronousByteChannel connection;
|
private final AsynchronousSocketChannel connection;
|
||||||
private final TwoPartyVatNetwork network;
|
private final TwoPartyVatNetwork network;
|
||||||
private final RpcSystem<RpcTwoPartyProtocol.VatId.Reader> rpcSystem;
|
private final RpcSystem<RpcTwoPartyProtocol.VatId.Reader> rpcSystem;
|
||||||
|
|
||||||
AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousByteChannel connection) {
|
AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousSocketChannel connection) {
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.network = new TwoPartyVatNetwork(this.connection, RpcTwoPartyProtocol.Side.SERVER);
|
this.network = new TwoPartyVatNetwork(this.connection, RpcTwoPartyProtocol.Side.SERVER);
|
||||||
this.rpcSystem = new RpcSystem<>(network, bootstrapInterface);
|
this.rpcSystem = new RpcSystem<>(network, bootstrapInterface);
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
package org.capnproto;
|
package org.capnproto;
|
||||||
|
|
||||||
import java.nio.channels.AsynchronousByteChannel;
|
|
||||||
import java.nio.channels.AsynchronousSocketChannel;
|
import java.nio.channels.AsynchronousSocketChannel;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
@ -11,12 +10,12 @@ public class TwoPartyVatNetwork
|
||||||
|
|
||||||
private CompletableFuture<java.lang.Void> previousWrite = CompletableFuture.completedFuture(null);
|
private CompletableFuture<java.lang.Void> previousWrite = CompletableFuture.completedFuture(null);
|
||||||
private final CompletableFuture<java.lang.Void> disconnectPromise = new CompletableFuture<>();
|
private final CompletableFuture<java.lang.Void> disconnectPromise = new CompletableFuture<>();
|
||||||
private final AsynchronousByteChannel channel;
|
private final AsynchronousSocketChannel channel;
|
||||||
private final RpcTwoPartyProtocol.Side side;
|
private final RpcTwoPartyProtocol.Side side;
|
||||||
private final MessageBuilder peerVatId = new MessageBuilder(4);
|
private final MessageBuilder peerVatId = new MessageBuilder(4);
|
||||||
private boolean accepted;
|
private boolean accepted;
|
||||||
|
|
||||||
public TwoPartyVatNetwork(AsynchronousByteChannel channel, RpcTwoPartyProtocol.Side side) {
|
public TwoPartyVatNetwork(AsynchronousSocketChannel channel, RpcTwoPartyProtocol.Side side) {
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
this.side = side;
|
this.side = side;
|
||||||
this.peerVatId.initRoot(RpcTwoPartyProtocol.VatId.factory).setSide(
|
this.peerVatId.initRoot(RpcTwoPartyProtocol.VatId.factory).setSide(
|
||||||
|
|
|
@ -24,14 +24,10 @@ package org.capnproto;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.ByteOrder;
|
import java.nio.ByteOrder;
|
||||||
import java.nio.channels.AsynchronousByteChannel;
|
import java.nio.channels.*;
|
||||||
import java.nio.channels.CompletionHandler;
|
|
||||||
import java.nio.channels.ReadableByteChannel;
|
|
||||||
import java.nio.channels.WritableByteChannel;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
public final class Serialize {
|
public final class Serialize {
|
||||||
|
@ -208,14 +204,11 @@ public final class Serialize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class AsyncMessageReader {
|
static abstract class AsyncMessageReader {
|
||||||
|
|
||||||
private final AsynchronousByteChannel channel;
|
|
||||||
private final ReaderOptions options;
|
private final ReaderOptions options;
|
||||||
private final CompletableFuture<MessageReader> readCompleted = new CompletableFuture<>();
|
protected final CompletableFuture<MessageReader> readCompleted = new CompletableFuture<>();
|
||||||
|
|
||||||
public AsyncMessageReader(AsynchronousByteChannel channel, ReaderOptions options) {
|
AsyncMessageReader(ReaderOptions options) {
|
||||||
this.channel = channel;
|
|
||||||
this.options = options;
|
this.options = options;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,8 +219,8 @@ public final class Serialize {
|
||||||
|
|
||||||
private void readHeader() {
|
private void readHeader() {
|
||||||
read(Constants.BYTES_PER_WORD, firstWord -> {
|
read(Constants.BYTES_PER_WORD, firstWord -> {
|
||||||
final var segmentCount = 1 + firstWord.getInt(0);
|
var segmentCount = 1 + firstWord.getInt(0);
|
||||||
final var segment0Size = firstWord.getInt(4);
|
var segment0Size = firstWord.getInt(4);
|
||||||
|
|
||||||
if (segmentCount == 1) {
|
if (segmentCount == 1) {
|
||||||
readSegments(segment0Size, segmentCount, segment0Size, null);
|
readSegments(segment0Size, segmentCount, segment0Size, null);
|
||||||
|
@ -241,7 +234,7 @@ public final class Serialize {
|
||||||
}
|
}
|
||||||
|
|
||||||
read(4 * (segmentCount & ~1), moreSizesRaw -> {
|
read(4 * (segmentCount & ~1), moreSizesRaw -> {
|
||||||
final var moreSizes = new int[segmentCount - 1];
|
var moreSizes = new int[segmentCount - 1];
|
||||||
var totalWords = segment0Size;
|
var totalWords = segment0Size;
|
||||||
|
|
||||||
for (int ii = 0; ii < segmentCount - 1; ++ii) {
|
for (int ii = 0; ii < segmentCount - 1; ++ii) {
|
||||||
|
@ -262,7 +255,7 @@ public final class Serialize {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final var segmentSlices = new ByteBuffer[segmentCount];
|
var segmentSlices = new ByteBuffer[segmentCount];
|
||||||
if (totalWords == 0) {
|
if (totalWords == 0) {
|
||||||
for (int ii = 0; ii < segmentCount; ++ii) {
|
for (int ii = 0; ii < segmentCount; ++ii) {
|
||||||
segmentSlices[ii] = ByteBuffer.allocate(0);
|
segmentSlices[ii] = ByteBuffer.allocate(0);
|
||||||
|
@ -273,17 +266,19 @@ public final class Serialize {
|
||||||
|
|
||||||
read(totalWords * Constants.BYTES_PER_WORD, allSegments -> {
|
read(totalWords * Constants.BYTES_PER_WORD, allSegments -> {
|
||||||
allSegments.rewind();
|
allSegments.rewind();
|
||||||
segmentSlices[0] = allSegments.slice();
|
var segment0 = allSegments.slice();
|
||||||
segmentSlices[0].limit(segment0Size * Constants.BYTES_PER_WORD);
|
segment0.limit(segment0Size * Constants.BYTES_PER_WORD);
|
||||||
segmentSlices[0].order(ByteOrder.LITTLE_ENDIAN);
|
segment0.order(ByteOrder.LITTLE_ENDIAN);
|
||||||
|
segmentSlices[0] = segment0;
|
||||||
|
|
||||||
int offset = segment0Size;
|
int offset = segment0Size;
|
||||||
for (int ii = 1; ii < segmentCount; ++ii) {
|
for (int ii = 1; ii < segmentCount; ++ii) {
|
||||||
allSegments.position(offset * Constants.BYTES_PER_WORD);
|
allSegments.position(offset * Constants.BYTES_PER_WORD);
|
||||||
var segmentSize = moreSizes[ii-1];
|
var segmentSize = moreSizes[ii-1];
|
||||||
segmentSlices[ii] = allSegments.slice();
|
var segment = allSegments.slice();
|
||||||
segmentSlices[ii].limit(segmentSize * Constants.BYTES_PER_WORD);
|
segment.limit(segmentSize * Constants.BYTES_PER_WORD);
|
||||||
segmentSlices[ii].order(ByteOrder.LITTLE_ENDIAN);
|
segment.order(ByteOrder.LITTLE_ENDIAN);
|
||||||
|
segmentSlices[ii] = segment;
|
||||||
offset += segmentSize;
|
offset += segmentSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,19 +286,71 @@ public final class Serialize {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void read(int bytes, Consumer<ByteBuffer> consumer) {
|
abstract void read(int bytes, Consumer<? super ByteBuffer> consumer);
|
||||||
final var buffer = Serialize.makeByteBuffer(bytes);
|
}
|
||||||
final var handler = new CompletionHandler<Integer, Object>() {
|
|
||||||
|
static class AsyncSocketReader extends AsyncMessageReader {
|
||||||
|
private final AsynchronousSocketChannel channel;
|
||||||
|
private final long timeout;
|
||||||
|
private final TimeUnit timeUnit;
|
||||||
|
|
||||||
|
AsyncSocketReader(AsynchronousSocketChannel channel, ReaderOptions options, long timeout, TimeUnit timeUnit) {
|
||||||
|
super(options);
|
||||||
|
this.channel = channel;
|
||||||
|
this.timeout = timeout;
|
||||||
|
this.timeUnit = timeUnit;
|
||||||
|
}
|
||||||
|
|
||||||
|
void read(int bytes, Consumer<? super ByteBuffer> consumer) {
|
||||||
|
var buffer = Serialize.makeByteBuffer(bytes);
|
||||||
|
var handler = new CompletionHandler<Integer, Object>() {
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer result, Object attachment) {
|
public void completed(Integer result, Object attachment) {
|
||||||
// System.out.println("read " + result + " bytes");
|
//System.out.println(channel.toString() + ": read " + result + " bytes");
|
||||||
if (result <= 0) {
|
if (result <= 0) {
|
||||||
var text = result == 0
|
var text = result == 0
|
||||||
? "Read zero bytes. Is the channel in non-blocking mode?"
|
? "Read zero bytes. Is the channel in non-blocking mode?"
|
||||||
: "Premature EOF";
|
: "Premature EOF";
|
||||||
readCompleted.completeExceptionally(new IOException(text));
|
readCompleted.completeExceptionally(new IOException(text));
|
||||||
} else if (buffer.hasRemaining()) {
|
} else if (buffer.hasRemaining()) {
|
||||||
// retry
|
// partial read
|
||||||
|
channel.read(buffer, timeout, timeUnit, null, this);
|
||||||
|
} else {
|
||||||
|
consumer.accept(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failed(Throwable exc, Object attachment) {
|
||||||
|
readCompleted.completeExceptionally(exc);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
this.channel.read(buffer, this.timeout, this.timeUnit, null, handler);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class AsyncByteChannelReader extends AsyncMessageReader {
|
||||||
|
private final AsynchronousByteChannel channel;
|
||||||
|
|
||||||
|
AsyncByteChannelReader(AsynchronousByteChannel channel, ReaderOptions options) {
|
||||||
|
super(options);
|
||||||
|
this.channel = channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
void read(int bytes, Consumer<? super ByteBuffer> consumer) {
|
||||||
|
var buffer = Serialize.makeByteBuffer(bytes);
|
||||||
|
var handler = new CompletionHandler<Integer, Object>() {
|
||||||
|
@Override
|
||||||
|
public void completed(Integer result, Object attachment) {
|
||||||
|
//System.out.println(channel.toString() + ": read " + result + " bytes");
|
||||||
|
if (result <= 0) {
|
||||||
|
var text = result == 0
|
||||||
|
? "Read zero bytes. Is the channel in non-blocking mode?"
|
||||||
|
: "Premature EOF";
|
||||||
|
readCompleted.completeExceptionally(new IOException(text));
|
||||||
|
} else if (buffer.hasRemaining()) {
|
||||||
|
// partial read
|
||||||
channel.read(buffer, null, this);
|
channel.read(buffer, null, this);
|
||||||
} else {
|
} else {
|
||||||
consumer.accept(buffer);
|
consumer.accept(buffer);
|
||||||
|
@ -325,39 +372,51 @@ public final class Serialize {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static CompletableFuture<MessageReader> readAsync(AsynchronousByteChannel channel, ReaderOptions options) {
|
public static CompletableFuture<MessageReader> readAsync(AsynchronousByteChannel channel, ReaderOptions options) {
|
||||||
return new AsyncMessageReader(channel, options).getMessage();
|
return new AsyncByteChannelReader(channel, options).getMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel) {
|
||||||
|
return readAsync(channel, ReaderOptions.DEFAULT_READER_OPTIONS, Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel, ReaderOptions options) {
|
||||||
|
return readAsync(channel, options, Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel, long timeout, TimeUnit timeUnit) {
|
||||||
|
return readAsync(channel, ReaderOptions.DEFAULT_READER_OPTIONS, timeout, timeUnit);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel, ReaderOptions options, long timeout, TimeUnit timeUnit) {
|
||||||
|
return new AsyncSocketReader(channel, options, timeout, timeUnit).getMessage();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static CompletableFuture<java.lang.Void> writeAsync(AsynchronousByteChannel outputChannel, MessageBuilder message) {
|
public static CompletableFuture<java.lang.Void> writeAsync(AsynchronousByteChannel outputChannel, MessageBuilder message) {
|
||||||
final var writeCompleted = new CompletableFuture<java.lang.Void>();
|
var writeCompleted = new CompletableFuture<java.lang.Void>();
|
||||||
final var segments = message.getSegmentsForOutput();
|
var segments = message.getSegmentsForOutput();
|
||||||
assert segments.length > 0: "Empty message";
|
var header = getHeaderForOutput(segments);
|
||||||
final int tableSize = (segments.length + 2) & (~1);
|
|
||||||
final var table = ByteBuffer.allocate(4 * tableSize);
|
|
||||||
|
|
||||||
table.order(ByteOrder.LITTLE_ENDIAN);
|
|
||||||
table.putInt(0, segments.length - 1);
|
|
||||||
|
|
||||||
for (int ii = 0; ii < segments.length; ++ii) {
|
|
||||||
table.putInt(4 * (ii + 1), segments[ii].limit() / 8);
|
|
||||||
}
|
|
||||||
|
|
||||||
outputChannel.write(table, 0, new CompletionHandler<>() {
|
|
||||||
|
|
||||||
|
outputChannel.write(header, -1, new CompletionHandler<>() {
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer result, Integer attachment) {
|
public void completed(Integer result, Integer index) {
|
||||||
//System.out.println("Wrote " + result + " bytes");
|
var currentSegment = index < 0 ? header : segments[index];
|
||||||
if (writeCompleted.isCancelled()) {
|
|
||||||
// TODO do we really want to interrupt here?
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (attachment == segments.length) {
|
if (result < 0) {
|
||||||
|
writeCompleted.completeExceptionally(new IOException("Write failed"));
|
||||||
|
}
|
||||||
|
else if (currentSegment.hasRemaining()) {
|
||||||
|
// partial write
|
||||||
|
outputChannel.write(currentSegment, index, this);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
index++;
|
||||||
|
if (index == segments.length) {
|
||||||
writeCompleted.complete(null);
|
writeCompleted.complete(null);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
outputChannel.write(segments[attachment], attachment + 1, this);
|
outputChannel.write(segments[index], index, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -365,6 +424,63 @@ public final class Serialize {
|
||||||
writeCompleted.completeExceptionally(exc);
|
writeCompleted.completeExceptionally(exc);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return writeCompleted.copy();
|
|
||||||
|
return writeCompleted;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CompletableFuture<java.lang.Void> writeAsync(AsynchronousSocketChannel outputChannel, MessageBuilder message) {
|
||||||
|
return writeAsync(outputChannel, message, Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CompletableFuture<java.lang.Void> writeAsync(AsynchronousSocketChannel outputChannel, MessageBuilder message, long timeout, TimeUnit timeUnit) {
|
||||||
|
var writeCompleted = new CompletableFuture<java.lang.Void>();
|
||||||
|
var segments = message.getSegmentsForOutput();
|
||||||
|
var header = getHeaderForOutput(segments);
|
||||||
|
long totalBytes = header.remaining();
|
||||||
|
|
||||||
|
// TODO avoid this copy?
|
||||||
|
var allSegments = new ByteBuffer[segments.length+1];
|
||||||
|
allSegments[0] = header;
|
||||||
|
for (int ii = 0; ii < segments.length; ++ii) {
|
||||||
|
var segment = segments[ii];
|
||||||
|
allSegments[ii+1] = segment;
|
||||||
|
totalBytes += segment.remaining();
|
||||||
|
}
|
||||||
|
|
||||||
|
outputChannel.write(allSegments, 0, allSegments.length, timeout, timeUnit, totalBytes, new CompletionHandler<>() {
|
||||||
|
@Override
|
||||||
|
public void completed(Long result, Long totalBytes) {
|
||||||
|
//System.out.println(outputChannel.toString() + ": Wrote " + result + "/" + totalBytes + " bytes");
|
||||||
|
if (result < 0) {
|
||||||
|
writeCompleted.completeExceptionally(new IOException("Write failed"));
|
||||||
|
}
|
||||||
|
else if (result < totalBytes) {
|
||||||
|
// partial write
|
||||||
|
outputChannel.write(allSegments, 0, allSegments.length, timeout, timeUnit, totalBytes - result, this);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
writeCompleted.complete(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failed(Throwable exc, Long attachment) {
|
||||||
|
writeCompleted.completeExceptionally(exc);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return writeCompleted;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ByteBuffer getHeaderForOutput(ByteBuffer[] segments) {
|
||||||
|
assert segments.length > 0: "Empty message";
|
||||||
|
int tableSize = (segments.length + 2) & (~1);
|
||||||
|
var table = ByteBuffer.allocate(4 * tableSize);
|
||||||
|
table.order(ByteOrder.LITTLE_ENDIAN);
|
||||||
|
table.putInt(0, segments.length - 1);
|
||||||
|
for (int ii = 0; ii < segments.length; ++ii) {
|
||||||
|
table.putInt(4 * (ii + 1), segments[ii].limit() / 8);
|
||||||
|
}
|
||||||
|
return table;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@
|
||||||
package org.capnproto;
|
package org.capnproto;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.SocketOptions;
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.AsynchronousServerSocketChannel;
|
import java.nio.channels.AsynchronousServerSocketChannel;
|
||||||
import java.nio.channels.AsynchronousSocketChannel;
|
import java.nio.channels.AsynchronousSocketChannel;
|
||||||
|
@ -72,10 +71,10 @@ public class SerializeTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
// read via AsyncChannel
|
// read via AsyncChannel
|
||||||
expectSerializesToAsync(exampleSegmentCount, exampleBytes);
|
expectSerializesToAsyncSocket(exampleSegmentCount, exampleBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void expectSerializesToAsync(int exampleSegmentCount, byte[] exampleBytes) throws IOException {
|
private void expectSerializesToAsyncSocket(int exampleSegmentCount, byte[] exampleBytes) throws IOException {
|
||||||
var done = new CompletableFuture<java.lang.Void>();
|
var done = new CompletableFuture<java.lang.Void>();
|
||||||
var server = AsynchronousServerSocketChannel.open();
|
var server = AsynchronousServerSocketChannel.open();
|
||||||
server.bind(null);
|
server.bind(null);
|
||||||
|
@ -108,10 +107,7 @@ public class SerializeTest {
|
||||||
checkSegmentContents(exampleSegmentCount, messageReader.arena);
|
checkSegmentContents(exampleSegmentCount, messageReader.arena);
|
||||||
done.get();
|
done.get();
|
||||||
}
|
}
|
||||||
catch (InterruptedException exc) {
|
catch (InterruptedException | ExecutionException exc) {
|
||||||
Assert.fail(exc.getMessage());
|
|
||||||
}
|
|
||||||
catch (ExecutionException exc) {
|
|
||||||
Assert.fail(exc.getMessage());
|
Assert.fail(exc.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue