diff --git a/runtime/src/main/java/org/capnproto/RpcState.java b/runtime/src/main/java/org/capnproto/RpcState.java index e56fde5..453be6e 100644 --- a/runtime/src/main/java/org/capnproto/RpcState.java +++ b/runtime/src/main/java/org/capnproto/RpcState.java @@ -445,6 +445,30 @@ final class RpcState { } void handleFinish(RpcProtocol.Finish.Reader finish) { + List exportsToRelease = null; + var answer = answers.find(finish.getQuestionId()); + if (answer == null || !answer.active) { + assert false: "'Finish' for invalid question ID."; + return; + } + + if (finish.getReleaseResultCaps()) { + exportsToRelease = answer.resultExports; + } + answer.resultExports = null; + + var pipelineToRelease = answer.pipeline; + answer.pipeline = null; + + // If the call isn't actually done yet, cancel it. Otherwise, we can go ahead and erase the + // question from the table. + var ctx = answer.callContext; + if (ctx != null) { + ctx.requestCancel(); + } + else { + answers.erase(finish.getQuestionId()); + } } void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) { @@ -1003,15 +1027,15 @@ final class RpcState { } RpcResponse consumeRedirectedResponse() { - assert this.redirectResults; + assert this.redirectResults; - if (this.response == null) { - getResults(); // force initialization of response - } - - return ((LocallyRedirectedRpcResponse)this.response); + if (this.response == null) { + getResults(); // force initialization of response } + return ((LocallyRedirectedRpcResponse) this.response); + } + void sendReturn() { assert !redirectResults; @@ -1029,7 +1053,7 @@ final class RpcState { List exports; try { - exports = ((RpcServerResponseImpl)response).send(); + exports = ((RpcServerResponseImpl) response).send(); } catch (Throwable exc) { this.responseSent = false; @@ -1081,6 +1105,24 @@ final class RpcState { } } } + + public void requestCancel() { + // Hints that the caller wishes to cancel this call. At the next time when cancellation is + // deemed safe, the RpcCallContext shall send a canceled Return -- or if it never becomes + // safe, the RpcCallContext will send a normal return when the call completes. Either way + // the RpcCallContext is now responsible for cleaning up the entry in the answer table, since + // a Finish message was already received. + + boolean previouslyAllowedButNotRequested = (this.cancelAllowed && !this.cancelRequested); + this.cancelRequested = true; + + if (previouslyAllowedButNotRequested) { + // We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously. Initiate + // the cancellation. + this.cancelled.complete(null); + } + // TODO do we care about cancelRequested if further completions are effectively ignored? + } } enum PipelineState {