implement call blocking stack

This commit is contained in:
Vaci Koblizek 2020-11-25 17:21:47 +00:00
parent 941a254e41
commit 07f8f22acd
3 changed files with 102 additions and 20 deletions

View file

@ -238,10 +238,57 @@ public final class CapabilityTest {
var server = new RpcTestUtil.TestStreamingImpl();
var cap = new Test.TestStreaming.Client(server);
CompletableFuture<java.lang.Void> promise1 = null;
CompletableFuture<java.lang.Void> promise2 = null;
CompletableFuture<java.lang.Void> promise3 = null;
{
var req = cap.doStreamIRequest();
req.getParams().setI(123);
promise1 = req.send();
}
{
var req = cap.doStreamJRequest();
req.getParams().setJ(321);
promise2 = req.send();
}
{
var req = cap.doStreamIRequest();
req.getParams().setI(456);
promise3 = req.send();
}
var promise4 = cap.finishStreamRequest().send();
// Only the first streaming call has executed
Assert.assertEquals(123, server.iSum);
Assert.assertEquals(0, server.jSum);
// Complete first streaming call
Assert.assertNotNull(server.fulfiller);
server.fulfiller.complete(null);
// second streaming call unblocked
Assert.assertEquals(123, server.iSum);
Assert.assertEquals(321, server.jSum);
// complete second streaming call
Assert.assertNotNull(server.fulfiller);
server.fulfiller.complete(null);
// third streaming call unblocked
Assert.assertEquals(579, server.iSum);
Assert.assertEquals(321, server.jSum);
// complete third streaming call
Assert.assertNotNull(server.fulfiller);
server.fulfiller.complete(null);
// last call is unblocked
var result = promise4.join();
Assert.assertEquals(579, result.getTotalI());
Assert.assertEquals(321, result.getTotalJ());
}
}

View file

@ -177,9 +177,12 @@ public final class Capability {
}
private final class LocalClient implements ClientHook {
private CompletableFuture<java.lang.Void> resolveTask;
private ClientHook resolved;
private boolean blocked = false;
private Throwable brokenException;
private final Queue<Runnable> blockedCalls = new ArrayDeque<>();
private final CapabilityServerSetBase capServerSet;
LocalClient() {
@ -206,12 +209,6 @@ public final class Capability {
@Override
public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) {
assert !blocked: "Blocked condition not implemented";
if (blocked) {
// TODO implement blocked behaviour
return null;
}
// Note this comment from the C++ source:
//
// "We don't want to actually dispatch the call synchronously, because we don't want the callee
@ -225,12 +222,27 @@ public final class Capability {
//
// As the Java implementation doesn't (currently) have an evalLater() call, we obtain a promise
// from the CallContextHook that will be completed by QueuedClient when appropriate.
var promise = ctx.releaseCall().thenCompose(
void_ -> this.callInternal(interfaceId, methodId, ctx));
var promise = ctx.releaseCall().thenCompose(void_ -> {
if (blocked) {
var blockedCall = new CompletableFuture<java.lang.Void>();
this.blockedCalls.add(() -> callInternal(interfaceId, methodId, ctx).whenComplete((result, exc) -> {
if (exc == null) {
blockedCall.complete(result);
}
else {
blockedCall.completeExceptionally(exc);
}
}));
return blockedCall;
}
else {
return this.callInternal(interfaceId, methodId, ctx);
}
});
var pipelinePromise = promise.thenApply(x -> {
ctx.releaseParams();
return (PipelineHook)new LocalPipeline(ctx);
return (PipelineHook) new LocalPipeline(ctx);
});
var tailCall = ctx.onTailCall().thenApply(pipeline -> pipeline.hook);
@ -264,31 +276,58 @@ public final class Capability {
return BRAND;
}
void unblock() {
this.blocked = false;
while (!this.blocked) {
if (this.blockedCalls.isEmpty()) {
break;
}
var call = this.blockedCalls.remove();
call.run();
}
}
CompletableFuture<java.lang.Void> callInternal(long interfaceId, short methodId, CallContextHook ctx) {
assert !this.blocked;
if (this.brokenException != null) {
return CompletableFuture.failedFuture(this.brokenException);
}
var result = dispatchCall(
interfaceId, methodId,
new CallContext<>(AnyPointer.factory, AnyPointer.factory, ctx));
if (result.isStreaming()) {
// TODO streaming
return null;
if (!result.isStreaming()) {
return result.promise;
}
else {
return result.getPromise();
this.blocked = true;
return result.promise.exceptionallyCompose(exc -> {
this.brokenException = exc;
return CompletableFuture.failedFuture(exc);
}).whenComplete((void_, exc) -> {
this.unblock();
});
}
}
public CompletableFuture<Server> getLocalServer(CapabilityServerSetBase capServerSet) {
if (this.capServerSet == capServerSet) {
if (this.blocked) {
assert false: "Blocked local server not implemented";
var promise = new CompletableFuture<Server>();
this.blockedCalls.add(() -> promise.complete(Server.this));
return promise;
}
return CompletableFuture.completedFuture(Server.this);
}
return null;
}
}
public Integer getFd() {
return null;
}
/**
* If this returns non-null, then it is a promise which, when resolved, points to a new
* capability to which future calls can be sent. Use this in cases where an object implementation

View file

@ -4,7 +4,7 @@ import java.util.concurrent.CompletableFuture;
public final class DispatchCallResult {
private final CompletableFuture<java.lang.Void> promise;
final CompletableFuture<java.lang.Void> promise;
private final boolean streaming;
public DispatchCallResult(CompletableFuture<java.lang.Void> promise, boolean isStreaming) {
@ -16,10 +16,6 @@ public final class DispatchCallResult {
this(CompletableFuture.failedFuture(exc), false);
}
public CompletableFuture<java.lang.Void> getPromise() {
return promise.copy();
}
public boolean isStreaming() {
return streaming;
}