add moar size hints

This commit is contained in:
Vaci Koblizek 2020-10-21 21:20:58 +01:00
parent 215f485883
commit 7b939d7c0b

View file

@ -15,6 +15,10 @@ final class RpcState {
return 1 + RpcProtocol.Message.factory.structSize().total(); return 1 + RpcProtocol.Message.factory.structSize().total();
} }
private static int exceptionSizeHint(Throwable exc) {
return RpcProtocol.Exception.factory.structSize().total() + exc.getMessage().length();
}
private static int MESSAGE_TARGET_SIZE_HINT private static int MESSAGE_TARGET_SIZE_HINT
= RpcProtocol.MessageTarget.factory.structSize().total() = RpcProtocol.MessageTarget.factory.structSize().total()
+ RpcProtocol.PromisedAnswer.factory.structSize().total() + RpcProtocol.PromisedAnswer.factory.structSize().total()
@ -59,12 +63,12 @@ final class RpcState {
} }
} }
private final class QuestionRef extends WeakReference<Question> { private static final class QuestionRef extends WeakReference<Question> {
private final QuestionDisposer disposer; private final QuestionDisposer disposer;
QuestionRef(Question question) { QuestionRef(Question question, ReferenceQueue<Question> queue) {
super(question, questionRefQueue); super(question, queue);
this.disposer = question.disposer; this.disposer = question.disposer;
} }
@ -84,10 +88,14 @@ final class RpcState {
this.disposer = new QuestionDisposer(id); this.disposer = new QuestionDisposer(id);
} }
public int getId() { int getId() {
return this.disposer.id; return this.disposer.id;
} }
boolean isAwaitingReturn() {
return this.disposer.isAwaitingReturn;
}
public void setAwaitingReturn(boolean value) { public void setAwaitingReturn(boolean value) {
this.disposer.isAwaitingReturn = value; this.disposer.isAwaitingReturn = value;
} }
@ -100,11 +108,7 @@ final class RpcState {
this.response.complete(response); this.response.complete(response);
} }
public boolean isAwaitingReturn() { void setSkipFinish(boolean value) {
return this.disposer.isAwaitingReturn;
}
public void setSkipFinish(boolean value) {
this.disposer.skipFinish = value; this.disposer.skipFinish = value;
} }
} }
@ -133,7 +137,7 @@ final class RpcState {
public Question next() { public Question next() {
int id = freeIds.isEmpty() ? max++ : freeIds.remove(); int id = freeIds.isEmpty() ? max++ : freeIds.remove();
var value = new Question(id); var value = new Question(id);
var prev = slots.put(id, new QuestionRef(value)); var prev = slots.put(id, new QuestionRef(value, questionRefs));
assert prev == null; assert prev == null;
return value; return value;
} }
@ -173,7 +177,7 @@ final class RpcState {
final int exportId; final int exportId;
int refcount; int refcount;
ClientHook clientHook; ClientHook clientHook;
CompletionStage<?> resolveOp; CompletionStage<java.lang.Void> resolveOp;
Export(int exportId) { Export(int exportId) {
this.exportId = exportId; this.exportId = exportId;
@ -204,8 +208,9 @@ final class RpcState {
} }
// Send a message releasing our remote references. // Send a message releasing our remote references.
if (remoteRefCount > 0 && !isDisconnected()) { if (this.remoteRefCount > 0 && isConnected()) {
var message = connection.newOutgoingMessage(1024); int sizeHint = messageSizeHint() + RpcProtocol.Release.factory.structSize().total();
var message = connection.newOutgoingMessage(sizeHint);
var builder = message.getBody().initAs(RpcProtocol.Message.factory).initRelease(); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initRelease();
builder.setId(importId); builder.setId(importId);
builder.setReferenceCount(remoteRefCount); builder.setReferenceCount(remoteRefCount);
@ -238,7 +243,8 @@ final class RpcState {
} }
}; };
*/ */
private final QuestionExportTable questions = new QuestionExportTable(); /*{ private final QuestionExportTable questions = new QuestionExportTable();
/*{
@Override @Override
Question newExportable(int id) { Question newExportable(int id) {
return new Question(id); return new Question(id);
@ -272,9 +278,9 @@ final class RpcState {
private final CompletableFuture<java.lang.Void> onDisconnect; private final CompletableFuture<java.lang.Void> onDisconnect;
private Throwable disconnected = null; private Throwable disconnected = null;
private CompletableFuture<java.lang.Void> messageReady = CompletableFuture.completedFuture(null); private CompletableFuture<java.lang.Void> messageReady = CompletableFuture.completedFuture(null);
private final String name;
private final CompletableFuture<java.lang.Void> messageLoop; private final CompletableFuture<java.lang.Void> messageLoop;
private final ReferenceQueue<Question> questionRefQueue = new ReferenceQueue<>(); private final ReferenceQueue<Question> questionRefs = new ReferenceQueue<>();
private final ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>();
RpcState( Capability.Client bootstrapInterface, RpcState( Capability.Client bootstrapInterface,
VatNetwork.Connection connection, VatNetwork.Connection connection,
@ -283,13 +289,6 @@ final class RpcState {
this.connection = connection; this.connection = connection;
this.onDisconnect = onDisconnect; this.onDisconnect = onDisconnect;
this.messageLoop = this.doMessageLoop(); this.messageLoop = this.doMessageLoop();
if (this.connection instanceof TwoPartyVatNetwork) {
this.name = ((TwoPartyVatNetwork)this.connection).getSide().toString();
}
else {
this.name = this.toString();
}
} }
public CompletableFuture<java.lang.Void> getMessageLoop() { public CompletableFuture<java.lang.Void> getMessageLoop() {
@ -311,7 +310,7 @@ final class RpcState {
List<PipelineHook> pipelinesToRelease = new ArrayList<>(); List<PipelineHook> pipelinesToRelease = new ArrayList<>();
List<ClientHook> clientsToRelease = new ArrayList<>(); List<ClientHook> clientsToRelease = new ArrayList<>();
List<CompletionStage<RpcResponse>> tailCallsToRelease = new ArrayList<>(); List<CompletionStage<RpcResponse>> tailCallsToRelease = new ArrayList<>();
List<CompletionStage<?>> resolveOpsToRelease = new ArrayList<>(); List<CompletionStage<java.lang.Void>> resolveOpsToRelease = new ArrayList<>();
for (var answer : answers) { for (var answer : answers) {
if (answer.pipeline != null) { if (answer.pipeline != null) {
@ -350,12 +349,13 @@ final class RpcState {
} }
try { try {
var message = this.connection.newOutgoingMessage(1024); int sizeHint = messageSizeHint() + exceptionSizeHint(exc);
RpcException.fromException(exc, message.getBody().getAs(RpcProtocol.Message.factory).initAbort()); var message = this.connection.newOutgoingMessage(sizeHint);
var abort = message.getBody().getAs(RpcProtocol.Message.factory).initAbort();
RpcException.fromException(exc, abort);
message.send(); message.send();
} }
catch (Exception abortFailed) { catch (Throwable abortFailed) {
// no-op
} }
var onShutdown = this.connection.shutdown().handle((x, ioExc) -> { var onShutdown = this.connection.shutdown().handle((x, ioExc) -> {
@ -738,7 +738,7 @@ final class RpcState {
this.releaseExports(exportsToRelease); this.releaseExports(exportsToRelease);
} }
void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) { private void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) {
var imp = this.imports.find(resolve.getPromiseId()); var imp = this.imports.find(resolve.getPromiseId());
if (imp == null) { if (imp == null) {
return; return;
@ -769,7 +769,7 @@ final class RpcState {
this.releaseExport(release.getId(), release.getReferenceCount()); this.releaseExport(release.getId(), release.getReferenceCount());
} }
void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) { private void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) {
var ctx = disembargo.getContext(); var ctx = disembargo.getContext();
switch (ctx.which()) { switch (ctx.which()) {
case SENDER_LOOPBACK: case SENDER_LOOPBACK:
@ -904,7 +904,7 @@ final class RpcState {
return export.exportId; return export.exportId;
} }
CompletionStage<?> resolveExportedPromise(int exportId, CompletionStage<ClientHook> promise) { CompletionStage<java.lang.Void> resolveExportedPromise(int exportId, CompletionStage<ClientHook> promise) {
return promise.thenCompose(resolution -> { return promise.thenCompose(resolution -> {
if (isDisconnected()) { if (isDisconnected()) {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
@ -949,7 +949,10 @@ final class RpcState {
if (exc == null) { if (exc == null) {
return; return;
} }
var message = connection.newOutgoingMessage(1024); int sizeHint = messageSizeHint()
+ RpcProtocol.Resolve.factory.structSize().total()
+ exceptionSizeHint(exc);
var message = connection.newOutgoingMessage(sizeHint);
var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve(); var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve();
resolve.setPromiseId(exportId); resolve.setPromiseId(exportId);
RpcException.fromException(exc, resolve.initException()); RpcException.fromException(exc, resolve.initException());
@ -1642,8 +1645,6 @@ final class RpcState {
} }
} }
private ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>();
private class ImportRef extends WeakReference<ImportClient> { private class ImportRef extends WeakReference<ImportClient> {
final int importId; final int importId;
@ -1709,7 +1710,7 @@ final class RpcState {
private void cleanupQuestions() { private void cleanupQuestions() {
while (true) { while (true) {
var ref = (QuestionRef)this.questionRefQueue.poll(); var ref = (QuestionRef)this.questionRefs.poll();
if (ref == null) { if (ref == null) {
break; break;
} }