run the message loop asynchronously

This commit is contained in:
Vaci Koblizek 2020-11-12 21:23:08 +00:00
parent af47f1a825
commit 2ddc8e1d79
3 changed files with 24 additions and 22 deletions

View file

@ -260,7 +260,8 @@ final class RpcState<VatId> {
private final CompletableFuture<java.lang.Void> onDisconnect; private final CompletableFuture<java.lang.Void> onDisconnect;
private Throwable disconnected = null; private Throwable disconnected = null;
private CompletableFuture<java.lang.Void> messageReady = CompletableFuture.completedFuture(null); private CompletableFuture<java.lang.Void> messageReady = CompletableFuture.completedFuture(null);
private final CompletableFuture<java.lang.Void> messageLoop; private final CompletableFuture<java.lang.Void> messageLoop = new CompletableFuture<>();
// completes when the message loop exits
private final ReferenceQueue<Question> questionRefs = new ReferenceQueue<>(); private final ReferenceQueue<Question> questionRefs = new ReferenceQueue<>();
private final ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>(); private final ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>();
@ -270,7 +271,7 @@ final class RpcState<VatId> {
this.bootstrapFactory = bootstrapFactory; this.bootstrapFactory = bootstrapFactory;
this.connection = connection; this.connection = connection;
this.onDisconnect = onDisconnect; this.onDisconnect = onDisconnect;
this.messageLoop = this.doMessageLoop(); startMessageLoop();
} }
public CompletableFuture<java.lang.Void> getMessageLoop() { public CompletableFuture<java.lang.Void> getMessageLoop() {
@ -397,24 +398,30 @@ final class RpcState<VatId> {
return pipeline.getPipelinedCap(new PipelineOp[0]); return pipeline.getPipelinedCap(new PipelineOp[0]);
} }
private CompletableFuture<java.lang.Void> doMessageLoop() { private void startMessageLoop() {
if (isDisconnected()) { if (isDisconnected()) {
return CompletableFuture.failedFuture(this.disconnected); this.messageLoop.completeExceptionally(this.disconnected);
return;
} }
return connection.receiveIncomingMessage().thenCompose(message -> { var messageReader = this.connection.receiveIncomingMessage()
try { .thenAccept(message -> {
this.handleMessage(message); if (message == null) {
} catch (Exception rpcExc) { this.messageLoop.complete(null);
// either we received an Abort message from peer return;
// or internal RpcState is bad. }
return this.disconnect(rpcExc); try {
} this.handleMessage(message);
this.cleanupImports(); } catch (Exception rpcExc) {
this.cleanupQuestions(); // either we received an Abort message from peer
return this.doMessageLoop(); // or internal RpcState is bad.
this.disconnect(rpcExc);
}
this.cleanupImports();
this.cleanupQuestions();
});
}).exceptionallyCompose(exc -> this.disconnect(exc)); messageReader.thenRunAsync(this::startMessageLoop);
} }
private void handleMessage(IncomingRpcMessage message) throws RpcException { private void handleMessage(IncomingRpcMessage message) throws RpcException {

View file

@ -77,11 +77,7 @@ public class TwoPartyVatNetwork
public CompletableFuture<IncomingRpcMessage> receiveIncomingMessage() { public CompletableFuture<IncomingRpcMessage> receiveIncomingMessage() {
var message = Serialize.readAsync(channel) var message = Serialize.readAsync(channel)
.thenApply(reader -> (IncomingRpcMessage) new IncomingMessage(reader)) .thenApply(reader -> (IncomingRpcMessage) new IncomingMessage(reader))
.whenComplete((msg, exc) -> { .exceptionally(exc -> null);
if (exc != null) {
this.peerDisconnected.complete(null);
}
});
// send to message tap // send to message tap
if (this.tap != null) { if (this.tap != null) {

View file

@ -110,7 +110,6 @@ public class RpcStateTest {
var msg = new TestMessage(); var msg = new TestMessage();
msg.builder.getRoot(RpcProtocol.Message.factory).initUnimplemented(); msg.builder.getRoot(RpcProtocol.Message.factory).initUnimplemented();
this.connection.setNextIncomingMessage(msg); this.connection.setNextIncomingMessage(msg);
Assert.assertFalse(sent.isEmpty());
} }
@Test @Test