From 7b939d7c0b056c35920688354dfac5d4311d4ec1 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Wed, 21 Oct 2020 21:20:58 +0100 Subject: [PATCH] add moar size hints --- .../src/main/java/org/capnproto/RpcState.java | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/runtime/src/main/java/org/capnproto/RpcState.java b/runtime/src/main/java/org/capnproto/RpcState.java index ff9844e..a125c93 100644 --- a/runtime/src/main/java/org/capnproto/RpcState.java +++ b/runtime/src/main/java/org/capnproto/RpcState.java @@ -15,6 +15,10 @@ final class RpcState { return 1 + RpcProtocol.Message.factory.structSize().total(); } + private static int exceptionSizeHint(Throwable exc) { + return RpcProtocol.Exception.factory.structSize().total() + exc.getMessage().length(); + } + private static int MESSAGE_TARGET_SIZE_HINT = RpcProtocol.MessageTarget.factory.structSize().total() + RpcProtocol.PromisedAnswer.factory.structSize().total() @@ -59,12 +63,12 @@ final class RpcState { } } - private final class QuestionRef extends WeakReference { + private static final class QuestionRef extends WeakReference { private final QuestionDisposer disposer; - QuestionRef(Question question) { - super(question, questionRefQueue); + QuestionRef(Question question, ReferenceQueue queue) { + super(question, queue); this.disposer = question.disposer; } @@ -84,10 +88,14 @@ final class RpcState { this.disposer = new QuestionDisposer(id); } - public int getId() { + int getId() { return this.disposer.id; } + boolean isAwaitingReturn() { + return this.disposer.isAwaitingReturn; + } + public void setAwaitingReturn(boolean value) { this.disposer.isAwaitingReturn = value; } @@ -100,11 +108,7 @@ final class RpcState { this.response.complete(response); } - public boolean isAwaitingReturn() { - return this.disposer.isAwaitingReturn; - } - - public void setSkipFinish(boolean value) { + void setSkipFinish(boolean value) { this.disposer.skipFinish = value; } } @@ -133,7 +137,7 @@ final class RpcState { public Question next() { int id = freeIds.isEmpty() ? max++ : freeIds.remove(); var value = new Question(id); - var prev = slots.put(id, new QuestionRef(value)); + var prev = slots.put(id, new QuestionRef(value, questionRefs)); assert prev == null; return value; } @@ -173,7 +177,7 @@ final class RpcState { final int exportId; int refcount; ClientHook clientHook; - CompletionStage resolveOp; + CompletionStage resolveOp; Export(int exportId) { this.exportId = exportId; @@ -204,8 +208,9 @@ final class RpcState { } // Send a message releasing our remote references. - if (remoteRefCount > 0 && !isDisconnected()) { - var message = connection.newOutgoingMessage(1024); + if (this.remoteRefCount > 0 && isConnected()) { + int sizeHint = messageSizeHint() + RpcProtocol.Release.factory.structSize().total(); + var message = connection.newOutgoingMessage(sizeHint); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initRelease(); builder.setId(importId); builder.setReferenceCount(remoteRefCount); @@ -238,7 +243,8 @@ final class RpcState { } }; */ - private final QuestionExportTable questions = new QuestionExportTable(); /*{ + private final QuestionExportTable questions = new QuestionExportTable(); + /*{ @Override Question newExportable(int id) { return new Question(id); @@ -272,9 +278,9 @@ final class RpcState { private final CompletableFuture onDisconnect; private Throwable disconnected = null; private CompletableFuture messageReady = CompletableFuture.completedFuture(null); - private final String name; private final CompletableFuture messageLoop; - private final ReferenceQueue questionRefQueue = new ReferenceQueue<>(); + private final ReferenceQueue questionRefs = new ReferenceQueue<>(); + private final ReferenceQueue importRefs = new ReferenceQueue<>(); RpcState( Capability.Client bootstrapInterface, VatNetwork.Connection connection, @@ -283,13 +289,6 @@ final class RpcState { this.connection = connection; this.onDisconnect = onDisconnect; this.messageLoop = this.doMessageLoop(); - - if (this.connection instanceof TwoPartyVatNetwork) { - this.name = ((TwoPartyVatNetwork)this.connection).getSide().toString(); - } - else { - this.name = this.toString(); - } } public CompletableFuture getMessageLoop() { @@ -311,7 +310,7 @@ final class RpcState { List pipelinesToRelease = new ArrayList<>(); List clientsToRelease = new ArrayList<>(); List> tailCallsToRelease = new ArrayList<>(); - List> resolveOpsToRelease = new ArrayList<>(); + List> resolveOpsToRelease = new ArrayList<>(); for (var answer : answers) { if (answer.pipeline != null) { @@ -350,12 +349,13 @@ final class RpcState { } try { - var message = this.connection.newOutgoingMessage(1024); - RpcException.fromException(exc, message.getBody().getAs(RpcProtocol.Message.factory).initAbort()); + int sizeHint = messageSizeHint() + exceptionSizeHint(exc); + var message = this.connection.newOutgoingMessage(sizeHint); + var abort = message.getBody().getAs(RpcProtocol.Message.factory).initAbort(); + RpcException.fromException(exc, abort); message.send(); } - catch (Exception abortFailed) { - // no-op + catch (Throwable abortFailed) { } var onShutdown = this.connection.shutdown().handle((x, ioExc) -> { @@ -738,7 +738,7 @@ final class RpcState { this.releaseExports(exportsToRelease); } - void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) { + private void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) { var imp = this.imports.find(resolve.getPromiseId()); if (imp == null) { return; @@ -769,7 +769,7 @@ final class RpcState { this.releaseExport(release.getId(), release.getReferenceCount()); } - void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) { + private void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) { var ctx = disembargo.getContext(); switch (ctx.which()) { case SENDER_LOOPBACK: @@ -904,7 +904,7 @@ final class RpcState { return export.exportId; } - CompletionStage resolveExportedPromise(int exportId, CompletionStage promise) { + CompletionStage resolveExportedPromise(int exportId, CompletionStage promise) { return promise.thenCompose(resolution -> { if (isDisconnected()) { return CompletableFuture.completedFuture(null); @@ -949,7 +949,10 @@ final class RpcState { if (exc == null) { return; } - var message = connection.newOutgoingMessage(1024); + int sizeHint = messageSizeHint() + + RpcProtocol.Resolve.factory.structSize().total() + + exceptionSizeHint(exc); + var message = connection.newOutgoingMessage(sizeHint); var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve(); resolve.setPromiseId(exportId); RpcException.fromException(exc, resolve.initException()); @@ -1642,8 +1645,6 @@ final class RpcState { } } - private ReferenceQueue importRefs = new ReferenceQueue<>(); - private class ImportRef extends WeakReference { final int importId; @@ -1709,7 +1710,7 @@ final class RpcState { private void cleanupQuestions() { while (true) { - var ref = (QuestionRef)this.questionRefQueue.poll(); + var ref = (QuestionRef)this.questionRefs.poll(); if (ref == null) { break; }