From 07f8f22acde4067752d40f6fbe54b8908229b3d0 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Wed, 25 Nov 2020 17:21:47 +0000 Subject: [PATCH] implement call blocking stack --- .../java/org/capnproto/CapabilityTest.java | 47 +++++++++++++ .../main/java/org/capnproto/Capability.java | 69 +++++++++++++++---- .../org/capnproto/DispatchCallResult.java | 6 +- 3 files changed, 102 insertions(+), 20 deletions(-) diff --git a/runtime-rpc/src/test/java/org/capnproto/CapabilityTest.java b/runtime-rpc/src/test/java/org/capnproto/CapabilityTest.java index 4843392..7354a10 100644 --- a/runtime-rpc/src/test/java/org/capnproto/CapabilityTest.java +++ b/runtime-rpc/src/test/java/org/capnproto/CapabilityTest.java @@ -238,10 +238,57 @@ public final class CapabilityTest { var server = new RpcTestUtil.TestStreamingImpl(); var cap = new Test.TestStreaming.Client(server); + CompletableFuture promise1 = null; + CompletableFuture promise2 = null; + CompletableFuture promise3 = null; + { var req = cap.doStreamIRequest(); + req.getParams().setI(123); + promise1 = req.send(); } + { + var req = cap.doStreamJRequest(); + req.getParams().setJ(321); + promise2 = req.send(); + } + { + var req = cap.doStreamIRequest(); + req.getParams().setI(456); + promise3 = req.send(); + } + + var promise4 = cap.finishStreamRequest().send(); + + // Only the first streaming call has executed + Assert.assertEquals(123, server.iSum); + Assert.assertEquals(0, server.jSum); + + // Complete first streaming call + Assert.assertNotNull(server.fulfiller); + server.fulfiller.complete(null); + + // second streaming call unblocked + Assert.assertEquals(123, server.iSum); + Assert.assertEquals(321, server.jSum); + + // complete second streaming call + Assert.assertNotNull(server.fulfiller); + server.fulfiller.complete(null); + + // third streaming call unblocked + Assert.assertEquals(579, server.iSum); + Assert.assertEquals(321, server.jSum); + + // complete third streaming call + Assert.assertNotNull(server.fulfiller); + server.fulfiller.complete(null); + + // last call is unblocked + var result = promise4.join(); + Assert.assertEquals(579, result.getTotalI()); + Assert.assertEquals(321, result.getTotalJ()); } } diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index 181e89e..71bcbd1 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -177,9 +177,12 @@ public final class Capability { } private final class LocalClient implements ClientHook { + private CompletableFuture resolveTask; private ClientHook resolved; private boolean blocked = false; + private Throwable brokenException; + private final Queue blockedCalls = new ArrayDeque<>(); private final CapabilityServerSetBase capServerSet; LocalClient() { @@ -206,12 +209,6 @@ public final class Capability { @Override public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) { - assert !blocked: "Blocked condition not implemented"; - if (blocked) { - // TODO implement blocked behaviour - return null; - } - // Note this comment from the C++ source: // // "We don't want to actually dispatch the call synchronously, because we don't want the callee @@ -225,12 +222,27 @@ public final class Capability { // // As the Java implementation doesn't (currently) have an evalLater() call, we obtain a promise // from the CallContextHook that will be completed by QueuedClient when appropriate. - var promise = ctx.releaseCall().thenCompose( - void_ -> this.callInternal(interfaceId, methodId, ctx)); + var promise = ctx.releaseCall().thenCompose(void_ -> { + if (blocked) { + var blockedCall = new CompletableFuture(); + this.blockedCalls.add(() -> callInternal(interfaceId, methodId, ctx).whenComplete((result, exc) -> { + if (exc == null) { + blockedCall.complete(result); + } + else { + blockedCall.completeExceptionally(exc); + } + })); + return blockedCall; + } + else { + return this.callInternal(interfaceId, methodId, ctx); + } + }); var pipelinePromise = promise.thenApply(x -> { ctx.releaseParams(); - return (PipelineHook)new LocalPipeline(ctx); + return (PipelineHook) new LocalPipeline(ctx); }); var tailCall = ctx.onTailCall().thenApply(pipeline -> pipeline.hook); @@ -264,31 +276,58 @@ public final class Capability { return BRAND; } + void unblock() { + this.blocked = false; + while (!this.blocked) { + if (this.blockedCalls.isEmpty()) { + break; + } + var call = this.blockedCalls.remove(); + call.run(); + } + } + CompletableFuture callInternal(long interfaceId, short methodId, CallContextHook ctx) { + assert !this.blocked; + + if (this.brokenException != null) { + return CompletableFuture.failedFuture(this.brokenException); + } + var result = dispatchCall( interfaceId, methodId, new CallContext<>(AnyPointer.factory, AnyPointer.factory, ctx)); - if (result.isStreaming()) { - // TODO streaming - return null; + if (!result.isStreaming()) { + return result.promise; } else { - return result.getPromise(); + this.blocked = true; + return result.promise.exceptionallyCompose(exc -> { + this.brokenException = exc; + return CompletableFuture.failedFuture(exc); + }).whenComplete((void_, exc) -> { + this.unblock(); + }); } } public CompletableFuture getLocalServer(CapabilityServerSetBase capServerSet) { if (this.capServerSet == capServerSet) { if (this.blocked) { - assert false: "Blocked local server not implemented"; + var promise = new CompletableFuture(); + this.blockedCalls.add(() -> promise.complete(Server.this)); + return promise; } - return CompletableFuture.completedFuture(Server.this); } return null; } } + public Integer getFd() { + return null; + } + /** * If this returns non-null, then it is a promise which, when resolved, points to a new * capability to which future calls can be sent. Use this in cases where an object implementation diff --git a/runtime/src/main/java/org/capnproto/DispatchCallResult.java b/runtime/src/main/java/org/capnproto/DispatchCallResult.java index 536a2a0..1c7c86c 100644 --- a/runtime/src/main/java/org/capnproto/DispatchCallResult.java +++ b/runtime/src/main/java/org/capnproto/DispatchCallResult.java @@ -4,7 +4,7 @@ import java.util.concurrent.CompletableFuture; public final class DispatchCallResult { - private final CompletableFuture promise; + final CompletableFuture promise; private final boolean streaming; public DispatchCallResult(CompletableFuture promise, boolean isStreaming) { @@ -16,10 +16,6 @@ public final class DispatchCallResult { this(CompletableFuture.failedFuture(exc), false); } - public CompletableFuture getPromise() { - return promise.copy(); - } - public boolean isStreaming() { return streaming; }