From b3c5b030c5abd106c269649387e0ff0205916df5 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Mon, 28 Sep 2020 16:43:38 +0100 Subject: [PATCH] handle bootstrapping request --- .../main/java/org/capnproto/AnyPointer.java | 4 + .../main/java/org/capnproto/Capability.java | 15 ++ .../main/java/org/capnproto/ExportTable.java | 12 +- .../src/main/java/org/capnproto/RpcState.java | 220 ++++++++++++++++-- .../test/java/org/capnproto/RpcStateTest.java | 18 +- 5 files changed, 238 insertions(+), 31 deletions(-) create mode 100644 runtime/src/main/java/org/capnproto/Capability.java diff --git a/runtime/src/main/java/org/capnproto/AnyPointer.java b/runtime/src/main/java/org/capnproto/AnyPointer.java index 34a05c1..64f4acc 100644 --- a/runtime/src/main/java/org/capnproto/AnyPointer.java +++ b/runtime/src/main/java/org/capnproto/AnyPointer.java @@ -110,6 +110,10 @@ public final class AnyPointer { factory.setPointerBuilder(this.segment, this.pointer, reader); } + public final void setAsCapability(Capability.Client cap) { + WireHelpers.setCapabilityPointer(this.segment, capTable, this.pointer, cap.hook); + } + public final Reader asReader() { return new Reader(segment, pointer, java.lang.Integer.MAX_VALUE); } diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java new file mode 100644 index 0000000..05338c3 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -0,0 +1,15 @@ +package org.capnproto; + +public class Capability { + + public static class Client { + + final ClientHook hook; + + public Client(ClientHook hook) { + this.hook = hook; + } + + } + +} diff --git a/runtime/src/main/java/org/capnproto/ExportTable.java b/runtime/src/main/java/org/capnproto/ExportTable.java index 74ae69d..d199130 100644 --- a/runtime/src/main/java/org/capnproto/ExportTable.java +++ b/runtime/src/main/java/org/capnproto/ExportTable.java @@ -6,14 +6,12 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.function.Consumer; -abstract class ExportTable implements Iterable { +class ExportTable implements Iterable { final HashMap slots = new HashMap<>(); final Queue freeIds = new PriorityQueue<>(); int max = 0; - protected abstract T newExportable(); - public T find(int id) { return slots.get(id); } @@ -28,18 +26,16 @@ abstract class ExportTable implements Iterable { } } - public T next() { + public int next(T value) { if (freeIds.isEmpty()) { var id = max; max++; - var value = newExportable(); slots.put(id, value); - return value; + return id; } else { var id = freeIds.remove(); - var value = newExportable(); slots.put(id, value); - return value; + return id; } } diff --git a/runtime/src/main/java/org/capnproto/RpcState.java b/runtime/src/main/java/org/capnproto/RpcState.java index 5483194..93e6a6d 100644 --- a/runtime/src/main/java/org/capnproto/RpcState.java +++ b/runtime/src/main/java/org/capnproto/RpcState.java @@ -2,6 +2,7 @@ package org.capnproto; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Consumer; final class RpcState { @@ -33,7 +34,7 @@ final class RpcState { static final class Export { int refcount; ClientHook clientHook; - CompletableFuture resolveOp; + CompletionStage resolveOp; } static final class Import { @@ -47,19 +48,8 @@ final class RpcState { CompletableFuture fulfiller; } - private final ExportTable exports = new ExportTable() { - @Override - protected Export newExportable() { - return new Export(); - } - }; - - private final ExportTable questions = new ExportTable() { - @Override - protected Question newExportable() { - return new Question(); - } - }; + private final ExportTable exports = new ExportTable(); + private final ExportTable questions = new ExportTable(); private final ImportTable answers = new ImportTable() { @Override @@ -75,19 +65,20 @@ final class RpcState { } }; - private final ExportTable embargos = new ExportTable() { - @Override - protected Embargo newExportable() { - return new Embargo(); - } - }; + private final ExportTable embargos = new ExportTable(); private final HashMap exportsByCap = new HashMap<>(); private final VatNetwork.Connection connection; + private final Capability.Client bootstrapInterface; + private Throwable disconnected = null; - - RpcState(VatNetwork.Connection connection) { + RpcState(VatNetwork.Connection connection, Capability.Client bootstrapInterface) { this.connection = connection; + this.bootstrapInterface = bootstrapInterface; + } + + boolean isDisconnected() { + return this.disconnected != null; } void handleMessage(IncomingRpcMessage message) { @@ -131,6 +122,47 @@ final class RpcState { } void handleBootstrap(IncomingRpcMessage message, RpcProtocol.Bootstrap.Reader bootstrap) { + if (isDisconnected()) { + return; + } + + var answerId = bootstrap.getQuestionId(); + var answer = answers.put(answerId); + if (answer.active) { + // questionId already in use + return; + } + answer.active = true; + + var capTable = new BuilderCapabilityTable(); + var response = connection.newOutgoingMessage(1024); + + var ret = response.getBody().getAs(RpcProtocol.Message.factory).initReturn(); + ret.setAnswerId(answerId); + + var payload = ret.initResults(); + var content = payload.getContent().imbue(capTable); + content.setAsCapability(bootstrapInterface); + + var capTableArray = capTable.getTable(); + assert capTableArray.length != 0; + + var capHook = capTableArray[0]; + assert capHook != null; + + var fds = List.of(); + response.setFds(List.of()); + + answer.resultExports = writeDescriptors(capTableArray, payload, fds); + answer.pipeline = ops -> ops.length == 0 + ? capHook + : ClientHook.newBrokenCap("Invalid pipeline transform."); + + response.send(); + + assert answer.active; + assert answer.resultExports != null; + assert answer.pipeline != null; } void handleCall(IncomingRpcMessage message, RpcProtocol.Call.Reader call) { @@ -148,6 +180,150 @@ final class RpcState { void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) { } + + private List writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List fds) { + if (capTable.length == 0) { + return List.of(); + } + + var capTableBuilder = payload.initCapTable(capTable.length); + var exports = new ArrayList(); + for (int ii = 0; ii < capTable.length; ++ii) { + var cap = capTable[ii]; + if (cap == null) { + capTableBuilder.get(ii).setNone(null); + continue; + } + + var exportId = writeDescriptor(cap, capTableBuilder.get(ii), fds); + if (exportId != null) { + exports.add(exportId); + } + } + return exports; + } + + private Integer writeDescriptor(ClientHook cap, RpcProtocol.CapDescriptor.Builder descriptor, List fds) { + ClientHook inner = cap; + for (;;) { + var resolved = inner.getResolved(); + if (resolved != null) { + inner = resolved; + } else { + break; + } + } + + var fd = inner.getFd(); + if (fd != null) { + fds.add(fd); + } + + if (inner.getBrand() == this) { + return ((RpcClient) inner).writeDescriptor(descriptor, fds); + } + + var exportId = exportsByCap.get(inner); + if (exportId != null) { + // We've already seen and exported this capability before. + var export = exports.find(exportId); + export.refcount++; + descriptor.setSenderHosted(exportId); + return exportId; + } + + // This is the first time we've seen this capability. + var export = new Export(); + export.refcount = 1; + export.clientHook = inner; + exportId = exports.next(export); + + var wrapped = inner.whenMoreResolved(); + if (wrapped != null) { + // This is a promise. Arrange for the `Resolve` message to be sent later. + export.resolveOp = resolveExportedPromise(exportId, wrapped); + descriptor.setSenderPromise(exportId); + } + else { + descriptor.setSenderHosted(exportId); + } + return exportId; + } + + CompletionStage resolveExportedPromise(int exportId, CompletionStage promise) { + + return promise.thenCompose(resolution -> { + if (isDisconnected()) { + return CompletableFuture.completedFuture(null); + } + + resolution = getInnermostClient(resolution); + + var exp = exports.find(exportId); + exportsByCap.remove(exp.clientHook); + exp.clientHook = resolution; + + if (exp.clientHook.getBrand() != this) { + // We're resolving to a local capability. If we're resolving to a promise, we might be + // able to reuse our export table entry and avoid sending a message. + var more = exp.clientHook.whenMoreResolved(); + if (more != null) { + // We're replacing a promise with another local promise. In this case, we might actually + // be able to just reuse the existing export table entry to represent the new promise -- + // unless it already has an entry. Let's check. + + var insertResult = exportsByCap.put(exp.clientHook, exportId); + // TODO check this behaviour + if (insertResult == null) { + // The new promise was not already in the table, therefore the existing export table + // entry has now been repurposed to represent it. There is no need to send a resolve + // message at all. We do, however, have to start resolving the next promise. + return resolveExportedPromise(exportId, more); + } + } + } + + // send a Resolve message + var message = connection.newOutgoingMessage(1024); + var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve(); + resolve.setPromiseId(exportId); + var fds = List.of(); + writeDescriptor(exp.clientHook, resolve.initCap(), fds); + message.setFds(fds); + message.send(); + return CompletableFuture.completedFuture(null); + }).whenComplete((value, exc) -> { + if (exc == null) { + return; + } + var message = connection.newOutgoingMessage(1024); + var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve(); + resolve.setPromiseId(exportId); + RpcException.fromException(exc, resolve.initException()); + message.send(); + + // TODO disconnect? + }); + } + + ClientHook getInnermostClient(ClientHook client) { + for (;;) { + var inner = client.getResolved(); + if (inner != null) { + client = inner; + } + else { + break; + } + } + + if (client.getBrand() == this) { + return ((RpcClient)client).getInnermostClient(); + } + + return client; + } + interface RpcResponse extends ResponseHook { AnyPointer.Reader getResults(); } diff --git a/runtime/src/test/java/org/capnproto/RpcStateTest.java b/runtime/src/test/java/org/capnproto/RpcStateTest.java index 72bf771..1d0e64e 100644 --- a/runtime/src/test/java/org/capnproto/RpcStateTest.java +++ b/runtime/src/test/java/org/capnproto/RpcStateTest.java @@ -54,13 +54,15 @@ public class RpcStateTest { } TestConnection connection; + Capability.Client bootstrapInterface; RpcState rpc; final Queue sent = new ArrayDeque<>(); @Before public void setUp() throws Exception { connection = new TestConnection(); - rpc = new RpcState(connection); + bootstrapInterface = new Capability.Client(ClientHook.newNullCap()); + rpc = new RpcState(connection, bootstrapInterface); } @After @@ -84,6 +86,20 @@ public class RpcStateTest { @Test public void handleBootstrap() { + var msg = new TestMessage(); + var bootstrap = msg.builder.getRoot(RpcProtocol.Message.factory).initBootstrap(); + bootstrap.setQuestionId(0); + rpc.handleMessage(msg); + Assert.assertFalse(sent.isEmpty()); + var reply = sent.remove(); + var rpcMsg = reply.getBody().getAs(RpcProtocol.Message.factory); + Assert.assertEquals(rpcMsg.which(), RpcProtocol.Message.Which.RETURN); + var ret = rpcMsg.getReturn(); + Assert.assertEquals(ret.getAnswerId(), 0); + Assert.assertEquals(ret.which(), RpcProtocol.Return.Which.RESULTS); + var results = ret.getResults(); + Assert.assertEquals(results.getCapTable().size(), 1); // got a capability! + Assert.assertTrue(results.hasContent()); } @Test