From 054e4efdb15205d179e3ebc2acc6b046b279ed00 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Tue, 17 Nov 2020 14:32:37 +0000 Subject: [PATCH] Revert "resolve PromiseClient requests in order" This reverts commit d526eca4b9175ec1e2b89746b995b56b13723722. --- .../src/main/java/org/capnproto/RpcState.java | 51 +++++++++++++------ 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index 94322ff..d6894e6 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -1509,43 +1509,62 @@ final class RpcState { private class RpcPipeline implements PipelineHook { private final QuestionRef questionRef; + private PipelineState state = PipelineState.WAITING; + private RpcResponse resolved; + private Throwable broken; final HashMap, ClientHook> clientMap = new HashMap<>(); final CompletableFuture redirectLater; + final CompletableFuture resolveSelf; RpcPipeline(QuestionRef questionRef, CompletableFuture redirectLater) { this.questionRef = questionRef; assert redirectLater != null; this.redirectLater = redirectLater; + this.resolveSelf = this.redirectLater + .thenAccept(response -> { + this.state = PipelineState.RESOLVED; + this.resolved = response; + }) + .exceptionally(exc -> { + this.state = PipelineState.BROKEN; + this.broken = exc; + return null; + }); } RpcPipeline(QuestionRef questionRef) { this(questionRef, null); - // never resolves } @Override public ClientHook getPipelinedCap(PipelineOp[] ops) { - // We differ from the C++ implementation here. - // Previously, we would just store and return the resolved client, but this - // could cause tail calls to execute out of order. - // So instead we always chain resolution on the redirectLater promise, which - // ensures that each call initiated from this PromiseClient is executed in order. - // TODO avoid conversion to/from ArrayList? var key = new ArrayList<>(Arrays.asList(ops)); - return this.clientMap.computeIfAbsent(key, k -> { - var pipelineClient = new PipelineClient(this.questionRef, ops); - if (this.redirectLater == null) { - // This pipeline will never get redirected, so just return the PipelineClient. - return pipelineClient; - } - var resolutionPromise = this.redirectLater.thenApply( - response -> response.getResults().getPipelinedCap(ops)); - return new PromiseClient(pipelineClient, resolutionPromise, null); + var hook = this.clientMap.computeIfAbsent(key, k -> { + switch (state) { + case WAITING: { + var pipelineClient = new PipelineClient(this.questionRef, ops); + if (this.redirectLater == null) { + // This pipeline will never get redirected, so just return the PipelineClient. + return pipelineClient; + } + + var resolutionPromise = this.redirectLater.thenApply( + response -> response.getResults().getPipelinedCap(ops)); + return new PromiseClient(pipelineClient, resolutionPromise, null); + } + + case RESOLVED: + return resolved.getResults().getPipelinedCap(ops); + + default: + return Capability.newBrokenCap(broken); + } }); + return hook; } @Override