resolve PromiseClient requests in order
This commit is contained in:
parent
cd8e096f3f
commit
d526eca4b9
1 changed files with 16 additions and 37 deletions
|
@ -1500,63 +1500,42 @@ final class RpcState<VatId> {
|
|||
private class RpcPipeline implements PipelineHook {
|
||||
|
||||
private final Question question;
|
||||
private PipelineState state = PipelineState.WAITING;
|
||||
private RpcResponse resolved;
|
||||
private Throwable broken;
|
||||
|
||||
final HashMap<List<PipelineOp>, ClientHook> clientMap = new HashMap<>();
|
||||
final CompletableFuture<RpcResponse> redirectLater;
|
||||
final CompletableFuture<java.lang.Void> resolveSelf;
|
||||
|
||||
RpcPipeline(Question question,
|
||||
CompletableFuture<RpcResponse> redirectLater) {
|
||||
this.question = question;
|
||||
this.redirectLater = redirectLater;
|
||||
this.resolveSelf = this.redirectLater
|
||||
.thenAccept(response -> {
|
||||
this.state = PipelineState.RESOLVED;
|
||||
this.resolved = response;
|
||||
})
|
||||
.exceptionally(exc -> {
|
||||
this.state = PipelineState.BROKEN;
|
||||
this.broken = exc;
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
RpcPipeline(Question question) {
|
||||
this(question, null);
|
||||
// never resolves
|
||||
this.question = question;
|
||||
this.redirectLater = null;
|
||||
this.resolveSelf = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientHook getPipelinedCap(PipelineOp[] ops) {
|
||||
// We differ from the C++ implementation here.
|
||||
// Previously, we would just store and return the resolved client, but this
|
||||
// could cause tail calls to execute out of order.
|
||||
// So instead we always chain resolution on the redirectLater promise, which
|
||||
// ensures that each call initiated from this PromiseClient is executed in order.
|
||||
|
||||
// TODO avoid conversion to/from ArrayList?
|
||||
var key = new ArrayList<>(Arrays.asList(ops));
|
||||
var hook = this.clientMap.computeIfAbsent(key, k -> {
|
||||
switch (state) {
|
||||
case WAITING: {
|
||||
var pipelineClient = new PipelineClient(this.question, ops);
|
||||
if (this.redirectLater == null) {
|
||||
// This pipeline will never get redirected, so just return the PipelineClient.
|
||||
return pipelineClient;
|
||||
}
|
||||
|
||||
var resolutionPromise = this.redirectLater.thenApply(
|
||||
response -> response.getResults().getPipelinedCap(ops));
|
||||
return new PromiseClient(pipelineClient, resolutionPromise, null);
|
||||
}
|
||||
|
||||
case RESOLVED:
|
||||
return resolved.getResults().getPipelinedCap(ops);
|
||||
|
||||
default:
|
||||
return Capability.newBrokenCap(broken);
|
||||
return this.clientMap.computeIfAbsent(key, k -> {
|
||||
var pipelineClient = new PipelineClient(this.question, ops);
|
||||
if (this.redirectLater == null) {
|
||||
// This pipeline will never get redirected, so just return the PipelineClient.
|
||||
return pipelineClient;
|
||||
}
|
||||
|
||||
var resolutionPromise = this.redirectLater.thenApply(
|
||||
response -> response.getResults().getPipelinedCap(ops));
|
||||
return new PromiseClient(pipelineClient, resolutionPromise, null);
|
||||
});
|
||||
return hook;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue