From c01228c31cf2a45de0f95c9397f5a31434c1e906 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Wed, 21 Oct 2020 21:21:44 +0100 Subject: [PATCH] add tap for dumping rpc messages --- .../main/java/org/capnproto/RpcDumper.java | 103 +++++++++++++----- .../org/capnproto/TwoPartyVatNetwork.java | 44 +++++--- .../test/java/org/capnproto/TwoPartyTest.java | 22 ++++ 3 files changed, 122 insertions(+), 47 deletions(-) diff --git a/runtime/src/main/java/org/capnproto/RpcDumper.java b/runtime/src/main/java/org/capnproto/RpcDumper.java index 0a2f902..d6e8a76 100644 --- a/runtime/src/main/java/org/capnproto/RpcDumper.java +++ b/runtime/src/main/java/org/capnproto/RpcDumper.java @@ -37,41 +37,67 @@ public class RpcDumper { return -1L; } + private String dumpCap(RpcProtocol.CapDescriptor.Reader cap) { + return cap.which().toString(); + } + private String dumpCaps(StructList.Reader capTable) { + switch (capTable.size()) { + case 0: + return ""; + case 1: + return dumpCap(capTable.get(0)); + default: + { + var text = dumpCap(capTable.get(0)); + for (int ii = 1; ii< capTable.size(); ++ii) { + text += ", " + dumpCap(capTable.get(ii)); + } + return text; + } + } + } + String dump(RpcProtocol.Message.Reader message, RpcTwoPartyProtocol.Side sender) { switch (message.which()) { case CALL: { var call = message.getCall(); var iface = call.getInterfaceId(); - var schema = this.schemas.get(iface); - if (schema == null || !schema.isInterface()) { - break; - } - - var interfaceSchema = schema.getInterface(); - - var methods = interfaceSchema.getMethods(); - if (call.getMethodId() >= methods.size()) { - break; - } - - var method = methods.get(call.getMethodId()); - var interfaceName = schema.getDisplayName().toString(); - var paramType = method.getParamStructType(); - var resultType = method.getResultStructType(); - - if (call.getSendResultsTo().isCaller()) { - var questionId = call.getQuestionId(); - setReturnType(sender, call.getQuestionId(), resultType); - } + var interfaceName = String.format("0x%x", iface); + var methodName = String.format("method#%d", call.getMethodId()); var payload = call.getParams(); var params = payload.getContent(); var sendResultsTo = call.getSendResultsTo(); + var schema = this.schemas.get(iface); + if (schema != null) { + interfaceName = schema.getDisplayName().toString(); + if (schema.isInterface()) { + + interfaceName = schema.getDisplayName().toString(); + var interfaceSchema = schema.getInterface(); + + var methods = interfaceSchema.getMethods(); + if (call.getMethodId() < methods.size()) { + var method = methods.get(call.getMethodId()); + methodName = method.getName().toString(); + var paramType = method.getParamStructType(); + var resultType = method.getResultStructType(); + + if (call.getSendResultsTo().isCaller()) { + var questionId = call.getQuestionId(); + setReturnType(sender, call.getQuestionId(), resultType); + } + + } + } + } + return sender.name() + "(" + call.getQuestionId() + "): call " + call.getTarget() + " <- " + interfaceName + "." + - method.getName().toString() + " " + params + " caps:[" + - payload.getCapTable() + "]" + (sendResultsTo.isCaller() ? "" : (" sendResultsTo:" + sendResultsTo)); + methodName + " " + params.getClass().getName() + " caps:[" + + dumpCaps(payload.getCapTable()) + "]" + + (sendResultsTo.isCaller() ? "" : (" sendResultsTo:" + sendResultsTo)); } case RETURN: { @@ -81,12 +107,22 @@ public class RpcDumper { ? RpcTwoPartyProtocol.Side.SERVER : RpcTwoPartyProtocol.Side.CLIENT, ret.getAnswerId()); - if (ret.which() != RpcProtocol.Return.Which.RESULTS) { - break; + switch (ret.which()) { + case RESULTS: { + var payload = ret.getResults(); + return sender.name() + "(" + ret.getAnswerId() + "): return " + payload + + " caps:[" + dumpCaps(payload.getCapTable()) + "]"; + } + case EXCEPTION: { + var exc = ret.getException(); + return sender.name() + "(" + ret.getAnswerId() + "): exception " + + exc.getType().toString() + + " " + exc.getReason(); + } + default: { + return sender.name() + "(" + ret.getAnswerId() + "): " + ret.which().name(); + } } - var payload = ret.getResults(); - return sender.name() + "(" + ret.getAnswerId() + "): return " + payload + - " caps:[" + payload.getCapTable() + "]"; } case BOOTSTRAP: { @@ -95,9 +131,16 @@ public class RpcDumper { return sender.name() + "(" + restore.getQuestionId() + "): bootstrap " + restore.getDeprecatedObjectId(); } + + case ABORT: { + var abort = message.getAbort(); + return sender.name() + ": abort " + + abort.getType().toString() + + " \"" + abort.getReason().toString() + "\""; + } + default: - break; + return sender.name() + ": " + message.which().name(); } - return ""; } } diff --git a/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java b/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java index c1e60f1..f5be461 100644 --- a/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java +++ b/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java @@ -8,14 +8,18 @@ public class TwoPartyVatNetwork implements VatNetwork, VatNetwork.Connection { + public interface MessageTap { + void outgoing(OutgoingRpcMessage message, RpcTwoPartyProtocol.Side side); + void incoming(IncomingRpcMessage message, RpcTwoPartyProtocol.Side side); + } + private CompletableFuture previousWrite = CompletableFuture.completedFuture(null); private final CompletableFuture peerDisconnected = new CompletableFuture<>(); private final AsynchronousSocketChannel channel; private final RpcTwoPartyProtocol.Side side; private final MessageBuilder peerVatId = new MessageBuilder(4); private boolean accepted; - - public final RpcDumper dumper = new RpcDumper(); + private MessageTap tap; public TwoPartyVatNetwork(AsynchronousSocketChannel channel, RpcTwoPartyProtocol.Side side) { this.channel = channel; @@ -34,6 +38,10 @@ public class TwoPartyVatNetwork return peerVatId.getRoot(RpcTwoPartyProtocol.VatId.factory).asReader(); } + public void setTap(MessageTap tap) { + this.tap = tap; + } + public VatNetwork.Connection asConnection() { return this; } @@ -60,18 +68,22 @@ public class TwoPartyVatNetwork @Override public CompletableFuture receiveIncomingMessage() { - return Serialize.readAsync(channel).whenComplete((x, exc) -> { - if (exc != null) { - this.peerDisconnected.complete(null); - } - }).thenApply(reader -> { - var msg = new IncomingMessage(reader); - var dump = this.dumper.dump(msg.getBody().getAs(RpcProtocol.Message.factory), getSide()); - if (!dump.isEmpty()) { - System.out.println(dump); - } - return msg; - }); + return Serialize.readAsync(channel) + .thenApply(reader -> (IncomingRpcMessage) new IncomingMessage(reader)) + .whenComplete((msg, exc) -> { + if (exc != null) { + this.peerDisconnected.complete(null); + } + }) + .whenComplete((msg, exc) -> { + if (this.tap != null && msg != null) { + this.tap.incoming( + msg, + this.getSide() == RpcTwoPartyProtocol.Side.CLIENT + ? RpcTwoPartyProtocol.Side.SERVER + : RpcTwoPartyProtocol.Side.CLIENT); + } + }); } @Override @@ -121,9 +133,7 @@ public class TwoPartyVatNetwork @Override public void send() { - previousWrite = previousWrite.thenCompose( - x -> Serialize.writeAsync(channel, message) - ); + previousWrite = previousWrite.thenCompose(x -> Serialize.writeAsync(channel, message)); } @Override diff --git a/runtime/src/test/java/org/capnproto/TwoPartyTest.java b/runtime/src/test/java/org/capnproto/TwoPartyTest.java index 2dd81f8..8cd52a6 100644 --- a/runtime/src/test/java/org/capnproto/TwoPartyTest.java +++ b/runtime/src/test/java/org/capnproto/TwoPartyTest.java @@ -42,6 +42,26 @@ class TestCap0Impl extends Demo.TestCap0.Server { class TestCap1Impl extends Demo.TestCap1.Server { } +class Tap implements TwoPartyVatNetwork.MessageTap { + + final RpcDumper dumper = new RpcDumper(); + + @Override + public void outgoing(OutgoingRpcMessage message, RpcTwoPartyProtocol.Side side) { + var text = this.dumper.dump(message.getBody().asReader().getAs(RpcProtocol.Message.factory), side); + if (text.length() > 0) { + System.out.println(text); + } + } + + @Override + public void incoming(IncomingRpcMessage message, RpcTwoPartyProtocol.Side side) { + var text = this.dumper.dump(message.getBody().getAs(RpcProtocol.Message.factory), side); + if (text.length() > 0) { + System.out.println(text); + } + } +} public class TwoPartyTest { @@ -74,9 +94,11 @@ public class TwoPartyTest { this.clientSocket = AsynchronousSocketChannel.open(); this.clientSocket.connect(this.serverSocket.getLocalAddress()).get(); this.client = new TwoPartyClient(clientSocket); + this.client.getNetwork().setTap(new Tap()); var socket = serverSocket.accept().get(); this.serverNetwork = new TwoPartyVatNetwork(socket, RpcTwoPartyProtocol.Side.SERVER); + this.serverNetwork.setTap(new Tap()); //this.serverNetwork.dumper.addSchema(Demo.TestCap1); this.serverThread = runServer(this.serverNetwork); }