cleanup minor code quality issues in RpcState

simplify handleResolve

use enhanced switch in getMessageTarget
This commit is contained in:
Vaci Koblizek 2020-11-19 15:48:51 +00:00
parent f0fbaacae1
commit af229ccb99

View file

@ -60,7 +60,7 @@ final class RpcState<VatId> {
Question(int id) { Question(int id) {
this.id = id; this.id = id;
this.selfRef = new QuestionRef(this.id);; this.selfRef = new QuestionRef(this.id);
this.disposer = new QuestionDisposer(this.selfRef); this.disposer = new QuestionDisposer(this.selfRef);
} }
@ -239,9 +239,9 @@ final class RpcState<VatId> {
}; };
private final Map<ClientHook, Integer> exportsByCap = new HashMap<>(); private final Map<ClientHook, Integer> exportsByCap = new HashMap<>();
private final BootstrapFactory<VatId> bootstrapFactory; private final BootstrapFactory<? super VatId> bootstrapFactory;
private final VatNetwork.Connection<VatId> connection; private final VatNetwork.Connection<VatId> connection;
private final CompletableFuture<DisconnectInfo> disconnectFulfiller; private final CompletableFuture<? super DisconnectInfo> disconnectFulfiller;
private Throwable disconnected = null; private Throwable disconnected = null;
private final CompletableFuture<java.lang.Void> messageLoop = new CompletableFuture<>(); private final CompletableFuture<java.lang.Void> messageLoop = new CompletableFuture<>();
// completes when the message loop exits // completes when the message loop exits
@ -249,9 +249,9 @@ final class RpcState<VatId> {
private final ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>(); private final ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>();
private final Queue<Callable<java.lang.Void>> lastEvals = new ArrayDeque<>(); private final Queue<Callable<java.lang.Void>> lastEvals = new ArrayDeque<>();
RpcState(BootstrapFactory<VatId> bootstrapFactory, RpcState(BootstrapFactory<? super VatId> bootstrapFactory,
VatNetwork.Connection<VatId> connection, VatNetwork.Connection<VatId> connection,
CompletableFuture<DisconnectInfo> disconnectFulfiller) { CompletableFuture<? super DisconnectInfo> disconnectFulfiller) {
this.bootstrapFactory = bootstrapFactory; this.bootstrapFactory = bootstrapFactory;
this.connection = connection; this.connection = connection;
this.disconnectFulfiller = disconnectFulfiller; this.disconnectFulfiller = disconnectFulfiller;
@ -420,40 +420,22 @@ final class RpcState<VatId> {
}); });
messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose( messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose(
exc -> CompletableFuture.failedFuture(exc)); CompletableFuture::failedFuture);
} }
private void handleMessage(IncomingRpcMessage message) throws RpcException { private void handleMessage(IncomingRpcMessage message) throws RpcException {
var reader = message.getBody().getAs(RpcProtocol.Message.factory); var reader = message.getBody().getAs(RpcProtocol.Message.factory);
switch (reader.which()) { switch (reader.which()) {
case UNIMPLEMENTED: case UNIMPLEMENTED -> handleUnimplemented(reader.getUnimplemented());
handleUnimplemented(reader.getUnimplemented()); case ABORT -> handleAbort(reader.getAbort());
break; case BOOTSTRAP -> handleBootstrap(reader.getBootstrap());
case ABORT: case CALL -> handleCall(message, reader.getCall());
handleAbort(reader.getAbort()); case RETURN -> handleReturn(message, reader.getReturn());
break; case FINISH -> handleFinish(reader.getFinish());
case BOOTSTRAP: case RESOLVE -> handleResolve(message, reader.getResolve());
handleBootstrap(message, reader.getBootstrap()); case DISEMBARGO -> handleDisembargo(reader.getDisembargo());
break; case RELEASE -> handleRelease(reader.getRelease());
case CALL: default -> {
handleCall(message, reader.getCall());
return;
case RETURN:
handleReturn(message, reader.getReturn());
break;
case FINISH:
handleFinish(reader.getFinish());
break;
case RESOLVE:
handleResolve(message, reader.getResolve());
break;
case DISEMBARGO:
handleDisembargo(reader.getDisembargo());
break;
case RELEASE:
handleRelease(reader.getRelease());
break;
default: {
LOGGER.warning(() -> this.toString() + ": < Unhandled RPC message: " + reader.which().toString()); LOGGER.warning(() -> this.toString() + ": < Unhandled RPC message: " + reader.which().toString());
if (!isDisconnected()) { if (!isDisconnected()) {
// boomin' back atcha // boomin' back atcha
@ -462,7 +444,6 @@ final class RpcState<VatId> {
LOGGER.info(() -> this.toString() + ": > UNIMPLEMENTED"); LOGGER.info(() -> this.toString() + ": > UNIMPLEMENTED");
msg.send(); msg.send();
} }
break;
} }
} }
@ -514,7 +495,7 @@ final class RpcState<VatId> {
throw exc; throw exc;
} }
void handleBootstrap(IncomingRpcMessage message, RpcProtocol.Bootstrap.Reader bootstrap) { void handleBootstrap(RpcProtocol.Bootstrap.Reader bootstrap) {
LOGGER.info(() -> this.toString() + ": < BOOTSTRAP question=" + bootstrap.getQuestionId()); LOGGER.info(() -> this.toString() + ": < BOOTSTRAP question=" + bootstrap.getQuestionId());
if (isDisconnected()) { if (isDisconnected()) {
return; return;
@ -570,16 +551,13 @@ final class RpcState<VatId> {
boolean redirectResults; boolean redirectResults;
switch (call.getSendResultsTo().which()) { switch (call.getSendResultsTo().which()) {
case CALLER: case CALLER -> redirectResults = false;
redirectResults = false; case YOURSELF -> redirectResults = true;
break; default -> {
case YOURSELF: assert false : "Unsupported 'Call.sendResultsTo'.";
redirectResults = true;
break;
default:
assert false: "Unsupported 'Call.sendResultsTo'.";
return; return;
} }
}
var payload = call.getParams(); var payload = call.getParams();
var capTableArray = receiveCaps(payload.getCapTable(), message.getAttachedFds()); var capTableArray = receiveCaps(payload.getCapTable(), message.getAttachedFds());
@ -774,43 +752,34 @@ final class RpcState<VatId> {
private void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) { private void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) {
LOGGER.info(() -> this.toString() + ": < RESOLVE promise=" + resolve.getPromiseId()); LOGGER.info(() -> this.toString() + ": < RESOLVE promise=" + resolve.getPromiseId());
ClientHook cap = null;
Throwable exc = null;
switch (resolve.which()) {
case CAP:
cap = receiveCap(resolve.getCap(), message.getAttachedFds());
break;
case EXCEPTION:
exc = ToException(resolve.getException());
break;
default:
assert false: "Unknown 'Resolve' type.";
return;
}
var importId = resolve.getPromiseId(); var importId = resolve.getPromiseId();
var imp = this.imports.find(resolve.getPromiseId()); var imp = this.imports.find(importId);
if (imp == null) { if (imp == null) {
return; return;
} }
if (imp.promise != null) { if (imp.promise == null) {
// This import is an unfulfilled promise. assert imp.importClient == null : "Import already resolved.";
// It appears this is a valid entry on the import table, but was not expected to be a
assert !imp.promise.isDone(); // promise.
if (exc == null) {
imp.promise.complete(cap);
}
else {
imp.promise.completeExceptionally(exc);
}
return; return;
} }
// It appears this is a valid entry on the import table, but was not expected to be a // This import is an unfulfilled promise.
// promise. assert !imp.promise.isDone();
assert imp.importClient == null : "Import already resolved."; switch (resolve.which()) {
case CAP -> {
var cap = receiveCap(resolve.getCap(), message.getAttachedFds());
imp.promise.complete(cap);
}
case EXCEPTION -> {
var exc = ToException(resolve.getException());
imp.promise.completeExceptionally(exc);
}
default -> {
assert false : "Unknown 'Resolve' type.";
}
}
} }
private void handleRelease(RpcProtocol.Release.Reader release) { private void handleRelease(RpcProtocol.Release.Reader release) {
@ -842,8 +811,8 @@ final class RpcState<VatId> {
return; return;
} }
final var embargoId = ctx.getSenderLoopback(); var embargoId = ctx.getSenderLoopback();
final var rpcTarget = (RpcClient) target; var rpcTarget = (RpcClient) target;
Callable<java.lang.Void> sendDisembargo = () -> { Callable<java.lang.Void> sendDisembargo = () -> {
if (isDisconnected()) { if (isDisconnected()) {
@ -877,14 +846,13 @@ final class RpcState<VatId> {
assert false: "Invalid embargo ID in 'Disembargo.context.receiverLoopback'."; assert false: "Invalid embargo ID in 'Disembargo.context.receiverLoopback'.";
return; return;
} }
assert embargo.disembargo != null;
embargo.disembargo.complete(null); embargo.disembargo.complete(null);
embargos.erase(ctx.getReceiverLoopback(), embargo); embargos.erase(ctx.getReceiverLoopback(), embargo);
break; break;
default: default:
assert false: "Unimplemented Disembargo type. " + ctx.which(); assert false: "Unimplemented Disembargo type. " + ctx.which();
return; break;
} }
} }
@ -1169,7 +1137,7 @@ final class RpcState<VatId> {
ClientHook getMessageTarget(RpcProtocol.MessageTarget.Reader target) { ClientHook getMessageTarget(RpcProtocol.MessageTarget.Reader target) {
switch (target.which()) { switch (target.which()) {
case IMPORTED_CAP: case IMPORTED_CAP -> {
var exp = exports.find(target.getImportedCap()); var exp = exports.find(target.getImportedCap());
if (exp != null) { if (exp != null) {
return exp.clientHook; return exp.clientHook;
@ -1178,8 +1146,8 @@ final class RpcState<VatId> {
assert false: "Message target is not a current export ID."; assert false: "Message target is not a current export ID.";
return null; return null;
} }
}
case PROMISED_ANSWER: case PROMISED_ANSWER -> {
var promisedAnswer = target.getPromisedAnswer(); var promisedAnswer = target.getPromisedAnswer();
var questionId = promisedAnswer.getQuestionId(); var questionId = promisedAnswer.getQuestionId();
var base = answers.put(questionId); var base = answers.put(questionId);
@ -1187,24 +1155,23 @@ final class RpcState<VatId> {
assert false: "PromisedAnswer.questionId is not a current question."; assert false: "PromisedAnswer.questionId is not a current question.";
return null; return null;
} }
var pipeline = base.pipeline; var pipeline = base.pipeline;
if (pipeline == null) { if (pipeline == null) {
pipeline = PipelineHook.newBrokenPipeline( pipeline = PipelineHook.newBrokenPipeline(
RpcException.failed("Pipeline call on a request that returned no capabilities or was already closed.")); RpcException.failed("Pipeline call on a request that returned no capabilities or was already closed."));
} }
var ops = ToPipelineOps(promisedAnswer); var ops = ToPipelineOps(promisedAnswer);
if (ops == null) { if (ops == null) {
return null; return null;
} }
return pipeline.getPipelinedCap(ops); return pipeline.getPipelinedCap(ops);
}
default: default -> {
assert false: "Unknown message target type. " + target.which(); assert false: "Unknown message target type. " + target.which();
return null; return null;
} }
} }
}
ClientHook getInnermostClient(ClientHook client) { ClientHook getInnermostClient(ClientHook client) {
for (;;) { for (;;) {
@ -1317,7 +1284,7 @@ final class RpcState<VatId> {
// response // response
private RpcServerResponse response; private RpcServerResponse response;
private RpcProtocol.Return.Builder returnMessage; private RpcProtocol.Return.Builder returnMessage;
private boolean redirectResults = false; private final boolean redirectResults;
private boolean responseSent = false; private boolean responseSent = false;
private CompletableFuture<AnyPointer.Pipeline> tailCallPipeline; private CompletableFuture<AnyPointer.Pipeline> tailCallPipeline;
@ -1377,7 +1344,7 @@ final class RpcState<VatId> {
@Override @Override
public void allowCancellation() { public void allowCancellation() {
boolean previouslyRequestedButNotAllowed = (this.cancelAllowed == false && this.cancelRequested == true); boolean previouslyRequestedButNotAllowed = (!this.cancelAllowed && this.cancelRequested);
this.cancelAllowed = true; this.cancelAllowed = true;
if (previouslyRequestedButNotAllowed) { if (previouslyRequestedButNotAllowed) {
@ -1423,9 +1390,8 @@ final class RpcState<VatId> {
// Just forward to another local call // Just forward to another local call
var response = request.send(); var response = request.send();
var promise = response.thenAccept(results -> { var promise = response.thenAccept(
getResults(0).setAs(AnyPointer.factory, results); results -> getResults(0).setAs(AnyPointer.factory, results));
});
return new ClientHook.VoidPromiseAndPipeline(promise, response.pipeline().hook); return new ClientHook.VoidPromiseAndPipeline(promise, response.pipeline().hook);
} }
@ -1703,7 +1669,7 @@ final class RpcState<VatId> {
return replacement.send(); return replacement.send();
} }
final var questionRef = sendInternal(false); var questionRef = sendInternal(false);
// The pipeline must get notified of resolution before the app does to maintain ordering. // The pipeline must get notified of resolution before the app does to maintain ordering.
var pipeline = new RpcPipeline(questionRef, questionRef.response); var pipeline = new RpcPipeline(questionRef, questionRef.response);
@ -1815,7 +1781,7 @@ final class RpcState<VatId> {
private void cleanupImports() { private void cleanupImports() {
while (true) { while (true) {
var ref = (ImportRef) this.importRefs.poll(); var ref = (ImportRef)this.importRefs.poll();
if (ref == null) { if (ref == null) {
return; return;
} }
@ -1985,7 +1951,7 @@ final class RpcState<VatId> {
var embargo = embargos.next(); var embargo = embargos.next();
disembargo.getContext().setSenderLoopback(embargo.id); disembargo.getContext().setSenderLoopback(embargo.id);
final ClientHook finalReplacement = replacement; ClientHook finalReplacement = replacement;
var embargoPromise = embargo.disembargo.thenApply( var embargoPromise = embargo.disembargo.thenApply(
void_ -> finalReplacement); void_ -> finalReplacement);
replacement = Capability.newLocalPromiseClient(embargoPromise); replacement = Capability.newLocalPromiseClient(embargoPromise);
@ -2038,13 +2004,10 @@ final class RpcState<VatId> {
static void FromPipelineOps(PipelineOp[] ops, RpcProtocol.PromisedAnswer.Builder builder) { static void FromPipelineOps(PipelineOp[] ops, RpcProtocol.PromisedAnswer.Builder builder) {
var transforms = builder.initTransform(ops.length); var transforms = builder.initTransform(ops.length);
for (int ii = 0; ii < ops.length; ++ii) { for (int ii = 0; ii < ops.length; ++ii) {
var transform = transforms.get(ii);
switch (ops[ii].type) { switch (ops[ii].type) {
case NOOP: case NOOP -> transform.setNoop(null);
transforms.get(ii).setNoop(null); case GET_POINTER_FIELD -> transform.setGetPointerField(ops[ii].pointerIndex);
break;
case GET_POINTER_FIELD:
transforms.get(ii).setGetPointerField(ops[ii].pointerIndex);
break;
} }
} }
} }
@ -2079,7 +2042,6 @@ final class RpcState<VatId> {
case OVERLOADED -> RpcProtocol.Exception.Type.OVERLOADED; case OVERLOADED -> RpcProtocol.Exception.Type.OVERLOADED;
case DISCONNECTED -> RpcProtocol.Exception.Type.DISCONNECTED; case DISCONNECTED -> RpcProtocol.Exception.Type.DISCONNECTED;
case UNIMPLEMENTED -> RpcProtocol.Exception.Type.UNIMPLEMENTED; case UNIMPLEMENTED -> RpcProtocol.Exception.Type.UNIMPLEMENTED;
default -> RpcProtocol.Exception.Type.FAILED;
}; };
} }
builder.setType(type); builder.setType(type);
@ -2091,7 +2053,6 @@ final class RpcState<VatId> {
static RpcException ToException(RpcProtocol.Exception.Reader reader) { static RpcException ToException(RpcProtocol.Exception.Reader reader) {
var type = switch (reader.getType()) { var type = switch (reader.getType()) {
case FAILED -> RpcException.Type.FAILED;
case OVERLOADED -> RpcException.Type.OVERLOADED; case OVERLOADED -> RpcException.Type.OVERLOADED;
case DISCONNECTED -> RpcException.Type.DISCONNECTED; case DISCONNECTED -> RpcException.Type.DISCONNECTED;
case UNIMPLEMENTED -> RpcException.Type.UNIMPLEMENTED; case UNIMPLEMENTED -> RpcException.Type.UNIMPLEMENTED;