memoise queuespipeline caps. use queues to order queuedclient resolution
This commit is contained in:
parent
52892478ef
commit
119a682d4d
1 changed files with 29 additions and 21 deletions
|
@ -1,5 +1,6 @@
|
||||||
package org.capnproto;
|
package org.capnproto;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
|
|
||||||
|
@ -546,6 +547,7 @@ public final class Capability {
|
||||||
private final CompletableFuture<PipelineHook> promise;
|
private final CompletableFuture<PipelineHook> promise;
|
||||||
private final CompletableFuture<Void> selfResolutionOp;
|
private final CompletableFuture<Void> selfResolutionOp;
|
||||||
PipelineHook redirect;
|
PipelineHook redirect;
|
||||||
|
private final Map<List<PipelineOp>, ClientHook> clientMap = new HashMap<>();
|
||||||
|
|
||||||
QueuedPipeline(CompletableFuture<PipelineHook> promiseParam) {
|
QueuedPipeline(CompletableFuture<PipelineHook> promiseParam) {
|
||||||
this.promise = promiseParam;
|
this.promise = promiseParam;
|
||||||
|
@ -559,10 +561,14 @@ public final class Capability {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final ClientHook getPipelinedCap(PipelineOp[] ops) {
|
public final ClientHook getPipelinedCap(PipelineOp[] ops) {
|
||||||
return redirect != null
|
if (redirect != null) {
|
||||||
? redirect.getPipelinedCap(ops)
|
return redirect.getPipelinedCap(ops);
|
||||||
: new QueuedClient(this.promise.thenApply(
|
}
|
||||||
pipeline -> pipeline.getPipelinedCap(ops)));
|
|
||||||
|
var key = new ArrayList<>(Arrays.asList(ops));
|
||||||
|
return this.clientMap.computeIfAbsent(key,
|
||||||
|
k -> new QueuedClient(this.promise.thenApply(
|
||||||
|
pipeline -> pipeline.getPipelinedCap(ops))));
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
@Override
|
@Override
|
||||||
|
@ -580,14 +586,9 @@ public final class Capability {
|
||||||
// A ClientHook which simply queues calls while waiting for a ClientHook to which to forward them.
|
// A ClientHook which simply queues calls while waiting for a ClientHook to which to forward them.
|
||||||
private static class QueuedClient implements ClientHook {
|
private static class QueuedClient implements ClientHook {
|
||||||
|
|
||||||
private final CompletableFuture<java.lang.Void> setResolutionOp;
|
private final CompletableFuture<java.lang.Void> selfResolutionOp;
|
||||||
// Represents the operation which will set `redirect` when possible.
|
// Represents the operation which will set `redirect` when possible.
|
||||||
|
|
||||||
private final CompletableFuture<ClientHook> promiseForCallForwarding = new CompletableFuture<>();
|
|
||||||
// When this promise resolves, each queued call will be forwarded to the real client. This needs
|
|
||||||
// to occur *before* any 'whenMoreResolved()' promises resolve, because we want to make sure
|
|
||||||
// previously-queued calls are delivered before any new calls made in response to the resolution.
|
|
||||||
|
|
||||||
private final CompletableFuture<ClientHook> promiseForClientResolution = new CompletableFuture<>();
|
private final CompletableFuture<ClientHook> promiseForClientResolution = new CompletableFuture<>();
|
||||||
// whenMoreResolved() returns forks of this promise. These must resolve *after* queued calls
|
// whenMoreResolved() returns forks of this promise. These must resolve *after* queued calls
|
||||||
// have been initiated (so that any calls made in the whenMoreResolved() handler are correctly
|
// have been initiated (so that any calls made in the whenMoreResolved() handler are correctly
|
||||||
|
@ -595,25 +596,28 @@ public final class Capability {
|
||||||
// confuse the application if a queued call returns before the capability on which it was made
|
// confuse the application if a queued call returns before the capability on which it was made
|
||||||
// resolves).
|
// resolves).
|
||||||
|
|
||||||
private final CompletableFuture<java.lang.Void> promiseForCallRelease = new CompletableFuture<>();
|
|
||||||
// When this promise resolves, all pending calls will return.
|
|
||||||
|
|
||||||
private ClientHook redirect;
|
private ClientHook redirect;
|
||||||
|
private final Queue<CompletableFuture<ClientHook>> queuedCalls = new ArrayDeque<>();
|
||||||
|
private final Queue<LocalRequest> pendingCalls = new ArrayDeque<>();
|
||||||
|
|
||||||
QueuedClient(CompletableFuture<ClientHook> promise) {
|
QueuedClient(CompletableFuture<ClientHook> promise) {
|
||||||
this.setResolutionOp = promise.handle((inner, exc) -> {
|
this.selfResolutionOp = promise.handle((inner, exc) -> {
|
||||||
this.redirect = exc == null
|
this.redirect = exc == null
|
||||||
? inner
|
? inner
|
||||||
: newBrokenCap(exc);
|
: newBrokenCap(exc);
|
||||||
|
|
||||||
// Resolve the promise for call forwarding.
|
// Resolve promises for call forwarding.
|
||||||
this.promiseForCallForwarding.complete(this.redirect);
|
for (var call: this.queuedCalls) {
|
||||||
|
call.complete(this.redirect);
|
||||||
|
}
|
||||||
|
|
||||||
// Now resolve the promise for client resolution
|
// Now resolve the promise for client resolution
|
||||||
this.promiseForClientResolution.complete(this.redirect);
|
this.promiseForClientResolution.complete(this.redirect);
|
||||||
|
|
||||||
// Finally, execute any pending calls.
|
// Finally, execute any pending calls.
|
||||||
this.promiseForCallRelease.complete(null);
|
for (var hook: this.pendingCalls) {
|
||||||
|
hook.releaseCall();
|
||||||
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
|
@ -622,17 +626,21 @@ public final class Capability {
|
||||||
@Override
|
@Override
|
||||||
public Request<AnyPointer.Builder> newCall(long interfaceId, short methodId) {
|
public Request<AnyPointer.Builder> newCall(long interfaceId, short methodId) {
|
||||||
var hook = new LocalRequest(interfaceId, methodId, this);
|
var hook = new LocalRequest(interfaceId, methodId, this);
|
||||||
this.promiseForCallRelease.thenRun(hook::releaseCall);
|
this.pendingCalls.add(hook);
|
||||||
var root = hook.message.getRoot(AnyPointer.factory);
|
var root = hook.message.getRoot(AnyPointer.factory);
|
||||||
return new AnyPointer.Request(root, hook);
|
return new AnyPointer.Request(root, hook);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) {
|
public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) {
|
||||||
var callResult = this.promiseForCallForwarding.thenApply(
|
var promise = new CompletableFuture<ClientHook>();
|
||||||
|
this.queuedCalls.add(promise);
|
||||||
|
|
||||||
|
var callResult = promise.thenApply(
|
||||||
client -> client.call(interfaceId, methodId, ctx));
|
client -> client.call(interfaceId, methodId, ctx));
|
||||||
var pipeline = new QueuedPipeline(callResult.thenApply(result -> result.pipeline));
|
var pipelineResult = callResult.thenApply(result -> result.pipeline);
|
||||||
return new VoidPromiseAndPipeline(callResult.thenRun(() -> {}), pipeline);
|
var pipeline = new QueuedPipeline(pipelineResult);
|
||||||
|
return new VoidPromiseAndPipeline(pipelineResult.thenRun(() -> {}), pipeline);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in a new issue