add releaseCall to delay call execution

This commit is contained in:
Vaci Koblizek 2020-11-20 17:04:50 +00:00
parent e314d26ab3
commit 1cadca604e
3 changed files with 69 additions and 26 deletions

View file

@ -21,4 +21,8 @@ public interface CallContextHook {
CompletableFuture<AnyPointer.Pipeline> onTailCall(); CompletableFuture<AnyPointer.Pipeline> onTailCall();
ClientHook.VoidPromiseAndPipeline directTailCall(RequestHook request); ClientHook.VoidPromiseAndPipeline directTailCall(RequestHook request);
default CompletableFuture<java.lang.Void> releaseCall() {
return CompletableFuture.completedFuture(null);
}
} }

View file

@ -171,7 +171,7 @@ public final class Capability {
} }
@Override @Override
public AnyPointer.Request newCall(long interfaceId, short methodId) { public Request<AnyPointer.Builder> newCall(long interfaceId, short methodId) {
var hook = new LocalRequest(interfaceId, methodId, this); var hook = new LocalRequest(interfaceId, methodId, this);
var root = hook.message.getRoot(AnyPointer.factory); var root = hook.message.getRoot(AnyPointer.factory);
return new AnyPointer.Request(root, hook); return new AnyPointer.Request(root, hook);
@ -185,16 +185,20 @@ public final class Capability {
return null; return null;
} }
// We don't want to actually dispatch the call synchronously, because we don't want the callee // Note this comment from the C++ source:
// to have any side effects before the promise is returned to the caller. This helps avoid
// race conditions.
// //
// So, we do an evalLater() here. // "We don't want to actually dispatch the call synchronously, because we don't want the callee
// to have any side effects before the promise is returned to the caller. This helps avoid
// race conditions.
// //
// Note also that QueuedClient depends on this evalLater() to ensure that pipelined calls don't // So, we do an evalLater() here.
// complete before 'whenMoreResolved()' promises resolve. //
// TODO fix the above comment! we don't have the option of evalLater (yes) // Note also that QueuedClient depends on this evalLater() to ensure that pipelined calls don't
var promise = this.whenResolved().thenCompose( // complete before 'whenMoreResolved()' promises resolve."
//
// As the Java implementation doesn't (currently) have an evalLater() call, we obtain a promise
// from the CallContextHook that will be completed by QueuedClient when appropriate.
var promise = ctx.releaseCall().thenCompose(
void_ -> this.callInternal(interfaceId, methodId, ctx)); void_ -> this.callInternal(interfaceId, methodId, ctx));
var pipelinePromise = promise.thenApply(x -> { var pipelinePromise = promise.thenApply(x -> {
@ -344,6 +348,7 @@ public final class Capability {
final long interfaceId; final long interfaceId;
final short methodId; final short methodId;
final ClientHook client; final ClientHook client;
private final CompletableFuture<java.lang.Void> callRelease = new CompletableFuture<>();
LocalRequest(long interfaceId, short methodId, ClientHook client) { LocalRequest(long interfaceId, short methodId, ClientHook client) {
this.interfaceId = interfaceId; this.interfaceId = interfaceId;
@ -354,7 +359,7 @@ public final class Capability {
@Override @Override
public RemotePromise<AnyPointer.Reader> send() { public RemotePromise<AnyPointer.Reader> send() {
var cancel = new CompletableFuture<java.lang.Void>(); var cancel = new CompletableFuture<java.lang.Void>();
var context = new LocalCallContext(message, client, cancel); var context = new LocalCallContext(message, client, cancel, this.callRelease);
var promiseAndPipeline = client.call(interfaceId, methodId, context); var promiseAndPipeline = client.call(interfaceId, methodId, context);
var promise = promiseAndPipeline.promise.thenApply(x -> { var promise = promiseAndPipeline.promise.thenApply(x -> {
context.getResults(); // force allocation context.getResults(); // force allocation
@ -365,6 +370,7 @@ public final class Capability {
promiseAndPipeline.promise.cancel(false); promiseAndPipeline.promise.cancel(false);
}); });
this.callRelease.complete(null);
assert promiseAndPipeline.pipeline != null; assert promiseAndPipeline.pipeline != null;
return new RemotePromise<>(promise, new AnyPointer.Pipeline(promiseAndPipeline.pipeline)); return new RemotePromise<>(promise, new AnyPointer.Pipeline(promiseAndPipeline.pipeline));
} }
@ -380,6 +386,10 @@ public final class Capability {
public Object getBrand() { public Object getBrand() {
return null; return null;
} }
void releaseCall() {
this.callRelease.complete(null);
}
} }
private static final class LocalPipeline implements PipelineHook { private static final class LocalPipeline implements PipelineHook {
@ -410,6 +420,7 @@ public final class Capability {
private static class LocalCallContext implements CallContextHook { private static class LocalCallContext implements CallContextHook {
final CompletableFuture<java.lang.Void> cancelAllowed; final CompletableFuture<java.lang.Void> cancelAllowed;
private final CompletableFuture<java.lang.Void> callRelease;
CompletableFuture<AnyPointer.Pipeline> tailCallPipeline; CompletableFuture<AnyPointer.Pipeline> tailCallPipeline;
MessageBuilder request; MessageBuilder request;
Response<AnyPointer.Reader> response; Response<AnyPointer.Reader> response;
@ -418,10 +429,12 @@ public final class Capability {
LocalCallContext(MessageBuilder request, LocalCallContext(MessageBuilder request,
ClientHook clientRef, ClientHook clientRef,
CompletableFuture<java.lang.Void> cancelAllowed) { CompletableFuture<java.lang.Void> cancelAllowed,
CompletableFuture<java.lang.Void> callRelease) {
this.request = request; this.request = request;
this.clientRef = clientRef; this.clientRef = clientRef;
this.cancelAllowed = cancelAllowed; this.cancelAllowed = cancelAllowed;
this.callRelease = callRelease;
} }
@Override @Override
@ -441,6 +454,7 @@ public final class Capability {
this.responseBuilder = localResponse.message.getRoot(AnyPointer.factory); this.responseBuilder = localResponse.message.getRoot(AnyPointer.factory);
this.response = new Response<>(this.responseBuilder.asReader(), localResponse); this.response = new Response<>(this.responseBuilder.asReader(), localResponse);
} }
assert this.response != null;
return this.responseBuilder; return this.responseBuilder;
} }
@ -466,13 +480,18 @@ public final class Capability {
@Override @Override
public ClientHook.VoidPromiseAndPipeline directTailCall(RequestHook request) { public ClientHook.VoidPromiseAndPipeline directTailCall(RequestHook request) {
assert this.response == null: "Can't call tailCall() after initializing the results struct."; assert this.response == null : "Can't call tailCall() after initializing the results struct.";
var promise = request.send(); var promise = request.send();
var voidPromise = promise._getResponse().thenAccept(tailResponse -> { var voidPromise = promise._getResponse().thenAccept(tailResponse -> {
this.response = tailResponse; this.response = tailResponse;
}); });
return new ClientHook.VoidPromiseAndPipeline(voidPromise, promise.pipeline().hook); return new ClientHook.VoidPromiseAndPipeline(voidPromise, promise.pipeline().hook);
} }
@Override
public CompletableFuture<java.lang.Void> releaseCall() {
return this.callRelease;
}
} }
public static ClientHook newBrokenCap(String reason) { public static ClientHook newBrokenCap(String reason) {
@ -494,7 +513,7 @@ public final class Capability {
static private ClientHook newBrokenClient(Throwable exc, boolean resolved, Object brand) { static private ClientHook newBrokenClient(Throwable exc, boolean resolved, Object brand) {
return new ClientHook() { return new ClientHook() {
@Override @Override
public AnyPointer.Request newCall(long interfaceId, short methodId) { public Request<AnyPointer.Builder> newCall(long interfaceId, short methodId) {
var broken = Request.newBrokenRequest(AnyPointer.factory, exc); var broken = Request.newBrokenRequest(AnyPointer.factory, exc);
return new AnyPointer.Request(broken.getParams(), broken.getHook()); return new AnyPointer.Request(broken.getParams(), broken.getHook());
} }
@ -561,28 +580,49 @@ public final class Capability {
// A ClientHook which simply queues calls while waiting for a ClientHook to which to forward them. // A ClientHook which simply queues calls while waiting for a ClientHook to which to forward them.
private static class QueuedClient implements ClientHook { private static class QueuedClient implements ClientHook {
private final CompletableFuture<ClientHook> promise;
private final CompletableFuture<ClientHook> promiseForCallForwarding;
private final CompletableFuture<ClientHook> promiseForClientResolution;
private final CompletableFuture<java.lang.Void> setResolutionOp; private final CompletableFuture<java.lang.Void> setResolutionOp;
// Represents the operation which will set `redirect` when possible.
private final CompletableFuture<ClientHook> promiseForCallForwarding = new CompletableFuture<>();
// When this promise resolves, each queued call will be forwarded to the real client. This needs
// to occur *before* any 'whenMoreResolved()' promises resolve, because we want to make sure
// previously-queued calls are delivered before any new calls made in response to the resolution.
private final CompletableFuture<ClientHook> promiseForClientResolution = new CompletableFuture<>();
// whenMoreResolved() returns forks of this promise. These must resolve *after* queued calls
// have been initiated (so that any calls made in the whenMoreResolved() handler are correctly
// delivered after calls made earlier), but *before* any queued calls return (because it might
// confuse the application if a queued call returns before the capability on which it was made
// resolves).
private final CompletableFuture<java.lang.Void> promiseForCallRelease = new CompletableFuture<>();
// When this promise resolves, all pending calls will return.
private ClientHook redirect; private ClientHook redirect;
QueuedClient(CompletableFuture<ClientHook> promise) { QueuedClient(CompletableFuture<ClientHook> promise) {
// TODO revisit futures this.setResolutionOp = promise.handle((inner, exc) -> {
this.promise = promise; this.redirect = exc == null
this.promiseForCallForwarding = promise.toCompletableFuture().copy(); ? inner
this.promiseForClientResolution = promise.toCompletableFuture().copy(); : newBrokenCap(exc);
this.setResolutionOp = promise.thenAccept(inner -> {
this.redirect = inner; // Resolve the promise for call forwarding.
}).exceptionally(exc -> { this.promiseForCallForwarding.complete(this.redirect);
this.redirect = newBrokenCap(exc);
// Now resolve the promise for client resolution
this.promiseForClientResolution.complete(this.redirect);
// Finally, execute any pending calls.
this.promiseForCallRelease.complete(null);
return null; return null;
}); });
} }
@Override @Override
public AnyPointer.Request newCall(long interfaceId, short methodId) { public Request<AnyPointer.Builder> newCall(long interfaceId, short methodId) {
var hook = new LocalRequest(interfaceId, methodId, this); var hook = new LocalRequest(interfaceId, methodId, this);
this.promiseForCallRelease.thenRun(hook::releaseCall);
var root = hook.message.getRoot(AnyPointer.factory); var root = hook.message.getRoot(AnyPointer.factory);
return new AnyPointer.Request(root, hook); return new AnyPointer.Request(root, hook);
} }

View file

@ -1,7 +1,6 @@
package org.capnproto; package org.capnproto;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public interface ClientHook { public interface ClientHook {