From 2d072a6b12fb1e892914343eafaa9283980a7929 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Wed, 25 Nov 2020 15:47:02 +0000 Subject: [PATCH] implement streaming requests --- compiler/src/main/cpp/capnpc-java.c++ | 5 +- .../src/main/java/org/capnproto/RpcState.java | 4 +- .../java/org/capnproto/CapabilityTest.java | 14 +++++- .../test/java/org/capnproto/RpcTestUtil.java | 44 +++++++++++++++-- runtime-rpc/src/test/schema/test.capnp | 7 +++ .../main/java/org/capnproto/AnyPointer.java | 47 +++++++++++++++++++ .../main/java/org/capnproto/Capability.java | 42 +++++++++++++---- .../src/main/java/org/capnproto/Request.java | 34 ++++++++------ .../main/java/org/capnproto/RequestBase.java | 20 ++++++++ .../main/java/org/capnproto/RequestHook.java | 4 +- .../org/capnproto/StreamingCallContext.java | 16 +++++-- .../java/org/capnproto/StreamingRequest.java | 27 +++++------ 12 files changed, 208 insertions(+), 56 deletions(-) create mode 100644 runtime/src/main/java/org/capnproto/RequestBase.java diff --git a/compiler/src/main/cpp/capnpc-java.c++ b/compiler/src/main/cpp/capnpc-java.c++ index 76ffe25..49bc4a9 100644 --- a/compiler/src/main/cpp/capnpc-java.c++ +++ b/compiler/src/main/cpp/capnpc-java.c++ @@ -2027,9 +2027,6 @@ private: kj::strTree( sp, " public static final class ", methodName, " {\n", sp, " public interface Request extends org.capnproto.Request<", paramBuilder, "> {\n", - sp, " default org.capnproto.FromPointerBuilder<", paramBuilder, "> getParamsFactory() {\n", - sp, " return ", paramFactory, ";\n", - sp, " }\n", sp, " default Response send() {\n", sp, " return new Response(this.sendInternal());\n", sp, " }\n", @@ -2050,7 +2047,7 @@ private: // client call kj::strTree( sp, "public Methods.", methodName, ".Request ", methodName, "Request() {\n", - sp, " var result = newCall(0x", interfaceIdHex, "L, (short)", methodId, ");\n", + sp, " var result = newCall(", paramFactory, ", 0x", interfaceIdHex, "L, (short)", methodId, ");\n", sp, " return () -> result;\n", sp, "}\n" ), diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index 1753374..852c3cd 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -1698,9 +1698,9 @@ final class RpcState { } @Override - public CompletionStage sendStreaming() { + public CompletableFuture sendStreaming() { // TODO falling back to regular send for now... - return send(); + return send().thenApply(results -> null); } QuestionRef sendInternal(boolean isTailCall) { diff --git a/runtime-rpc/src/test/java/org/capnproto/CapabilityTest.java b/runtime-rpc/src/test/java/org/capnproto/CapabilityTest.java index 23e005f..4843392 100644 --- a/runtime-rpc/src/test/java/org/capnproto/CapabilityTest.java +++ b/runtime-rpc/src/test/java/org/capnproto/CapabilityTest.java @@ -215,8 +215,6 @@ public final class CapabilityTest { var factory = Test.TestGenerics.newFactory(Test.TestAllTypes.factory, AnyPointer.factory); } - - @org.junit.Test public void thisCap() { var callCount = new Counter(); @@ -234,4 +232,16 @@ public final class CapabilityTest { client2.barRequest().send().join(); Assert.assertEquals(3, callCount.value()); } + + @org.junit.Test + public void testStreamingCallsBlockSubsequentCalls() { + var server = new RpcTestUtil.TestStreamingImpl(); + var cap = new Test.TestStreaming.Client(server); + + { + var req = cap.doStreamIRequest(); + } + + + } } diff --git a/runtime-rpc/src/test/java/org/capnproto/RpcTestUtil.java b/runtime-rpc/src/test/java/org/capnproto/RpcTestUtil.java index 319037e..1de666f 100644 --- a/runtime-rpc/src/test/java/org/capnproto/RpcTestUtil.java +++ b/runtime-rpc/src/test/java/org/capnproto/RpcTestUtil.java @@ -114,7 +114,7 @@ class RpcTestUtil { this.callCount = callCount; this.handleCount = handleCount; } - + @Override protected CompletableFuture echo(CallContext context) { this.callCount.inc(); @@ -129,9 +129,9 @@ class RpcTestUtil { var cap = context.getParams().getCap(); context.allowCancellation(); return new CompletableFuture().whenComplete((void_, exc) -> { - if (exc != null) { - System.out.println("expectCancel completed exceptionally: " + exc.getMessage()); - } + if (exc != null) { + System.out.println("expectCancel completed exceptionally: " + exc.getMessage()); + } }); // never completes, just await doom... } @@ -230,6 +230,7 @@ class RpcTestUtil { public TestTailCalleeImpl(Counter count) { this(count, READY_NOW); } + public TestTailCalleeImpl(Counter count, CompletableFuture releaseMe) { this.count = count; this.releaseMe = releaseMe; @@ -328,5 +329,40 @@ class RpcTestUtil { return this.impl.foo(context); } } + + static class TestStreamingImpl + extends Test.TestStreaming.Server { + + public int iSum = 0; + public int jSum = 0; + CompletableFuture fulfiller; + boolean jShouldThrow = false; + + @Override + protected CompletableFuture doStreamI(StreamingCallContext context) { + iSum += context.getParams().getI(); + fulfiller = new CompletableFuture<>(); + return fulfiller; + } + + @Override + protected CompletableFuture doStreamJ(StreamingCallContext context) { + context.allowCancellation(); + jSum += context.getParams().getJ(); + if (jShouldThrow) { + return CompletableFuture.failedFuture(RpcException.failed("throw requested")); + } + fulfiller = new CompletableFuture<>(); + return fulfiller; + } + + @Override + protected CompletableFuture finishStream(CallContext context) { + var results = context.getResults(); + results.setTotalI(iSum); + results.setTotalJ(jSum); + return READY_NOW; + } + } } diff --git a/runtime-rpc/src/test/schema/test.capnp b/runtime-rpc/src/test/schema/test.capnp index fff9a0c..8c6c8e2 100644 --- a/runtime-rpc/src/test/schema/test.capnp +++ b/runtime-rpc/src/test/schema/test.capnp @@ -95,6 +95,13 @@ interface TestTailCaller { foo @0 (i :Int32, callee :TestTailCallee) -> TestTailCallee.TailResult; } +interface TestStreaming { + doStreamI @0 (i: UInt32) -> stream; + doStreamJ @1 (j: UInt32) -> stream; + finishStream @2 () -> (totalI :UInt32, totalJ: UInt32); + # Test streaming. finishStream() returns the totals of the values streamed to the other calls. +} + interface TestHandle {} interface TestMoreStuff extends(TestCallOrder) { diff --git a/runtime/src/main/java/org/capnproto/AnyPointer.java b/runtime/src/main/java/org/capnproto/AnyPointer.java index 838207f..5fba51d 100644 --- a/runtime/src/main/java/org/capnproto/AnyPointer.java +++ b/runtime/src/main/java/org/capnproto/AnyPointer.java @@ -21,6 +21,8 @@ package org.capnproto; +import java.util.concurrent.CompletableFuture; + public final class AnyPointer { public static final class Factory implements PointerFactory, @@ -209,6 +211,11 @@ public final class AnyPointer { return this; } + @Override + public org.capnproto.Request getBaseRequest() { + return this; + } + @Override public RequestHook getHook() { return this.requestHook; @@ -228,4 +235,44 @@ public final class AnyPointer { return this.requestHook.send(); } } + + public static final class StreamingRequest + implements org.capnproto.StreamingRequest { + + private final Builder params; + private final RequestHook requestHook; + + StreamingRequest(AnyPointer.Request request) { + this(request.params, request.requestHook); + } + + StreamingRequest(Builder params, RequestHook requestHook) { + this.params = params; + this.requestHook = requestHook; + } + + @Override + public Builder getParams() { + return this.params; + } + + @Override + public org.capnproto.StreamingRequest getTypelessRequest() { + return this; + } + + @Override + public RequestHook getHook() { + return this.requestHook; + } + + @Override + public FromPointerBuilder getParamsFactory() { + return AnyPointer.factory; + } + + public CompletableFuture send() { + return this.requestHook.sendStreaming(); + } + } } diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index ebe8916..8c720fc 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -87,14 +87,40 @@ public final class Capability { return CompletableFuture.completedFuture(null); } - default Request newCall(long interfaceId, short methodId) { - return this.getHook().newCall(interfaceId, methodId); + default Request newCall(FromPointerBuilder paramsFactory, long interfaceId, short methodId) { + var request = this.getHook().newCall(interfaceId, methodId); + return new Request<>() { + @Override + public FromPointerBuilder getParamsFactory() { + return paramsFactory; + } + + @Override + public Request getTypelessRequest() { + return request; + } + + @Override + public Request getBaseRequest() { + return this; + } + }; } - default StreamingRequest newStreamingCall(FromPointerBuilder paramsBuilder, - long interfaceId, short methodId) { - var request = getHook().newCall(interfaceId, methodId); - return new StreamingRequest<> (paramsBuilder, request.getParams(), request.getHook()); + default StreamingRequest newStreamingCall(FromPointerBuilder paramsFactory, long interfaceId, short methodId) { + var request = this.getHook().newCall(interfaceId, methodId); + var streamingRequest = new AnyPointer.StreamingRequest(request.getParams(), request.getHook()); + return new StreamingRequest<>() { + @Override + public FromPointerBuilder getParamsFactory() { + return paramsFactory; + } + + @Override + public StreamingRequest getTypelessRequest() { + return streamingRequest; + } + }; } } @@ -377,10 +403,10 @@ public final class Capability { } @Override - public CompletableFuture sendStreaming() { + public CompletableFuture sendStreaming() { // We don't do any special handling of streaming in RequestHook for local requests, because // there is no latency to compensate for between the client and server in this case. - return send(); + return send().thenApply(results -> null); } @Override diff --git a/runtime/src/main/java/org/capnproto/Request.java b/runtime/src/main/java/org/capnproto/Request.java index 29988a1..5e24a82 100644 --- a/runtime/src/main/java/org/capnproto/Request.java +++ b/runtime/src/main/java/org/capnproto/Request.java @@ -2,23 +2,17 @@ package org.capnproto; import java.util.concurrent.CompletableFuture; -public interface Request { +public interface Request + extends RequestBase { - FromPointerBuilder getParamsFactory(); + RequestBase getBaseRequest(); - Request getTypelessRequest(); - - default Params getParams() { - return this.getTypelessRequest().getParams().getAs(this.getParamsFactory()); + default FromPointerBuilder getParamsFactory() { + return getBaseRequest().getParamsFactory(); } - default RequestHook getHook() { - return this.getTypelessRequest().getHook(); - } - - - default RemotePromise sendInternal() { - return this.getTypelessRequest().sendInternal(); + default RequestBase getTypelessRequest() { + return getBaseRequest().getTypelessRequest(); } static Request newBrokenRequest(FromPointerBuilder paramsFactory, @@ -34,7 +28,7 @@ public interface Request { } @Override - public CompletableFuture sendStreaming() { + public CompletableFuture sendStreaming() { return CompletableFuture.failedFuture(exc); } }; @@ -46,9 +40,14 @@ public interface Request { } @Override - public Request getTypelessRequest() { + public RequestBase getTypelessRequest() { return new AnyPointer.Request(message.getRoot(AnyPointer.factory), hook); } + + @Override + public Request getBaseRequest() { + return this; + } }; } @@ -65,6 +64,11 @@ public interface Request { public Request getTypelessRequest() { return typeless; } + + @Override + public Request getBaseRequest() { + return this; + } }; } } diff --git a/runtime/src/main/java/org/capnproto/RequestBase.java b/runtime/src/main/java/org/capnproto/RequestBase.java new file mode 100644 index 0000000..fb3b5f5 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/RequestBase.java @@ -0,0 +1,20 @@ +package org.capnproto; + +public interface RequestBase { + + FromPointerBuilder getParamsFactory(); + + RequestBase getTypelessRequest(); + + default Params getParams() { + return this.getTypelessRequest().getParams().getAs(this.getParamsFactory()); + } + + default RequestHook getHook() { + return this.getTypelessRequest().getHook(); + } + + default RemotePromise sendInternal() { + return this.getTypelessRequest().sendInternal(); + } +} diff --git a/runtime/src/main/java/org/capnproto/RequestHook.java b/runtime/src/main/java/org/capnproto/RequestHook.java index 7e90f3e..08607a4 100644 --- a/runtime/src/main/java/org/capnproto/RequestHook.java +++ b/runtime/src/main/java/org/capnproto/RequestHook.java @@ -1,12 +1,12 @@ package org.capnproto; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.CompletableFuture; public interface RequestHook { RemotePromise send(); - CompletionStage sendStreaming(); + CompletableFuture sendStreaming(); default Object getBrand() { return null; diff --git a/runtime/src/main/java/org/capnproto/StreamingCallContext.java b/runtime/src/main/java/org/capnproto/StreamingCallContext.java index 750d8dc..a4128c0 100644 --- a/runtime/src/main/java/org/capnproto/StreamingCallContext.java +++ b/runtime/src/main/java/org/capnproto/StreamingCallContext.java @@ -2,12 +2,20 @@ package org.capnproto; public class StreamingCallContext { - private final FromPointerReader params; - final CallContextHook hook; + private final FromPointerReader paramsFactory; + private final CallContextHook hook; - public StreamingCallContext(FromPointerReader params, + public StreamingCallContext(FromPointerReader paramsFactory, CallContextHook hook) { - this.params = params; + this.paramsFactory = paramsFactory; this.hook = hook; } + + public final Params getParams() { + return this.hook.getParams().getAs(paramsFactory); + } + + public final void allowCancellation() { + this.hook.allowCancellation(); + } } diff --git a/runtime/src/main/java/org/capnproto/StreamingRequest.java b/runtime/src/main/java/org/capnproto/StreamingRequest.java index a66182b..1cf6385 100644 --- a/runtime/src/main/java/org/capnproto/StreamingRequest.java +++ b/runtime/src/main/java/org/capnproto/StreamingRequest.java @@ -1,26 +1,23 @@ package org.capnproto; - import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -public class StreamingRequest { +public interface StreamingRequest { - private final FromPointerBuilder paramsBuilder; - AnyPointer.Builder params; - RequestHook hook; + FromPointerBuilder getParamsFactory(); - StreamingRequest(FromPointerBuilder paramsBuilder, - AnyPointer.Builder params, RequestHook hook) { - this.paramsBuilder = paramsBuilder; - this.params = params; - this.hook = hook; + StreamingRequest getTypelessRequest(); + + default Params getParams() { + return this.getTypelessRequest().getParams().getAs(this.getParamsFactory()); } - CompletionStage send() { - var promise = hook.sendStreaming(); - hook = null; // prevent reuse - return promise; + default RequestHook getHook() { + return this.getTypelessRequest().getHook(); + } + + default CompletableFuture send() { + return this.getHook().sendStreaming(); } }