From 709751a885ebceafbb95bc3e7517fcfdb52ec6ba Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Tue, 17 Nov 2020 16:57:06 +0000 Subject: [PATCH] add logging for inbound messages --- .../src/main/java/org/capnproto/RpcState.java | 50 ++++++++++++++++--- .../org/capnproto/TwoPartyVatNetwork.java | 5 ++ .../src/test/java/org/capnproto/RpcTest.java | 5 ++ 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index 03a800c..df32bd7 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -11,9 +11,12 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import java.util.logging.*; final class RpcState { + private static final Logger LOGGER = Logger.getLogger(RpcState.class.getName()); + private static int messageSizeHint() { return 1 + RpcProtocol.Message.factory.structSize().total(); } @@ -68,6 +71,7 @@ final class RpcState { var builder = message.getBody().getAs(RpcProtocol.Message.factory).initFinish(); builder.setQuestionId(this.id); builder.setReleaseResultCaps(this.isAwaitingReturn); + LOGGER.info(() -> RpcState.this.toString() + ": > FINISH question=" + this.id); message.send(); } this.skipFinish = true; @@ -185,6 +189,7 @@ final class RpcState { var builder = message.getBody().initAs(RpcProtocol.Message.factory).initRelease(); builder.setId(importId); builder.setReferenceCount(remoteRefCount); + LOGGER.info(() -> this.toString() + ": > RELEASE import=" + importId); message.send(); } } @@ -253,6 +258,11 @@ final class RpcState { startMessageLoop(); } + @Override + public String toString() { + return super.toString() + ": " + this.connection.toString(); + } + CompletableFuture onDisconnection() { return this.messageLoop; } @@ -315,6 +325,7 @@ final class RpcState { var message = this.connection.newOutgoingMessage(sizeHint); var abort = message.getBody().getAs(RpcProtocol.Message.factory).initAbort(); FromException(exc, abort); + LOGGER.log(Level.INFO, this.toString() + ": > ABORT", exc.getMessage()); message.send(); } catch (Exception ignored) { @@ -374,6 +385,7 @@ final class RpcState { var message = connection.newOutgoingMessage(sizeHint); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap(); builder.setQuestionId(question.id); + LOGGER.info(() -> this.toString() + ": > BOOTSTRAP question=" + question.id); message.send(); return pipeline.getPipelinedCap(new PipelineOp[0]); @@ -407,15 +419,12 @@ final class RpcState { } }); - messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose(exc -> { - //System.out.println("Exception in startMessageLoop!" + exc); - return CompletableFuture.failedFuture(exc); - }); + messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose( + exc -> CompletableFuture.failedFuture(exc)); } private void handleMessage(IncomingRpcMessage message) throws RpcException { var reader = message.getBody().getAs(RpcProtocol.Message.factory); - //System.out.println(this + ": Received message: " + reader.which()); switch (reader.which()) { case UNIMPLEMENTED: handleUnimplemented(reader.getUnimplemented()); @@ -444,14 +453,17 @@ final class RpcState { case RELEASE: handleRelease(reader.getRelease()); break; - default: + default: { + LOGGER.warning(() -> this.toString() + ": < Unhandled RPC message: " + reader.which().toString()); if (!isDisconnected()) { // boomin' back atcha var msg = connection.newOutgoingMessage(); msg.getBody().initAs(RpcProtocol.Message.factory).setUnimplemented(reader); + LOGGER.info(() -> this.toString() + ": > UNIMPLEMENTED"); msg.send(); } break; + } } this.cleanupImports(); @@ -459,6 +471,8 @@ final class RpcState { } void handleUnimplemented(RpcProtocol.Message.Reader message) { + LOGGER.info(() -> this.toString() + ": < UNIMPLEMENTED"); + switch (message.which()) { case RESOLVE: var resolve = message.getResolve(); @@ -495,10 +509,13 @@ final class RpcState { } void handleAbort(RpcProtocol.Exception.Reader abort) throws RpcException { - throw ToException(abort); + var exc = ToException(abort); + LOGGER.log(Level.INFO, this.toString() + ": < ABORT ", exc.getMessage()); + throw exc; } void handleBootstrap(IncomingRpcMessage message, RpcProtocol.Bootstrap.Reader bootstrap) { + LOGGER.info(() -> this.toString() + ": < BOOTSTRAP question=" + bootstrap.getQuestionId()); if (isDisconnected()) { return; } @@ -537,6 +554,7 @@ final class RpcState { ? capHook : Capability.newBrokenCap("Invalid pipeline transform."); + LOGGER.info(() -> this.toString() + ": > RETURN answer=" + answerId); response.send(); assert answer.active; @@ -622,6 +640,8 @@ final class RpcState { } void handleReturn(IncomingRpcMessage message, RpcProtocol.Return.Reader callReturn) { + LOGGER.info(() -> this.toString() + ": < RETURN answer=" + callReturn.getAnswerId()); + var question = questions.find(callReturn.getAnswerId()); if (question == null) { assert false: "Invalid question ID in Return message."; @@ -721,6 +741,8 @@ final class RpcState { } void handleFinish(RpcProtocol.Finish.Reader finish) { + LOGGER.info(() -> this.toString() + ": < FINISH question=" + finish.getQuestionId()); + var answer = answers.find(finish.getQuestionId()); if (answer == null || !answer.active) { assert false: "'Finish' for invalid question ID."; @@ -750,6 +772,8 @@ final class RpcState { } private void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) { + LOGGER.info(() -> this.toString() + ": < RESOLVE promise=" + resolve.getPromiseId()); + ClientHook cap = null; Throwable exc = null; @@ -790,10 +814,13 @@ final class RpcState { } private void handleRelease(RpcProtocol.Release.Reader release) { + LOGGER.info(() -> this.toString() + ": < RELEASE promise=" + release.getId()); this.releaseExport(release.getId(), release.getReferenceCount()); } private void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) { + LOGGER.info(() -> this.toString() + ": < DISEMBARGO"); + var ctx = disembargo.getContext(); switch (ctx.which()) { case SENDER_LOOPBACK: @@ -837,6 +864,7 @@ final class RpcState { return null; } builder.getContext().setReceiverLoopback(embargoId); + LOGGER.info(() -> this.toString() + ": > DISEMBARGO"); message.send(); return null; }; @@ -973,6 +1001,7 @@ final class RpcState { var fds = List.of(); writeDescriptor(exp.clientHook, resolve.initCap(), fds); message.setFds(fds); + LOGGER.info(() -> this.toString() + ": > RESOLVE export=" + exportId); message.send(); return CompletableFuture.completedFuture(null); }).whenComplete((value, exc) -> { @@ -984,6 +1013,7 @@ final class RpcState { var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve(); resolve.setPromiseId(exportId); FromException(exc, resolve.initException()); + LOGGER.log(Level.INFO, this.toString() + ": > RESOLVE", exc.getMessage()); message.send(); // TODO disconnect? @@ -1381,6 +1411,7 @@ final class RpcState { builder.setAnswerId(this.answerId); builder.setReleaseParamCaps(false); builder.setTakeFromOtherQuestion(tailInfo.questionId); + LOGGER.info(() -> this.toString() + ": > RETURN answer=" + answerId); message.send(); } @@ -1423,6 +1454,8 @@ final class RpcState { this.returnMessage.setAnswerId(this.answerId); this.returnMessage.setReleaseParamCaps(false); + LOGGER.info(() -> RpcState.this.toString() + ": > RETURN answer=" + this.answerId); + int[] exports = null; try { exports = ((RpcServerResponseImpl) response).send(); @@ -1446,6 +1479,7 @@ final class RpcState { builder.setAnswerId(this.answerId); builder.setReleaseParamCaps(false); FromException(exc, builder.initException()); + LOGGER.log(Level.INFO, this.toString() + ": > RETURN", exc.getMessage()); message.send(); } @@ -1703,6 +1737,7 @@ final class RpcState { callBuilder.getSendResultsTo().getYourself(); } try { + LOGGER.info(() -> RpcState.this.toString() + ": > CALL question=" + question.id); message.send(); } catch (Exception exc) { question.isAwaitingReturn = false; @@ -1954,6 +1989,7 @@ final class RpcState { var embargoPromise = embargo.disembargo.thenApply( void_ -> finalReplacement); replacement = Capability.newLocalPromiseClient(embargoPromise); + LOGGER.info(() -> RpcState.this.toString() + ": > DISEMBARGO"); message.send(); } diff --git a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java index 40b4ba1..84a758f 100644 --- a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java +++ b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java @@ -47,6 +47,11 @@ public class TwoPartyVatNetwork } } + @Override + public String toString() { + return this.getSide().toString(); + } + public RpcTwoPartyProtocol.Side getSide() { return side; } diff --git a/runtime-rpc/src/test/java/org/capnproto/RpcTest.java b/runtime-rpc/src/test/java/org/capnproto/RpcTest.java index c7f2718..93523e4 100644 --- a/runtime-rpc/src/test/java/org/capnproto/RpcTest.java +++ b/runtime-rpc/src/test/java/org/capnproto/RpcTest.java @@ -71,6 +71,11 @@ public class RpcTest { this.peerId = peerId; } + @Override + public String toString() { + return this.isClient ? "CLIENT" : "SERVER"; + } + void attach(Connection other) { Assert.assertNull(this.partner); Assert.assertNull(other.partner);