move cleanup to end of message loop
This commit is contained in:
parent
94ca2a04e6
commit
86ccdd5a55
1 changed files with 10 additions and 7 deletions
|
@ -282,9 +282,9 @@ final class RpcState {
|
||||||
private final ReferenceQueue<Question> questionRefs = new ReferenceQueue<>();
|
private final ReferenceQueue<Question> questionRefs = new ReferenceQueue<>();
|
||||||
private final ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>();
|
private final ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>();
|
||||||
|
|
||||||
RpcState( Capability.Client bootstrapInterface,
|
RpcState(Capability.Client bootstrapInterface,
|
||||||
VatNetwork.Connection connection,
|
VatNetwork.Connection connection,
|
||||||
CompletableFuture<java.lang.Void> onDisconnect) {
|
CompletableFuture<java.lang.Void> onDisconnect) {
|
||||||
this.bootstrapInterface = bootstrapInterface;
|
this.bootstrapInterface = bootstrapInterface;
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.onDisconnect = onDisconnect;
|
this.onDisconnect = onDisconnect;
|
||||||
|
@ -295,6 +295,10 @@ final class RpcState {
|
||||||
return this.messageLoop;
|
return this.messageLoop;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<java.lang.Void> onDisconnect() {
|
||||||
|
return this.messageLoop;
|
||||||
|
}
|
||||||
|
|
||||||
CompletableFuture<java.lang.Void> disconnect(Throwable exc) {
|
CompletableFuture<java.lang.Void> disconnect(Throwable exc) {
|
||||||
if (isDisconnected()) {
|
if (isDisconnected()) {
|
||||||
return CompletableFuture.failedFuture(this.disconnected);
|
return CompletableFuture.failedFuture(this.disconnected);
|
||||||
|
@ -420,21 +424,20 @@ final class RpcState {
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<java.lang.Void> doMessageLoop() {
|
private CompletableFuture<java.lang.Void> doMessageLoop() {
|
||||||
this.cleanupImports();
|
|
||||||
this.cleanupQuestions();
|
|
||||||
|
|
||||||
if (isDisconnected()) {
|
if (isDisconnected()) {
|
||||||
return CompletableFuture.failedFuture(this.disconnected);
|
return CompletableFuture.failedFuture(this.disconnected);
|
||||||
}
|
}
|
||||||
|
|
||||||
return connection.receiveIncomingMessage().thenCompose(message -> {
|
return connection.receiveIncomingMessage().thenCompose(message -> {
|
||||||
try {
|
try {
|
||||||
handleMessage(message);
|
this.handleMessage(message);
|
||||||
} catch (Exception rpcExc) {
|
} catch (Exception rpcExc) {
|
||||||
// either we received an Abort message from peer
|
// either we received an Abort message from peer
|
||||||
// or internal RpcState is bad.
|
// or internal RpcState is bad.
|
||||||
return this.disconnect(rpcExc);
|
return this.disconnect(rpcExc);
|
||||||
}
|
}
|
||||||
|
this.cleanupImports();
|
||||||
|
this.cleanupQuestions();
|
||||||
return this.doMessageLoop();
|
return this.doMessageLoop();
|
||||||
|
|
||||||
}).exceptionallyCompose(exc -> this.disconnect(exc));
|
}).exceptionallyCompose(exc -> this.disconnect(exc));
|
||||||
|
|
Loading…
Reference in a new issue