implement streaming requests

This commit is contained in:
Vaci Koblizek 2020-11-25 15:47:02 +00:00
parent fb5f1bf2ba
commit 2d072a6b12
12 changed files with 208 additions and 56 deletions

View file

@ -2027,9 +2027,6 @@ private:
kj::strTree(
sp, " public static final class ", methodName, " {\n",
sp, " public interface Request extends org.capnproto.Request<", paramBuilder, "> {\n",
sp, " default org.capnproto.FromPointerBuilder<", paramBuilder, "> getParamsFactory() {\n",
sp, " return ", paramFactory, ";\n",
sp, " }\n",
sp, " default Response send() {\n",
sp, " return new Response(this.sendInternal());\n",
sp, " }\n",
@ -2050,7 +2047,7 @@ private:
// client call
kj::strTree(
sp, "public Methods.", methodName, ".Request ", methodName, "Request() {\n",
sp, " var result = newCall(0x", interfaceIdHex, "L, (short)", methodId, ");\n",
sp, " var result = newCall(", paramFactory, ", 0x", interfaceIdHex, "L, (short)", methodId, ");\n",
sp, " return () -> result;\n",
sp, "}\n"
),

View file

@ -1698,9 +1698,9 @@ final class RpcState<VatId> {
}
@Override
public CompletionStage<?> sendStreaming() {
public CompletableFuture<java.lang.Void> sendStreaming() {
// TODO falling back to regular send for now...
return send();
return send().thenApply(results -> null);
}
QuestionRef sendInternal(boolean isTailCall) {

View file

@ -215,8 +215,6 @@ public final class CapabilityTest {
var factory = Test.TestGenerics.newFactory(Test.TestAllTypes.factory, AnyPointer.factory);
}
@org.junit.Test
public void thisCap() {
var callCount = new Counter();
@ -234,4 +232,16 @@ public final class CapabilityTest {
client2.barRequest().send().join();
Assert.assertEquals(3, callCount.value());
}
@org.junit.Test
public void testStreamingCallsBlockSubsequentCalls() {
var server = new RpcTestUtil.TestStreamingImpl();
var cap = new Test.TestStreaming.Client(server);
{
var req = cap.doStreamIRequest();
}
}
}

View file

@ -114,7 +114,7 @@ class RpcTestUtil {
this.callCount = callCount;
this.handleCount = handleCount;
}
@Override
protected CompletableFuture<java.lang.Void> echo(CallContext<Test.TestMoreStuff.EchoParams.Reader, Test.TestMoreStuff.EchoResults.Builder> context) {
this.callCount.inc();
@ -129,9 +129,9 @@ class RpcTestUtil {
var cap = context.getParams().getCap();
context.allowCancellation();
return new CompletableFuture<java.lang.Void>().whenComplete((void_, exc) -> {
if (exc != null) {
System.out.println("expectCancel completed exceptionally: " + exc.getMessage());
}
if (exc != null) {
System.out.println("expectCancel completed exceptionally: " + exc.getMessage());
}
}); // never completes, just await doom...
}
@ -230,6 +230,7 @@ class RpcTestUtil {
public TestTailCalleeImpl(Counter count) {
this(count, READY_NOW);
}
public TestTailCalleeImpl(Counter count, CompletableFuture<java.lang.Void> releaseMe) {
this.count = count;
this.releaseMe = releaseMe;
@ -328,5 +329,40 @@ class RpcTestUtil {
return this.impl.foo(context);
}
}
static class TestStreamingImpl
extends Test.TestStreaming.Server {
public int iSum = 0;
public int jSum = 0;
CompletableFuture<java.lang.Void> fulfiller;
boolean jShouldThrow = false;
@Override
protected CompletableFuture<java.lang.Void> doStreamI(StreamingCallContext<Test.TestStreaming.DoStreamIParams.Reader> context) {
iSum += context.getParams().getI();
fulfiller = new CompletableFuture<>();
return fulfiller;
}
@Override
protected CompletableFuture<java.lang.Void> doStreamJ(StreamingCallContext<Test.TestStreaming.DoStreamJParams.Reader> context) {
context.allowCancellation();
jSum += context.getParams().getJ();
if (jShouldThrow) {
return CompletableFuture.failedFuture(RpcException.failed("throw requested"));
}
fulfiller = new CompletableFuture<>();
return fulfiller;
}
@Override
protected CompletableFuture<java.lang.Void> finishStream(CallContext<Test.TestStreaming.FinishStreamParams.Reader, Test.TestStreaming.FinishStreamResults.Builder> context) {
var results = context.getResults();
results.setTotalI(iSum);
results.setTotalJ(jSum);
return READY_NOW;
}
}
}

View file

@ -95,6 +95,13 @@ interface TestTailCaller {
foo @0 (i :Int32, callee :TestTailCallee) -> TestTailCallee.TailResult;
}
interface TestStreaming {
doStreamI @0 (i: UInt32) -> stream;
doStreamJ @1 (j: UInt32) -> stream;
finishStream @2 () -> (totalI :UInt32, totalJ: UInt32);
# Test streaming. finishStream() returns the totals of the values streamed to the other calls.
}
interface TestHandle {}
interface TestMoreStuff extends(TestCallOrder) {

View file

@ -21,6 +21,8 @@
package org.capnproto;
import java.util.concurrent.CompletableFuture;
public final class AnyPointer {
public static final class Factory
implements PointerFactory<Builder, Reader>,
@ -209,6 +211,11 @@ public final class AnyPointer {
return this;
}
@Override
public org.capnproto.Request<Builder> getBaseRequest() {
return this;
}
@Override
public RequestHook getHook() {
return this.requestHook;
@ -228,4 +235,44 @@ public final class AnyPointer {
return this.requestHook.send();
}
}
public static final class StreamingRequest
implements org.capnproto.StreamingRequest<Builder> {
private final Builder params;
private final RequestHook requestHook;
StreamingRequest(AnyPointer.Request request) {
this(request.params, request.requestHook);
}
StreamingRequest(Builder params, RequestHook requestHook) {
this.params = params;
this.requestHook = requestHook;
}
@Override
public Builder getParams() {
return this.params;
}
@Override
public org.capnproto.StreamingRequest<Builder> getTypelessRequest() {
return this;
}
@Override
public RequestHook getHook() {
return this.requestHook;
}
@Override
public FromPointerBuilder<Builder> getParamsFactory() {
return AnyPointer.factory;
}
public CompletableFuture<java.lang.Void> send() {
return this.requestHook.sendStreaming();
}
}
}

View file

@ -87,14 +87,40 @@ public final class Capability {
return CompletableFuture.completedFuture(null);
}
default Request<AnyPointer.Builder> newCall(long interfaceId, short methodId) {
return this.getHook().newCall(interfaceId, methodId);
default <T> Request<T> newCall(FromPointerBuilder<T> paramsFactory, long interfaceId, short methodId) {
var request = this.getHook().newCall(interfaceId, methodId);
return new Request<>() {
@Override
public FromPointerBuilder<T> getParamsFactory() {
return paramsFactory;
}
@Override
public Request<AnyPointer.Builder> getTypelessRequest() {
return request;
}
@Override
public Request<T> getBaseRequest() {
return this;
}
};
}
default <T> StreamingRequest<T> newStreamingCall(FromPointerBuilder<T> paramsBuilder,
long interfaceId, short methodId) {
var request = getHook().newCall(interfaceId, methodId);
return new StreamingRequest<> (paramsBuilder, request.getParams(), request.getHook());
default <T> StreamingRequest<T> newStreamingCall(FromPointerBuilder<T> paramsFactory, long interfaceId, short methodId) {
var request = this.getHook().newCall(interfaceId, methodId);
var streamingRequest = new AnyPointer.StreamingRequest(request.getParams(), request.getHook());
return new StreamingRequest<>() {
@Override
public FromPointerBuilder<T> getParamsFactory() {
return paramsFactory;
}
@Override
public StreamingRequest<AnyPointer.Builder> getTypelessRequest() {
return streamingRequest;
}
};
}
}
@ -377,10 +403,10 @@ public final class Capability {
}
@Override
public CompletableFuture<?> sendStreaming() {
public CompletableFuture<java.lang.Void> sendStreaming() {
// We don't do any special handling of streaming in RequestHook for local requests, because
// there is no latency to compensate for between the client and server in this case.
return send();
return send().thenApply(results -> null);
}
@Override

View file

@ -2,23 +2,17 @@ package org.capnproto;
import java.util.concurrent.CompletableFuture;
public interface Request<Params> {
public interface Request<Params>
extends RequestBase<Params> {
FromPointerBuilder<Params> getParamsFactory();
RequestBase<Params> getBaseRequest();
Request<AnyPointer.Builder> getTypelessRequest();
default Params getParams() {
return this.getTypelessRequest().getParams().getAs(this.getParamsFactory());
default FromPointerBuilder<Params> getParamsFactory() {
return getBaseRequest().getParamsFactory();
}
default RequestHook getHook() {
return this.getTypelessRequest().getHook();
}
default RemotePromise<AnyPointer.Reader> sendInternal() {
return this.getTypelessRequest().sendInternal();
default RequestBase<AnyPointer.Builder> getTypelessRequest() {
return getBaseRequest().getTypelessRequest();
}
static <Params> Request<Params> newBrokenRequest(FromPointerBuilder<Params> paramsFactory,
@ -34,7 +28,7 @@ public interface Request<Params> {
}
@Override
public CompletableFuture<?> sendStreaming() {
public CompletableFuture<java.lang.Void> sendStreaming() {
return CompletableFuture.failedFuture(exc);
}
};
@ -46,9 +40,14 @@ public interface Request<Params> {
}
@Override
public Request<AnyPointer.Builder> getTypelessRequest() {
public RequestBase<AnyPointer.Builder> getTypelessRequest() {
return new AnyPointer.Request(message.getRoot(AnyPointer.factory), hook);
}
@Override
public Request<Params> getBaseRequest() {
return this;
}
};
}
@ -65,6 +64,11 @@ public interface Request<Params> {
public Request<AnyPointer.Builder> getTypelessRequest() {
return typeless;
}
@Override
public Request<Params> getBaseRequest() {
return this;
}
};
}
}

View file

@ -0,0 +1,20 @@
package org.capnproto;
public interface RequestBase<Params> {
FromPointerBuilder<Params> getParamsFactory();
RequestBase<AnyPointer.Builder> getTypelessRequest();
default Params getParams() {
return this.getTypelessRequest().getParams().getAs(this.getParamsFactory());
}
default RequestHook getHook() {
return this.getTypelessRequest().getHook();
}
default RemotePromise<AnyPointer.Reader> sendInternal() {
return this.getTypelessRequest().sendInternal();
}
}

View file

@ -1,12 +1,12 @@
package org.capnproto;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;
public interface RequestHook {
RemotePromise<AnyPointer.Reader> send();
CompletionStage<?> sendStreaming();
CompletableFuture<java.lang.Void> sendStreaming();
default Object getBrand() {
return null;

View file

@ -2,12 +2,20 @@ package org.capnproto;
public class StreamingCallContext<Params> {
private final FromPointerReader<Params> params;
final CallContextHook hook;
private final FromPointerReader<Params> paramsFactory;
private final CallContextHook hook;
public StreamingCallContext(FromPointerReader<Params> params,
public StreamingCallContext(FromPointerReader<Params> paramsFactory,
CallContextHook hook) {
this.params = params;
this.paramsFactory = paramsFactory;
this.hook = hook;
}
public final Params getParams() {
return this.hook.getParams().getAs(paramsFactory);
}
public final void allowCancellation() {
this.hook.allowCancellation();
}
}

View file

@ -1,26 +1,23 @@
package org.capnproto;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class StreamingRequest<Params> {
public interface StreamingRequest<Params> {
private final FromPointerBuilder<Params> paramsBuilder;
AnyPointer.Builder params;
RequestHook hook;
FromPointerBuilder<Params> getParamsFactory();
StreamingRequest(FromPointerBuilder<Params> paramsBuilder,
AnyPointer.Builder params, RequestHook hook) {
this.paramsBuilder = paramsBuilder;
this.params = params;
this.hook = hook;
StreamingRequest<AnyPointer.Builder> getTypelessRequest();
default Params getParams() {
return this.getTypelessRequest().getParams().getAs(this.getParamsFactory());
}
CompletionStage<?> send() {
var promise = hook.sendStreaming();
hook = null; // prevent reuse
return promise;
default RequestHook getHook() {
return this.getTypelessRequest().getHook();
}
default CompletableFuture<java.lang.Void> send() {
return this.getHook().sendStreaming();
}
}