diff --git a/runtime/src/main/java/org/capnproto/RpcState.java b/runtime/src/main/java/org/capnproto/RpcState.java index da61be9..b76cb41 100644 --- a/runtime/src/main/java/org/capnproto/RpcState.java +++ b/runtime/src/main/java/org/capnproto/RpcState.java @@ -1,8 +1,11 @@ package org.capnproto; import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.FutureTask; +import java.util.function.Function; final class RpcState { @@ -117,6 +120,18 @@ final class RpcState { return !isDisconnected(); } + // Run func() before the next IO event. + private CompletableFuture evalLast(Callable func) { + return this.messageReady.thenCompose(x -> { + try { + return CompletableFuture.completedFuture(func.call()); + } + catch (java.lang.Exception exc) { + return CompletableFuture.failedFuture(exc); + } + }); + } + ClientHook restore() { var question = questions.next(); question.isAwaitingReturn = true; @@ -276,7 +291,7 @@ final class RpcState { var payload = ret.initResults(); var content = payload.getContent().imbue(capTable); - content.setAsCapability(bootstrapInterface); + content.setAsCap(bootstrapInterface); var capTableArray = capTable.getTable(); assert capTableArray.length != 0; @@ -531,28 +546,31 @@ final class RpcState { return; } - var embargoId = ctx.getSenderLoopback(); + final var embargoId = ctx.getSenderLoopback(); + final var rpcTarget = (RpcClient) target; - // TODO run this later... - if (isDisconnected()) { - return; - } + Callable sendDisembargo = () -> { + if (isDisconnected()) { + return null; + } - var rpcTarget = (RpcClient) target; - var message = connection.newOutgoingMessage(1024); - var builder = message.getBody().initAs(RpcProtocol.Message.factory).initDisembargo(); - var redirect = rpcTarget.writeTarget(builder.initTarget()); - // Disembargoes should only be sent to capabilities that were previously the subject of - // a `Resolve` message. But `writeTarget` only ever returns non-null when called on - // a PromiseClient. The code which sends `Resolve` and `Return` should have replaced - // any promise with a direct node in order to solve the Tribble 4-way race condition. - // See the documentation of Disembargo in rpc.capnp for more. - if (redirect == null) { - assert false: "'Disembargo' of type 'senderLoopback' sent to an object that does not appear to have been the subject of a previous 'Resolve' message."; - return; - } - builder.getContext().setReceiverLoopback(embargoId); - message.send(); + var message = connection.newOutgoingMessage(1024); + var builder = message.getBody().initAs(RpcProtocol.Message.factory).initDisembargo(); + var redirect = rpcTarget.writeTarget(builder.initTarget()); + // Disembargoes should only be sent to capabilities that were previously the subject of + // a `Resolve` message. But `writeTarget` only ever returns non-null when called on + // a PromiseClient. The code which sends `Resolve` and `Return` should have replaced + // any promise with a direct node in order to solve the Tribble 4-way race condition. + // See the documentation of Disembargo in rpc.capnp for more. + if (redirect == null) { + assert false : "'Disembargo' of type 'senderLoopback' sent to an object that does not appear to have been the subject of a previous 'Resolve' message."; + return null; + } + builder.getContext().setReceiverLoopback(embargoId); + message.send(); + return null; + }; + evalLast(sendDisembargo); break; case RECEIVER_LOOPBACK: @@ -1274,12 +1292,18 @@ final class RpcState { var appPromise = question.response.thenApply(response -> { var results = response.getResults(); - return new Response(results, response); + return new Response<>(AnyPointer.factory, results, response); }); return new RemotePromise<>(appPromise, pipeline); } + @Override + public CompletableFuture sendStreaming() { + // TODO falling back to regular send for now... + return send().ignoreResult(); + } + Question sendInternal(boolean isTailCall) { // TODO refactor var fds = List.of();