From 77133166839ce7576d01b0a699a02fae9e062d5e Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Mon, 28 Sep 2020 00:42:37 +0100 Subject: [PATCH] incoming and outgoing rpc messages --- .../org/capnproto/IncomingRpcMessage.java | 10 +++ .../org/capnproto/OutgoingRpcMessage.java | 14 +++ .../main/java/org/capnproto/RpcSystem.java | 4 + .../java/org/capnproto/TwoPartyRpcSystem.java | 4 + .../org/capnproto/TwoPartyVatNetwork.java | 88 +++++++++++++++++++ .../main/java/org/capnproto/VatNetwork.java | 13 +++ 6 files changed, 133 insertions(+) create mode 100644 runtime/src/main/java/org/capnproto/IncomingRpcMessage.java create mode 100644 runtime/src/main/java/org/capnproto/OutgoingRpcMessage.java create mode 100644 runtime/src/main/java/org/capnproto/RpcSystem.java create mode 100644 runtime/src/main/java/org/capnproto/TwoPartyRpcSystem.java create mode 100644 runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java create mode 100644 runtime/src/main/java/org/capnproto/VatNetwork.java diff --git a/runtime/src/main/java/org/capnproto/IncomingRpcMessage.java b/runtime/src/main/java/org/capnproto/IncomingRpcMessage.java new file mode 100644 index 0000000..c398297 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/IncomingRpcMessage.java @@ -0,0 +1,10 @@ +package org.capnproto; + +import java.util.List; + +public interface IncomingRpcMessage { + + AnyPointer.Reader getBody(); + + List getAttachedFds(); +} diff --git a/runtime/src/main/java/org/capnproto/OutgoingRpcMessage.java b/runtime/src/main/java/org/capnproto/OutgoingRpcMessage.java new file mode 100644 index 0000000..7b342d1 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/OutgoingRpcMessage.java @@ -0,0 +1,14 @@ +package org.capnproto; + +import java.util.List; + +public interface OutgoingRpcMessage { + + AnyPointer.Builder getBody(); + + void setFds(List fds); + + void send(); + + int sizeInWords(); +} diff --git a/runtime/src/main/java/org/capnproto/RpcSystem.java b/runtime/src/main/java/org/capnproto/RpcSystem.java new file mode 100644 index 0000000..60f5aab --- /dev/null +++ b/runtime/src/main/java/org/capnproto/RpcSystem.java @@ -0,0 +1,4 @@ +package org.capnproto; + +public class RpcSystem { +} diff --git a/runtime/src/main/java/org/capnproto/TwoPartyRpcSystem.java b/runtime/src/main/java/org/capnproto/TwoPartyRpcSystem.java new file mode 100644 index 0000000..1671843 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/TwoPartyRpcSystem.java @@ -0,0 +1,4 @@ +package org.capnproto; + +public class TwoPartyRpcSystem extends RpcSystem { +} diff --git a/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java b/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java new file mode 100644 index 0000000..caf4699 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java @@ -0,0 +1,88 @@ +package org.capnproto; + +import java.nio.channels.AsynchronousByteChannel; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class TwoPartyVatNetwork implements VatNetwork, VatNetwork.Connection { + + private CompletableFuture writeCompleted = CompletableFuture.completedFuture(null); + private final AsynchronousByteChannel channel; + + public TwoPartyVatNetwork(AsynchronousByteChannel channel) { + this.channel = channel; + } + + @Override + public OutgoingRpcMessage newOutgoingMessage(int firstSegmentWordSize) { + return new OutgoingMessage(firstSegmentWordSize); + } + + @Override + public CompletableFuture receiveIncomingMessage() { + return Serialize.readAsync(channel).thenApply(message -> { + return new IncomingMessage(message); + }); + } + + final class OutgoingMessage implements OutgoingRpcMessage { + + final MessageBuilder message; + List fds = List.of(); + + OutgoingMessage(int firstSegmentWordSize) { + this.message = new MessageBuilder(firstSegmentWordSize); + } + + @Override + public AnyPointer.Builder getBody() { + return message.getRoot(AnyPointer.factory); + } + + @Override + public void setFds(List fds) { + this.fds = fds; + } + + @Override + public void send() { + writeCompleted = writeCompleted.thenCompose( + x -> Serialize.writeAsync(channel, message) + ); + } + + @Override + public int sizeInWords() { + int size = 0; + for (var segment: message.getSegmentsForOutput()) { + size += segment.position(); + } + return size / 2; + } + } + + final class IncomingMessage implements IncomingRpcMessage { + + final MessageReader message; + final List fds; + + IncomingMessage(MessageReader message) { + this(message, List.of()); + } + + IncomingMessage(MessageReader message, List fds) { + this.message = message; + this.fds = fds; + } + + @Override + public AnyPointer.Reader getBody() { + return message.getRoot(AnyPointer.factory); + } + + @Override + public List getAttachedFds() { + return fds; + } + } +} diff --git a/runtime/src/main/java/org/capnproto/VatNetwork.java b/runtime/src/main/java/org/capnproto/VatNetwork.java new file mode 100644 index 0000000..16a4c21 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/VatNetwork.java @@ -0,0 +1,13 @@ +package org.capnproto; + +import java.util.concurrent.CompletableFuture; + +public interface VatNetwork { + + interface Connection { + + OutgoingRpcMessage newOutgoingMessage(int firstSegmentWordSize); + + CompletableFuture receiveIncomingMessage(); + } +}