add tap for dumping rpc messages

This commit is contained in:
Vaci Koblizek 2020-10-21 21:21:44 +01:00
parent 7b939d7c0b
commit c01228c31c
3 changed files with 122 additions and 47 deletions

View file

@ -37,25 +37,50 @@ public class RpcDumper {
return -1L;
}
private String dumpCap(RpcProtocol.CapDescriptor.Reader cap) {
return cap.which().toString();
}
private String dumpCaps(StructList.Reader<RpcProtocol.CapDescriptor.Reader> capTable) {
switch (capTable.size()) {
case 0:
return "";
case 1:
return dumpCap(capTable.get(0));
default:
{
var text = dumpCap(capTable.get(0));
for (int ii = 1; ii< capTable.size(); ++ii) {
text += ", " + dumpCap(capTable.get(ii));
}
return text;
}
}
}
String dump(RpcProtocol.Message.Reader message, RpcTwoPartyProtocol.Side sender) {
switch (message.which()) {
case CALL: {
var call = message.getCall();
var iface = call.getInterfaceId();
var schema = this.schemas.get(iface);
if (schema == null || !schema.isInterface()) {
break;
}
var interfaceName = String.format("0x%x", iface);
var methodName = String.format("method#%d", call.getMethodId());
var payload = call.getParams();
var params = payload.getContent();
var sendResultsTo = call.getSendResultsTo();
var schema = this.schemas.get(iface);
if (schema != null) {
interfaceName = schema.getDisplayName().toString();
if (schema.isInterface()) {
interfaceName = schema.getDisplayName().toString();
var interfaceSchema = schema.getInterface();
var methods = interfaceSchema.getMethods();
if (call.getMethodId() >= methods.size()) {
break;
}
if (call.getMethodId() < methods.size()) {
var method = methods.get(call.getMethodId());
var interfaceName = schema.getDisplayName().toString();
methodName = method.getName().toString();
var paramType = method.getParamStructType();
var resultType = method.getResultStructType();
@ -64,14 +89,15 @@ public class RpcDumper {
setReturnType(sender, call.getQuestionId(), resultType);
}
var payload = call.getParams();
var params = payload.getContent();
var sendResultsTo = call.getSendResultsTo();
}
}
}
return sender.name() + "(" + call.getQuestionId() + "): call " +
call.getTarget() + " <- " + interfaceName + "." +
method.getName().toString() + " " + params + " caps:[" +
payload.getCapTable() + "]" + (sendResultsTo.isCaller() ? "" : (" sendResultsTo:" + sendResultsTo));
methodName + " " + params.getClass().getName() + " caps:[" +
dumpCaps(payload.getCapTable()) + "]" +
(sendResultsTo.isCaller() ? "" : (" sendResultsTo:" + sendResultsTo));
}
case RETURN: {
@ -81,12 +107,22 @@ public class RpcDumper {
? RpcTwoPartyProtocol.Side.SERVER
: RpcTwoPartyProtocol.Side.CLIENT,
ret.getAnswerId());
if (ret.which() != RpcProtocol.Return.Which.RESULTS) {
break;
}
switch (ret.which()) {
case RESULTS: {
var payload = ret.getResults();
return sender.name() + "(" + ret.getAnswerId() + "): return " + payload +
" caps:[" + payload.getCapTable() + "]";
" caps:[" + dumpCaps(payload.getCapTable()) + "]";
}
case EXCEPTION: {
var exc = ret.getException();
return sender.name() + "(" + ret.getAnswerId() + "): exception "
+ exc.getType().toString() +
" " + exc.getReason();
}
default: {
return sender.name() + "(" + ret.getAnswerId() + "): " + ret.which().name();
}
}
}
case BOOTSTRAP: {
@ -95,9 +131,16 @@ public class RpcDumper {
return sender.name() + "(" + restore.getQuestionId() + "): bootstrap " +
restore.getDeprecatedObjectId();
}
default:
break;
case ABORT: {
var abort = message.getAbort();
return sender.name() + ": abort "
+ abort.getType().toString()
+ " \"" + abort.getReason().toString() + "\"";
}
default:
return sender.name() + ": " + message.which().name();
}
return "";
}
}

View file

@ -8,14 +8,18 @@ public class TwoPartyVatNetwork
implements VatNetwork<RpcTwoPartyProtocol.VatId.Reader>,
VatNetwork.Connection {
public interface MessageTap {
void outgoing(OutgoingRpcMessage message, RpcTwoPartyProtocol.Side side);
void incoming(IncomingRpcMessage message, RpcTwoPartyProtocol.Side side);
}
private CompletableFuture<java.lang.Void> previousWrite = CompletableFuture.completedFuture(null);
private final CompletableFuture<java.lang.Void> peerDisconnected = new CompletableFuture<>();
private final AsynchronousSocketChannel channel;
private final RpcTwoPartyProtocol.Side side;
private final MessageBuilder peerVatId = new MessageBuilder(4);
private boolean accepted;
public final RpcDumper dumper = new RpcDumper();
private MessageTap tap;
public TwoPartyVatNetwork(AsynchronousSocketChannel channel, RpcTwoPartyProtocol.Side side) {
this.channel = channel;
@ -34,6 +38,10 @@ public class TwoPartyVatNetwork
return peerVatId.getRoot(RpcTwoPartyProtocol.VatId.factory).asReader();
}
public void setTap(MessageTap tap) {
this.tap = tap;
}
public VatNetwork.Connection asConnection() {
return this;
}
@ -60,17 +68,21 @@ public class TwoPartyVatNetwork
@Override
public CompletableFuture<IncomingRpcMessage> receiveIncomingMessage() {
return Serialize.readAsync(channel).whenComplete((x, exc) -> {
return Serialize.readAsync(channel)
.thenApply(reader -> (IncomingRpcMessage) new IncomingMessage(reader))
.whenComplete((msg, exc) -> {
if (exc != null) {
this.peerDisconnected.complete(null);
}
}).thenApply(reader -> {
var msg = new IncomingMessage(reader);
var dump = this.dumper.dump(msg.getBody().getAs(RpcProtocol.Message.factory), getSide());
if (!dump.isEmpty()) {
System.out.println(dump);
})
.whenComplete((msg, exc) -> {
if (this.tap != null && msg != null) {
this.tap.incoming(
msg,
this.getSide() == RpcTwoPartyProtocol.Side.CLIENT
? RpcTwoPartyProtocol.Side.SERVER
: RpcTwoPartyProtocol.Side.CLIENT);
}
return msg;
});
}
@ -121,9 +133,7 @@ public class TwoPartyVatNetwork
@Override
public void send() {
previousWrite = previousWrite.thenCompose(
x -> Serialize.writeAsync(channel, message)
);
previousWrite = previousWrite.thenCompose(x -> Serialize.writeAsync(channel, message));
}
@Override

View file

@ -42,6 +42,26 @@ class TestCap0Impl extends Demo.TestCap0.Server {
class TestCap1Impl extends Demo.TestCap1.Server {
}
class Tap implements TwoPartyVatNetwork.MessageTap {
final RpcDumper dumper = new RpcDumper();
@Override
public void outgoing(OutgoingRpcMessage message, RpcTwoPartyProtocol.Side side) {
var text = this.dumper.dump(message.getBody().asReader().getAs(RpcProtocol.Message.factory), side);
if (text.length() > 0) {
System.out.println(text);
}
}
@Override
public void incoming(IncomingRpcMessage message, RpcTwoPartyProtocol.Side side) {
var text = this.dumper.dump(message.getBody().getAs(RpcProtocol.Message.factory), side);
if (text.length() > 0) {
System.out.println(text);
}
}
}
public class TwoPartyTest {
@ -74,9 +94,11 @@ public class TwoPartyTest {
this.clientSocket = AsynchronousSocketChannel.open();
this.clientSocket.connect(this.serverSocket.getLocalAddress()).get();
this.client = new TwoPartyClient(clientSocket);
this.client.getNetwork().setTap(new Tap());
var socket = serverSocket.accept().get();
this.serverNetwork = new TwoPartyVatNetwork(socket, RpcTwoPartyProtocol.Side.SERVER);
this.serverNetwork.setTap(new Tap());
//this.serverNetwork.dumper.addSchema(Demo.TestCap1);
this.serverThread = runServer(this.serverNetwork);
}