From c49221c2e9a2fc5a37d7ad7cd71046c6a94071df Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Wed, 14 Oct 2020 15:54:44 +0100 Subject: [PATCH] add cleanup to questions and imports --- .../src/main/java/org/capnproto/RpcState.java | 89 +++++++++++++------ 1 file changed, 64 insertions(+), 25 deletions(-) diff --git a/runtime/src/main/java/org/capnproto/RpcState.java b/runtime/src/main/java/org/capnproto/RpcState.java index 1227434..c2da6a1 100644 --- a/runtime/src/main/java/org/capnproto/RpcState.java +++ b/runtime/src/main/java/org/capnproto/RpcState.java @@ -7,7 +7,7 @@ import java.util.concurrent.CompletionStage; final class RpcState { - static final class Question { + final class Question { final int id; CompletableFuture response = new CompletableFuture<>(); List paramExports; @@ -21,10 +21,29 @@ final class RpcState { void reject(Throwable exc) { this.response.completeExceptionally(exc); + this.finish(); } void answer(RpcResponse response) { this.response.complete(response); + this.finish(); + } + + void finish() { + assert questions.find(this.id) != null : "Question ID no longer on table?"; + if (isConnected() && !this.skipFinish) { + var message = connection.newOutgoingMessage(1024); + var builder = message.getBody().getAs(RpcProtocol.Message.factory).initFinish(); + builder.setQuestionId(this.id); + builder.setReleaseResultCaps(this.isAwaitingReturn); + message.send(); + } + + // Check if the question has returned and, if so, remove it from the table. + // Remove question ID from the table. Must do this *after* sending `Finish` to ensure that + // the ID is not re-allocated before the `Finish` message can be sent. + assert !this.isAwaitingReturn; + questions.erase(id, this); } } @@ -411,7 +430,9 @@ final class RpcState { } var payload = callReturn.getResults(); var capTable = receiveCaps(payload.getCapTable(), message.getAttachedFds()); - var response = new RpcResponseImpl(question, message, capTable, payload.getContent()); + // TODO question, message unused in RpcResponseImpl + // var response = new RpcResponseImpl(question, message, capTable, payload.getContent()); + var response = new RpcResponseImpl(capTable, payload.getContent()); question.answer(response); break; @@ -587,6 +608,7 @@ final class RpcState { return; } } + private List writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List fds) { if (capTable.length == 0) { return List.of(); @@ -900,16 +922,17 @@ final class RpcState { } static class RpcResponseImpl implements RpcResponse { - private final Question question; - private final IncomingRpcMessage message; + // TODO unused? + // private final Question question; + // private final IncomingRpcMessage message; private final AnyPointer.Reader results; - RpcResponseImpl(Question question, - IncomingRpcMessage message, + RpcResponseImpl(/*Question question, + IncomingRpcMessage message,*/ List capTable, AnyPointer.Reader results) { - this.question = question; - this.message = message; + // this.question = question; + // this.message = message; this.results = results.imbue(new ReaderCapabilityTable(capTable)); } @@ -1048,7 +1071,7 @@ final class RpcState { return null; } - RpcResponse consumeRedirectedResponse() { + private RpcResponse consumeRedirectedResponse() { assert this.redirectResults; if (this.response == null) { @@ -1058,11 +1081,11 @@ final class RpcState { return ((LocallyRedirectedRpcResponse) this.response); } - void sendReturn() { + private void sendReturn() { assert !redirectResults; if (!this.cancelRequested && isDisconnected()) { - assert false: "Cancellation should have been requested on disconnect."; + assert false : "Cancellation should have been requested on disconnect."; return; } @@ -1073,17 +1096,20 @@ final class RpcState { this.returnMessage.setAnswerId(this.answerId); this.returnMessage.setReleaseParamCaps(false); - List exports; + List exports = List.of(); try { exports = ((RpcServerResponseImpl) response).send(); - } - catch (Throwable exc) { + } catch (Throwable exc) { this.responseSent = false; sendErrorReturn(exc); } + + // If no caps in the results, the pipeline is irrelevant. + boolean shouldFreePipeline = exports.isEmpty(); + cleanupAnswerTable(exports, shouldFreePipeline); } - void sendErrorReturn(Throwable exc) { + private void sendErrorReturn(Throwable exc) { assert !redirectResults; if (!isFirstResponder()) { @@ -1099,10 +1125,10 @@ final class RpcState { message.send(); } - cleanupAnswerTable(null, false); + cleanupAnswerTable(List.of(), false); } - boolean isFirstResponder() { + private boolean isFirstResponder() { if (this.responseSent) { return false; } @@ -1110,7 +1136,7 @@ final class RpcState { return true; } - void cleanupAnswerTable(List resultExports, boolean shouldFreePipeline) { + private void cleanupAnswerTable(List resultExports, boolean shouldFreePipeline) { if (this.cancelRequested) { assert resultExports.size() == 0; answers.erase(this.answerId); @@ -1151,7 +1177,7 @@ final class RpcState { WAITING, RESOLVED, BROKEN } - class RpcPipeline implements PipelineHook { + private class RpcPipeline implements PipelineHook { private final Question question; private PipelineState state = PipelineState.WAITING; @@ -1188,7 +1214,7 @@ final class RpcState { @Override public ClientHook getPipelinedCap(PipelineOp[] ops) { // TODO avoid conversion to/from ArrayList? - var key = new ArrayList<>(Arrays.asList(ops)); + var key = new ArrayList<>(Arrays.asList(ops)); var hook = this.clientMap.computeIfAbsent(key, k -> { switch (state) { case WAITING: @@ -1269,10 +1295,11 @@ final class RpcState { this.paramsBuilder = callBuilder.getParams().getContent().imbue(this.capTable); } - AnyPointer.Builder getRoot() { + private AnyPointer.Builder getRoot() { return this.paramsBuilder; } - RpcProtocol.Call.Builder getCall() { + + private RpcProtocol.Call.Builder getCall() { return this.callBuilder; } @@ -1353,7 +1380,7 @@ final class RpcState { } } - class ImportClient extends RpcClient { + private class ImportClient extends RpcClient { final int importId; int remoteRefCount = 0; @@ -1374,8 +1401,8 @@ final class RpcState { } } - public void dispose() { - // TODO manage destruction... + public void remove() { + // Remove self from the import table. var imp = imports.find(importId); if (imp != null) { if (imp.importClient == this) { @@ -1383,6 +1410,7 @@ final class RpcState { } } + // Send a message releasing our remote references. if (remoteRefCount > 0 && !isDisconnected()) { var message = connection.newOutgoingMessage(1024); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initRelease(); @@ -1541,6 +1569,17 @@ final class RpcState { public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) { return null; } + + public void remove() { + if (this.importId != null) { + // This object represents an import promise. Clean that up. + var imp = imports.find(this.importId); + if (imp.appClient != null && imp.appClient == this) { + imp.appClient = null; + imp.importClient.remove(); + } + } + } } class PipelineClient extends RpcClient {