handle bootstrapping request

This commit is contained in:
Vaci Koblizek 2020-09-28 16:43:38 +01:00
parent 66ae27e805
commit b3c5b030c5
5 changed files with 238 additions and 31 deletions

View file

@ -110,6 +110,10 @@ public final class AnyPointer {
factory.setPointerBuilder(this.segment, this.pointer, reader); factory.setPointerBuilder(this.segment, this.pointer, reader);
} }
public final void setAsCapability(Capability.Client cap) {
WireHelpers.setCapabilityPointer(this.segment, capTable, this.pointer, cap.hook);
}
public final Reader asReader() { public final Reader asReader() {
return new Reader(segment, pointer, java.lang.Integer.MAX_VALUE); return new Reader(segment, pointer, java.lang.Integer.MAX_VALUE);
} }

View file

@ -0,0 +1,15 @@
package org.capnproto;
public class Capability {
public static class Client {
final ClientHook hook;
public Client(ClientHook hook) {
this.hook = hook;
}
}
}

View file

@ -6,14 +6,12 @@ import java.util.PriorityQueue;
import java.util.Queue; import java.util.Queue;
import java.util.function.Consumer; import java.util.function.Consumer;
abstract class ExportTable<T> implements Iterable<T> { class ExportTable<T> implements Iterable<T> {
final HashMap<Integer, T> slots = new HashMap<>(); final HashMap<Integer, T> slots = new HashMap<>();
final Queue<Integer> freeIds = new PriorityQueue<>(); final Queue<Integer> freeIds = new PriorityQueue<>();
int max = 0; int max = 0;
protected abstract T newExportable();
public T find(int id) { public T find(int id) {
return slots.get(id); return slots.get(id);
} }
@ -28,18 +26,16 @@ abstract class ExportTable<T> implements Iterable<T> {
} }
} }
public T next() { public int next(T value) {
if (freeIds.isEmpty()) { if (freeIds.isEmpty()) {
var id = max; var id = max;
max++; max++;
var value = newExportable();
slots.put(id, value); slots.put(id, value);
return value; return id;
} else { } else {
var id = freeIds.remove(); var id = freeIds.remove();
var value = newExportable();
slots.put(id, value); slots.put(id, value);
return value; return id;
} }
} }

View file

@ -2,6 +2,7 @@ package org.capnproto;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer; import java.util.function.Consumer;
final class RpcState { final class RpcState {
@ -33,7 +34,7 @@ final class RpcState {
static final class Export { static final class Export {
int refcount; int refcount;
ClientHook clientHook; ClientHook clientHook;
CompletableFuture<java.lang.Void> resolveOp; CompletionStage<?> resolveOp;
} }
static final class Import { static final class Import {
@ -47,19 +48,8 @@ final class RpcState {
CompletableFuture<java.lang.Void> fulfiller; CompletableFuture<java.lang.Void> fulfiller;
} }
private final ExportTable<Export> exports = new ExportTable<Export>() { private final ExportTable<Export> exports = new ExportTable<Export>();
@Override private final ExportTable<Question> questions = new ExportTable<Question>();
protected Export newExportable() {
return new Export();
}
};
private final ExportTable<Question> questions = new ExportTable<Question>() {
@Override
protected Question newExportable() {
return new Question();
}
};
private final ImportTable<Answer> answers = new ImportTable<Answer>() { private final ImportTable<Answer> answers = new ImportTable<Answer>() {
@Override @Override
@ -75,19 +65,20 @@ final class RpcState {
} }
}; };
private final ExportTable<Embargo> embargos = new ExportTable<Embargo>() { private final ExportTable<Embargo> embargos = new ExportTable<Embargo>();
@Override
protected Embargo newExportable() {
return new Embargo();
}
};
private final HashMap<ClientHook, Integer> exportsByCap = new HashMap<>(); private final HashMap<ClientHook, Integer> exportsByCap = new HashMap<>();
private final VatNetwork.Connection connection; private final VatNetwork.Connection connection;
private final Capability.Client bootstrapInterface;
private Throwable disconnected = null;
RpcState(VatNetwork.Connection connection, Capability.Client bootstrapInterface) {
RpcState(VatNetwork.Connection connection) {
this.connection = connection; this.connection = connection;
this.bootstrapInterface = bootstrapInterface;
}
boolean isDisconnected() {
return this.disconnected != null;
} }
void handleMessage(IncomingRpcMessage message) { void handleMessage(IncomingRpcMessage message) {
@ -131,6 +122,47 @@ final class RpcState {
} }
void handleBootstrap(IncomingRpcMessage message, RpcProtocol.Bootstrap.Reader bootstrap) { void handleBootstrap(IncomingRpcMessage message, RpcProtocol.Bootstrap.Reader bootstrap) {
if (isDisconnected()) {
return;
}
var answerId = bootstrap.getQuestionId();
var answer = answers.put(answerId);
if (answer.active) {
// questionId already in use
return;
}
answer.active = true;
var capTable = new BuilderCapabilityTable();
var response = connection.newOutgoingMessage(1024);
var ret = response.getBody().getAs(RpcProtocol.Message.factory).initReturn();
ret.setAnswerId(answerId);
var payload = ret.initResults();
var content = payload.getContent().imbue(capTable);
content.setAsCapability(bootstrapInterface);
var capTableArray = capTable.getTable();
assert capTableArray.length != 0;
var capHook = capTableArray[0];
assert capHook != null;
var fds = List.<Integer>of();
response.setFds(List.of());
answer.resultExports = writeDescriptors(capTableArray, payload, fds);
answer.pipeline = ops -> ops.length == 0
? capHook
: ClientHook.newBrokenCap("Invalid pipeline transform.");
response.send();
assert answer.active;
assert answer.resultExports != null;
assert answer.pipeline != null;
} }
void handleCall(IncomingRpcMessage message, RpcProtocol.Call.Reader call) { void handleCall(IncomingRpcMessage message, RpcProtocol.Call.Reader call) {
@ -148,6 +180,150 @@ final class RpcState {
void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) { void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) {
} }
private List<Integer> writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List<Integer> fds) {
if (capTable.length == 0) {
return List.of();
}
var capTableBuilder = payload.initCapTable(capTable.length);
var exports = new ArrayList<Integer>();
for (int ii = 0; ii < capTable.length; ++ii) {
var cap = capTable[ii];
if (cap == null) {
capTableBuilder.get(ii).setNone(null);
continue;
}
var exportId = writeDescriptor(cap, capTableBuilder.get(ii), fds);
if (exportId != null) {
exports.add(exportId);
}
}
return exports;
}
private Integer writeDescriptor(ClientHook cap, RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds) {
ClientHook inner = cap;
for (;;) {
var resolved = inner.getResolved();
if (resolved != null) {
inner = resolved;
} else {
break;
}
}
var fd = inner.getFd();
if (fd != null) {
fds.add(fd);
}
if (inner.getBrand() == this) {
return ((RpcClient) inner).writeDescriptor(descriptor, fds);
}
var exportId = exportsByCap.get(inner);
if (exportId != null) {
// We've already seen and exported this capability before.
var export = exports.find(exportId);
export.refcount++;
descriptor.setSenderHosted(exportId);
return exportId;
}
// This is the first time we've seen this capability.
var export = new Export();
export.refcount = 1;
export.clientHook = inner;
exportId = exports.next(export);
var wrapped = inner.whenMoreResolved();
if (wrapped != null) {
// This is a promise. Arrange for the `Resolve` message to be sent later.
export.resolveOp = resolveExportedPromise(exportId, wrapped);
descriptor.setSenderPromise(exportId);
}
else {
descriptor.setSenderHosted(exportId);
}
return exportId;
}
CompletionStage<?> resolveExportedPromise(int exportId, CompletionStage<ClientHook> promise) {
return promise.thenCompose(resolution -> {
if (isDisconnected()) {
return CompletableFuture.completedFuture(null);
}
resolution = getInnermostClient(resolution);
var exp = exports.find(exportId);
exportsByCap.remove(exp.clientHook);
exp.clientHook = resolution;
if (exp.clientHook.getBrand() != this) {
// We're resolving to a local capability. If we're resolving to a promise, we might be
// able to reuse our export table entry and avoid sending a message.
var more = exp.clientHook.whenMoreResolved();
if (more != null) {
// We're replacing a promise with another local promise. In this case, we might actually
// be able to just reuse the existing export table entry to represent the new promise --
// unless it already has an entry. Let's check.
var insertResult = exportsByCap.put(exp.clientHook, exportId);
// TODO check this behaviour
if (insertResult == null) {
// The new promise was not already in the table, therefore the existing export table
// entry has now been repurposed to represent it. There is no need to send a resolve
// message at all. We do, however, have to start resolving the next promise.
return resolveExportedPromise(exportId, more);
}
}
}
// send a Resolve message
var message = connection.newOutgoingMessage(1024);
var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve();
resolve.setPromiseId(exportId);
var fds = List.<Integer>of();
writeDescriptor(exp.clientHook, resolve.initCap(), fds);
message.setFds(fds);
message.send();
return CompletableFuture.completedFuture(null);
}).whenComplete((value, exc) -> {
if (exc == null) {
return;
}
var message = connection.newOutgoingMessage(1024);
var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve();
resolve.setPromiseId(exportId);
RpcException.fromException(exc, resolve.initException());
message.send();
// TODO disconnect?
});
}
ClientHook getInnermostClient(ClientHook client) {
for (;;) {
var inner = client.getResolved();
if (inner != null) {
client = inner;
}
else {
break;
}
}
if (client.getBrand() == this) {
return ((RpcClient)client).getInnermostClient();
}
return client;
}
interface RpcResponse extends ResponseHook { interface RpcResponse extends ResponseHook {
AnyPointer.Reader getResults(); AnyPointer.Reader getResults();
} }

View file

@ -54,13 +54,15 @@ public class RpcStateTest {
} }
TestConnection connection; TestConnection connection;
Capability.Client bootstrapInterface;
RpcState rpc; RpcState rpc;
final Queue<OutgoingRpcMessage> sent = new ArrayDeque<>(); final Queue<OutgoingRpcMessage> sent = new ArrayDeque<>();
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
connection = new TestConnection(); connection = new TestConnection();
rpc = new RpcState(connection); bootstrapInterface = new Capability.Client(ClientHook.newNullCap());
rpc = new RpcState(connection, bootstrapInterface);
} }
@After @After
@ -84,6 +86,20 @@ public class RpcStateTest {
@Test @Test
public void handleBootstrap() { public void handleBootstrap() {
var msg = new TestMessage();
var bootstrap = msg.builder.getRoot(RpcProtocol.Message.factory).initBootstrap();
bootstrap.setQuestionId(0);
rpc.handleMessage(msg);
Assert.assertFalse(sent.isEmpty());
var reply = sent.remove();
var rpcMsg = reply.getBody().getAs(RpcProtocol.Message.factory);
Assert.assertEquals(rpcMsg.which(), RpcProtocol.Message.Which.RETURN);
var ret = rpcMsg.getReturn();
Assert.assertEquals(ret.getAnswerId(), 0);
Assert.assertEquals(ret.which(), RpcProtocol.Return.Which.RESULTS);
var results = ret.getResults();
Assert.assertEquals(results.getCapTable().size(), 1); // got a capability!
Assert.assertTrue(results.hasContent());
} }
@Test @Test