From 119a682d4dd6be5d40e048c5b8ab62f09f807899 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Mon, 23 Nov 2020 19:53:35 +0000 Subject: [PATCH] memoise queuespipeline caps. use queues to order queuedclient resolution --- .../main/java/org/capnproto/Capability.java | 50 +++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index dfe48d6..7b85d26 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -1,5 +1,6 @@ package org.capnproto; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -546,6 +547,7 @@ public final class Capability { private final CompletableFuture promise; private final CompletableFuture selfResolutionOp; PipelineHook redirect; + private final Map, ClientHook> clientMap = new HashMap<>(); QueuedPipeline(CompletableFuture promiseParam) { this.promise = promiseParam; @@ -559,10 +561,14 @@ public final class Capability { @Override public final ClientHook getPipelinedCap(PipelineOp[] ops) { - return redirect != null - ? redirect.getPipelinedCap(ops) - : new QueuedClient(this.promise.thenApply( - pipeline -> pipeline.getPipelinedCap(ops))); + if (redirect != null) { + return redirect.getPipelinedCap(ops); + } + + var key = new ArrayList<>(Arrays.asList(ops)); + return this.clientMap.computeIfAbsent(key, + k -> new QueuedClient(this.promise.thenApply( + pipeline -> pipeline.getPipelinedCap(ops)))); } /* @Override @@ -580,14 +586,9 @@ public final class Capability { // A ClientHook which simply queues calls while waiting for a ClientHook to which to forward them. private static class QueuedClient implements ClientHook { - private final CompletableFuture setResolutionOp; + private final CompletableFuture selfResolutionOp; // Represents the operation which will set `redirect` when possible. - private final CompletableFuture promiseForCallForwarding = new CompletableFuture<>(); - // When this promise resolves, each queued call will be forwarded to the real client. This needs - // to occur *before* any 'whenMoreResolved()' promises resolve, because we want to make sure - // previously-queued calls are delivered before any new calls made in response to the resolution. - private final CompletableFuture promiseForClientResolution = new CompletableFuture<>(); // whenMoreResolved() returns forks of this promise. These must resolve *after* queued calls // have been initiated (so that any calls made in the whenMoreResolved() handler are correctly @@ -595,25 +596,28 @@ public final class Capability { // confuse the application if a queued call returns before the capability on which it was made // resolves). - private final CompletableFuture promiseForCallRelease = new CompletableFuture<>(); - // When this promise resolves, all pending calls will return. - private ClientHook redirect; + private final Queue> queuedCalls = new ArrayDeque<>(); + private final Queue pendingCalls = new ArrayDeque<>(); QueuedClient(CompletableFuture promise) { - this.setResolutionOp = promise.handle((inner, exc) -> { + this.selfResolutionOp = promise.handle((inner, exc) -> { this.redirect = exc == null ? inner : newBrokenCap(exc); - // Resolve the promise for call forwarding. - this.promiseForCallForwarding.complete(this.redirect); + // Resolve promises for call forwarding. + for (var call: this.queuedCalls) { + call.complete(this.redirect); + } // Now resolve the promise for client resolution this.promiseForClientResolution.complete(this.redirect); // Finally, execute any pending calls. - this.promiseForCallRelease.complete(null); + for (var hook: this.pendingCalls) { + hook.releaseCall(); + } return null; }); @@ -622,17 +626,21 @@ public final class Capability { @Override public Request newCall(long interfaceId, short methodId) { var hook = new LocalRequest(interfaceId, methodId, this); - this.promiseForCallRelease.thenRun(hook::releaseCall); + this.pendingCalls.add(hook); var root = hook.message.getRoot(AnyPointer.factory); return new AnyPointer.Request(root, hook); } @Override public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) { - var callResult = this.promiseForCallForwarding.thenApply( + var promise = new CompletableFuture(); + this.queuedCalls.add(promise); + + var callResult = promise.thenApply( client -> client.call(interfaceId, methodId, ctx)); - var pipeline = new QueuedPipeline(callResult.thenApply(result -> result.pipeline)); - return new VoidPromiseAndPipeline(callResult.thenRun(() -> {}), pipeline); + var pipelineResult = callResult.thenApply(result -> result.pipeline); + var pipeline = new QueuedPipeline(pipelineResult); + return new VoidPromiseAndPipeline(pipelineResult.thenRun(() -> {}), pipeline); } @Override