add logging for inbound messages

This commit is contained in:
Vaci Koblizek 2020-11-17 16:57:06 +00:00
parent d6112f0be7
commit 709751a885
3 changed files with 53 additions and 7 deletions

View file

@ -11,9 +11,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.logging.*;
final class RpcState<VatId> { final class RpcState<VatId> {
private static final Logger LOGGER = Logger.getLogger(RpcState.class.getName());
private static int messageSizeHint() { private static int messageSizeHint() {
return 1 + RpcProtocol.Message.factory.structSize().total(); return 1 + RpcProtocol.Message.factory.structSize().total();
} }
@ -68,6 +71,7 @@ final class RpcState<VatId> {
var builder = message.getBody().getAs(RpcProtocol.Message.factory).initFinish(); var builder = message.getBody().getAs(RpcProtocol.Message.factory).initFinish();
builder.setQuestionId(this.id); builder.setQuestionId(this.id);
builder.setReleaseResultCaps(this.isAwaitingReturn); builder.setReleaseResultCaps(this.isAwaitingReturn);
LOGGER.info(() -> RpcState.this.toString() + ": > FINISH question=" + this.id);
message.send(); message.send();
} }
this.skipFinish = true; this.skipFinish = true;
@ -185,6 +189,7 @@ final class RpcState<VatId> {
var builder = message.getBody().initAs(RpcProtocol.Message.factory).initRelease(); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initRelease();
builder.setId(importId); builder.setId(importId);
builder.setReferenceCount(remoteRefCount); builder.setReferenceCount(remoteRefCount);
LOGGER.info(() -> this.toString() + ": > RELEASE import=" + importId);
message.send(); message.send();
} }
} }
@ -253,6 +258,11 @@ final class RpcState<VatId> {
startMessageLoop(); startMessageLoop();
} }
@Override
public String toString() {
return super.toString() + ": " + this.connection.toString();
}
CompletableFuture<java.lang.Void> onDisconnection() { CompletableFuture<java.lang.Void> onDisconnection() {
return this.messageLoop; return this.messageLoop;
} }
@ -315,6 +325,7 @@ final class RpcState<VatId> {
var message = this.connection.newOutgoingMessage(sizeHint); var message = this.connection.newOutgoingMessage(sizeHint);
var abort = message.getBody().getAs(RpcProtocol.Message.factory).initAbort(); var abort = message.getBody().getAs(RpcProtocol.Message.factory).initAbort();
FromException(exc, abort); FromException(exc, abort);
LOGGER.log(Level.INFO, this.toString() + ": > ABORT", exc.getMessage());
message.send(); message.send();
} }
catch (Exception ignored) { catch (Exception ignored) {
@ -374,6 +385,7 @@ final class RpcState<VatId> {
var message = connection.newOutgoingMessage(sizeHint); var message = connection.newOutgoingMessage(sizeHint);
var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap(); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap();
builder.setQuestionId(question.id); builder.setQuestionId(question.id);
LOGGER.info(() -> this.toString() + ": > BOOTSTRAP question=" + question.id);
message.send(); message.send();
return pipeline.getPipelinedCap(new PipelineOp[0]); return pipeline.getPipelinedCap(new PipelineOp[0]);
@ -407,15 +419,12 @@ final class RpcState<VatId> {
} }
}); });
messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose(exc -> { messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose(
//System.out.println("Exception in startMessageLoop!" + exc); exc -> CompletableFuture.failedFuture(exc));
return CompletableFuture.failedFuture(exc);
});
} }
private void handleMessage(IncomingRpcMessage message) throws RpcException { private void handleMessage(IncomingRpcMessage message) throws RpcException {
var reader = message.getBody().getAs(RpcProtocol.Message.factory); var reader = message.getBody().getAs(RpcProtocol.Message.factory);
//System.out.println(this + ": Received message: " + reader.which());
switch (reader.which()) { switch (reader.which()) {
case UNIMPLEMENTED: case UNIMPLEMENTED:
handleUnimplemented(reader.getUnimplemented()); handleUnimplemented(reader.getUnimplemented());
@ -444,14 +453,17 @@ final class RpcState<VatId> {
case RELEASE: case RELEASE:
handleRelease(reader.getRelease()); handleRelease(reader.getRelease());
break; break;
default: default: {
LOGGER.warning(() -> this.toString() + ": < Unhandled RPC message: " + reader.which().toString());
if (!isDisconnected()) { if (!isDisconnected()) {
// boomin' back atcha // boomin' back atcha
var msg = connection.newOutgoingMessage(); var msg = connection.newOutgoingMessage();
msg.getBody().initAs(RpcProtocol.Message.factory).setUnimplemented(reader); msg.getBody().initAs(RpcProtocol.Message.factory).setUnimplemented(reader);
LOGGER.info(() -> this.toString() + ": > UNIMPLEMENTED");
msg.send(); msg.send();
} }
break; break;
}
} }
this.cleanupImports(); this.cleanupImports();
@ -459,6 +471,8 @@ final class RpcState<VatId> {
} }
void handleUnimplemented(RpcProtocol.Message.Reader message) { void handleUnimplemented(RpcProtocol.Message.Reader message) {
LOGGER.info(() -> this.toString() + ": < UNIMPLEMENTED");
switch (message.which()) { switch (message.which()) {
case RESOLVE: case RESOLVE:
var resolve = message.getResolve(); var resolve = message.getResolve();
@ -495,10 +509,13 @@ final class RpcState<VatId> {
} }
void handleAbort(RpcProtocol.Exception.Reader abort) throws RpcException { void handleAbort(RpcProtocol.Exception.Reader abort) throws RpcException {
throw ToException(abort); var exc = ToException(abort);
LOGGER.log(Level.INFO, this.toString() + ": < ABORT ", exc.getMessage());
throw exc;
} }
void handleBootstrap(IncomingRpcMessage message, RpcProtocol.Bootstrap.Reader bootstrap) { void handleBootstrap(IncomingRpcMessage message, RpcProtocol.Bootstrap.Reader bootstrap) {
LOGGER.info(() -> this.toString() + ": < BOOTSTRAP question=" + bootstrap.getQuestionId());
if (isDisconnected()) { if (isDisconnected()) {
return; return;
} }
@ -537,6 +554,7 @@ final class RpcState<VatId> {
? capHook ? capHook
: Capability.newBrokenCap("Invalid pipeline transform."); : Capability.newBrokenCap("Invalid pipeline transform.");
LOGGER.info(() -> this.toString() + ": > RETURN answer=" + answerId);
response.send(); response.send();
assert answer.active; assert answer.active;
@ -622,6 +640,8 @@ final class RpcState<VatId> {
} }
void handleReturn(IncomingRpcMessage message, RpcProtocol.Return.Reader callReturn) { void handleReturn(IncomingRpcMessage message, RpcProtocol.Return.Reader callReturn) {
LOGGER.info(() -> this.toString() + ": < RETURN answer=" + callReturn.getAnswerId());
var question = questions.find(callReturn.getAnswerId()); var question = questions.find(callReturn.getAnswerId());
if (question == null) { if (question == null) {
assert false: "Invalid question ID in Return message."; assert false: "Invalid question ID in Return message.";
@ -721,6 +741,8 @@ final class RpcState<VatId> {
} }
void handleFinish(RpcProtocol.Finish.Reader finish) { void handleFinish(RpcProtocol.Finish.Reader finish) {
LOGGER.info(() -> this.toString() + ": < FINISH question=" + finish.getQuestionId());
var answer = answers.find(finish.getQuestionId()); var answer = answers.find(finish.getQuestionId());
if (answer == null || !answer.active) { if (answer == null || !answer.active) {
assert false: "'Finish' for invalid question ID."; assert false: "'Finish' for invalid question ID.";
@ -750,6 +772,8 @@ final class RpcState<VatId> {
} }
private void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) { private void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) {
LOGGER.info(() -> this.toString() + ": < RESOLVE promise=" + resolve.getPromiseId());
ClientHook cap = null; ClientHook cap = null;
Throwable exc = null; Throwable exc = null;
@ -790,10 +814,13 @@ final class RpcState<VatId> {
} }
private void handleRelease(RpcProtocol.Release.Reader release) { private void handleRelease(RpcProtocol.Release.Reader release) {
LOGGER.info(() -> this.toString() + ": < RELEASE promise=" + release.getId());
this.releaseExport(release.getId(), release.getReferenceCount()); this.releaseExport(release.getId(), release.getReferenceCount());
} }
private void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) { private void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) {
LOGGER.info(() -> this.toString() + ": < DISEMBARGO");
var ctx = disembargo.getContext(); var ctx = disembargo.getContext();
switch (ctx.which()) { switch (ctx.which()) {
case SENDER_LOOPBACK: case SENDER_LOOPBACK:
@ -837,6 +864,7 @@ final class RpcState<VatId> {
return null; return null;
} }
builder.getContext().setReceiverLoopback(embargoId); builder.getContext().setReceiverLoopback(embargoId);
LOGGER.info(() -> this.toString() + ": > DISEMBARGO");
message.send(); message.send();
return null; return null;
}; };
@ -973,6 +1001,7 @@ final class RpcState<VatId> {
var fds = List.<Integer>of(); var fds = List.<Integer>of();
writeDescriptor(exp.clientHook, resolve.initCap(), fds); writeDescriptor(exp.clientHook, resolve.initCap(), fds);
message.setFds(fds); message.setFds(fds);
LOGGER.info(() -> this.toString() + ": > RESOLVE export=" + exportId);
message.send(); message.send();
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
}).whenComplete((value, exc) -> { }).whenComplete((value, exc) -> {
@ -984,6 +1013,7 @@ final class RpcState<VatId> {
var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve(); var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve();
resolve.setPromiseId(exportId); resolve.setPromiseId(exportId);
FromException(exc, resolve.initException()); FromException(exc, resolve.initException());
LOGGER.log(Level.INFO, this.toString() + ": > RESOLVE", exc.getMessage());
message.send(); message.send();
// TODO disconnect? // TODO disconnect?
@ -1381,6 +1411,7 @@ final class RpcState<VatId> {
builder.setAnswerId(this.answerId); builder.setAnswerId(this.answerId);
builder.setReleaseParamCaps(false); builder.setReleaseParamCaps(false);
builder.setTakeFromOtherQuestion(tailInfo.questionId); builder.setTakeFromOtherQuestion(tailInfo.questionId);
LOGGER.info(() -> this.toString() + ": > RETURN answer=" + answerId);
message.send(); message.send();
} }
@ -1423,6 +1454,8 @@ final class RpcState<VatId> {
this.returnMessage.setAnswerId(this.answerId); this.returnMessage.setAnswerId(this.answerId);
this.returnMessage.setReleaseParamCaps(false); this.returnMessage.setReleaseParamCaps(false);
LOGGER.info(() -> RpcState.this.toString() + ": > RETURN answer=" + this.answerId);
int[] exports = null; int[] exports = null;
try { try {
exports = ((RpcServerResponseImpl) response).send(); exports = ((RpcServerResponseImpl) response).send();
@ -1446,6 +1479,7 @@ final class RpcState<VatId> {
builder.setAnswerId(this.answerId); builder.setAnswerId(this.answerId);
builder.setReleaseParamCaps(false); builder.setReleaseParamCaps(false);
FromException(exc, builder.initException()); FromException(exc, builder.initException());
LOGGER.log(Level.INFO, this.toString() + ": > RETURN", exc.getMessage());
message.send(); message.send();
} }
@ -1703,6 +1737,7 @@ final class RpcState<VatId> {
callBuilder.getSendResultsTo().getYourself(); callBuilder.getSendResultsTo().getYourself();
} }
try { try {
LOGGER.info(() -> RpcState.this.toString() + ": > CALL question=" + question.id);
message.send(); message.send();
} catch (Exception exc) { } catch (Exception exc) {
question.isAwaitingReturn = false; question.isAwaitingReturn = false;
@ -1954,6 +1989,7 @@ final class RpcState<VatId> {
var embargoPromise = embargo.disembargo.thenApply( var embargoPromise = embargo.disembargo.thenApply(
void_ -> finalReplacement); void_ -> finalReplacement);
replacement = Capability.newLocalPromiseClient(embargoPromise); replacement = Capability.newLocalPromiseClient(embargoPromise);
LOGGER.info(() -> RpcState.this.toString() + ": > DISEMBARGO");
message.send(); message.send();
} }

View file

@ -47,6 +47,11 @@ public class TwoPartyVatNetwork
} }
} }
@Override
public String toString() {
return this.getSide().toString();
}
public RpcTwoPartyProtocol.Side getSide() { public RpcTwoPartyProtocol.Side getSide() {
return side; return side;
} }

View file

@ -71,6 +71,11 @@ public class RpcTest {
this.peerId = peerId; this.peerId = peerId;
} }
@Override
public String toString() {
return this.isClient ? "CLIENT" : "SERVER";
}
void attach(Connection other) { void attach(Connection other) {
Assert.assertNull(this.partner); Assert.assertNull(this.partner);
Assert.assertNull(other.partner); Assert.assertNull(other.partner);