tidy up TwoPartyVatNetwork

This commit is contained in:
Vaci Koblizek 2020-10-22 13:27:36 +01:00
parent e21ba577b5
commit 3161e246ae

View file

@ -12,7 +12,6 @@ public class TwoPartyVatNetwork
void incoming(IncomingRpcMessage message, RpcTwoPartyProtocol.Side side);
}
private static BuilderArena.AllocationStrategy allocationStrategy = BuilderArena.SUGGESTED_ALLOCATION_STRATEGY;
private CompletableFuture<java.lang.Void> previousWrite = CompletableFuture.completedFuture(null);
private final CompletableFuture<java.lang.Void> peerDisconnected = new CompletableFuture<>();
private final AsynchronousSocketChannel channel;
@ -68,22 +67,30 @@ public class TwoPartyVatNetwork
@Override
public CompletableFuture<IncomingRpcMessage> receiveIncomingMessage() {
return Serialize.readAsync(channel)
var message = Serialize.readAsync(channel)
.thenApply(reader -> (IncomingRpcMessage) new IncomingMessage(reader))
.whenComplete((msg, exc) -> {
if (exc != null) {
this.peerDisconnected.complete(null);
}
})
.whenComplete((msg, exc) -> {
if (this.tap != null && msg != null) {
this.tap.incoming(
msg,
this.getSide() == RpcTwoPartyProtocol.Side.CLIENT
? RpcTwoPartyProtocol.Side.SERVER
: RpcTwoPartyProtocol.Side.CLIENT);
}
});
// send to message tap
if (this.tap != null) {
message = message.whenComplete((msg, exc) -> {
if (this.tap == null || msg == null) {
return;
}
var side = this.getSide() == RpcTwoPartyProtocol.Side.CLIENT
? RpcTwoPartyProtocol.Side.SERVER
: RpcTwoPartyProtocol.Side.CLIENT;
this.tap.incoming(msg, side);
});
}
return message;
}
@Override
@ -114,11 +121,13 @@ public class TwoPartyVatNetwork
final class OutgoingMessage implements OutgoingRpcMessage {
final MessageBuilder message;
List<Integer> fds = List.of();
private final MessageBuilder message;
private List<Integer> fds = List.of();
OutgoingMessage(int firstSegmentWordSize) {
this.message = new MessageBuilder(firstSegmentWordSize, allocationStrategy);
this.message = new MessageBuilder(firstSegmentWordSize == 0
? BuilderArena.SUGGESTED_FIRST_SEGMENT_WORDS
: firstSegmentWordSize);
}
@Override
@ -148,8 +157,8 @@ public class TwoPartyVatNetwork
final class IncomingMessage implements IncomingRpcMessage {
final MessageReader message;
final List<Integer> fds;
private final MessageReader message;
private final List<Integer> fds;
IncomingMessage(MessageReader message) {
this(message, List.of());
@ -162,12 +171,12 @@ public class TwoPartyVatNetwork
@Override
public AnyPointer.Reader getBody() {
return message.getRoot(AnyPointer.factory);
return this.message.getRoot(AnyPointer.factory);
}
@Override
public List<Integer> getAttachedFds() {
return fds;
return this.fds;
}
}
}