From c2423d453e7f62a6be9e32edd107476bf6ea7510 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Mon, 16 Nov 2020 10:39:05 +0000 Subject: [PATCH] improve question lifecycle handling A specialised export table was a bad idea. Stick more closely to C++ implentation of QuestionRef. --- .../src/main/java/org/capnproto/RpcState.java | 262 +++++++++--------- .../src/test/java/org/capnproto/RpcTest.java | 3 + 2 files changed, 129 insertions(+), 136 deletions(-) diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index aa221a4..f739710 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -11,7 +11,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; -import java.util.function.Consumer; final class RpcState { @@ -46,14 +45,20 @@ final class RpcState { } } - private final class QuestionDisposer { + private final class Question { final int id; boolean skipFinish; boolean isAwaitingReturn; + int[] paramExports = new int[0]; + boolean isTailCall = false; + QuestionRef selfRef; + private final WeakReference disposer; - QuestionDisposer(int id) { + Question(int id) { this.id = id; + this.selfRef = new QuestionRef(this.id);; + this.disposer = new QuestionDisposer(this.selfRef); } void finish() { @@ -66,109 +71,62 @@ final class RpcState { message.send(); } this.skipFinish = true; - } - - void dispose() { - this.finish(); // Check if the question has returned and, if so, remove it from the table. // Remove question ID from the table. Must do this *after* sending `Finish` to ensure that // the ID is not re-allocated before the `Finish` message can be sent. if (!this.isAwaitingReturn) { - questions.erase(this.id); + questions.erase(this.id, this); } } } - private final class QuestionRef extends WeakReference { - private final QuestionDisposer disposer; + /** + * A reference to an entry on the question table. + */ + private final class QuestionRef { - QuestionRef(Question question, ReferenceQueue queue) { - super(question, queue); - this.disposer = question.disposer; + private final int questionId; + CompletableFuture response = new CompletableFuture<>(); + + QuestionRef(int questionId) { + this.questionId = questionId; + } + + void fulfill(Throwable exc) { + this.response.completeExceptionally(exc); + this.finish(); + } + + void fulfill(RpcResponse response) { + this.response.complete(response); + this.finish(); + } + + private void finish() { + // We no longer need access to the questionRef in order to complete it. + // Dropping the selfRef releases the question for disposal once all other + // references are gone. + var question = questions.find(this.questionId); + if (question != null) { + question.selfRef = null; + } + } + } + + private final class QuestionDisposer extends WeakReference { + private final int questionId; + + QuestionDisposer(QuestionRef questionRef) { + super(questionRef, questionRefs); + this.questionId = questionRef.questionId; } void dispose() { - this.disposer.dispose(); - } - } - - private class Question { - - CompletableFuture response = new CompletableFuture<>(); - int[] paramExports = new int[0]; - private final QuestionDisposer disposer; - boolean isTailCall = false; - - Question(int id) { - this.disposer = new QuestionDisposer(id); - } - - int getId() { - return this.disposer.id; - } - - boolean isAwaitingReturn() { - return this.disposer.isAwaitingReturn; - } - - public void setAwaitingReturn(boolean value) { - this.disposer.isAwaitingReturn = value; - } - - void reject(Throwable exc) { - this.response.completeExceptionally(exc); - } - - void answer(RpcResponse response) { - this.response.complete(response); - } - - void setSkipFinish(boolean value) { - this.disposer.skipFinish = value; - } - - public void finish() { - this.disposer.finish(); - } - } - - class QuestionExportTable { - private final HashMap> slots = new HashMap<>(); - private final Queue freeIds = new PriorityQueue<>(); - private int max = 0; - - public Question find(int id) { - var ref = this.slots.get(id); - return ref == null ? null : ref.get(); - } - - public Question erase(int id) { - var value = this.slots.get(id); - if (value != null) { - freeIds.add(id); - this.slots.remove(id); - return value.get(); - } else { - return null; - } - } - - public Question next() { - int id = freeIds.isEmpty() ? max++ : freeIds.remove(); - var value = new Question(id); - var prev = slots.put(id, new QuestionRef(value, questionRefs)); - assert prev == null; - return value; - } - - public void forEach(Consumer action) { - for (var entry: this.slots.values()) { - var question = entry.get(); - if (question != null) { - action.accept(question); - } + var question = questions.find(this.questionId); + if (question != null) { + question.finish(); } } } @@ -247,7 +205,12 @@ final class RpcState { } }; - private final QuestionExportTable questions = new QuestionExportTable(); + private final ExportTable questions = new ExportTable<>() { + @Override + Question newExportable(int id) { + return new Question(id); + } + }; private final ImportTable answers = new ImportTable<>() { @Override @@ -278,7 +241,7 @@ final class RpcState { private CompletableFuture messageReady = CompletableFuture.completedFuture(null); private final CompletableFuture messageLoop = new CompletableFuture<>(); // completes when the message loop exits - private final ReferenceQueue questionRefs = new ReferenceQueue<>(); + private final ReferenceQueue questionRefs = new ReferenceQueue<>(); private final ReferenceQueue importRefs = new ReferenceQueue<>(); RpcState(BootstrapFactory bootstrapFactory, @@ -303,7 +266,12 @@ final class RpcState { var networkExc = RpcException.disconnected(exc.getMessage()); // All current questions complete with exceptions. - questions.forEach(question -> question.reject(networkExc)); + for (var question: questions) { + var questionRef = question.selfRef; + if (questionRef != null) { + questionRef.fulfill(networkExc); + } + } List pipelinesToRelease = new ArrayList<>(); List clientsToRelease = new ArrayList<>(); @@ -406,14 +374,17 @@ final class RpcState { ClientHook restore() { var question = questions.next(); - question.setAwaitingReturn(true); + question.isAwaitingReturn = true; + var questionRef = question.selfRef; var promise = new CompletableFuture(); + var pipeline = new RpcPipeline(questionRef, promise); + int sizeHint = messageSizeHint(RpcProtocol.Bootstrap.factory); var message = connection.newOutgoingMessage(sizeHint); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap(); - builder.setQuestionId(question.getId()); + builder.setQuestionId(question.id); message.send(); - var pipeline = new RpcPipeline(question, promise); + return pipeline.getPipelinedCap(new PipelineOp[0]); } @@ -440,7 +411,8 @@ final class RpcState { }); messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose(exc -> { - assert exc == null: "Exception in startMessageLoop!"; + //System.out.println("Exception in startMessageLoop!"); + //exc.printStackTrace(); return CompletableFuture.failedFuture(exc); }); } @@ -660,11 +632,11 @@ final class RpcState { return; } - if (!question.isAwaitingReturn()) { + if (!question.isAwaitingReturn) { assert false: "Duplicate Return"; return; } - question.setAwaitingReturn(false); + question.isAwaitingReturn = false; int[] exportsToRelease = null; if (callReturn.getReleaseParamCaps()) { @@ -672,11 +644,21 @@ final class RpcState { question.paramExports = null; } - if (callReturn.isTakeFromOtherQuestion()) { - var answer = this.answers.find(callReturn.getTakeFromOtherQuestion()); - if (answer != null) { - answer.redirectedResults = null; + var questionRef = question.selfRef; + if (questionRef == null) { + if (callReturn.isTakeFromOtherQuestion()) { + var answer = this.answers.find(callReturn.getTakeFromOtherQuestion()); + if (answer != null) { + answer.redirectedResults = null; + } } + + // Looks like this question was canceled earlier, so `Finish` was already sent, with + // `releaseResultCaps` set true so that we don't have to release them here. We can go + // ahead and delete it from the table. + // TODO Should we do this? + questions.erase(callReturn.getAnswerId(), question); + if (exportsToRelease != null) { this.releaseExports(exportsToRelease); } @@ -692,8 +674,8 @@ final class RpcState { var payload = callReturn.getResults(); var capTable = receiveCaps(payload.getCapTable(), message.getAttachedFds()); - var response = new RpcResponseImpl(capTable, payload.getContent()); - question.answer(response); + var response = new RpcResponseImpl(questionRef, message, capTable, payload.getContent()); + questionRef.fulfill(response); break; case EXCEPTION: @@ -701,7 +683,7 @@ final class RpcState { assert false: "Tail call `Return` must set `resultsSentElsewhere`, not `exception`."; break; } - question.reject(ToException(callReturn.getException())); + questionRef.fulfill(ToException(callReturn.getException())); break; case CANCELED: @@ -714,7 +696,7 @@ final class RpcState { break; } // Tail calls are fulfilled with a null pointer. - question.answer(() -> null); + questionRef.fulfill(() -> null); break; case TAKE_FROM_OTHER_QUESTION: @@ -728,7 +710,7 @@ final class RpcState { assert false: "`Return.takeFromOtherQuestion` referenced a call that did not use `sendResultsTo.yourself`."; break; } - question.response = answer.redirectedResults; + questionRef.response = answer.redirectedResults; answer.redirectedResults = null; break; @@ -1225,10 +1207,16 @@ final class RpcState { class RpcResponseImpl implements RpcResponse { + private final IncomingRpcMessage message; + private final QuestionRef questionRef; private final AnyPointer.Reader results; - RpcResponseImpl(List capTable, + RpcResponseImpl(QuestionRef questionRef, + IncomingRpcMessage message, + List capTable, AnyPointer.Reader results) { + this.questionRef = questionRef; + this.message = message; this.results = results.imbue(new ReaderCapabilityTable(capTable)); } @@ -1521,20 +1509,20 @@ final class RpcState { private class RpcPipeline implements PipelineHook { - private final Question question; + private final QuestionRef questionRef; final HashMap, ClientHook> clientMap = new HashMap<>(); final CompletableFuture redirectLater; - RpcPipeline(Question question, + RpcPipeline(QuestionRef questionRef, CompletableFuture redirectLater) { - this.question = question; + this.questionRef = questionRef; assert redirectLater != null; this.redirectLater = redirectLater; } - RpcPipeline(Question question) { - this(question, null); + RpcPipeline(QuestionRef questionRef) { + this(questionRef, null); // never resolves } @@ -1549,7 +1537,7 @@ final class RpcState { // TODO avoid conversion to/from ArrayList? var key = new ArrayList<>(Arrays.asList(ops)); return this.clientMap.computeIfAbsent(key, k -> { - var pipelineClient = new PipelineClient(this.question, ops); + var pipelineClient = new PipelineClient(this.questionRef, ops); if (this.redirectLater == null) { // This pipeline will never get redirected, so just return the PipelineClient. return pipelineClient; @@ -1563,7 +1551,7 @@ final class RpcState { @Override public void cancel(Throwable exc) { - this.question.reject(exc); + this.questionRef.fulfill(exc); } } @@ -1658,12 +1646,12 @@ final class RpcState { return replacement.send(); } - final var question = sendInternal(false); + final var questionRef = sendInternal(false); // The pipeline must get notified of resolution before the app does to maintain ordering. - var pipeline = new RpcPipeline(question, question.response); + var pipeline = new RpcPipeline(questionRef, questionRef.response); - var appPromise = question.response.thenApply( + var appPromise = questionRef.response.thenApply( hook -> new Response<>(hook.getResults(), hook)); return new RemotePromise<>(appPromise, new AnyPointer.Pipeline(pipeline)); @@ -1675,28 +1663,30 @@ final class RpcState { return send(); } - Question sendInternal(boolean isTailCall) { + QuestionRef sendInternal(boolean isTailCall) { // TODO refactor var fds = List.of(); var exports = writeDescriptors(capTable.getTable(), callBuilder.getParams(), fds); message.setFds(fds); var question = questions.next(); - question.setAwaitingReturn(true); + question.isAwaitingReturn = true; question.isTailCall = isTailCall; question.paramExports = exports; - callBuilder.setQuestionId(question.getId()); + var questionRef = question.selfRef; + + callBuilder.setQuestionId(question.id); if (isTailCall) { callBuilder.getSendResultsTo().getYourself(); } try { message.send(); } catch (Exception exc) { - question.setAwaitingReturn(false); - question.setSkipFinish(true); - question.reject(exc); + question.isAwaitingReturn = false; + question.skipFinish = true; + questionRef.fulfill(exc); } - return question; + return questionRef; } @Override @@ -1781,11 +1771,11 @@ final class RpcState { private void cleanupQuestions() { while (true) { - var ref = (QuestionRef)this.questionRefs.poll(); - if (ref == null) { + var disposer = (QuestionDisposer)this.questionRefs.poll(); + if (disposer == null) { break; } - ref.dispose(); + disposer.dispose(); } } @@ -1914,11 +1904,11 @@ final class RpcState { private class PipelineClient extends RpcClient { - private final Question question; + private final QuestionRef questionRef; private final PipelineOp[] ops; - PipelineClient(Question question, PipelineOp[] ops) { - this.question = question; + PipelineClient(QuestionRef questionRef, PipelineOp[] ops) { + this.questionRef = questionRef; this.ops = ops; } @@ -1935,7 +1925,7 @@ final class RpcState { @Override public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List fds) { var promisedAnswer = descriptor.initReceiverAnswer(); - promisedAnswer.setQuestionId(question.getId()); + promisedAnswer.setQuestionId(questionRef.questionId); FromPipelineOps(ops, promisedAnswer); return null; } @@ -1943,7 +1933,7 @@ final class RpcState { @Override public ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target) { var builder = target.initPromisedAnswer(); - builder.setQuestionId(question.getId()); + builder.setQuestionId(questionRef.questionId); FromPipelineOps(ops, builder); return null; } diff --git a/runtime-rpc/src/test/java/org/capnproto/RpcTest.java b/runtime-rpc/src/test/java/org/capnproto/RpcTest.java index 89b918d..33c7188 100644 --- a/runtime-rpc/src/test/java/org/capnproto/RpcTest.java +++ b/runtime-rpc/src/test/java/org/capnproto/RpcTest.java @@ -371,6 +371,9 @@ public class RpcTest { handle1 = null; handle2 = null; + + System.gc(); + client.echoRequest().send().join(); } @org.junit.Test