fix local resolver, null caps, and add some doc comments

This commit is contained in:
Vaci Koblizek 2020-10-22 13:30:21 +01:00
parent 8ccfdc1bf6
commit caa4441a65
3 changed files with 128 additions and 60 deletions

View file

@ -77,11 +77,36 @@ public final class Capability {
return this.hook; return this.hook;
} }
/**
* If the capability's server implemented {@link Server.getFd} returning non-null, and all
* RPC links between the client and server support FD passing, returns a file descriptor pointing
* to the same underlying file description as the server did. Returns null if the server provided
* no FD or if FD passing was unavailable at some intervening link.
* <p>
* This returns a Promise to handle the case of an unresolved promise capability, e.g. a
* pipelined capability. The promise resolves no later than when the capability settles, i.e.
* the same time `whenResolved()` would complete.
* <p>
* The file descriptor will remain open at least as long as the {@link Client} remains alive.
* If you need it to last longer, you will need to `dup()` it.
*/
public CompletableFuture<Integer> getFd() {
var fd = this.hook.getFd();
if (fd != null) {
return CompletableFuture.completedFuture(fd);
}
var promise = this.hook.whenMoreResolved();
if (promise != null) {
return promise.thenCompose(newHook -> new Client(newHook).getFd());
}
return CompletableFuture.completedFuture(null);
}
private static ClientHook makeLocalClient(Server server) { private static ClientHook makeLocalClient(Server server) {
return server.makeLocalClient(); return server.makeLocalClient();
} }
CompletionStage<?> whenResolved() { CompletionStage<java.lang.Void> whenResolved() {
return this.hook.whenResolved(); return this.hook.whenResolved();
} }
@ -109,14 +134,19 @@ public final class Capability {
private final class LocalClient implements ClientHook { private final class LocalClient implements ClientHook {
private CompletableFuture<java.lang.Void> resolveTask; private final CompletableFuture<java.lang.Void> resolveTask;
private ClientHook resolved; private ClientHook resolved;
private boolean blocked = false; private boolean blocked = false;
private Exception brokenException; private Exception brokenException;
LocalClient() { LocalClient() {
Server.this.hook = this; Server.this.hook = this;
startResolveTask(); var resolver = shortenPath();
this.resolveTask = resolver == null
? CompletableFuture.completedFuture(null)
: resolver.thenAccept(client -> {
this.resolved = client.hook;
});
} }
@Override @Override
@ -134,9 +164,7 @@ public final class Capability {
return null; return null;
} }
// TODO re-visit promises
var promise = callInternal(interfaceId, methodId, ctx); var promise = callInternal(interfaceId, methodId, ctx);
var forked = promise.copy();
CompletableFuture<PipelineHook> pipelinePromise = promise.thenApply(x -> { CompletableFuture<PipelineHook> pipelinePromise = promise.thenApply(x -> {
ctx.releaseParams(); ctx.releaseParams();
@ -144,17 +172,25 @@ public final class Capability {
}); });
var tailCall = ctx.onTailCall(); var tailCall = ctx.onTailCall();
// TODO implement tailCall
if (tailCall != null) { if (tailCall != null) {
pipelinePromise = tailCall.applyToEither(pipelinePromise, pipeline -> pipeline); pipelinePromise = tailCall.applyToEither(pipelinePromise, pipeline -> pipeline);
} }
return new VoidPromiseAndPipeline(forked, new QueuedPipeline(pipelinePromise)); return new VoidPromiseAndPipeline(
promise.copy(),
new QueuedPipeline(pipelinePromise));
} }
@Override @Override
public CompletableFuture<java.lang.Void> whenResolved() { public ClientHook getResolved() {
return null; return this.resolved;
}
@Override
public CompletableFuture<ClientHook> whenMoreResolved() {
return this.resolved != null
? CompletableFuture.completedFuture(this.resolved)
: this.resolveTask.thenApply(x -> this.resolved);
} }
@Override @Override
@ -175,16 +211,6 @@ public final class Capability {
return result.getPromise(); return result.getPromise();
} }
} }
void startResolveTask() {
var resolver = Server.this.shortenPath();
if (resolver == null) {
return;
}
this.resolveTask = resolver.thenAccept(client -> {
this.resolved = client.hook;
});
}
} }
public CompletableFuture<Client> shortenPath() { public CompletableFuture<Client> shortenPath() {
@ -403,7 +429,7 @@ public final class Capability {
} }
@Override @Override
public CompletionStage<ClientHook> whenMoreResolved() { public CompletableFuture<ClientHook> whenMoreResolved() {
return resolved ? null : CompletableFuture.failedFuture(exc); return resolved ? null : CompletableFuture.failedFuture(exc);
} }
@ -423,7 +449,7 @@ public final class Capability {
private static final class QueuedPipeline implements PipelineHook { private static final class QueuedPipeline implements PipelineHook {
private final CompletableFuture<PipelineHook> promise; private final CompletableFuture<PipelineHook> promise;
private final CompletionStage<Void> selfResolutionOp; private final CompletableFuture<Void> selfResolutionOp;
PipelineHook redirect; PipelineHook redirect;
QueuedPipeline(CompletableFuture<PipelineHook> promiseParam) { QueuedPipeline(CompletableFuture<PipelineHook> promiseParam) {
@ -476,10 +502,10 @@ public final class Capability {
@Override @Override
public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) { public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) {
var callResultPromise = this.promiseForCallForwarding.thenApply(client -> client.call(interfaceId, methodId, ctx)); var callResult = this.promiseForCallForwarding.thenApply(
var pipelinePromise = callResultPromise.thenApply(callResult -> callResult.pipeline); client -> client.call(interfaceId, methodId, ctx));
var pipeline = new QueuedPipeline(pipelinePromise); var pipeline = new QueuedPipeline(callResult.thenApply(result -> result.pipeline));
return new VoidPromiseAndPipeline(callResultPromise.thenAccept(x -> {}), pipeline); return new VoidPromiseAndPipeline(callResult.thenAccept(x -> {}), pipeline);
} }
@Override @Override
@ -488,8 +514,8 @@ public final class Capability {
} }
@Override @Override
public CompletionStage<ClientHook> whenMoreResolved() { public CompletableFuture<ClientHook> whenMoreResolved() {
return promiseForClientResolution.copy(); return this.promiseForClientResolution.copy();
} }
} }
} }

View file

@ -12,33 +12,66 @@ public interface ClientHook {
VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context); VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context);
/**
If this ClientHook is a promise that has already resolved, returns the inner, resolved version
of the capability. The caller may permanently replace this client with the resolved one if
desired. Returns null if the client isn't a promise or hasn't resolved yet -- use
`whenMoreResolved()` to distinguish between them.
@return the resolved capability
*/
default ClientHook getResolved() { default ClientHook getResolved() {
return null; return null;
} }
default CompletionStage<ClientHook> whenMoreResolved() { /**
If this client is a settled reference (not a promise), return nullptr. Otherwise, return a
promise that eventually resolves to a new client that is closer to being the final, settled
client (i.e. the value eventually returned by `getResolved()`). Calling this repeatedly
should eventually produce a settled client.
*/
default CompletableFuture<ClientHook> whenMoreResolved() {
return null; return null;
} }
/**
Returns an opaque object that identifies who made this client. This can be used by an RPC adapter to
discover when a capability it needs to marshal is one that it created in the first place, and
therefore it can transfer the capability without proxying.
*/
default Object getBrand() { default Object getBrand() {
return NULL_CAPABILITY_BRAND; return NULL_CAPABILITY_BRAND;
} }
default CompletionStage<?> whenResolved() { /**
* Repeatedly calls whenMoreResolved() until it returns nullptr.
*/
default CompletionStage<java.lang.Void> whenResolved() {
var promise = whenMoreResolved(); var promise = whenMoreResolved();
return promise != null return promise != null
? promise.thenCompose(ClientHook::whenResolved) ? promise.thenCompose(ClientHook::whenResolved)
: CompletableFuture.completedFuture(null); : CompletableFuture.completedFuture(null);
} }
/**
* Returns true if the capability was created as a result of assigning a Client to null or by
* reading a null pointer out of a Cap'n Proto message.
*/
default boolean isNull() { default boolean isNull() {
return getBrand() == NULL_CAPABILITY_BRAND; return getBrand() == NULL_CAPABILITY_BRAND;
} }
/**
* Returns true if the capability was created by newBrokenCap().
*/
default boolean isError() { default boolean isError() {
return getBrand() == BROKEN_CAPABILITY_BRAND; return getBrand() == BROKEN_CAPABILITY_BRAND;
} }
/**
* Implements {@link Capability.Client.getFd}. If this returns null but whenMoreResolved() returns
* non-null, then Capability::Client::getFd() waits for resolution and tries again.
*/
default Integer getFd() { default Integer getFd() {
return null; return null;
} }

View file

@ -403,17 +403,23 @@ final class RpcState {
ClientHook restore() { ClientHook restore() {
var question = questions.next(); var question = questions.next();
question.setAwaitingReturn(true); question.setAwaitingReturn(true);
// Run the message loop until the boostrap promise is resolved.
var promise = new CompletableFuture<RpcResponse>();
var loop = CompletableFuture.anyOf(
getMessageLoop(), promise).thenCompose(x -> promise);
int sizeHint = messageSizeHint() int sizeHint = messageSizeHint()
+ RpcProtocol.Bootstrap.factory.structSize().total(); + RpcProtocol.Bootstrap.factory.structSize().total();
var message = connection.newOutgoingMessage(sizeHint); var message = connection.newOutgoingMessage(sizeHint);
var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap(); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap();
builder.setQuestionId(question.getId()); builder.setQuestionId(question.getId());
message.send(); message.send();
var pipeline = new RpcPipeline(question); var pipeline = new RpcPipeline(question, promise);
return pipeline.getPipelinedCap(new PipelineOp[0]); return pipeline.getPipelinedCap(new PipelineOp[0]);
} }
private final CompletableFuture<java.lang.Void> doMessageLoop() { private CompletableFuture<java.lang.Void> doMessageLoop() {
this.cleanupImports(); this.cleanupImports();
this.cleanupQuestions(); this.cleanupQuestions();
@ -424,7 +430,7 @@ final class RpcState {
return connection.receiveIncomingMessage().thenCompose(message -> { return connection.receiveIncomingMessage().thenCompose(message -> {
try { try {
handleMessage(message); handleMessage(message);
} catch (Throwable rpcExc) { } catch (Exception rpcExc) {
// either we received an Abort message from peer // either we received an Abort message from peer
// or internal RpcState is bad. // or internal RpcState is bad.
return this.disconnect(rpcExc); return this.disconnect(rpcExc);
@ -467,7 +473,7 @@ final class RpcState {
default: default:
if (!isDisconnected()) { if (!isDisconnected()) {
// boomin' back atcha // boomin' back atcha
var msg = connection.newOutgoingMessage(BuilderArena.SUGGESTED_FIRST_SEGMENT_WORDS); var msg = connection.newOutgoingMessage();
msg.getBody().initAs(RpcProtocol.Message.factory).setUnimplemented(reader); msg.getBody().initAs(RpcProtocol.Message.factory).setUnimplemented(reader);
msg.send(); msg.send();
} }
@ -493,17 +499,20 @@ final class RpcState {
releaseExport(cap.getThirdPartyHosted().getVineId(), 1); releaseExport(cap.getThirdPartyHosted().getVineId(), 1);
break; break;
case NONE: case NONE:
// Should never happen.
case RECEIVER_ANSWER: case RECEIVER_ANSWER:
case RECEIVER_HOSTED: case RECEIVER_HOSTED:
// Nothing to do.
break; break;
} }
break; break;
case EXCEPTION: case EXCEPTION:
// Nothing to do
break; break;
} }
break; break;
default: default:
// Peer unimplemented assert false: "Peer did not implement required RPC message type. " + message.which().name();
break; break;
} }
} }
@ -537,17 +546,15 @@ final class RpcState {
var payload = ret.initResults(); var payload = ret.initResults();
var content = payload.getContent().imbue(capTable); var content = payload.getContent().imbue(capTable);
content.setAsCap(bootstrapInterface); content.setAsCap(bootstrapInterface);
var caps = capTable.getTable();
var capTableArray = capTable.getTable(); var capHook = caps.length != 0
assert capTableArray.length != 0; ? caps[0]
: Capability.newNullCap();
var capHook = capTableArray[0];
assert capHook != null;
var fds = List.<Integer>of(); var fds = List.<Integer>of();
response.setFds(List.of()); response.setFds(List.of());
answer.resultExports = writeDescriptors(capTableArray, payload, fds); answer.resultExports = writeDescriptors(caps, payload, fds);
answer.pipeline = ops -> ops.length == 0 answer.pipeline = ops -> ops.length == 0
? capHook ? capHook
: Capability.newBrokenCap("Invalid pipeline transform."); : Capability.newBrokenCap("Invalid pipeline transform.");
@ -749,11 +756,7 @@ final class RpcState {
return; return;
} }
if (imp.importClient != null) { assert imp.importClient == null : "Import already resolved.";
// It appears this is a valid entry on the import table, but was not expected to be a
// promise.
assert false: "Import already resolved.";
}
switch (resolve.which()) { switch (resolve.which()) {
case CAP: case CAP:
@ -918,7 +921,7 @@ final class RpcState {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
resolution = getInnermostClient(resolution); resolution = this.getInnermostClient(resolution);
var exp = exports.find(exportId); var exp = exports.find(exportId);
exportsByCap.remove(exp.clientHook); exportsByCap.remove(exp.clientHook);
@ -939,7 +942,7 @@ final class RpcState {
// The new promise was not already in the table, therefore the existing export table // The new promise was not already in the table, therefore the existing export table
// entry has now been repurposed to represent it. There is no need to send a resolve // entry has now been repurposed to represent it. There is no need to send a resolve
// message at all. We do, however, have to start resolving the next promise. // message at all. We do, however, have to start resolving the next promise.
return resolveExportedPromise(exportId, more); return this.resolveExportedPromise(exportId, more);
} }
} }
} }
@ -1381,7 +1384,7 @@ final class RpcState {
} }
if (isConnected()) { if (isConnected()) {
var message = connection.newOutgoingMessage(1024); var message = connection.newOutgoingMessage();
var builder = message.getBody().initAs(RpcProtocol.Message.factory).initReturn(); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initReturn();
builder.setAnswerId(this.answerId); builder.setAnswerId(this.answerId);
builder.setReleaseParamCaps(false); builder.setReleaseParamCaps(false);
@ -1449,11 +1452,11 @@ final class RpcState {
private Throwable broken; private Throwable broken;
final HashMap<List<PipelineOp>, ClientHook> clientMap = new HashMap<>(); final HashMap<List<PipelineOp>, ClientHook> clientMap = new HashMap<>();
final CompletionStage<RpcResponse> redirectLater; final CompletableFuture<RpcResponse> redirectLater;
final CompletionStage<java.lang.Void> resolveSelf; final CompletableFuture<java.lang.Void> resolveSelf;
RpcPipeline(Question question, RpcPipeline(Question question,
CompletionStage<RpcResponse> redirectLater) { CompletableFuture<RpcResponse> redirectLater) {
this.question = question; this.question = question;
this.redirectLater = redirectLater; this.redirectLater = redirectLater;
this.resolveSelf = this.redirectLater this.resolveSelf = this.redirectLater
@ -1481,15 +1484,21 @@ final class RpcState {
var key = new ArrayList<>(Arrays.asList(ops)); var key = new ArrayList<>(Arrays.asList(ops));
var hook = this.clientMap.computeIfAbsent(key, k -> { var hook = this.clientMap.computeIfAbsent(key, k -> {
switch (state) { switch (state) {
case WAITING: case WAITING: {
if (redirectLater != null) { var pipelineClient = new PipelineClient(this.question, ops);
// TODO implement redirect if (this.redirectLater == null) {
assert false: "redirection not implemented"; // This pipeline will never get redirected, so just return the PipelineClient.
return null; return pipelineClient;
} }
return new PipelineClient(question, ops);
var resolutionPromise = this.redirectLater.thenApply(
response -> response.getResults().getPipelinedCap(ops));
return new PromiseClient(pipelineClient, resolutionPromise, null);
}
case RESOLVED: case RESOLVED:
return resolved.getResults().getPipelinedCap(ops); return resolved.getResults().getPipelinedCap(ops);
default: default:
return Capability.newBrokenCap(broken); return Capability.newBrokenCap(broken);
} }
@ -1700,7 +1709,7 @@ final class RpcState {
} }
@Override @Override
public CompletionStage<ClientHook> whenMoreResolved() { public CompletableFuture<ClientHook> whenMoreResolved() {
return null; return null;
} }
} }
@ -1868,7 +1877,7 @@ final class RpcState {
} }
@Override @Override
public CompletionStage<ClientHook> whenMoreResolved() { public CompletableFuture<ClientHook> whenMoreResolved() {
return null; return null;
} }