implement evalLast queue

This commit is contained in:
Vaci Koblizek 2020-11-17 16:18:23 +00:00
parent 054e4efdb1
commit 13dec22063

View file

@ -238,11 +238,11 @@ final class RpcState<VatId> {
private final VatNetwork.Connection<VatId> connection;
private final CompletableFuture<DisconnectInfo> disconnectFulfiller;
private Throwable disconnected = null;
private CompletableFuture<java.lang.Void> messageReady = CompletableFuture.completedFuture(null);
private final CompletableFuture<java.lang.Void> messageLoop = new CompletableFuture<>();
// completes when the message loop exits
private final ReferenceQueue<QuestionRef> questionRefs = new ReferenceQueue<>();
private final ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>();
private final Queue<Callable<java.lang.Void>> lastEvals = new ArrayDeque<>();
RpcState(BootstrapFactory<VatId> bootstrapFactory,
VatNetwork.Connection<VatId> connection,
@ -360,16 +360,8 @@ final class RpcState<VatId> {
}
// Run func() before the next IO event.
private <T> void evalLast(Callable<T> func) {
this.messageReady = this.messageReady.thenCompose(x -> {
try {
func.call();
}
catch (java.lang.Exception exc) {
return CompletableFuture.failedFuture(exc);
}
return CompletableFuture.completedFuture(null);
});
private void evalLast(Callable<java.lang.Void> func) {
this.lastEvals.add(func);
}
ClientHook restore() {
@ -402,7 +394,13 @@ final class RpcState<VatId> {
}
try {
this.handleMessage(message);
} catch (Exception rpcExc) {
while (!this.lastEvals.isEmpty()) {
this.lastEvals.remove().call();
}
}
catch (Throwable rpcExc) {
// either we received an Abort message from peer
// or internal RpcState is bad.
this.disconnect(rpcExc);
@ -410,8 +408,7 @@ final class RpcState<VatId> {
});
messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose(exc -> {
//System.out.println("Exception in startMessageLoop!");
//exc.printStackTrace();
//System.out.println("Exception in startMessageLoop!" + exc);
return CompletableFuture.failedFuture(exc);
});
}
@ -790,7 +787,6 @@ final class RpcState<VatId> {
// It appears this is a valid entry on the import table, but was not expected to be a
// promise.
assert imp.importClient == null : "Import already resolved.";
}
private void handleRelease(RpcProtocol.Release.Reader release) {
@ -1053,9 +1049,11 @@ final class RpcState<VatId> {
var exp = exports.find(descriptor.getReceiverHosted());
if (exp == null) {
return Capability.newBrokenCap("invalid 'receiverHosted' export ID");
} else if (exp.clientHook.getBrand() == this) {
}
else if (exp.clientHook.getBrand() == this) {
return new TribbleRaceBlocker(exp.clientHook);
} else {
}
else {
return exp.clientHook;
}
}
@ -1513,9 +1511,9 @@ final class RpcState<VatId> {
private RpcResponse resolved;
private Throwable broken;
final HashMap<List<PipelineOp>, ClientHook> clientMap = new HashMap<>();
final CompletableFuture<RpcResponse> redirectLater;
final CompletableFuture<java.lang.Void> resolveSelf;
private final HashMap<List<PipelineOp>, ClientHook> clientMap = new HashMap<>();
private final CompletableFuture<RpcResponse> redirectLater;
private final CompletableFuture<RpcResponse> resolveSelf;
RpcPipeline(QuestionRef questionRef,
CompletableFuture<RpcResponse> redirectLater) {
@ -1523,9 +1521,10 @@ final class RpcState<VatId> {
assert redirectLater != null;
this.redirectLater = redirectLater;
this.resolveSelf = this.redirectLater
.thenAccept(response -> {
.thenApply(response -> {
this.state = PipelineState.RESOLVED;
this.resolved = response;
return response;
})
.exceptionally(exc -> {
this.state = PipelineState.BROKEN;
@ -1534,8 +1533,12 @@ final class RpcState<VatId> {
});
}
/**
* Construct a new RpcPipeline that is never expected to resolve.
*/
RpcPipeline(QuestionRef questionRef) {
this(questionRef, null);
// TODO implement tail calls...
}
@Override
@ -1552,13 +1555,15 @@ final class RpcState<VatId> {
return pipelineClient;
}
var resolutionPromise = this.redirectLater.thenApply(
assert this.resolveSelf != null;
var resolutionPromise = this.resolveSelf.thenApply(
response -> response.getResults().getPipelinedCap(ops));
return new PromiseClient(pipelineClient, resolutionPromise, null);
}
case RESOLVED:
return resolved.getResults().getPipelinedCap(ops);
assert this.resolved != null;
return this.resolved.getResults().getPipelinedCap(ops);
default:
return Capability.newBrokenCap(broken);
@ -1807,7 +1812,7 @@ final class RpcState<VatId> {
private class PromiseClient extends RpcClient {
private final ClientHook cap;
private ClientHook cap;
private final Integer importId;
private boolean receivedCall = false;
private ResolutionType resolutionType = ResolutionType.UNRESOLVED;
@ -1940,7 +1945,7 @@ final class RpcState<VatId> {
var message = connection.newOutgoingMessage(sizeHint);
var disembargo = message.getBody().initAs(RpcProtocol.Message.factory).initDisembargo();
var redirect = RpcState.this.writeTarget(cap, disembargo.initTarget());
assert redirect == null;
assert redirect == null: "Original promise target should always be from this RPC connection.";
var embargo = embargos.next();
disembargo.getContext().setSenderLoopback(embargo.id);
@ -1952,6 +1957,7 @@ final class RpcState<VatId> {
message.send();
}
this.cap = replacement;
return replacement;
}
}