diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index af29813..740bf8e 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -77,11 +77,36 @@ public final class Capability { return this.hook; } + /** + * If the capability's server implemented {@link Server.getFd} returning non-null, and all + * RPC links between the client and server support FD passing, returns a file descriptor pointing + * to the same underlying file description as the server did. Returns null if the server provided + * no FD or if FD passing was unavailable at some intervening link. + *

+ * This returns a Promise to handle the case of an unresolved promise capability, e.g. a + * pipelined capability. The promise resolves no later than when the capability settles, i.e. + * the same time `whenResolved()` would complete. + *

+ * The file descriptor will remain open at least as long as the {@link Client} remains alive. + * If you need it to last longer, you will need to `dup()` it. + */ + public CompletableFuture getFd() { + var fd = this.hook.getFd(); + if (fd != null) { + return CompletableFuture.completedFuture(fd); + } + var promise = this.hook.whenMoreResolved(); + if (promise != null) { + return promise.thenCompose(newHook -> new Client(newHook).getFd()); + } + return CompletableFuture.completedFuture(null); + } + private static ClientHook makeLocalClient(Server server) { return server.makeLocalClient(); } - CompletionStage whenResolved() { + CompletionStage whenResolved() { return this.hook.whenResolved(); } @@ -109,14 +134,19 @@ public final class Capability { private final class LocalClient implements ClientHook { - private CompletableFuture resolveTask; + private final CompletableFuture resolveTask; private ClientHook resolved; private boolean blocked = false; private Exception brokenException; LocalClient() { Server.this.hook = this; - startResolveTask(); + var resolver = shortenPath(); + this.resolveTask = resolver == null + ? CompletableFuture.completedFuture(null) + : resolver.thenAccept(client -> { + this.resolved = client.hook; + }); } @Override @@ -134,9 +164,7 @@ public final class Capability { return null; } - // TODO re-visit promises var promise = callInternal(interfaceId, methodId, ctx); - var forked = promise.copy(); CompletableFuture pipelinePromise = promise.thenApply(x -> { ctx.releaseParams(); @@ -144,17 +172,25 @@ public final class Capability { }); var tailCall = ctx.onTailCall(); - // TODO implement tailCall if (tailCall != null) { pipelinePromise = tailCall.applyToEither(pipelinePromise, pipeline -> pipeline); } - return new VoidPromiseAndPipeline(forked, new QueuedPipeline(pipelinePromise)); + return new VoidPromiseAndPipeline( + promise.copy(), + new QueuedPipeline(pipelinePromise)); } @Override - public CompletableFuture whenResolved() { - return null; + public ClientHook getResolved() { + return this.resolved; + } + + @Override + public CompletableFuture whenMoreResolved() { + return this.resolved != null + ? CompletableFuture.completedFuture(this.resolved) + : this.resolveTask.thenApply(x -> this.resolved); } @Override @@ -175,16 +211,6 @@ public final class Capability { return result.getPromise(); } } - - void startResolveTask() { - var resolver = Server.this.shortenPath(); - if (resolver == null) { - return; - } - this.resolveTask = resolver.thenAccept(client -> { - this.resolved = client.hook; - }); - } } public CompletableFuture shortenPath() { @@ -403,7 +429,7 @@ public final class Capability { } @Override - public CompletionStage whenMoreResolved() { + public CompletableFuture whenMoreResolved() { return resolved ? null : CompletableFuture.failedFuture(exc); } @@ -423,7 +449,7 @@ public final class Capability { private static final class QueuedPipeline implements PipelineHook { private final CompletableFuture promise; - private final CompletionStage selfResolutionOp; + private final CompletableFuture selfResolutionOp; PipelineHook redirect; QueuedPipeline(CompletableFuture promiseParam) { @@ -476,10 +502,10 @@ public final class Capability { @Override public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) { - var callResultPromise = this.promiseForCallForwarding.thenApply(client -> client.call(interfaceId, methodId, ctx)); - var pipelinePromise = callResultPromise.thenApply(callResult -> callResult.pipeline); - var pipeline = new QueuedPipeline(pipelinePromise); - return new VoidPromiseAndPipeline(callResultPromise.thenAccept(x -> {}), pipeline); + var callResult = this.promiseForCallForwarding.thenApply( + client -> client.call(interfaceId, methodId, ctx)); + var pipeline = new QueuedPipeline(callResult.thenApply(result -> result.pipeline)); + return new VoidPromiseAndPipeline(callResult.thenAccept(x -> {}), pipeline); } @Override @@ -488,8 +514,8 @@ public final class Capability { } @Override - public CompletionStage whenMoreResolved() { - return promiseForClientResolution.copy(); + public CompletableFuture whenMoreResolved() { + return this.promiseForClientResolution.copy(); } } } diff --git a/runtime/src/main/java/org/capnproto/ClientHook.java b/runtime/src/main/java/org/capnproto/ClientHook.java index b5dc613..b3a30c6 100644 --- a/runtime/src/main/java/org/capnproto/ClientHook.java +++ b/runtime/src/main/java/org/capnproto/ClientHook.java @@ -12,33 +12,66 @@ public interface ClientHook { VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context); + /** + If this ClientHook is a promise that has already resolved, returns the inner, resolved version + of the capability. The caller may permanently replace this client with the resolved one if + desired. Returns null if the client isn't a promise or hasn't resolved yet -- use + `whenMoreResolved()` to distinguish between them. + + @return the resolved capability + */ default ClientHook getResolved() { return null; } - default CompletionStage whenMoreResolved() { + /** + If this client is a settled reference (not a promise), return nullptr. Otherwise, return a + promise that eventually resolves to a new client that is closer to being the final, settled + client (i.e. the value eventually returned by `getResolved()`). Calling this repeatedly + should eventually produce a settled client. + */ + default CompletableFuture whenMoreResolved() { return null; } + /** + Returns an opaque object that identifies who made this client. This can be used by an RPC adapter to + discover when a capability it needs to marshal is one that it created in the first place, and + therefore it can transfer the capability without proxying. + */ default Object getBrand() { return NULL_CAPABILITY_BRAND; } - default CompletionStage whenResolved() { + /** + * Repeatedly calls whenMoreResolved() until it returns nullptr. + */ + default CompletionStage whenResolved() { var promise = whenMoreResolved(); return promise != null ? promise.thenCompose(ClientHook::whenResolved) : CompletableFuture.completedFuture(null); } + /** + * Returns true if the capability was created as a result of assigning a Client to null or by + * reading a null pointer out of a Cap'n Proto message. + */ default boolean isNull() { return getBrand() == NULL_CAPABILITY_BRAND; } + /** + * Returns true if the capability was created by newBrokenCap(). + */ default boolean isError() { return getBrand() == BROKEN_CAPABILITY_BRAND; } + /** + * Implements {@link Capability.Client.getFd}. If this returns null but whenMoreResolved() returns + * non-null, then Capability::Client::getFd() waits for resolution and tries again. + */ default Integer getFd() { return null; } diff --git a/runtime/src/main/java/org/capnproto/RpcState.java b/runtime/src/main/java/org/capnproto/RpcState.java index 66ff27a..68379c1 100644 --- a/runtime/src/main/java/org/capnproto/RpcState.java +++ b/runtime/src/main/java/org/capnproto/RpcState.java @@ -403,17 +403,23 @@ final class RpcState { ClientHook restore() { var question = questions.next(); question.setAwaitingReturn(true); + + // Run the message loop until the boostrap promise is resolved. + var promise = new CompletableFuture(); + var loop = CompletableFuture.anyOf( + getMessageLoop(), promise).thenCompose(x -> promise); + int sizeHint = messageSizeHint() + RpcProtocol.Bootstrap.factory.structSize().total(); var message = connection.newOutgoingMessage(sizeHint); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap(); builder.setQuestionId(question.getId()); message.send(); - var pipeline = new RpcPipeline(question); + var pipeline = new RpcPipeline(question, promise); return pipeline.getPipelinedCap(new PipelineOp[0]); } - private final CompletableFuture doMessageLoop() { + private CompletableFuture doMessageLoop() { this.cleanupImports(); this.cleanupQuestions(); @@ -424,7 +430,7 @@ final class RpcState { return connection.receiveIncomingMessage().thenCompose(message -> { try { handleMessage(message); - } catch (Throwable rpcExc) { + } catch (Exception rpcExc) { // either we received an Abort message from peer // or internal RpcState is bad. return this.disconnect(rpcExc); @@ -467,7 +473,7 @@ final class RpcState { default: if (!isDisconnected()) { // boomin' back atcha - var msg = connection.newOutgoingMessage(BuilderArena.SUGGESTED_FIRST_SEGMENT_WORDS); + var msg = connection.newOutgoingMessage(); msg.getBody().initAs(RpcProtocol.Message.factory).setUnimplemented(reader); msg.send(); } @@ -493,17 +499,20 @@ final class RpcState { releaseExport(cap.getThirdPartyHosted().getVineId(), 1); break; case NONE: + // Should never happen. case RECEIVER_ANSWER: case RECEIVER_HOSTED: + // Nothing to do. break; } break; case EXCEPTION: + // Nothing to do break; } break; default: - // Peer unimplemented + assert false: "Peer did not implement required RPC message type. " + message.which().name(); break; } } @@ -537,17 +546,15 @@ final class RpcState { var payload = ret.initResults(); var content = payload.getContent().imbue(capTable); content.setAsCap(bootstrapInterface); - - var capTableArray = capTable.getTable(); - assert capTableArray.length != 0; - - var capHook = capTableArray[0]; - assert capHook != null; + var caps = capTable.getTable(); + var capHook = caps.length != 0 + ? caps[0] + : Capability.newNullCap(); var fds = List.of(); response.setFds(List.of()); - answer.resultExports = writeDescriptors(capTableArray, payload, fds); + answer.resultExports = writeDescriptors(caps, payload, fds); answer.pipeline = ops -> ops.length == 0 ? capHook : Capability.newBrokenCap("Invalid pipeline transform."); @@ -749,11 +756,7 @@ final class RpcState { return; } - if (imp.importClient != null) { - // It appears this is a valid entry on the import table, but was not expected to be a - // promise. - assert false: "Import already resolved."; - } + assert imp.importClient == null : "Import already resolved."; switch (resolve.which()) { case CAP: @@ -918,7 +921,7 @@ final class RpcState { return CompletableFuture.completedFuture(null); } - resolution = getInnermostClient(resolution); + resolution = this.getInnermostClient(resolution); var exp = exports.find(exportId); exportsByCap.remove(exp.clientHook); @@ -939,7 +942,7 @@ final class RpcState { // The new promise was not already in the table, therefore the existing export table // entry has now been repurposed to represent it. There is no need to send a resolve // message at all. We do, however, have to start resolving the next promise. - return resolveExportedPromise(exportId, more); + return this.resolveExportedPromise(exportId, more); } } } @@ -1381,7 +1384,7 @@ final class RpcState { } if (isConnected()) { - var message = connection.newOutgoingMessage(1024); + var message = connection.newOutgoingMessage(); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initReturn(); builder.setAnswerId(this.answerId); builder.setReleaseParamCaps(false); @@ -1449,11 +1452,11 @@ final class RpcState { private Throwable broken; final HashMap, ClientHook> clientMap = new HashMap<>(); - final CompletionStage redirectLater; - final CompletionStage resolveSelf; + final CompletableFuture redirectLater; + final CompletableFuture resolveSelf; RpcPipeline(Question question, - CompletionStage redirectLater) { + CompletableFuture redirectLater) { this.question = question; this.redirectLater = redirectLater; this.resolveSelf = this.redirectLater @@ -1481,15 +1484,21 @@ final class RpcState { var key = new ArrayList<>(Arrays.asList(ops)); var hook = this.clientMap.computeIfAbsent(key, k -> { switch (state) { - case WAITING: - if (redirectLater != null) { - // TODO implement redirect - assert false: "redirection not implemented"; - return null; + case WAITING: { + var pipelineClient = new PipelineClient(this.question, ops); + if (this.redirectLater == null) { + // This pipeline will never get redirected, so just return the PipelineClient. + return pipelineClient; } - return new PipelineClient(question, ops); + + var resolutionPromise = this.redirectLater.thenApply( + response -> response.getResults().getPipelinedCap(ops)); + return new PromiseClient(pipelineClient, resolutionPromise, null); + } + case RESOLVED: return resolved.getResults().getPipelinedCap(ops); + default: return Capability.newBrokenCap(broken); } @@ -1700,7 +1709,7 @@ final class RpcState { } @Override - public CompletionStage whenMoreResolved() { + public CompletableFuture whenMoreResolved() { return null; } } @@ -1868,7 +1877,7 @@ final class RpcState { } @Override - public CompletionStage whenMoreResolved() { + public CompletableFuture whenMoreResolved() { return null; }