handleFinish

This commit is contained in:
Vaci Koblizek 2020-10-02 22:46:29 +01:00
parent 6bd1411c26
commit 734af71659

View file

@ -445,6 +445,30 @@ final class RpcState {
} }
void handleFinish(RpcProtocol.Finish.Reader finish) { void handleFinish(RpcProtocol.Finish.Reader finish) {
List<Integer> exportsToRelease = null;
var answer = answers.find(finish.getQuestionId());
if (answer == null || !answer.active) {
assert false: "'Finish' for invalid question ID.";
return;
}
if (finish.getReleaseResultCaps()) {
exportsToRelease = answer.resultExports;
}
answer.resultExports = null;
var pipelineToRelease = answer.pipeline;
answer.pipeline = null;
// If the call isn't actually done yet, cancel it. Otherwise, we can go ahead and erase the
// question from the table.
var ctx = answer.callContext;
if (ctx != null) {
ctx.requestCancel();
}
else {
answers.erase(finish.getQuestionId());
}
} }
void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) { void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) {
@ -1003,15 +1027,15 @@ final class RpcState {
} }
RpcResponse consumeRedirectedResponse() { RpcResponse consumeRedirectedResponse() {
assert this.redirectResults; assert this.redirectResults;
if (this.response == null) { if (this.response == null) {
getResults(); // force initialization of response getResults(); // force initialization of response
}
return ((LocallyRedirectedRpcResponse)this.response);
} }
return ((LocallyRedirectedRpcResponse) this.response);
}
void sendReturn() { void sendReturn() {
assert !redirectResults; assert !redirectResults;
@ -1029,7 +1053,7 @@ final class RpcState {
List<Integer> exports; List<Integer> exports;
try { try {
exports = ((RpcServerResponseImpl)response).send(); exports = ((RpcServerResponseImpl) response).send();
} }
catch (Throwable exc) { catch (Throwable exc) {
this.responseSent = false; this.responseSent = false;
@ -1081,6 +1105,24 @@ final class RpcState {
} }
} }
} }
public void requestCancel() {
// Hints that the caller wishes to cancel this call. At the next time when cancellation is
// deemed safe, the RpcCallContext shall send a canceled Return -- or if it never becomes
// safe, the RpcCallContext will send a normal return when the call completes. Either way
// the RpcCallContext is now responsible for cleaning up the entry in the answer table, since
// a Finish message was already received.
boolean previouslyAllowedButNotRequested = (this.cancelAllowed && !this.cancelRequested);
this.cancelRequested = true;
if (previouslyAllowedButNotRequested) {
// We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously. Initiate
// the cancellation.
this.cancelled.complete(null);
}
// TODO do we care about cancelRequested if further completions are effectively ignored?
}
} }
enum PipelineState { enum PipelineState {