diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index d6894e6..03a800c 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -238,11 +238,11 @@ final class RpcState { private final VatNetwork.Connection connection; private final CompletableFuture disconnectFulfiller; private Throwable disconnected = null; - private CompletableFuture messageReady = CompletableFuture.completedFuture(null); private final CompletableFuture messageLoop = new CompletableFuture<>(); // completes when the message loop exits private final ReferenceQueue questionRefs = new ReferenceQueue<>(); private final ReferenceQueue importRefs = new ReferenceQueue<>(); + private final Queue> lastEvals = new ArrayDeque<>(); RpcState(BootstrapFactory bootstrapFactory, VatNetwork.Connection connection, @@ -360,16 +360,8 @@ final class RpcState { } // Run func() before the next IO event. - private void evalLast(Callable func) { - this.messageReady = this.messageReady.thenCompose(x -> { - try { - func.call(); - } - catch (java.lang.Exception exc) { - return CompletableFuture.failedFuture(exc); - } - return CompletableFuture.completedFuture(null); - }); + private void evalLast(Callable func) { + this.lastEvals.add(func); } ClientHook restore() { @@ -402,7 +394,13 @@ final class RpcState { } try { this.handleMessage(message); - } catch (Exception rpcExc) { + + while (!this.lastEvals.isEmpty()) { + this.lastEvals.remove().call(); + } + + } + catch (Throwable rpcExc) { // either we received an Abort message from peer // or internal RpcState is bad. this.disconnect(rpcExc); @@ -410,8 +408,7 @@ final class RpcState { }); messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose(exc -> { - //System.out.println("Exception in startMessageLoop!"); - //exc.printStackTrace(); + //System.out.println("Exception in startMessageLoop!" + exc); return CompletableFuture.failedFuture(exc); }); } @@ -790,7 +787,6 @@ final class RpcState { // It appears this is a valid entry on the import table, but was not expected to be a // promise. assert imp.importClient == null : "Import already resolved."; - } private void handleRelease(RpcProtocol.Release.Reader release) { @@ -1053,9 +1049,11 @@ final class RpcState { var exp = exports.find(descriptor.getReceiverHosted()); if (exp == null) { return Capability.newBrokenCap("invalid 'receiverHosted' export ID"); - } else if (exp.clientHook.getBrand() == this) { + } + else if (exp.clientHook.getBrand() == this) { return new TribbleRaceBlocker(exp.clientHook); - } else { + } + else { return exp.clientHook; } } @@ -1513,9 +1511,9 @@ final class RpcState { private RpcResponse resolved; private Throwable broken; - final HashMap, ClientHook> clientMap = new HashMap<>(); - final CompletableFuture redirectLater; - final CompletableFuture resolveSelf; + private final HashMap, ClientHook> clientMap = new HashMap<>(); + private final CompletableFuture redirectLater; + private final CompletableFuture resolveSelf; RpcPipeline(QuestionRef questionRef, CompletableFuture redirectLater) { @@ -1523,9 +1521,10 @@ final class RpcState { assert redirectLater != null; this.redirectLater = redirectLater; this.resolveSelf = this.redirectLater - .thenAccept(response -> { + .thenApply(response -> { this.state = PipelineState.RESOLVED; this.resolved = response; + return response; }) .exceptionally(exc -> { this.state = PipelineState.BROKEN; @@ -1534,8 +1533,12 @@ final class RpcState { }); } + /** + * Construct a new RpcPipeline that is never expected to resolve. + */ RpcPipeline(QuestionRef questionRef) { this(questionRef, null); + // TODO implement tail calls... } @Override @@ -1552,13 +1555,15 @@ final class RpcState { return pipelineClient; } - var resolutionPromise = this.redirectLater.thenApply( + assert this.resolveSelf != null; + var resolutionPromise = this.resolveSelf.thenApply( response -> response.getResults().getPipelinedCap(ops)); return new PromiseClient(pipelineClient, resolutionPromise, null); } case RESOLVED: - return resolved.getResults().getPipelinedCap(ops); + assert this.resolved != null; + return this.resolved.getResults().getPipelinedCap(ops); default: return Capability.newBrokenCap(broken); @@ -1807,7 +1812,7 @@ final class RpcState { private class PromiseClient extends RpcClient { - private final ClientHook cap; + private ClientHook cap; private final Integer importId; private boolean receivedCall = false; private ResolutionType resolutionType = ResolutionType.UNRESOLVED; @@ -1940,7 +1945,7 @@ final class RpcState { var message = connection.newOutgoingMessage(sizeHint); var disembargo = message.getBody().initAs(RpcProtocol.Message.factory).initDisembargo(); var redirect = RpcState.this.writeTarget(cap, disembargo.initTarget()); - assert redirect == null; + assert redirect == null: "Original promise target should always be from this RPC connection."; var embargo = embargos.next(); disembargo.getContext().setSenderLoopback(embargo.id); @@ -1952,6 +1957,7 @@ final class RpcState { message.send(); } + this.cap = replacement; return replacement; } }