diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index dabc299..b7c5e96 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -1500,63 +1500,42 @@ final class RpcState { private class RpcPipeline implements PipelineHook { private final Question question; - private PipelineState state = PipelineState.WAITING; - private RpcResponse resolved; - private Throwable broken; final HashMap, ClientHook> clientMap = new HashMap<>(); final CompletableFuture redirectLater; - final CompletableFuture resolveSelf; RpcPipeline(Question question, CompletableFuture redirectLater) { this.question = question; this.redirectLater = redirectLater; - this.resolveSelf = this.redirectLater - .thenAccept(response -> { - this.state = PipelineState.RESOLVED; - this.resolved = response; - }) - .exceptionally(exc -> { - this.state = PipelineState.BROKEN; - this.broken = exc; - return null; - }); } RpcPipeline(Question question) { + this(question, null); // never resolves - this.question = question; - this.redirectLater = null; - this.resolveSelf = null; } @Override public ClientHook getPipelinedCap(PipelineOp[] ops) { + // We differ from the C++ implementation here. + // Previously, we would just store and return the resolved client, but this + // could cause tail calls to execute out of order. + // So instead we always chain resolution on the redirectLater promise, which + // ensures that each call initiated from this PromiseClient is executed in order. + // TODO avoid conversion to/from ArrayList? var key = new ArrayList<>(Arrays.asList(ops)); - var hook = this.clientMap.computeIfAbsent(key, k -> { - switch (state) { - case WAITING: { - var pipelineClient = new PipelineClient(this.question, ops); - if (this.redirectLater == null) { - // This pipeline will never get redirected, so just return the PipelineClient. - return pipelineClient; - } - - var resolutionPromise = this.redirectLater.thenApply( - response -> response.getResults().getPipelinedCap(ops)); - return new PromiseClient(pipelineClient, resolutionPromise, null); - } - - case RESOLVED: - return resolved.getResults().getPipelinedCap(ops); - - default: - return Capability.newBrokenCap(broken); + return this.clientMap.computeIfAbsent(key, k -> { + var pipelineClient = new PipelineClient(this.question, ops); + if (this.redirectLater == null) { + // This pipeline will never get redirected, so just return the PipelineClient. + return pipelineClient; } + + var resolutionPromise = this.redirectLater.thenApply( + response -> response.getResults().getPipelinedCap(ops)); + return new PromiseClient(pipelineClient, resolutionPromise, null); }); - return hook; } }