diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index df32bd7..e922b66 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -60,7 +60,7 @@ final class RpcState { Question(int id) { this.id = id; - this.selfRef = new QuestionRef(this.id);; + this.selfRef = new QuestionRef(this.id); this.disposer = new QuestionDisposer(this.selfRef); } @@ -239,9 +239,9 @@ final class RpcState { }; private final Map exportsByCap = new HashMap<>(); - private final BootstrapFactory bootstrapFactory; + private final BootstrapFactory bootstrapFactory; private final VatNetwork.Connection connection; - private final CompletableFuture disconnectFulfiller; + private final CompletableFuture disconnectFulfiller; private Throwable disconnected = null; private final CompletableFuture messageLoop = new CompletableFuture<>(); // completes when the message loop exits @@ -249,9 +249,9 @@ final class RpcState { private final ReferenceQueue importRefs = new ReferenceQueue<>(); private final Queue> lastEvals = new ArrayDeque<>(); - RpcState(BootstrapFactory bootstrapFactory, + RpcState(BootstrapFactory bootstrapFactory, VatNetwork.Connection connection, - CompletableFuture disconnectFulfiller) { + CompletableFuture disconnectFulfiller) { this.bootstrapFactory = bootstrapFactory; this.connection = connection; this.disconnectFulfiller = disconnectFulfiller; @@ -420,40 +420,22 @@ final class RpcState { }); messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose( - exc -> CompletableFuture.failedFuture(exc)); + CompletableFuture::failedFuture); } private void handleMessage(IncomingRpcMessage message) throws RpcException { var reader = message.getBody().getAs(RpcProtocol.Message.factory); switch (reader.which()) { - case UNIMPLEMENTED: - handleUnimplemented(reader.getUnimplemented()); - break; - case ABORT: - handleAbort(reader.getAbort()); - break; - case BOOTSTRAP: - handleBootstrap(message, reader.getBootstrap()); - break; - case CALL: - handleCall(message, reader.getCall()); - return; - case RETURN: - handleReturn(message, reader.getReturn()); - break; - case FINISH: - handleFinish(reader.getFinish()); - break; - case RESOLVE: - handleResolve(message, reader.getResolve()); - break; - case DISEMBARGO: - handleDisembargo(reader.getDisembargo()); - break; - case RELEASE: - handleRelease(reader.getRelease()); - break; - default: { + case UNIMPLEMENTED -> handleUnimplemented(reader.getUnimplemented()); + case ABORT -> handleAbort(reader.getAbort()); + case BOOTSTRAP -> handleBootstrap(reader.getBootstrap()); + case CALL -> handleCall(message, reader.getCall()); + case RETURN -> handleReturn(message, reader.getReturn()); + case FINISH -> handleFinish(reader.getFinish()); + case RESOLVE -> handleResolve(message, reader.getResolve()); + case DISEMBARGO -> handleDisembargo(reader.getDisembargo()); + case RELEASE -> handleRelease(reader.getRelease()); + default -> { LOGGER.warning(() -> this.toString() + ": < Unhandled RPC message: " + reader.which().toString()); if (!isDisconnected()) { // boomin' back atcha @@ -462,7 +444,6 @@ final class RpcState { LOGGER.info(() -> this.toString() + ": > UNIMPLEMENTED"); msg.send(); } - break; } } @@ -514,7 +495,7 @@ final class RpcState { throw exc; } - void handleBootstrap(IncomingRpcMessage message, RpcProtocol.Bootstrap.Reader bootstrap) { + void handleBootstrap(RpcProtocol.Bootstrap.Reader bootstrap) { LOGGER.info(() -> this.toString() + ": < BOOTSTRAP question=" + bootstrap.getQuestionId()); if (isDisconnected()) { return; @@ -570,15 +551,12 @@ final class RpcState { boolean redirectResults; switch (call.getSendResultsTo().which()) { - case CALLER: - redirectResults = false; - break; - case YOURSELF: - redirectResults = true; - break; - default: - assert false: "Unsupported 'Call.sendResultsTo'."; + case CALLER -> redirectResults = false; + case YOURSELF -> redirectResults = true; + default -> { + assert false : "Unsupported 'Call.sendResultsTo'."; return; + } } var payload = call.getParams(); @@ -774,43 +752,34 @@ final class RpcState { private void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) { LOGGER.info(() -> this.toString() + ": < RESOLVE promise=" + resolve.getPromiseId()); - ClientHook cap = null; - Throwable exc = null; - - switch (resolve.which()) { - case CAP: - cap = receiveCap(resolve.getCap(), message.getAttachedFds()); - break; - case EXCEPTION: - exc = ToException(resolve.getException()); - break; - default: - assert false: "Unknown 'Resolve' type."; - return; - } - var importId = resolve.getPromiseId(); - var imp = this.imports.find(resolve.getPromiseId()); + var imp = this.imports.find(importId); if (imp == null) { return; } - if (imp.promise != null) { - // This import is an unfulfilled promise. - - assert !imp.promise.isDone(); - if (exc == null) { - imp.promise.complete(cap); - } - else { - imp.promise.completeExceptionally(exc); - } + if (imp.promise == null) { + assert imp.importClient == null : "Import already resolved."; + // It appears this is a valid entry on the import table, but was not expected to be a + // promise. return; } - // It appears this is a valid entry on the import table, but was not expected to be a - // promise. - assert imp.importClient == null : "Import already resolved."; + // This import is an unfulfilled promise. + assert !imp.promise.isDone(); + switch (resolve.which()) { + case CAP -> { + var cap = receiveCap(resolve.getCap(), message.getAttachedFds()); + imp.promise.complete(cap); + } + case EXCEPTION -> { + var exc = ToException(resolve.getException()); + imp.promise.completeExceptionally(exc); + } + default -> { + assert false : "Unknown 'Resolve' type."; + } + } } private void handleRelease(RpcProtocol.Release.Reader release) { @@ -842,8 +811,8 @@ final class RpcState { return; } - final var embargoId = ctx.getSenderLoopback(); - final var rpcTarget = (RpcClient) target; + var embargoId = ctx.getSenderLoopback(); + var rpcTarget = (RpcClient) target; Callable sendDisembargo = () -> { if (isDisconnected()) { @@ -877,14 +846,13 @@ final class RpcState { assert false: "Invalid embargo ID in 'Disembargo.context.receiverLoopback'."; return; } - assert embargo.disembargo != null; embargo.disembargo.complete(null); embargos.erase(ctx.getReceiverLoopback(), embargo); break; default: assert false: "Unimplemented Disembargo type. " + ctx.which(); - return; + break; } } @@ -1169,7 +1137,7 @@ final class RpcState { ClientHook getMessageTarget(RpcProtocol.MessageTarget.Reader target) { switch (target.which()) { - case IMPORTED_CAP: + case IMPORTED_CAP -> { var exp = exports.find(target.getImportedCap()); if (exp != null) { return exp.clientHook; @@ -1178,8 +1146,8 @@ final class RpcState { assert false: "Message target is not a current export ID."; return null; } - - case PROMISED_ANSWER: + } + case PROMISED_ANSWER -> { var promisedAnswer = target.getPromisedAnswer(); var questionId = promisedAnswer.getQuestionId(); var base = answers.put(questionId); @@ -1187,22 +1155,21 @@ final class RpcState { assert false: "PromisedAnswer.questionId is not a current question."; return null; } - var pipeline = base.pipeline; if (pipeline == null) { pipeline = PipelineHook.newBrokenPipeline( RpcException.failed("Pipeline call on a request that returned no capabilities or was already closed.")); } - var ops = ToPipelineOps(promisedAnswer); if (ops == null) { return null; } return pipeline.getPipelinedCap(ops); - - default: + } + default -> { assert false: "Unknown message target type. " + target.which(); return null; + } } } @@ -1317,7 +1284,7 @@ final class RpcState { // response private RpcServerResponse response; private RpcProtocol.Return.Builder returnMessage; - private boolean redirectResults = false; + private final boolean redirectResults; private boolean responseSent = false; private CompletableFuture tailCallPipeline; @@ -1377,7 +1344,7 @@ final class RpcState { @Override public void allowCancellation() { - boolean previouslyRequestedButNotAllowed = (this.cancelAllowed == false && this.cancelRequested == true); + boolean previouslyRequestedButNotAllowed = (!this.cancelAllowed && this.cancelRequested); this.cancelAllowed = true; if (previouslyRequestedButNotAllowed) { @@ -1423,9 +1390,8 @@ final class RpcState { // Just forward to another local call var response = request.send(); - var promise = response.thenAccept(results -> { - getResults(0).setAs(AnyPointer.factory, results); - }); + var promise = response.thenAccept( + results -> getResults(0).setAs(AnyPointer.factory, results)); return new ClientHook.VoidPromiseAndPipeline(promise, response.pipeline().hook); } @@ -1703,7 +1669,7 @@ final class RpcState { return replacement.send(); } - final var questionRef = sendInternal(false); + var questionRef = sendInternal(false); // The pipeline must get notified of resolution before the app does to maintain ordering. var pipeline = new RpcPipeline(questionRef, questionRef.response); @@ -1815,7 +1781,7 @@ final class RpcState { private void cleanupImports() { while (true) { - var ref = (ImportRef) this.importRefs.poll(); + var ref = (ImportRef)this.importRefs.poll(); if (ref == null) { return; } @@ -1985,7 +1951,7 @@ final class RpcState { var embargo = embargos.next(); disembargo.getContext().setSenderLoopback(embargo.id); - final ClientHook finalReplacement = replacement; + ClientHook finalReplacement = replacement; var embargoPromise = embargo.disembargo.thenApply( void_ -> finalReplacement); replacement = Capability.newLocalPromiseClient(embargoPromise); @@ -2038,13 +2004,10 @@ final class RpcState { static void FromPipelineOps(PipelineOp[] ops, RpcProtocol.PromisedAnswer.Builder builder) { var transforms = builder.initTransform(ops.length); for (int ii = 0; ii < ops.length; ++ii) { + var transform = transforms.get(ii); switch (ops[ii].type) { - case NOOP: - transforms.get(ii).setNoop(null); - break; - case GET_POINTER_FIELD: - transforms.get(ii).setGetPointerField(ops[ii].pointerIndex); - break; + case NOOP -> transform.setNoop(null); + case GET_POINTER_FIELD -> transform.setGetPointerField(ops[ii].pointerIndex); } } } @@ -2079,7 +2042,6 @@ final class RpcState { case OVERLOADED -> RpcProtocol.Exception.Type.OVERLOADED; case DISCONNECTED -> RpcProtocol.Exception.Type.DISCONNECTED; case UNIMPLEMENTED -> RpcProtocol.Exception.Type.UNIMPLEMENTED; - default -> RpcProtocol.Exception.Type.FAILED; }; } builder.setType(type); @@ -2091,7 +2053,6 @@ final class RpcState { static RpcException ToException(RpcProtocol.Exception.Reader reader) { var type = switch (reader.getType()) { - case FAILED -> RpcException.Type.FAILED; case OVERLOADED -> RpcException.Type.OVERLOADED; case DISCONNECTED -> RpcException.Type.DISCONNECTED; case UNIMPLEMENTED -> RpcException.Type.UNIMPLEMENTED;