From 1cadca604efaa38dd22ac06c4738b2ce9828e2a4 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Fri, 20 Nov 2020 17:04:50 +0000 Subject: [PATCH] add releaseCall to delay call execution --- .../java/org/capnproto/CallContextHook.java | 4 + .../main/java/org/capnproto/Capability.java | 90 +++++++++++++------ .../main/java/org/capnproto/ClientHook.java | 1 - 3 files changed, 69 insertions(+), 26 deletions(-) diff --git a/runtime/src/main/java/org/capnproto/CallContextHook.java b/runtime/src/main/java/org/capnproto/CallContextHook.java index 9b00255..38ffe49 100644 --- a/runtime/src/main/java/org/capnproto/CallContextHook.java +++ b/runtime/src/main/java/org/capnproto/CallContextHook.java @@ -21,4 +21,8 @@ public interface CallContextHook { CompletableFuture onTailCall(); ClientHook.VoidPromiseAndPipeline directTailCall(RequestHook request); + + default CompletableFuture releaseCall() { + return CompletableFuture.completedFuture(null); + } } diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index f5738d2..dfe48d6 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -171,7 +171,7 @@ public final class Capability { } @Override - public AnyPointer.Request newCall(long interfaceId, short methodId) { + public Request newCall(long interfaceId, short methodId) { var hook = new LocalRequest(interfaceId, methodId, this); var root = hook.message.getRoot(AnyPointer.factory); return new AnyPointer.Request(root, hook); @@ -185,16 +185,20 @@ public final class Capability { return null; } - // We don't want to actually dispatch the call synchronously, because we don't want the callee - // to have any side effects before the promise is returned to the caller. This helps avoid - // race conditions. + // Note this comment from the C++ source: // - // So, we do an evalLater() here. + // "We don't want to actually dispatch the call synchronously, because we don't want the callee + // to have any side effects before the promise is returned to the caller. This helps avoid + // race conditions. // - // Note also that QueuedClient depends on this evalLater() to ensure that pipelined calls don't - // complete before 'whenMoreResolved()' promises resolve. - // TODO fix the above comment! we don't have the option of evalLater (yes) - var promise = this.whenResolved().thenCompose( + // So, we do an evalLater() here. + // + // Note also that QueuedClient depends on this evalLater() to ensure that pipelined calls don't + // complete before 'whenMoreResolved()' promises resolve." + // + // As the Java implementation doesn't (currently) have an evalLater() call, we obtain a promise + // from the CallContextHook that will be completed by QueuedClient when appropriate. + var promise = ctx.releaseCall().thenCompose( void_ -> this.callInternal(interfaceId, methodId, ctx)); var pipelinePromise = promise.thenApply(x -> { @@ -344,6 +348,7 @@ public final class Capability { final long interfaceId; final short methodId; final ClientHook client; + private final CompletableFuture callRelease = new CompletableFuture<>(); LocalRequest(long interfaceId, short methodId, ClientHook client) { this.interfaceId = interfaceId; @@ -354,7 +359,7 @@ public final class Capability { @Override public RemotePromise send() { var cancel = new CompletableFuture(); - var context = new LocalCallContext(message, client, cancel); + var context = new LocalCallContext(message, client, cancel, this.callRelease); var promiseAndPipeline = client.call(interfaceId, methodId, context); var promise = promiseAndPipeline.promise.thenApply(x -> { context.getResults(); // force allocation @@ -365,6 +370,7 @@ public final class Capability { promiseAndPipeline.promise.cancel(false); }); + this.callRelease.complete(null); assert promiseAndPipeline.pipeline != null; return new RemotePromise<>(promise, new AnyPointer.Pipeline(promiseAndPipeline.pipeline)); } @@ -380,6 +386,10 @@ public final class Capability { public Object getBrand() { return null; } + + void releaseCall() { + this.callRelease.complete(null); + } } private static final class LocalPipeline implements PipelineHook { @@ -410,6 +420,7 @@ public final class Capability { private static class LocalCallContext implements CallContextHook { final CompletableFuture cancelAllowed; + private final CompletableFuture callRelease; CompletableFuture tailCallPipeline; MessageBuilder request; Response response; @@ -418,10 +429,12 @@ public final class Capability { LocalCallContext(MessageBuilder request, ClientHook clientRef, - CompletableFuture cancelAllowed) { + CompletableFuture cancelAllowed, + CompletableFuture callRelease) { this.request = request; this.clientRef = clientRef; this.cancelAllowed = cancelAllowed; + this.callRelease = callRelease; } @Override @@ -441,6 +454,7 @@ public final class Capability { this.responseBuilder = localResponse.message.getRoot(AnyPointer.factory); this.response = new Response<>(this.responseBuilder.asReader(), localResponse); } + assert this.response != null; return this.responseBuilder; } @@ -466,13 +480,18 @@ public final class Capability { @Override public ClientHook.VoidPromiseAndPipeline directTailCall(RequestHook request) { - assert this.response == null: "Can't call tailCall() after initializing the results struct."; + assert this.response == null : "Can't call tailCall() after initializing the results struct."; var promise = request.send(); var voidPromise = promise._getResponse().thenAccept(tailResponse -> { this.response = tailResponse; }); return new ClientHook.VoidPromiseAndPipeline(voidPromise, promise.pipeline().hook); } + + @Override + public CompletableFuture releaseCall() { + return this.callRelease; + } } public static ClientHook newBrokenCap(String reason) { @@ -494,7 +513,7 @@ public final class Capability { static private ClientHook newBrokenClient(Throwable exc, boolean resolved, Object brand) { return new ClientHook() { @Override - public AnyPointer.Request newCall(long interfaceId, short methodId) { + public Request newCall(long interfaceId, short methodId) { var broken = Request.newBrokenRequest(AnyPointer.factory, exc); return new AnyPointer.Request(broken.getParams(), broken.getHook()); } @@ -561,28 +580,49 @@ public final class Capability { // A ClientHook which simply queues calls while waiting for a ClientHook to which to forward them. private static class QueuedClient implements ClientHook { - private final CompletableFuture promise; - private final CompletableFuture promiseForCallForwarding; - private final CompletableFuture promiseForClientResolution; private final CompletableFuture setResolutionOp; + // Represents the operation which will set `redirect` when possible. + + private final CompletableFuture promiseForCallForwarding = new CompletableFuture<>(); + // When this promise resolves, each queued call will be forwarded to the real client. This needs + // to occur *before* any 'whenMoreResolved()' promises resolve, because we want to make sure + // previously-queued calls are delivered before any new calls made in response to the resolution. + + private final CompletableFuture promiseForClientResolution = new CompletableFuture<>(); + // whenMoreResolved() returns forks of this promise. These must resolve *after* queued calls + // have been initiated (so that any calls made in the whenMoreResolved() handler are correctly + // delivered after calls made earlier), but *before* any queued calls return (because it might + // confuse the application if a queued call returns before the capability on which it was made + // resolves). + + private final CompletableFuture promiseForCallRelease = new CompletableFuture<>(); + // When this promise resolves, all pending calls will return. + private ClientHook redirect; QueuedClient(CompletableFuture promise) { - // TODO revisit futures - this.promise = promise; - this.promiseForCallForwarding = promise.toCompletableFuture().copy(); - this.promiseForClientResolution = promise.toCompletableFuture().copy(); - this.setResolutionOp = promise.thenAccept(inner -> { - this.redirect = inner; - }).exceptionally(exc -> { - this.redirect = newBrokenCap(exc); + this.setResolutionOp = promise.handle((inner, exc) -> { + this.redirect = exc == null + ? inner + : newBrokenCap(exc); + + // Resolve the promise for call forwarding. + this.promiseForCallForwarding.complete(this.redirect); + + // Now resolve the promise for client resolution + this.promiseForClientResolution.complete(this.redirect); + + // Finally, execute any pending calls. + this.promiseForCallRelease.complete(null); + return null; }); } @Override - public AnyPointer.Request newCall(long interfaceId, short methodId) { + public Request newCall(long interfaceId, short methodId) { var hook = new LocalRequest(interfaceId, methodId, this); + this.promiseForCallRelease.thenRun(hook::releaseCall); var root = hook.message.getRoot(AnyPointer.factory); return new AnyPointer.Request(root, hook); } diff --git a/runtime/src/main/java/org/capnproto/ClientHook.java b/runtime/src/main/java/org/capnproto/ClientHook.java index 7dd34de..2d4842d 100644 --- a/runtime/src/main/java/org/capnproto/ClientHook.java +++ b/runtime/src/main/java/org/capnproto/ClientHook.java @@ -1,7 +1,6 @@ package org.capnproto; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; public interface ClientHook {