question cleanup

This commit is contained in:
Vaci Koblizek 2020-10-20 21:42:20 +01:00
parent 7134461e7d
commit 730ca1abf5
24 changed files with 9438 additions and 407 deletions

View file

@ -1758,7 +1758,7 @@ private:
" }\n"), " }\n"),
kj::strTree( kj::strTree(
" protected java.util.concurrent.CompletableFuture<?> ", identifierName, "(org.capnproto.StreamingCallContext<", shortParamType, ".Reader> context) {\n" " protected java.util.concurrent.CompletableFuture<java.lang.Void> ", identifierName, "(org.capnproto.StreamingCallContext<", shortParamType, ".Reader> context) {\n"
" return org.capnproto.Capability.Server.internalUnimplemented(\n" " return org.capnproto.Capability.Server.internalUnimplemented(\n"
" \"", interfaceProto.getDisplayName(), "\", \"", methodName, "\",\n" " \"", interfaceProto.getDisplayName(), "\", \"", methodName, "\",\n"
" 0x", interfaceIdHex, "L, (short)", methodId, ");\n" " 0x", interfaceIdHex, "L, (short)", methodId, ");\n"
@ -1780,7 +1780,7 @@ private:
" }\n"), " }\n"),
kj::strTree( kj::strTree(
" protected java.util.concurrent.CompletableFuture<?> ", identifierName, "(org.capnproto.CallContext<", shortParamType, ".Reader, ", shortResultType, ".Builder> context) {\n" " protected java.util.concurrent.CompletableFuture<java.lang.Void> ", identifierName, "(org.capnproto.CallContext<", shortParamType, ".Reader, ", shortResultType, ".Builder> context) {\n"
" return org.capnproto.Capability.Server.internalUnimplemented(\n" " return org.capnproto.Capability.Server.internalUnimplemented(\n"
" \"", interfaceProto.getDisplayName(), "\", \"", methodName, "\",\n" " \"", interfaceProto.getDisplayName(), "\", \"", methodName, "\",\n"
" 0x", interfaceIdHex, "L, (short)", methodId, ");\n" " 0x", interfaceIdHex, "L, (short)", methodId, ");\n"

View file

@ -7,7 +7,11 @@ public interface CallContextHook {
void releaseParams(); void releaseParams();
AnyPointer.Builder getResults(); default AnyPointer.Builder getResults() {
return getResults(0);
}
AnyPointer.Builder getResults(int sizeHint);
CompletableFuture<?> tailCall(RequestHook request); CompletableFuture<?> tailCall(RequestHook request);

View file

@ -162,7 +162,7 @@ public final class Capability {
return BRAND; return BRAND;
} }
CompletableFuture<?> callInternal(long interfaceId, short methodId, CallContextHook context) { CompletableFuture<java.lang.Void> callInternal(long interfaceId, short methodId, CallContextHook context) {
var result = dispatchCall( var result = dispatchCall(
interfaceId, interfaceId,
methodId, methodId,
@ -212,7 +212,7 @@ public final class Capability {
long interfaceId, short methodId, long interfaceId, short methodId,
CallContext<AnyPointer.Reader, AnyPointer.Builder> context); CallContext<AnyPointer.Reader, AnyPointer.Builder> context);
protected static DispatchCallResult streamResult(CompletableFuture<?> result) { protected static DispatchCallResult streamResult(CompletableFuture<java.lang.Void> result) {
// For streaming calls, we need to add an evalNow() here so that exceptions thrown // For streaming calls, we need to add an evalNow() here so that exceptions thrown
// directly from the call can propagate to later calls. If we don't capture the // directly from the call can propagate to later calls. If we don't capture the
// exception properly then the caller will never find out that this is a streaming // exception properly then the caller will never find out that this is a streaming
@ -222,35 +222,35 @@ public final class Capability {
return new DispatchCallResult(result, true); return new DispatchCallResult(result, true);
} }
protected static DispatchCallResult result(CompletableFuture<?> result) { protected static DispatchCallResult result(CompletableFuture<java.lang.Void> result) {
return new DispatchCallResult(result, false); return new DispatchCallResult(result, false);
} }
protected static CompletableFuture<?> internalUnimplemented(String actualInterfaceName, long requestedTypeId) { protected static CompletableFuture<java.lang.Void> internalUnimplemented(String actualInterfaceName, long requestedTypeId) {
return CompletableFuture.failedFuture(RpcException.unimplemented( return CompletableFuture.failedFuture(RpcException.unimplemented(
"Method not implemented. " + actualInterfaceName + " " + requestedTypeId)); "Method not implemented. " + actualInterfaceName + " " + requestedTypeId));
} }
protected static CompletableFuture<?> internalUnimplemented(String interfaceName, long typeId, short methodId) { protected static CompletableFuture<java.lang.Void> internalUnimplemented(String interfaceName, long typeId, short methodId) {
return CompletableFuture.failedFuture(RpcException.unimplemented( return CompletableFuture.failedFuture(RpcException.unimplemented(
"Method not implemented. " + interfaceName + " " + typeId + " " + methodId)); "Method not implemented. " + interfaceName + " " + typeId + " " + methodId));
} }
protected static CompletableFuture<?> internalUnimplemented(String interfaceName, String methodName, long typeId, short methodId) { protected static CompletableFuture<java.lang.Void> internalUnimplemented(String interfaceName, String methodName, long typeId, short methodId) {
return CompletableFuture.failedFuture(RpcException.unimplemented( return CompletableFuture.failedFuture(RpcException.unimplemented(
"Method not implemented. " + interfaceName + " " + typeId + " " + methodName + " " + methodId)); "Method not implemented. " + interfaceName + " " + typeId + " " + methodName + " " + methodId));
} }
} }
public static ClientHook newLocalPromiseClient(CompletionStage<ClientHook> promise) { public static ClientHook newLocalPromiseClient(CompletionStage<ClientHook> promise) {
return new QueuedClient(promise); return new QueuedClient(promise.toCompletableFuture());
} }
public static PipelineHook newLocalPromisePipeline(CompletionStage<PipelineHook> promise) { public static PipelineHook newLocalPromisePipeline(CompletionStage<PipelineHook> promise) {
return new QueuedPipeline(promise); return new QueuedPipeline(promise.toCompletableFuture());
} }
static class LocalRequest implements RequestHook { private static class LocalRequest implements RequestHook {
final MessageBuilder message = new MessageBuilder(); final MessageBuilder message = new MessageBuilder();
final long interfaceId; final long interfaceId;
@ -290,11 +290,11 @@ public final class Capability {
} }
} }
static final class LocalPipeline implements PipelineHook { private static final class LocalPipeline implements PipelineHook {
final CallContextHook context; private final CallContextHook context;
final AnyPointer.Reader results; private final AnyPointer.Reader results;
public LocalPipeline(CallContextHook context) { LocalPipeline(CallContextHook context) {
this.context = context; this.context = context;
this.results = context.getResults().asReader(); this.results = context.getResults().asReader();
} }
@ -305,11 +305,16 @@ public final class Capability {
} }
} }
static class LocalResponse implements ResponseHook { private static final class LocalResponse implements ResponseHook {
final MessageBuilder message = new MessageBuilder();
final MessageBuilder message;
LocalResponse(int sizeHint) {
this.message = new MessageBuilder(sizeHint);
}
} }
static class LocalCallContext implements CallContextHook { private static class LocalCallContext implements CallContextHook {
final CompletableFuture<?> cancelAllowed; final CompletableFuture<?> cancelAllowed;
MessageBuilder request; MessageBuilder request;
@ -336,9 +341,9 @@ public final class Capability {
} }
@Override @Override
public AnyPointer.Builder getResults() { public AnyPointer.Builder getResults(int sizeHint) {
if (this.response == null) { if (this.response == null) {
var localResponse = new LocalResponse(); var localResponse = new LocalResponse(sizeHint);
this.responseBuilder = localResponse.message.getRoot(AnyPointer.factory); this.responseBuilder = localResponse.message.getRoot(AnyPointer.factory);
this.response = new Response<>(this.responseBuilder.asReader(), localResponse); this.response = new Response<>(this.responseBuilder.asReader(), localResponse);
} }
@ -409,4 +414,82 @@ public final class Capability {
}; };
} }
// Call queues
//
// These classes handle pipelining in the case where calls need to be queued in-memory until some
// local operation completes.
// A PipelineHook which simply queues calls while waiting for a PipelineHook to which to forward them.
private static final class QueuedPipeline implements PipelineHook {
private final CompletableFuture<PipelineHook> promise;
private final CompletionStage<Void> selfResolutionOp;
PipelineHook redirect;
QueuedPipeline(CompletableFuture<PipelineHook> promiseParam) {
this.promise = promiseParam;
this.selfResolutionOp = promise.handle((pipeline, exc) -> {
this.redirect = exc == null
? pipeline
: PipelineHook.newBrokenPipeline(exc);
return null;
});
}
@Override
public final ClientHook getPipelinedCap(PipelineOp[] ops) {
return redirect != null
? redirect.getPipelinedCap(ops)
: new QueuedClient(this.promise.thenApply(
pipeline -> pipeline.getPipelinedCap(ops)));
}
}
// A ClientHook which simply queues calls while waiting for a ClientHook to which to forward them.
private static class QueuedClient implements ClientHook {
private final CompletableFuture<ClientHook> promise;
private final CompletableFuture<ClientHook> promiseForCallForwarding;
private final CompletableFuture<ClientHook> promiseForClientResolution;
private final CompletableFuture<java.lang.Void> setResolutionOp;
private ClientHook redirect;
QueuedClient(CompletableFuture<ClientHook> promise) {
// TODO revisit futures
this.promise = promise;
this.promiseForCallForwarding = promise.toCompletableFuture().copy();
this.promiseForClientResolution = promise.toCompletableFuture().copy();
this.setResolutionOp = promise.thenAccept(inner -> {
this.redirect = inner;
}).exceptionally(exc -> {
this.redirect = newBrokenCap(exc);
return null;
});
}
@Override
public Request<AnyPointer.Builder, AnyPointer.Pipeline> newCall(long interfaceId, short methodId) {
var hook = new LocalRequest(interfaceId, methodId, this);
var root = hook.message.getRoot(AnyPointer.factory);
return new Request<>(root, AnyPointer.factory, hook);
}
@Override
public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) {
var callResultPromise = this.promiseForCallForwarding.thenApply(client -> client.call(interfaceId, methodId, ctx));
var pipelinePromise = callResultPromise.thenApply(callResult -> callResult.pipeline);
var pipeline = new QueuedPipeline(pipelinePromise);
return new VoidPromiseAndPipeline(callResultPromise.thenAccept(x -> {}), pipeline);
}
@Override
public ClientHook getResolved() {
return redirect;
}
@Override
public CompletionStage<ClientHook> whenMoreResolved() {
return promiseForClientResolution.copy();
}
}
} }

View file

@ -44,10 +44,10 @@ public interface ClientHook {
} }
final class VoidPromiseAndPipeline { final class VoidPromiseAndPipeline {
public final CompletionStage<?> promise; public final CompletionStage<java.lang.Void> promise;
public final PipelineHook pipeline; public final PipelineHook pipeline;
VoidPromiseAndPipeline(CompletionStage<?> promise, PipelineHook pipeline) { VoidPromiseAndPipeline(CompletionStage<java.lang.Void> promise, PipelineHook pipeline) {
this.promise = promise; this.promise = promise;
this.pipeline = pipeline; this.pipeline = pipeline;
} }

View file

@ -4,10 +4,10 @@ import java.util.concurrent.CompletableFuture;
public final class DispatchCallResult { public final class DispatchCallResult {
private final CompletableFuture<?> promise; private final CompletableFuture<java.lang.Void> promise;
private final boolean streaming; private final boolean streaming;
public DispatchCallResult(CompletableFuture<?> promise, boolean isStreaming) { public DispatchCallResult(CompletableFuture<java.lang.Void> promise, boolean isStreaming) {
this.promise = promise; this.promise = promise;
this.streaming = isStreaming; this.streaming = isStreaming;
} }
@ -16,8 +16,8 @@ public final class DispatchCallResult {
this(CompletableFuture.failedFuture(exc), false); this(CompletableFuture.failedFuture(exc), false);
} }
public CompletableFuture<?> getPromise() { public CompletableFuture<java.lang.Void> getPromise() {
return promise; return promise.copy();
} }
public boolean isStreaming() { public boolean isStreaming() {

View file

@ -8,9 +8,9 @@ import java.util.function.Consumer;
abstract class ExportTable<T> implements Iterable<T> { abstract class ExportTable<T> implements Iterable<T> {
final HashMap<Integer, T> slots = new HashMap<>(); private final HashMap<Integer, T> slots = new HashMap<>();
final Queue<Integer> freeIds = new PriorityQueue<>(); private final Queue<Integer> freeIds = new PriorityQueue<>();
int max = 0; private int max = 0;
abstract T newExportable(int id); abstract T newExportable(int id);

View file

@ -1,50 +0,0 @@
package org.capnproto;
import java.util.concurrent.CompletionStage;
class QueuedClient implements ClientHook {
final CompletionStage<ClientHook> promise;
final CompletionStage<ClientHook> promiseForCallForwarding;
final CompletionStage<ClientHook> promiseForClientResolution;
final CompletionStage<java.lang.Void> setResolutionOp;
ClientHook redirect;
QueuedClient(CompletionStage<ClientHook> promise) {
// TODO revisit futures
this.promise = promise.toCompletableFuture().copy();
this.promiseForCallForwarding = promise.toCompletableFuture().copy();
this.promiseForClientResolution = promise.toCompletableFuture().copy();
this.setResolutionOp = promise.thenAccept(inner -> {
this.redirect = inner;
}).exceptionally(exc -> {
this.redirect = Capability.newBrokenCap(exc);
return null;
});
}
@Override
public Request<AnyPointer.Builder, AnyPointer.Pipeline> newCall(long interfaceId, short methodId) {
var hook = new Capability.LocalRequest(interfaceId, methodId, this);
var root = hook.message.getRoot(AnyPointer.factory);
return new Request<>(root, AnyPointer.factory, hook);
}
@Override
public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) {
var callResultPromise = this.promiseForCallForwarding.thenApply(client -> client.call(interfaceId, methodId, ctx));
var pipelinePromise = callResultPromise.thenApply(callResult -> callResult.pipeline);
var pipeline = new QueuedPipeline(pipelinePromise);
return new VoidPromiseAndPipeline(callResultPromise, pipeline);
}
@Override
public ClientHook getResolved() {
return redirect;
}
@Override
public CompletionStage<ClientHook> whenMoreResolved() {
return promiseForClientResolution;
}
}

View file

@ -1,28 +0,0 @@
package org.capnproto;
import java.util.concurrent.CompletionStage;
final class QueuedPipeline implements PipelineHook {
final CompletionStage<PipelineHook> promise;
final CompletionStage<Void> selfResolutionOp;
PipelineHook redirect;
public QueuedPipeline(CompletionStage<PipelineHook> promiseParam) {
this.promise = promiseParam;
this.selfResolutionOp = promise.handle((pipeline, exc) -> {
this.redirect = exc == null
? pipeline
: PipelineHook.newBrokenPipeline(exc);
return null;
});
}
@Override
public final ClientHook getPipelinedCap(PipelineOp[] ops) {
return redirect != null
? redirect.getPipelinedCap(ops)
: new QueuedClient(this.promise.thenApply(
pipeline -> pipeline.getPipelinedCap(ops)));
}
}

View file

@ -0,0 +1,103 @@
package org.capnproto;
import java.util.HashMap;
import java.util.Map;
public class RpcDumper {
private final Map<Long, Schema.Node.Reader> schemas = new HashMap<>();
private final Map<Integer, Long> clientReturnTypes = new HashMap<>();
private final Map<Integer, Long> serverReturnTypes = new HashMap<>();
void addSchema(long schemaId, Schema.Node.Reader node) {
this.schemas.put(schemaId, node);
}
private void setReturnType(RpcTwoPartyProtocol.Side side, int schemaId, long schema) {
switch (side) {
case CLIENT:
clientReturnTypes.put(schemaId, schema);
break;
case SERVER:
serverReturnTypes.put(schemaId, schema);
default:
break;
}
}
private Long getReturnType(RpcTwoPartyProtocol.Side side, int schemaId) {
switch (side) {
case CLIENT:
return clientReturnTypes.get(schemaId);
case SERVER:
return serverReturnTypes.get(schemaId);
default:
break;
}
return -1L;
}
String dump(RpcProtocol.Message.Reader message, RpcTwoPartyProtocol.Side sender) {
switch (message.which()) {
case CALL: {
var call = message.getCall();
var iface = call.getInterfaceId();
var schema = this.schemas.get(iface);
if (schema == null || !schema.isInterface()) {
break;
}
var interfaceSchema = schema.getInterface();
var methods = interfaceSchema.getMethods();
if (call.getMethodId() >= methods.size()) {
break;
}
var method = methods.get(call.getMethodId());
var interfaceName = schema.getDisplayName().toString();
var paramType = method.getParamStructType();
var resultType = method.getResultStructType();
if (call.getSendResultsTo().isCaller()) {
var questionId = call.getQuestionId();
setReturnType(sender, call.getQuestionId(), resultType);
}
var payload = call.getParams();
var params = payload.getContent();
var sendResultsTo = call.getSendResultsTo();
return sender.name() + "(" + call.getQuestionId() + "): call " +
call.getTarget() + " <- " + interfaceName + "." +
method.getName().toString() + " " + params + " caps:[" +
payload.getCapTable() + "]" + (sendResultsTo.isCaller() ? "" : (" sendResultsTo:" + sendResultsTo));
}
case RETURN: {
var ret = message.getReturn();
var returnType = getReturnType(
sender == RpcTwoPartyProtocol.Side.CLIENT
? RpcTwoPartyProtocol.Side.SERVER
: RpcTwoPartyProtocol.Side.CLIENT,
ret.getAnswerId());
if (ret.which() != RpcProtocol.Return.Which.RESULTS) {
break;
}
var payload = ret.getResults();
return sender.name() + "(" + ret.getAnswerId() + "): return " + payload +
" caps:[" + payload.getCapTable() + "]";
}
case BOOTSTRAP: {
var restore = message.getBootstrap();
setReturnType(sender, restore.getQuestionId(), 0);
return sender.name() + "(" + restore.getQuestionId() + "): bootstrap " +
restore.getDeprecatedObjectId();
}
default:
break;
}
return "";
}
}

View file

@ -5,7 +5,8 @@ public final class RpcException extends java.lang.Exception {
public enum Type { public enum Type {
UNKNOWN, UNKNOWN,
UNIMPLEMENTED, UNIMPLEMENTED,
FAILED FAILED,
DISCONNECTED
} }
private Type type; private Type type;
@ -27,6 +28,10 @@ public final class RpcException extends java.lang.Exception {
return new RpcException(Type.FAILED, message); return new RpcException(Type.FAILED, message);
} }
public static RpcException disconnected(String message) {
return new RpcException(Type.DISCONNECTED, message);
}
static void fromException(Throwable exc, RpcProtocol.Exception.Builder builder) { static void fromException(Throwable exc, RpcProtocol.Exception.Builder builder) {
builder.setReason(exc.getMessage()); builder.setReason(exc.getMessage());
builder.setType(RpcProtocol.Exception.Type.FAILED); builder.setType(RpcProtocol.Exception.Type.FAILED);

View file

@ -1,40 +1,50 @@
package org.capnproto; package org.capnproto;
import java.io.IOException;
import java.lang.ref.ReferenceQueue; import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.*; import java.util.*;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
final class RpcState { final class RpcState {
final class Question { private static int messageSizeHint() {
final int id; return 1 + RpcProtocol.Message.factory.structSize().total();
CompletableFuture<RpcResponse> response = new CompletableFuture<>(); }
List<Integer> paramExports;
boolean isAwaitingReturn = false;
boolean isTailCall = false;
boolean skipFinish = false;
Question(int id) { private static int MESSAGE_TARGET_SIZE_HINT
= RpcProtocol.MessageTarget.factory.structSize().total()
+ RpcProtocol.PromisedAnswer.factory.structSize().total()
+ 16;
private static int CAP_DESCRIPTOR_SIZE_HINT
= RpcProtocol.CapDescriptor.factory.structSize().total()
+ RpcProtocol.PromisedAnswer.factory.structSize().total();
private final class QuestionDisposer {
final int id;
boolean skipFinish;
boolean isAwaitingReturn;
QuestionDisposer(int id) {
this.id = id; this.id = id;
} }
void reject(Throwable exc) { void dispose() {
this.response.completeExceptionally(exc); var ref = questions.find(this.id);
this.finish(); if (ref != null) {
assert false: "Question ID no longer on table?";
return;
} }
void answer(RpcResponse response) {
this.response.complete(response);
this.finish();
}
void finish() {
assert questions.find(this.id) != null : "Question ID no longer on table?";
if (isConnected() && !this.skipFinish) { if (isConnected() && !this.skipFinish) {
var message = connection.newOutgoingMessage(1024); var sizeHint = messageSizeHint()
+ RpcProtocol.Finish.factory.structSize().total();
var message = connection.newOutgoingMessage(sizeHint);
var builder = message.getBody().getAs(RpcProtocol.Message.factory).initFinish(); var builder = message.getBody().getAs(RpcProtocol.Message.factory).initFinish();
builder.setQuestionId(this.id); builder.setQuestionId(this.id);
builder.setReleaseResultCaps(this.isAwaitingReturn); builder.setReleaseResultCaps(this.isAwaitingReturn);
@ -45,7 +55,104 @@ final class RpcState {
// Remove question ID from the table. Must do this *after* sending `Finish` to ensure that // Remove question ID from the table. Must do this *after* sending `Finish` to ensure that
// the ID is not re-allocated before the `Finish` message can be sent. // the ID is not re-allocated before the `Finish` message can be sent.
assert !this.isAwaitingReturn; assert !this.isAwaitingReturn;
questions.erase(id, this); questions.erase(id);
}
}
private final class QuestionRef extends WeakReference<Question> {
private final QuestionDisposer disposer;
QuestionRef(Question question) {
super(question, questionRefQueue);
this.disposer = question.disposer;
}
void dispose() {
this.disposer.dispose();
}
}
private final class Question {
CompletableFuture<RpcResponse> response = new CompletableFuture<>();
int[] paramExports = new int[0];
private final QuestionDisposer disposer;
boolean isTailCall = false;
Question(int id) {
this.disposer = new QuestionDisposer(id);
}
public int getId() {
return this.disposer.id;
}
public void setAwaitingReturn(boolean value) {
this.disposer.isAwaitingReturn = value;
}
void reject(Throwable exc) {
this.response.completeExceptionally(exc);
}
void answer(RpcResponse response) {
this.response.complete(response);
}
public boolean isAwaitingReturn() {
return this.disposer.isAwaitingReturn;
}
public void setSkipFinish(boolean value) {
this.disposer.skipFinish = value;
}
}
class QuestionExportTable implements Iterable<Question> {
private final HashMap<Integer, WeakReference<Question>> slots = new HashMap<>();
private final Queue<Integer> freeIds = new PriorityQueue<>();
private int max = 0;
public Question find(int id) {
var ref = this.slots.get(id);
return ref == null ? null : ref.get();
}
public Question erase(int id) {
var value = this.slots.get(id);
if (value != null) {
freeIds.add(id);
this.slots.remove(id);
return value.get();
} else {
return null;
}
}
public Question next() {
int id = freeIds.isEmpty() ? max++ : freeIds.remove();
var value = new Question(id);
var prev = slots.put(id, new QuestionRef(value));
assert prev == null;
return value;
}
@Override
public Iterator<Question> iterator() {
return this.slots.values()
.stream()
.map(ref -> ref.get())
.filter(question -> question != null)
.iterator();
}
@Override
public void forEach(Consumer<? super Question> action) {
var iter = this.iterator();
while (iter.hasNext()) {
action.accept(iter.next());
}
} }
} }
@ -55,7 +162,7 @@ final class RpcState {
PipelineHook pipeline; PipelineHook pipeline;
CompletionStage<RpcResponse> redirectedResults; CompletionStage<RpcResponse> redirectedResults;
RpcCallContext callContext; RpcCallContext callContext;
List<Integer> resultExports; int[] resultExports;
Answer(int answerId) { Answer(int answerId) {
this.answerId = answerId; this.answerId = answerId;
@ -123,12 +230,20 @@ final class RpcState {
} }
}; };
private final ExportTable<Question> questions = new ExportTable<Question>() { /*
private final ExportTable<QuestionRef> questions = new ExportTable<>() {
@Override
QuestionRef newExportable(int id) {
return new QuestionRef(new Question(id));
}
};
*/
private final QuestionExportTable questions = new QuestionExportTable(); /*{
@Override @Override
Question newExportable(int id) { Question newExportable(int id) {
return new Question(id); return new Question(id);
} }
}; */
private final ImportTable<Answer> answers = new ImportTable<>() { private final ImportTable<Answer> answers = new ImportTable<>() {
@Override @Override
@ -151,15 +266,117 @@ final class RpcState {
} }
}; };
private final HashMap<ClientHook, Integer> exportsByCap = new HashMap<>(); private final Map<ClientHook, Integer> exportsByCap = new HashMap<>();
private final VatNetwork.Connection connection;
private final Capability.Client bootstrapInterface; private final Capability.Client bootstrapInterface;
private final VatNetwork.Connection connection;
private final CompletableFuture<java.lang.Void> onDisconnect;
private Throwable disconnected = null; private Throwable disconnected = null;
private CompletableFuture<?> messageReady = CompletableFuture.completedFuture(null); private CompletableFuture<java.lang.Void> messageReady = CompletableFuture.completedFuture(null);
private final String name;
private final CompletableFuture<java.lang.Void> messageLoop;
private final ReferenceQueue<Question> questionRefQueue = new ReferenceQueue<>();
RpcState(VatNetwork.Connection connection, Capability.Client bootstrapInterface) { RpcState( Capability.Client bootstrapInterface,
this.connection = connection; VatNetwork.Connection connection,
CompletableFuture<java.lang.Void> onDisconnect) {
this.bootstrapInterface = bootstrapInterface; this.bootstrapInterface = bootstrapInterface;
this.connection = connection;
this.onDisconnect = onDisconnect;
this.messageLoop = this.doMessageLoop();
if (this.connection instanceof TwoPartyVatNetwork) {
this.name = ((TwoPartyVatNetwork)this.connection).getSide().toString();
}
else {
this.name = this.toString();
}
}
public CompletableFuture<java.lang.Void> getMessageLoop() {
return this.messageLoop;
}
CompletableFuture<java.lang.Void> disconnect(Throwable exc) {
if (isDisconnected()) {
return CompletableFuture.failedFuture(this.disconnected);
}
var networkExc = RpcException.disconnected(exc.getMessage());
// All current questions complete with exceptions.
for (var question: questions) {
question.reject(networkExc);
}
List<PipelineHook> pipelinesToRelease = new ArrayList<>();
List<ClientHook> clientsToRelease = new ArrayList<>();
List<CompletionStage<RpcResponse>> tailCallsToRelease = new ArrayList<>();
List<CompletionStage<?>> resolveOpsToRelease = new ArrayList<>();
for (var answer : answers) {
if (answer.pipeline != null) {
pipelinesToRelease.add(answer.pipeline);
answer.pipeline = null;
}
if (answer.redirectedResults != null) {
tailCallsToRelease.add(answer.redirectedResults);
answer.redirectedResults = null;
}
if (answer.callContext != null) {
answer.callContext.requestCancel();
}
}
for (var export : exports) {
clientsToRelease.add(export.clientHook);
resolveOpsToRelease.add(export.resolveOp);
export.clientHook = null;
export.resolveOp = null;
export.refcount = 0;
}
for (var imp : imports) {
if (imp.promise != null) {
imp.promise.completeExceptionally(networkExc);
}
}
for (var embargo : embargos) {
if (embargo.disembargo != null) {
embargo.disembargo.completeExceptionally(networkExc);
}
}
try {
var message = this.connection.newOutgoingMessage(1024);
RpcException.fromException(exc, message.getBody().getAs(RpcProtocol.Message.factory).initAbort());
message.send();
}
catch (Exception abortFailed) {
// no-op
}
var onShutdown = this.connection.shutdown().handle((x, ioExc) -> {
if (ioExc == null) {
return CompletableFuture.completedFuture(null);
}
// TODO IOException?
assert !(ioExc instanceof IOException);
if (ioExc instanceof RpcException) {
var rpcExc = (RpcException)exc;
if (rpcExc.getType() == RpcException.Type.DISCONNECTED) {
return CompletableFuture.completedFuture(null);
}
}
return CompletableFuture.failedFuture(ioExc);
});
this.disconnected = networkExc;
return onShutdown.thenCompose(x -> CompletableFuture.failedFuture(networkExc));
} }
final boolean isDisconnected() { final boolean isDisconnected() {
@ -171,83 +388,52 @@ final class RpcState {
} }
// Run func() before the next IO event. // Run func() before the next IO event.
private <T> CompletableFuture<T> evalLast(Callable<T> func) { private <T> void evalLast(Callable<T> func) {
return this.messageReady.thenCompose(x -> { this.messageReady = this.messageReady.thenCompose(x -> {
try { try {
return CompletableFuture.completedFuture(func.call()); func.call();
} }
catch (java.lang.Exception exc) { catch (java.lang.Exception exc) {
return CompletableFuture.failedFuture(exc); return CompletableFuture.failedFuture(exc);
} }
return CompletableFuture.completedFuture(null);
}); });
} }
ClientHook restore() { ClientHook restore() {
var question = questions.next(); var question = questions.next();
question.isAwaitingReturn = true; question.setAwaitingReturn(true);
question.paramExports = List.of();
var message = connection.newOutgoingMessage(64); var message = connection.newOutgoingMessage(64);
var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap(); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap();
builder.setQuestionId(question.id); builder.setQuestionId(question.getId());
message.send(); message.send();
var pipeline = new RpcPipeline(question); var pipeline = new RpcPipeline(question);
return pipeline.getPipelinedCap(new PipelineOp[0]); return pipeline.getPipelinedCap(new PipelineOp[0]);
} }
// run message loop once private final CompletableFuture<java.lang.Void> doMessageLoop() {
final CompletableFuture<?> runOnce() {
this.cleanupImports(); this.cleanupImports();
this.cleanupQuestions();
if (isDisconnected()) { if (isDisconnected()) {
return CompletableFuture.failedFuture(disconnected); return CompletableFuture.failedFuture(this.disconnected);
}
if (!messageReady.isDone()) {
return messageReady;
}
messageReady = connection.receiveIncomingMessage().thenAccept(message -> {
try {
handleMessage(message);
}
catch (Exception exc) {
this.disconnected = exc;
}
}).exceptionally(exc -> {
this.disconnected = exc;
return null;
});
return messageReady;
}
// run message loop until promise is completed
public final <T> CompletableFuture<T> messageLoop(CompletableFuture<T> done) {
this.cleanupImports();
if (done.isDone()) {
return done;
}
if (isDisconnected()) {
done.completeExceptionally(disconnected);
return done;
} }
return connection.receiveIncomingMessage().thenCompose(message -> { return connection.receiveIncomingMessage().thenCompose(message -> {
try { try {
handleMessage(message); handleMessage(message);
} catch (Throwable rpcExc) {
// either we received an Abort message from peer
// or internal RpcState is bad.
return this.disconnect(rpcExc);
} }
catch (Exception exc) { return this.doMessageLoop();
done.completeExceptionally(exc);
} }).exceptionallyCompose(exc -> this.disconnect(exc));
return messageLoop(done);
});
} }
synchronized void handleMessage(IncomingRpcMessage message) throws RpcException { synchronized void handleMessage(IncomingRpcMessage message) throws RpcException {
var reader = message.getBody().getAs(RpcProtocol.Message.factory); var reader = message.getBody().getAs(RpcProtocol.Message.factory);
System.out.println(reader.which());
switch (reader.which()) { switch (reader.which()) {
case UNIMPLEMENTED: case UNIMPLEMENTED:
handleUnimplemented(reader.getUnimplemented()); handleUnimplemented(reader.getUnimplemented());
@ -437,7 +623,6 @@ final class RpcState {
} }
void handleReturn(IncomingRpcMessage message, RpcProtocol.Return.Reader callReturn) { void handleReturn(IncomingRpcMessage message, RpcProtocol.Return.Reader callReturn) {
var exportsToRelease = new ArrayList<Integer>();
var question = questions.find(callReturn.getAnswerId()); var question = questions.find(callReturn.getAnswerId());
if (question == null) { if (question == null) {
@ -445,29 +630,35 @@ final class RpcState {
return; return;
} }
if (!question.isAwaitingReturn) { if (!question.isAwaitingReturn()) {
assert false: "Duplicate Return"; assert false: "Duplicate Return";
return; return;
} }
question.setAwaitingReturn(false);
question.isAwaitingReturn = false; var exportsToRelease = new int[0];
if (callReturn.getReleaseParamCaps()) { if (callReturn.getReleaseParamCaps()) {
exportsToRelease.addAll(question.paramExports); exportsToRelease = question.paramExports;
question.paramExports = List.of(); question.paramExports = new int[0];
} }
if (callReturn.isTakeFromOtherQuestion()) { if (callReturn.isTakeFromOtherQuestion()) {
assert false: "Not implemented"; var answer = this.answers.find(callReturn.getTakeFromOtherQuestion());
// TODO process isTakeFromOtherQuestion... if (answer != null) {
answer.redirectedResults = null;
}
//this.questions.erase(callReturn.getAnswerId());
this.releaseExports(exportsToRelease);
return; return;
} }
switch (callReturn.which()) { switch (callReturn.which()) {
case RESULTS: case RESULTS:
if (question.isTailCall) { if (question.isTailCall) {
// TODO resultsSentElsewhere assert false: "Tail call `Return` must set `resultsSentElsewhere`, not `results`.";
return; break;
} }
var payload = callReturn.getResults(); var payload = callReturn.getResults();
var capTable = receiveCaps(payload.getCapTable(), message.getAttachedFds()); var capTable = receiveCaps(payload.getCapTable(), message.getAttachedFds());
// TODO question, message unused in RpcResponseImpl // TODO question, message unused in RpcResponseImpl
@ -478,7 +669,7 @@ final class RpcState {
case EXCEPTION: case EXCEPTION:
if (question.isTailCall) { if (question.isTailCall) {
assert false: "Tail call `Return` must set `resultsSentElsewhere`, not `exception`."; assert false: "Tail call `Return` must set `resultsSentElsewhere`, not `exception`.";
return; break;
} }
question.reject(RpcException.toException(callReturn.getException())); question.reject(RpcException.toException(callReturn.getException()));
break; break;
@ -490,7 +681,7 @@ final class RpcState {
case RESULTS_SENT_ELSEWHERE: case RESULTS_SENT_ELSEWHERE:
if (!question.isTailCall) { if (!question.isTailCall) {
assert false: "`Return` had `resultsSentElsewhere` but this was not a tail call."; assert false: "`Return` had `resultsSentElsewhere` but this was not a tail call.";
return; break;
} }
// Tail calls are fulfilled with a null pointer. // Tail calls are fulfilled with a null pointer.
question.answer(() -> null); question.answer(() -> null);
@ -501,11 +692,11 @@ final class RpcState {
var answer = answers.find(other); var answer = answers.find(other);
if (answer == null) { if (answer == null) {
assert false: "`Return.takeFromOtherQuestion` had invalid answer ID."; assert false: "`Return.takeFromOtherQuestion` had invalid answer ID.";
return; break;
} }
if (answer.redirectedResults == null) { if (answer.redirectedResults == null) {
assert false: "`Return.takeFromOtherQuestion` referenced a call that did not use `sendResultsTo.yourself`."; assert false: "`Return.takeFromOtherQuestion` referenced a call that did not use `sendResultsTo.yourself`.";
return; break;
} }
question.response = answer.redirectedResults.toCompletableFuture(); question.response = answer.redirectedResults.toCompletableFuture();
answer.redirectedResults = null; answer.redirectedResults = null;
@ -513,24 +704,24 @@ final class RpcState {
default: default:
assert false : "Unknown 'Return' type."; assert false : "Unknown 'Return' type.";
return; break;
} }
this.releaseExports(exportsToRelease);
} }
void handleFinish(RpcProtocol.Finish.Reader finish) { void handleFinish(RpcProtocol.Finish.Reader finish) {
List<Integer> exportsToRelease = null;
var answer = answers.find(finish.getQuestionId()); var answer = answers.find(finish.getQuestionId());
if (answer == null || !answer.active) { if (answer == null || !answer.active) {
assert false: "'Finish' for invalid question ID."; assert false: "'Finish' for invalid question ID.";
return; return;
} }
if (finish.getReleaseResultCaps()) { var exportsToRelease = finish.getReleaseResultCaps()
exportsToRelease = answer.resultExports; ? answer.resultExports
} : null;
answer.resultExports = null;
var pipelineToRelease = answer.pipeline; answer.resultExports = null;
answer.pipeline = null; answer.pipeline = null;
// If the call isn't actually done yet, cancel it. Otherwise, we can go ahead and erase the // If the call isn't actually done yet, cancel it. Otherwise, we can go ahead and erase the
@ -540,13 +731,15 @@ final class RpcState {
ctx.requestCancel(); ctx.requestCancel();
} }
else { else {
answers.erase(finish.getQuestionId()); var questionId = finish.getQuestionId();
answers.erase(questionId);
} }
this.releaseExports(exportsToRelease);
} }
void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) { void handleResolve(IncomingRpcMessage message, RpcProtocol.Resolve.Reader resolve) {
var imp = this.imports.find(resolve.getPromiseId());
var imp = imports.find(resolve.getPromiseId());
if (imp == null) { if (imp == null) {
return; return;
} }
@ -559,10 +752,12 @@ final class RpcState {
switch (resolve.which()) { switch (resolve.which()) {
case CAP: case CAP:
imp.promise.complete(receiveCap(resolve.getCap(), message.getAttachedFds())); var cap = receiveCap(resolve.getCap(), message.getAttachedFds());
imp.promise.complete(cap);
break; break;
case EXCEPTION: case EXCEPTION:
imp.promise.completeExceptionally(RpcException.toException(resolve.getException())); var exc = RpcException.toException(resolve.getException());
imp.promise.completeExceptionally(exc);
break; break;
default: default:
assert false; assert false;
@ -571,7 +766,7 @@ final class RpcState {
} }
private void handleRelease(RpcProtocol.Release.Reader release) { private void handleRelease(RpcProtocol.Release.Reader release) {
releaseExport(release.getId(), release.getReferenceCount()); this.releaseExport(release.getId(), release.getReferenceCount());
} }
void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) { void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) {
@ -640,9 +835,9 @@ final class RpcState {
} }
} }
private List<Integer> writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List<Integer> fds) { private int[] writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List<Integer> fds) {
if (capTable.length == 0) { if (capTable.length == 0) {
return List.of(); return new int[0];
} }
var capTableBuilder = payload.initCapTable(capTable.length); var capTableBuilder = payload.initCapTable(capTable.length);
@ -655,14 +850,15 @@ final class RpcState {
} }
var exportId = writeDescriptor(cap, capTableBuilder.get(ii), fds); var exportId = writeDescriptor(cap, capTableBuilder.get(ii), fds);
if (exportId != null) {
exports.add(exportId); exports.add(exportId);
} }
}
return exports; return exports.stream()
.mapToInt(Integer::intValue)
.toArray();
} }
private Integer writeDescriptor(ClientHook cap, RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds) { private int writeDescriptor(ClientHook cap, RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds) {
ClientHook inner = cap; ClientHook inner = cap;
for (;;) { for (;;) {
var resolved = inner.getResolved(); var resolved = inner.getResolved();
@ -709,7 +905,6 @@ final class RpcState {
} }
CompletionStage<?> resolveExportedPromise(int exportId, CompletionStage<ClientHook> promise) { CompletionStage<?> resolveExportedPromise(int exportId, CompletionStage<ClientHook> promise) {
return promise.thenCompose(resolution -> { return promise.thenCompose(resolution -> {
if (isDisconnected()) { if (isDisconnected()) {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
@ -764,6 +959,12 @@ final class RpcState {
}); });
} }
void releaseExports(int[] exports) {
for (var exportId : exports) {
this.releaseExport(exportId, 1);
}
}
void releaseExport(int exportId, int refcount) { void releaseExport(int exportId, int refcount) {
var export = exports.find(exportId); var export = exports.find(exportId);
if (export == null) { if (export == null) {
@ -910,8 +1111,9 @@ final class RpcState {
case PROMISED_ANSWER: case PROMISED_ANSWER:
var promisedAnswer = target.getPromisedAnswer(); var promisedAnswer = target.getPromisedAnswer();
var base = answers.find(promisedAnswer.getQuestionId()); var questionId = promisedAnswer.getQuestionId();
if (base == null || !base.active) { var base = answers.put(questionId);
if (!base.active) {
assert false: "PromisedAnswer.questionId is not a current question."; assert false: "PromisedAnswer.questionId is not a current question.";
return null; return null;
} }
@ -995,7 +1197,7 @@ final class RpcState {
return payload.getContent().imbue(capTable); return payload.getContent().imbue(capTable);
} }
List<Integer> send() { int[] send() {
var capTable = this.capTable.getTable(); var capTable = this.capTable.getTable();
var fds = List.<Integer>of(); var fds = List.<Integer>of();
var exports = writeDescriptors(capTable, payload, fds); var exports = writeDescriptors(capTable, payload, fds);
@ -1014,18 +1216,20 @@ final class RpcState {
} }
} }
private static final class LocallyRedirectedRpcResponse implements RpcServerResponse, RpcResponse { private static final class LocallyRedirectedRpcResponse
implements RpcServerResponse,
RpcResponse {
private final MessageBuilder message = new MessageBuilder(); private final MessageBuilder message = new MessageBuilder();
@Override @Override
public AnyPointer.Builder getResultsBuilder() { public AnyPointer.Builder getResultsBuilder() {
return message.getRoot(AnyPointer.factory); return this.message.getRoot(AnyPointer.factory);
} }
@Override @Override
public AnyPointer.Reader getResults() { public AnyPointer.Reader getResults() {
return getResultsBuilder().asReader(); return this.getResultsBuilder().asReader();
} }
} }
@ -1044,6 +1248,7 @@ final class RpcState {
private RpcProtocol.Return.Builder returnMessage; private RpcProtocol.Return.Builder returnMessage;
private boolean redirectResults = false; private boolean redirectResults = false;
private boolean responseSent = false; private boolean responseSent = false;
private CompletableFuture<PipelineHook> tailCallPipelineFuture;
private boolean cancelRequested = false; private boolean cancelRequested = false;
private boolean cancelAllowed = false; private boolean cancelAllowed = false;
@ -1074,14 +1279,17 @@ final class RpcState {
} }
@Override @Override
public AnyPointer.Builder getResults() { public AnyPointer.Builder getResults(int sizeHint) {
if (this.response == null) { if (this.response == null) {
if (this.redirectResults || isDisconnected()) { if (this.redirectResults || isDisconnected()) {
this.response = new LocallyRedirectedRpcResponse(); this.response = new LocallyRedirectedRpcResponse();
} }
else { else {
var message = connection.newOutgoingMessage(1024); sizeHint += messageSizeHint()
+ RpcProtocol.Payload.factory.structSize().total()
+ RpcProtocol.Return.factory.structSize().total();
var message = connection.newOutgoingMessage(sizeHint);
this.returnMessage = message.getBody().initAs(RpcProtocol.Message.factory).initReturn(); this.returnMessage = message.getBody().initAs(RpcProtocol.Message.factory).initReturn();
this.response = new RpcServerResponseImpl(message, returnMessage.getResults()); this.response = new RpcServerResponseImpl(message, returnMessage.getResults());
} }
@ -1091,8 +1299,12 @@ final class RpcState {
} }
@Override @Override
public CompletableFuture<?> tailCall(RequestHook request) { public CompletableFuture<java.lang.Void> tailCall(RequestHook request) {
return null; var result = this.directTailCall(request);
if (this.tailCallPipelineFuture != null) {
this.tailCallPipelineFuture.complete(result.pipeline);
}
return result.promise.toCompletableFuture().copy();
} }
@Override @Override
@ -1134,7 +1346,7 @@ final class RpcState {
this.returnMessage.setAnswerId(this.answerId); this.returnMessage.setAnswerId(this.answerId);
this.returnMessage.setReleaseParamCaps(false); this.returnMessage.setReleaseParamCaps(false);
List<Integer> exports = List.of(); var exports = new int[0];
try { try {
exports = ((RpcServerResponseImpl) response).send(); exports = ((RpcServerResponseImpl) response).send();
} catch (Throwable exc) { } catch (Throwable exc) {
@ -1143,7 +1355,7 @@ final class RpcState {
} }
// If no caps in the results, the pipeline is irrelevant. // If no caps in the results, the pipeline is irrelevant.
boolean shouldFreePipeline = exports.isEmpty(); boolean shouldFreePipeline = exports.length == 0;
cleanupAnswerTable(exports, shouldFreePipeline); cleanupAnswerTable(exports, shouldFreePipeline);
} }
@ -1163,7 +1375,7 @@ final class RpcState {
message.send(); message.send();
} }
cleanupAnswerTable(List.of(), false); cleanupAnswerTable(new int[0], false);
} }
private boolean isFirstResponder() { private boolean isFirstResponder() {
@ -1174,9 +1386,9 @@ final class RpcState {
return true; return true;
} }
private void cleanupAnswerTable(List<Integer> resultExports, boolean shouldFreePipeline) { private void cleanupAnswerTable(int[] resultExports, boolean shouldFreePipeline) {
if (this.cancelRequested) { if (this.cancelRequested) {
assert resultExports.size() == 0; assert resultExports.length == 0;
answers.erase(this.answerId); answers.erase(this.answerId);
return; return;
} }
@ -1186,7 +1398,7 @@ final class RpcState {
answer.resultExports = resultExports; answer.resultExports = resultExports;
if (shouldFreePipeline) { if (shouldFreePipeline) {
assert resultExports.size() == 0; assert resultExports.length == 0;
answer.pipeline = null; answer.pipeline = null;
} }
} }
@ -1320,15 +1532,22 @@ final class RpcState {
class RpcRequest implements RequestHook { class RpcRequest implements RequestHook {
final RpcClient target; private final RpcClient target;
final OutgoingRpcMessage message; private final OutgoingRpcMessage message;
final BuilderCapabilityTable capTable = new BuilderCapabilityTable(); private final BuilderCapabilityTable capTable = new BuilderCapabilityTable();
final RpcProtocol.Call.Builder callBuilder; private final RpcProtocol.Call.Builder callBuilder;
final AnyPointer.Builder paramsBuilder; private final AnyPointer.Builder paramsBuilder;
RpcRequest(RpcClient target) { RpcRequest(RpcClient target) {
this(target, 0);
}
RpcRequest(RpcClient target, int sizeHint) {
this.target = target; this.target = target;
this.message = connection.newOutgoingMessage(1024); sizeHint += RpcProtocol.Call.factory.structSize().total()
+ RpcProtocol.Payload.factory.structSize().total()
+ MESSAGE_TARGET_SIZE_HINT;
this.message = connection.newOutgoingMessage(sizeHint);
this.callBuilder = message.getBody().getAs(RpcProtocol.Message.factory).initCall(); this.callBuilder = message.getBody().getAs(RpcProtocol.Message.factory).initCall();
this.paramsBuilder = callBuilder.getParams().getContent().imbue(this.capTable); this.paramsBuilder = callBuilder.getParams().getContent().imbue(this.capTable);
} }
@ -1355,15 +1574,20 @@ final class RpcState {
return replacement.hook.send(); return replacement.hook.send();
} }
var question = sendInternal(false); final var question = sendInternal(false);
// The pipeline must get notified of resolution before the app does to maintain ordering. // The pipeline must get notified of resolution before the app does to maintain ordering.
var pipeline = new RpcPipeline(question, question.response); var pipeline = new RpcPipeline(question, question.response);
var appPromise = question.response.thenApply(hook -> { var appPromise = question.response.thenApply(
return new Response<>(hook.getResults(), hook); hook -> new Response<>(hook.getResults(), hook));
});
return new RemotePromise<>(appPromise, pipeline); // complete when either the message loop completes (exceptionally) or
// the appPromise is fulfilled
var loop = CompletableFuture.anyOf(
getMessageLoop(), appPromise).thenCompose(x -> appPromise);
return new RemotePromise<>(loop, pipeline);
} }
@Override @Override
@ -1378,19 +1602,19 @@ final class RpcState {
var exports = writeDescriptors(capTable.getTable(), callBuilder.getParams(), fds); var exports = writeDescriptors(capTable.getTable(), callBuilder.getParams(), fds);
message.setFds(fds); message.setFds(fds);
var question = questions.next(); var question = questions.next();
question.isAwaitingReturn = true; question.setAwaitingReturn(true);
question.isTailCall = isTailCall; question.isTailCall = isTailCall;
question.paramExports = exports; question.paramExports = exports;
callBuilder.setQuestionId(question.id); callBuilder.setQuestionId(question.getId());
if (isTailCall) { if (isTailCall) {
callBuilder.getSendResultsTo().getYourself(); callBuilder.getSendResultsTo().getYourself();
} }
try { try {
message.send(); message.send();
} catch (Exception exc) { } catch (Exception exc) {
question.isAwaitingReturn = false; question.setAwaitingReturn(false);
question.skipFinish = true; question.setSkipFinish(true);
question.reject(exc); question.reject(exc);
} }
return question; return question;
@ -1483,6 +1707,16 @@ final class RpcState {
} }
} }
private void cleanupQuestions() {
while (true) {
var ref = (QuestionRef)this.questionRefQueue.poll();
if (ref == null) {
break;
}
ref.dispose();
}
}
enum ResolutionType { enum ResolutionType {
UNRESOLVED, UNRESOLVED,
REMOTE, REMOTE,
@ -1491,14 +1725,15 @@ final class RpcState {
BROKEN BROKEN
} }
class PromiseClient extends RpcClient { private class PromiseClient extends RpcClient {
final ClientHook cap;
final Integer importId;
final CompletableFuture<ClientHook> promise;
boolean receivedCall = false;
ResolutionType resolutionType = ResolutionType.UNRESOLVED;
public PromiseClient(RpcClient initial, private final ClientHook cap;
private final Integer importId;
private final CompletableFuture<ClientHook> promise;
private boolean receivedCall = false;
private ResolutionType resolutionType = ResolutionType.UNRESOLVED;
PromiseClient(RpcClient initial,
CompletableFuture<ClientHook> eventual, CompletableFuture<ClientHook> eventual,
Integer importId) { Integer importId) {
this.cap = initial; this.cap = initial;
@ -1538,8 +1773,8 @@ final class RpcState {
} }
} }
else { else {
if (replacementBrand == NULL_CAPABILITY_BRAND || if (replacementBrand == NULL_CAPABILITY_BRAND
replacementBrand == BROKEN_CAPABILITY_BRAND) { || replacementBrand == BROKEN_CAPABILITY_BRAND) {
resolutionType = ResolutionType.BROKEN; resolutionType = ResolutionType.BROKEN;
} }
else { else {
@ -1554,25 +1789,19 @@ final class RpcState {
if (resolutionType == ResolutionType.REFLECTED && receivedCall && !isDisconnected()) { if (resolutionType == ResolutionType.REFLECTED && receivedCall && !isDisconnected()) {
var message = connection.newOutgoingMessage(1024); var message = connection.newOutgoingMessage(1024);
var disembargo = message.getBody().initAs(RpcProtocol.Message.factory).initDisembargo(); var disembargo = message.getBody().initAs(RpcProtocol.Message.factory).initDisembargo();
{
var redirect = RpcState.this.writeTarget(cap, disembargo.initTarget()); var redirect = RpcState.this.writeTarget(cap, disembargo.initTarget());
assert redirect == null; assert redirect == null;
}
var embargo = embargos.next(); var embargo = embargos.next();
embargo.disembargo = new CompletableFuture<>();
disembargo.getContext().setSenderLoopback(embargo.id); disembargo.getContext().setSenderLoopback(embargo.id);
embargo.disembargo = new CompletableFuture<>();
final ClientHook finalReplacement = replacement; final ClientHook finalReplacement = replacement;
var embargoPromise = embargo.disembargo.thenApply(x -> { var embargoPromise = embargo.disembargo.thenApply(x -> finalReplacement);
return finalReplacement;
});
replacement = Capability.newLocalPromiseClient(embargoPromise); replacement = Capability.newLocalPromiseClient(embargoPromise);
message.send(); message.send();
} }
return replacement; return replacement;
} }
@ -1587,19 +1816,19 @@ final class RpcState {
@Override @Override
public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder target, List<Integer> fds) { public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder target, List<Integer> fds) {
receivedCall = true; this.receivedCall = true;
return RpcState.this.writeDescriptor(cap, target, fds); return RpcState.this.writeDescriptor(cap, target, fds);
} }
@Override @Override
public ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target) { public ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target) {
receivedCall = true; this.receivedCall = true;
return RpcState.this.writeTarget(cap, target); return RpcState.this.writeTarget(cap, target);
} }
@Override @Override
public ClientHook getInnermostClient() { public ClientHook getInnermostClient() {
receivedCall = true; this.receivedCall = true;
return RpcState.this.getInnermostClient(cap); return RpcState.this.getInnermostClient(cap);
} }
@ -1609,7 +1838,7 @@ final class RpcState {
} }
} }
class PipelineClient extends RpcClient { private class PipelineClient extends RpcClient {
private final Question question; private final Question question;
private final PipelineOp[] ops; private final PipelineOp[] ops;
@ -1632,7 +1861,7 @@ final class RpcState {
@Override @Override
public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds) { public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds) {
var promisedAnswer = descriptor.initReceiverAnswer(); var promisedAnswer = descriptor.initReceiverAnswer();
promisedAnswer.setQuestionId(question.id); promisedAnswer.setQuestionId(question.getId());
PipelineOp.FromPipelineOps(ops, promisedAnswer); PipelineOp.FromPipelineOps(ops, promisedAnswer);
return null; return null;
} }
@ -1640,7 +1869,7 @@ final class RpcState {
@Override @Override
public ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target) { public ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target) {
var builder = target.initPromisedAnswer(); var builder = target.initPromisedAnswer();
builder.setQuestionId(question.id); builder.setQuestionId(question.getId());
PipelineOp.FromPipelineOps(ops, builder); PipelineOp.FromPipelineOps(ops, builder);
return null; return null;
} }

View file

@ -9,11 +9,22 @@ public abstract class RpcSystem<VatId> {
final VatNetwork<VatId> network; final VatNetwork<VatId> network;
final Capability.Client bootstrapInterface; final Capability.Client bootstrapInterface;
final Map<VatNetwork.Connection, RpcState> connections = new HashMap<>(); final Map<VatNetwork.Connection, RpcState> connections = new HashMap<>();
CompletableFuture<?> acceptCompleted = CompletableFuture.completedFuture(null); final CompletableFuture<java.lang.Void> messageLoop;
final CompletableFuture<java.lang.Void> acceptLoop;
public RpcSystem(VatNetwork<VatId> network, Capability.Client bootstrapInterface) { public RpcSystem(VatNetwork<VatId> network, Capability.Client bootstrapInterface) {
this.network = network; this.network = network;
this.bootstrapInterface = bootstrapInterface; this.bootstrapInterface = bootstrapInterface;
this.acceptLoop = doAcceptLoop();
this.messageLoop = doMessageLoop();
}
public CompletableFuture<java.lang.Void> getMessageLoop() {
return this.messageLoop;
}
private CompletableFuture<java.lang.Void> getAcceptLoop() {
return this.acceptLoop;
} }
public void accept(VatNetwork.Connection connection) { public void accept(VatNetwork.Connection connection) {
@ -21,24 +32,27 @@ public abstract class RpcSystem<VatId> {
} }
synchronized RpcState getConnectionState(VatNetwork.Connection connection) { synchronized RpcState getConnectionState(VatNetwork.Connection connection) {
var onDisconnect = new CompletableFuture<VatNetwork.Connection>().thenAccept(lostConnection -> {
this.connections.remove(lostConnection);
});
return connections.computeIfAbsent(connection, key -> return connections.computeIfAbsent(connection, key ->
new RpcState(key, bootstrapInterface)); new RpcState(bootstrapInterface, connection, onDisconnect));
} }
public final CompletableFuture<?> runOnce() { private final CompletableFuture<java.lang.Void> doAcceptLoop() {
var done = acceptLoop(); return this.network.baseAccept().thenCompose(connection -> {
this.accept(connection);
return this.doAcceptLoop();
});
}
private final CompletableFuture<java.lang.Void> doMessageLoop() {
var accept = this.getAcceptLoop();
for (var conn : connections.values()) { for (var conn : connections.values()) {
done = CompletableFuture.anyOf(done, conn.runOnce()); accept = accept.acceptEither(conn.getMessageLoop(), x -> {});
} }
return done; return accept.thenCompose(x -> this.doMessageLoop());
}
CompletableFuture<?> acceptLoop() {
if (this.acceptCompleted.isDone()) {
var accepted = this.network.baseAccept();
this.acceptCompleted = accepted.thenAccept(this::accept);
}
return this.acceptCompleted;
} }
} }

File diff suppressed because it is too large Load diff

View file

@ -342,7 +342,7 @@ public final class Serialize {
table.putInt(4 * (ii + 1), segments[ii].limit() / 8); table.putInt(4 * (ii + 1), segments[ii].limit() / 8);
} }
outputChannel.write(table, 0, new CompletionHandler<Integer, Integer>() { outputChannel.write(table, 0, new CompletionHandler<>() {
@Override @Override
public void completed(Integer result, Integer attachment) { public void completed(Integer result, Integer attachment) {

View file

@ -1,6 +1,6 @@
package org.capnproto; package org.capnproto;
import java.nio.channels.AsynchronousByteChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
public class TwoPartyClient { public class TwoPartyClient {
@ -8,15 +8,15 @@ public class TwoPartyClient {
private final TwoPartyVatNetwork network; private final TwoPartyVatNetwork network;
private final TwoPartyRpcSystem rpcSystem; private final TwoPartyRpcSystem rpcSystem;
public TwoPartyClient(AsynchronousByteChannel channel) { public TwoPartyClient(AsynchronousSocketChannel channel) {
this(channel, null); this(channel, null);
} }
public TwoPartyClient(AsynchronousByteChannel channel, Capability.Client bootstrapInterface) { public TwoPartyClient(AsynchronousSocketChannel channel, Capability.Client bootstrapInterface) {
this(channel, bootstrapInterface, RpcTwoPartyProtocol.Side.CLIENT); this(channel, bootstrapInterface, RpcTwoPartyProtocol.Side.CLIENT);
} }
public TwoPartyClient(AsynchronousByteChannel channel, public TwoPartyClient(AsynchronousSocketChannel channel,
Capability.Client bootstrapInterface, Capability.Client bootstrapInterface,
RpcTwoPartyProtocol.Side side) { RpcTwoPartyProtocol.Side side) {
this.network = new TwoPartyVatNetwork(channel, side); this.network = new TwoPartyVatNetwork(channel, side);
@ -32,7 +32,13 @@ public class TwoPartyClient {
return rpcSystem.bootstrap(vatId.asReader()); return rpcSystem.bootstrap(vatId.asReader());
} }
public CompletableFuture<java.lang.Void> onDisconnect() {
return this.network.onDisconnect();
}
/*
public CompletableFuture<?> runOnce() { public CompletableFuture<?> runOnce() {
return this.rpcSystem.runOnce(); return this.rpcSystem.runOnce();
} }
*/
} }

View file

@ -7,6 +7,10 @@ public class TwoPartyRpcSystem
super(network, bootstrapInterface); super(network, bootstrapInterface);
} }
public TwoPartyRpcSystem(TwoPartyVatNetwork network, Capability.Server bootstrapInterface) {
super(network, new Capability.Client(bootstrapInterface));
}
public Capability.Client bootstrap(RpcTwoPartyProtocol.VatId.Reader vatId) { public Capability.Client bootstrap(RpcTwoPartyProtocol.VatId.Reader vatId) {
var connection = this.network.baseConnect(vatId); var connection = this.network.baseConnect(vatId);
var state = getConnectionState(connection); var state = getConnectionState(connection);

View file

@ -1,64 +1,145 @@
package org.capnproto; package org.capnproto;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class TwoPartyServer { public class TwoPartyServer {
private class AcceptedConnection { private class AcceptedConnection {
final AsynchronousByteChannel channel; final AsynchronousSocketChannel channel;
final TwoPartyVatNetwork network; final TwoPartyVatNetwork network;
final TwoPartyRpcSystem rpcSystem; final TwoPartyRpcSystem rpcSystem;
private final CompletableFuture<?> messageLoop;
AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousByteChannel channel) { AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousSocketChannel channel) {
this.channel = channel; this.channel = channel;
this.network = new TwoPartyVatNetwork(channel, RpcTwoPartyProtocol.Side.SERVER); this.network = new TwoPartyVatNetwork(channel, RpcTwoPartyProtocol.Side.SERVER);
this.rpcSystem = new TwoPartyRpcSystem(network, bootstrapInterface); this.rpcSystem = new TwoPartyRpcSystem(network, bootstrapInterface);
this.messageLoop = this.rpcSystem.getMessageLoop().exceptionally(exc -> {
connections.remove(this);
return null;
});
} }
public CompletableFuture<?> runOnce() { public CompletableFuture<?> getMessageLoop() {
return this.rpcSystem.runOnce(); return this.messageLoop;
}
}
class ConnectionReceiver {
AsynchronousServerSocketChannel listener;
final CompletableFuture<?> messageLoop;
public ConnectionReceiver(AsynchronousServerSocketChannel listener) {
this.listener = listener;
this.messageLoop = doMessageLoop();
}
public CompletableFuture<?> getMessageLoop() {
return this.messageLoop;
}
private CompletableFuture<?> doMessageLoop() {
final var accepted = new CompletableFuture<AsynchronousSocketChannel>();
listener.accept(null, new CompletionHandler<>() {
@Override
public void completed(AsynchronousSocketChannel channel, Object attachment) {
accepted.complete(channel);
}
@Override
public void failed(Throwable exc, Object attachment) {
accepted.completeExceptionally(exc);
}
});
return accepted.thenCompose(channel -> CompletableFuture.allOf(
accept(channel),
doMessageLoop()));
} }
} }
private final Capability.Client bootstrapInterface; private final Capability.Client bootstrapInterface;
private final List<AcceptedConnection> connections = new ArrayList<>(); private final List<AcceptedConnection> connections = new ArrayList<>();
private final List<AsynchronousServerSocketChannel> listeners = new ArrayList<>(); private final List<ConnectionReceiver> listeners = new ArrayList<>();
private final CompletableFuture<?> messageLoop;
public TwoPartyServer(Capability.Client bootstrapInterface) { public TwoPartyServer(Capability.Client bootstrapInterface) {
this.bootstrapInterface = bootstrapInterface; this.bootstrapInterface = bootstrapInterface;
this.messageLoop = doMessageLoop();
} }
private synchronized void accept(AsynchronousByteChannel channel) { public TwoPartyServer(Capability.Server bootstrapServer) {
this(new Capability.Client(bootstrapServer));
}
private CompletableFuture<?> getMessageLoop() {
return this.messageLoop;
}
public CompletableFuture<?> drain() {
CompletableFuture<java.lang.Void> done = new CompletableFuture<>();
for (var conn: this.connections) {
done = CompletableFuture.allOf(done, conn.getMessageLoop());
}
return done;
}
private CompletableFuture<java.lang.Void> accept(AsynchronousSocketChannel channel) {
var connection = new AcceptedConnection(this.bootstrapInterface, channel); var connection = new AcceptedConnection(this.bootstrapInterface, channel);
this.connections.add(connection); this.connections.add(connection);
return connection.network.onDisconnect().whenComplete((x, exc) -> {
this.connections.remove(connection);
});
} }
/*
private final CompletableFuture<?> acceptLoop(AsynchronousServerSocketChannel listener) {
final var accepted = new CompletableFuture<AsynchronousSocketChannel>();
listener.accept(null, new CompletionHandler<>() {
public void listen(AsynchronousServerSocketChannel listener) {
listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override @Override
public void completed(AsynchronousSocketChannel channel, Object attachment) { public void completed(AsynchronousSocketChannel channel, Object attachment) {
accept(channel); accepted.complete(channel);
listen(listener);
} }
@Override @Override
public void failed(Throwable exc, Object attachment) { public void failed(Throwable exc, Object attachment) {
listeners.remove(listener); accepted.completeExceptionally(exc);
} }
}); });
return accepted.thenCompose(channel -> CompletableFuture.anyOf(
accept(channel),
acceptLoop(listener)));
}
*/
public CompletableFuture<?> listen(AsynchronousServerSocketChannel listener) {
var receiver = new ConnectionReceiver(listener);
this.listeners.add(receiver);
return receiver.getMessageLoop();
} }
public synchronized CompletableFuture<?> runOnce() { private CompletableFuture<?> doMessageLoop() {
var done = new CompletableFuture<>();
for (var conn: this.connections) {
done = CompletableFuture.anyOf(done, conn.getMessageLoop());
}
for (var listener: this.listeners) {
done = CompletableFuture.anyOf(done, listener.getMessageLoop());
}
return done.thenCompose(x -> doMessageLoop());
}
/*
public CompletableFuture<?> runOnce() {
var done = new CompletableFuture<>(); var done = new CompletableFuture<>();
for (var conn: connections) { for (var conn: connections) {
done = CompletableFuture.anyOf(done, conn.runOnce()); done = CompletableFuture.anyOf(done, conn.runOnce());
} }
return done; return done;
} }
*/
} }

View file

@ -1,22 +1,23 @@
package org.capnproto; package org.capnproto;
import java.nio.channels.AsynchronousByteChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class TwoPartyVatNetwork public class TwoPartyVatNetwork
implements VatNetwork<RpcTwoPartyProtocol.VatId.Reader>, VatNetwork.Connection { implements VatNetwork<RpcTwoPartyProtocol.VatId.Reader>,
VatNetwork.Connection {
private CompletableFuture<?> writeCompleted = CompletableFuture.completedFuture(null); private CompletableFuture<java.lang.Void> previousWrite = CompletableFuture.completedFuture(null);
private final Executor executor = Executors.newSingleThreadExecutor(); private final CompletableFuture<java.lang.Void> peerDisconnected = new CompletableFuture<>();
private final AsynchronousByteChannel channel; private final AsynchronousSocketChannel channel;
private final RpcTwoPartyProtocol.Side side; private final RpcTwoPartyProtocol.Side side;
private final MessageBuilder peerVatId = new MessageBuilder(4); private final MessageBuilder peerVatId = new MessageBuilder(4);
private boolean accepted; private boolean accepted;
public TwoPartyVatNetwork(AsynchronousByteChannel channel, RpcTwoPartyProtocol.Side side) { public final RpcDumper dumper = new RpcDumper();
public TwoPartyVatNetwork(AsynchronousSocketChannel channel, RpcTwoPartyProtocol.Side side) {
this.channel = channel; this.channel = channel;
this.side = side; this.side = side;
this.peerVatId.initRoot(RpcTwoPartyProtocol.VatId.factory).setSide( this.peerVatId.initRoot(RpcTwoPartyProtocol.VatId.factory).setSide(
@ -33,11 +34,12 @@ public class TwoPartyVatNetwork
return peerVatId.getRoot(RpcTwoPartyProtocol.VatId.factory).asReader(); return peerVatId.getRoot(RpcTwoPartyProtocol.VatId.factory).asReader();
} }
private Connection connect(RpcTwoPartyProtocol.VatId.Reader vatId) { public VatNetwork.Connection asConnection() {
if (vatId.getSide() != side) {
return this; return this;
} }
return null;
private Connection connect(RpcTwoPartyProtocol.VatId.Reader vatId) {
return vatId.getSide() != side ? this : null;
} }
private CompletableFuture<Connection> accept() { private CompletableFuture<Connection> accept() {
@ -58,8 +60,33 @@ public class TwoPartyVatNetwork
@Override @Override
public CompletableFuture<IncomingRpcMessage> receiveIncomingMessage() { public CompletableFuture<IncomingRpcMessage> receiveIncomingMessage() {
return Serialize.readAsync(channel).thenApply(message -> { return Serialize.readAsync(channel).whenComplete((x, exc) -> {
return new IncomingMessage(message); if (exc != null) {
this.peerDisconnected.complete(null);
}
}).thenApply(reader -> {
var msg = new IncomingMessage(reader);
var dump = this.dumper.dump(msg.getBody().getAs(RpcProtocol.Message.factory), getSide());
if (!dump.isEmpty()) {
System.out.println(dump);
}
return msg;
});
}
@Override
public CompletableFuture<java.lang.Void> onDisconnect() {
return this.peerDisconnected.copy();
}
@Override
public CompletableFuture<java.lang.Void> shutdown() {
return this.previousWrite.whenComplete((x, exc) -> {
try {
this.channel.shutdownOutput();
}
catch (Exception ioExc) {
}
}); });
} }
@ -70,7 +97,7 @@ public class TwoPartyVatNetwork
@Override @Override
public CompletableFuture<Connection> baseAccept() { public CompletableFuture<Connection> baseAccept() {
return this.accept().thenApply(conn -> conn); return this.accept();
} }
final class OutgoingMessage implements OutgoingRpcMessage { final class OutgoingMessage implements OutgoingRpcMessage {
@ -94,7 +121,7 @@ public class TwoPartyVatNetwork
@Override @Override
public void send() { public void send() {
writeCompleted = writeCompleted.thenCompose( previousWrite = previousWrite.thenCompose(
x -> Serialize.writeAsync(channel, message) x -> Serialize.writeAsync(channel, message)
); );
} }

View file

@ -7,6 +7,8 @@ public interface VatNetwork<VatId> {
interface Connection { interface Connection {
OutgoingRpcMessage newOutgoingMessage(int firstSegmentWordSize); OutgoingRpcMessage newOutgoingMessage(int firstSegmentWordSize);
CompletableFuture<IncomingRpcMessage> receiveIncomingMessage(); CompletableFuture<IncomingRpcMessage> receiveIncomingMessage();
CompletableFuture<java.lang.Void> onDisconnect();
CompletableFuture<java.lang.Void> shutdown();
} }
Connection baseConnect(VatId hostId); Connection baseConnect(VatId hostId);

View file

@ -0,0 +1,533 @@
# Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
# Licensed under the MIT License:
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
using Cxx = import "/capnp/c++.capnp";
@0xa93fc509624c72d9;
$Cxx.namespace("capnp::schema");
using Java = import "/capnp/java.capnp";
$Java.package("org.capnproto");
$Java.outerClassname("Schema");
using Id = UInt64;
# The globally-unique ID of a file, type, or annotation.
struct Node {
id @0 :Id;
displayName @1 :Text;
# Name to present to humans to identify this Node. You should not attempt to parse this. Its
# format could change. It is not guaranteed to be unique.
#
# (On Zooko's triangle, this is the node's nickname.)
displayNamePrefixLength @2 :UInt32;
# If you want a shorter version of `displayName` (just naming this node, without its surrounding
# scope), chop off this many characters from the beginning of `displayName`.
scopeId @3 :Id;
# ID of the lexical parent node. Typically, the scope node will have a NestedNode pointing back
# at this node, but robust code should avoid relying on this (and, in fact, group nodes are not
# listed in the outer struct's nestedNodes, since they are listed in the fields). `scopeId` is
# zero if the node has no parent, which is normally only the case with files, but should be
# allowed for any kind of node (in order to make runtime type generation easier).
parameters @32 :List(Parameter);
# If this node is parameterized (generic), the list of parameters. Empty for non-generic types.
isGeneric @33 :Bool;
# True if this node is generic, meaning that it or one of its parent scopes has a non-empty
# `parameters`.
struct Parameter {
# Information about one of the node's parameters.
name @0 :Text;
}
nestedNodes @4 :List(NestedNode);
# List of nodes nested within this node, along with the names under which they were declared.
struct NestedNode {
name @0 :Text;
# Unqualified symbol name. Unlike Node.displayName, this *can* be used programmatically.
#
# (On Zooko's triangle, this is the node's petname according to its parent scope.)
id @1 :Id;
# ID of the nested node. Typically, the target node's scopeId points back to this node, but
# robust code should avoid relying on this.
}
annotations @5 :List(Annotation);
# Annotations applied to this node.
union {
# Info specific to each kind of node.
file @6 :Void;
struct :group {
dataWordCount @7 :UInt16;
# Size of the data section, in words.
pointerCount @8 :UInt16;
# Size of the pointer section, in pointers (which are one word each).
preferredListEncoding @9 :ElementSize;
# The preferred element size to use when encoding a list of this struct. If this is anything
# other than `inlineComposite` then the struct is one word or less in size and is a candidate
# for list packing optimization.
isGroup @10 :Bool;
# If true, then this "struct" node is actually not an independent node, but merely represents
# some named union or group within a particular parent struct. This node's scopeId refers
# to the parent struct, which may itself be a union/group in yet another struct.
#
# All group nodes share the same dataWordCount and pointerCount as the top-level
# struct, and their fields live in the same ordinal and offset spaces as all other fields in
# the struct.
#
# Note that a named union is considered a special kind of group -- in fact, a named union
# is exactly equivalent to a group that contains nothing but an unnamed union.
discriminantCount @11 :UInt16;
# Number of fields in this struct which are members of an anonymous union, and thus may
# overlap. If this is non-zero, then a 16-bit discriminant is present indicating which
# of the overlapping fields is active. This can never be 1 -- if it is non-zero, it must be
# two or more.
#
# Note that the fields of an unnamed union are considered fields of the scope containing the
# union -- an unnamed union is not its own group. So, a top-level struct may contain a
# non-zero discriminant count. Named unions, on the other hand, are equivalent to groups
# containing unnamed unions. So, a named union has its own independent schema node, with
# `isGroup` = true.
discriminantOffset @12 :UInt32;
# If `discriminantCount` is non-zero, this is the offset of the union discriminant, in
# multiples of 16 bits.
fields @13 :List(Field);
# Fields defined within this scope (either the struct's top-level fields, or the fields of
# a particular group; see `isGroup`).
#
# The fields are sorted by ordinal number, but note that because groups share the same
# ordinal space, the field's index in this list is not necessarily exactly its ordinal.
# On the other hand, the field's position in this list does remain the same even as the
# protocol evolves, since it is not possible to insert or remove an earlier ordinal.
# Therefore, for most use cases, if you want to identify a field by number, it may make the
# most sense to use the field's index in this list rather than its ordinal.
}
enum :group {
enumerants@14 :List(Enumerant);
# Enumerants ordered by numeric value (ordinal).
}
interface :group {
methods @15 :List(Method);
# Methods ordered by ordinal.
superclasses @31 :List(Superclass);
# Superclasses of this interface.
}
const :group {
type @16 :Type;
value @17 :Value;
}
annotation :group {
type @18 :Type;
targetsFile @19 :Bool;
targetsConst @20 :Bool;
targetsEnum @21 :Bool;
targetsEnumerant @22 :Bool;
targetsStruct @23 :Bool;
targetsField @24 :Bool;
targetsUnion @25 :Bool;
targetsGroup @26 :Bool;
targetsInterface @27 :Bool;
targetsMethod @28 :Bool;
targetsParam @29 :Bool;
targetsAnnotation @30 :Bool;
}
}
struct SourceInfo {
# Additional information about a node which is not needed at runtime, but may be useful for
# documentation or debugging purposes. This is kept in a separate struct to make sure it
# doesn't accidentally get included in contexts where it is not needed. The
# `CodeGeneratorRequest` includes this information in a separate array.
id @0 :Id;
# ID of the Node which this info describes.
docComment @1 :Text;
# The top-level doc comment for the Node.
members @2 :List(Member);
# Information about each member -- i.e. fields (for structs), enumerants (for enums), or
# methods (for interfaces).
#
# This list is the same length and order as the corresponding list in the Node, i.e.
# Node.struct.fields, Node.enum.enumerants, or Node.interface.methods.
struct Member {
docComment @0 :Text;
# Doc comment on the member.
}
# TODO(someday): Record location of the declaration in the original source code.
}
}
struct Field {
# Schema for a field of a struct.
name @0 :Text;
codeOrder @1 :UInt16;
# Indicates where this member appeared in the code, relative to other members.
# Code ordering may have semantic relevance -- programmers tend to place related fields
# together. So, using code ordering makes sense in human-readable formats where ordering is
# otherwise irrelevant, like JSON. The values of codeOrder are tightly-packed, so the maximum
# value is count(members) - 1. Fields that are members of a union are only ordered relative to
# the other members of that union, so the maximum value there is count(union.members).
annotations @2 :List(Annotation);
const noDiscriminant :UInt16 = 0xffff;
discriminantValue @3 :UInt16 = Field.noDiscriminant;
# If the field is in a union, this is the value which the union's discriminant should take when
# the field is active. If the field is not in a union, this is 0xffff.
union {
slot :group {
# A regular, non-group, non-fixed-list field.
offset @4 :UInt32;
# Offset, in units of the field's size, from the beginning of the section in which the field
# resides. E.g. for a UInt32 field, multiply this by 4 to get the byte offset from the
# beginning of the data section.
type @5 :Type;
defaultValue @6 :Value;
hadExplicitDefault @10 :Bool;
# Whether the default value was specified explicitly. Non-explicit default values are always
# zero or empty values. Usually, whether the default value was explicit shouldn't matter.
# The main use case for this flag is for structs representing method parameters:
# explicitly-defaulted parameters may be allowed to be omitted when calling the method.
}
group :group {
# A group.
typeId @7 :Id;
# The ID of the group's node.
}
}
ordinal :union {
implicit @8 :Void;
explicit @9 :UInt16;
# The original ordinal number given to the field. You probably should NOT use this; if you need
# a numeric identifier for a field, use its position within the field array for its scope.
# The ordinal is given here mainly just so that the original schema text can be reproduced given
# the compiled version -- i.e. so that `capnp compile -ocapnp` can do its job.
}
}
struct Enumerant {
# Schema for member of an enum.
name @0 :Text;
codeOrder @1 :UInt16;
# Specifies order in which the enumerants were declared in the code.
# Like Struct.Field.codeOrder.
annotations @2 :List(Annotation);
}
struct Superclass {
id @0 :Id;
brand @1 :Brand;
}
struct Method {
# Schema for method of an interface.
name @0 :Text;
codeOrder @1 :UInt16;
# Specifies order in which the methods were declared in the code.
# Like Struct.Field.codeOrder.
implicitParameters @7 :List(Node.Parameter);
# The parameters listed in [] (typically, type / generic parameters), whose bindings are intended
# to be inferred rather than specified explicitly, although not all languages support this.
paramStructType @2 :Id;
# ID of the parameter struct type. If a named parameter list was specified in the method
# declaration (rather than a single struct parameter type) then a corresponding struct type is
# auto-generated. Such an auto-generated type will not be listed in the interface's
# `nestedNodes` and its `scopeId` will be zero -- it is completely detached from the namespace.
# (Awkwardly, it does of course inherit generic parameters from the method's scope, which makes
# this a situation where you can't just climb the scope chain to find where a particular
# generic parameter was introduced. Making the `scopeId` zero was a mistake.)
paramBrand @5 :Brand;
# Brand of param struct type.
resultStructType @3 :Id;
# ID of the return struct type; similar to `paramStructType`.
resultBrand @6 :Brand;
# Brand of result struct type.
annotations @4 :List(Annotation);
}
struct Type {
# Represents a type expression.
union {
# The ordinals intentionally match those of Value.
void @0 :Void;
bool @1 :Void;
int8 @2 :Void;
int16 @3 :Void;
int32 @4 :Void;
int64 @5 :Void;
uint8 @6 :Void;
uint16 @7 :Void;
uint32 @8 :Void;
uint64 @9 :Void;
float32 @10 :Void;
float64 @11 :Void;
text @12 :Void;
data @13 :Void;
list :group {
elementType @14 :Type;
}
enum :group {
typeId @15 :Id;
brand @21 :Brand;
}
struct :group {
typeId @16 :Id;
brand @22 :Brand;
}
interface :group {
typeId @17 :Id;
brand @23 :Brand;
}
anyPointer :union {
unconstrained :union {
# A regular AnyPointer.
#
# The name "unconstrained" means as opposed to constraining it to match a type parameter.
# In retrospect this name is probably a poor choice given that it may still be constrained
# to be a struct, list, or capability.
anyKind @18 :Void; # truly AnyPointer
struct @25 :Void; # AnyStruct
list @26 :Void; # AnyList
capability @27 :Void; # Capability
}
parameter :group {
# This is actually a reference to a type parameter defined within this scope.
scopeId @19 :Id;
# ID of the generic type whose parameter we're referencing. This should be a parent of the
# current scope.
parameterIndex @20 :UInt16;
# Index of the parameter within the generic type's parameter list.
}
implicitMethodParameter :group {
# This is actually a reference to an implicit (generic) parameter of a method. The only
# legal context for this type to appear is inside Method.paramBrand or Method.resultBrand.
parameterIndex @24 :UInt16;
}
}
}
}
struct Brand {
# Specifies bindings for parameters of generics. Since these bindings turn a generic into a
# non-generic, we call it the "brand".
scopes @0 :List(Scope);
# For each of the target type and each of its parent scopes, a parameterization may be included
# in this list. If no parameterization is included for a particular relevant scope, then either
# that scope has no parameters or all parameters should be considered to be `AnyPointer`.
struct Scope {
scopeId @0 :Id;
# ID of the scope to which these params apply.
union {
bind @1 :List(Binding);
# List of parameter bindings.
inherit @2 :Void;
# The place where this Brand appears is actually within this scope or a sub-scope,
# and the bindings for this scope should be inherited from the reference point.
}
}
struct Binding {
union {
unbound @0 :Void;
type @1 :Type;
# TODO(someday): Allow non-type parameters? Unsure if useful.
}
}
}
struct Value {
# Represents a value, e.g. a field default value, constant value, or annotation value.
union {
# The ordinals intentionally match those of Type.
void @0 :Void;
bool @1 :Bool;
int8 @2 :Int8;
int16 @3 :Int16;
int32 @4 :Int32;
int64 @5 :Int64;
uint8 @6 :UInt8;
uint16 @7 :UInt16;
uint32 @8 :UInt32;
uint64 @9 :UInt64;
float32 @10 :Float32;
float64 @11 :Float64;
text @12 :Text;
data @13 :Data;
list @14 :AnyPointer;
enum @15 :UInt16;
struct @16 :AnyPointer;
interface @17 :Void;
# The only interface value that can be represented statically is "null", whose methods always
# throw exceptions.
anyPointer @18 :AnyPointer;
}
}
struct Annotation {
# Describes an annotation applied to a declaration. Note AnnotationNode describes the
# annotation's declaration, while this describes a use of the annotation.
id @0 :Id;
# ID of the annotation node.
brand @2 :Brand;
# Brand of the annotation.
#
# Note that the annotation itself is not allowed to be parameterized, but its scope might be.
value @1 :Value;
}
enum ElementSize {
# Possible element sizes for encoded lists. These correspond exactly to the possible values of
# the 3-bit element size component of a list pointer.
empty @0; # aka "void", but that's a keyword.
bit @1;
byte @2;
twoBytes @3;
fourBytes @4;
eightBytes @5;
pointer @6;
inlineComposite @7;
}
struct CapnpVersion {
major @0 :UInt16;
minor @1 :UInt8;
micro @2 :UInt8;
}
struct CodeGeneratorRequest {
capnpVersion @2 :CapnpVersion;
# Version of the `capnp` executable. Generally, code generators should ignore this, but the code
# generators that ship with `capnp` itself will print a warning if this mismatches since that
# probably indicates something is misconfigured.
#
# The first version of 'capnp' to set this was 0.6.0. So, if it's missing, the compiler version
# is older than that.
nodes @0 :List(Node);
# All nodes parsed by the compiler, including for the files on the command line and their
# imports.
sourceInfo @3 :List(Node.SourceInfo);
# Information about the original source code for each node, where available. This array may be
# omitted or may be missing some nodes if no info is available for them.
requestedFiles @1 :List(RequestedFile);
# Files which were listed on the command line.
struct RequestedFile {
id @0 :Id;
# ID of the file.
filename @1 :Text;
# Name of the file as it appeared on the command-line (minus the src-prefix). You may use
# this to decide where to write the output.
imports @2 :List(Import);
# List of all imported paths seen in this file.
struct Import {
id @0 :Id;
# ID of the imported file.
name @1 :Text;
# Name which *this* file used to refer to the foreign file. This may be a relative name.
# This information is provided because it might be useful for code generation, e.g. to
# generate #include directives in C++. We don't put this in Node.file because this
# information is only meaningful at compile time anyway.
#
# (On Zooko's triangle, this is the import's petname according to the importing file.)
}
}
}

View file

@ -0,0 +1,66 @@
package org.capnproto;
import org.capnproto.demo.Demo;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class LocalCapabilityTest {
@Test
public void testLocalServer() throws ExecutionException, InterruptedException {
var demo = new TestCap0Impl();
var client = new Demo.TestCap0.Client(demo);
var request = client.testMethod0Request();
var params = request.getParams();
params.setParam0(4321);
var response = request.send();
var results = response.get();
Assert.assertEquals(params.getParam0(), results.getResult0());
}
@Test
public void testGenericServer() throws ExecutionException, InterruptedException {
var demo = new TestCap0Impl();
var client = new Demo.TestCap0.Client(demo);
var request = client.testMethod0Request();
var params = request.getParams();
var response = request.send();
var results = response.get();
Assert.assertEquals(params.getParam0(), results.getResult0());
}
@Test
public void testLocalTwoStagePipeline() {
var server0 = new Demo.Iface0.Server() {
boolean method0called = false;
@Override
protected CompletableFuture<java.lang.Void> method0(CallContext<Demo.Iface0.Method0Params.Reader, Demo.Iface0.Method0Results.Builder> ctx) {
method0called = true;
return CompletableFuture.completedFuture(null);
}
};
var server1 = new Demo.Iface1.Server() {
@Override
protected CompletableFuture<java.lang.Void> method1(CallContext<Demo.Iface1.Method1Params.Reader, Demo.Iface1.Method1Results.Builder> ctx) {
ctx.getResults().setResult0(new Demo.Iface0.Client(server0));
return CompletableFuture.completedFuture(null);
}
};
var iface1Client = new Demo.Iface1.Client(server1);
var request1 = iface1Client.method1Request();
var response = request1.send();
var iface0 = response.getResult0();
var request0 = iface0.method0Request();
var response0 = request0.send();
response0.join();
Assert.assertTrue(!response0.isCompletedExceptionally());
Assert.assertTrue(server0.method0called);
}
}

View file

@ -25,7 +25,8 @@ public class RpcStateTest {
class TestConnection implements VatNetwork.Connection { class TestConnection implements VatNetwork.Connection {
Executor executor = Executors.newSingleThreadExecutor(); private final CompletableFuture<java.lang.Void> disconnect = new CompletableFuture<>();
@Override @Override
public OutgoingRpcMessage newOutgoingMessage(int firstSegmentWordSize) { public OutgoingRpcMessage newOutgoingMessage(int firstSegmentWordSize) {
var message = new MessageBuilder(); var message = new MessageBuilder();
@ -52,6 +53,17 @@ public class RpcStateTest {
public CompletableFuture<IncomingRpcMessage> receiveIncomingMessage() { public CompletableFuture<IncomingRpcMessage> receiveIncomingMessage() {
return null; return null;
} }
@Override
public CompletableFuture<java.lang.Void> onDisconnect() {
return this.disconnect.copy();
}
@Override
public CompletableFuture<java.lang.Void> shutdown() {
this.disconnect.complete(null);
return this.disconnect.copy();
}
} }
TestConnection connection; TestConnection connection;
@ -63,7 +75,7 @@ public class RpcStateTest {
public void setUp() throws Exception { public void setUp() throws Exception {
connection = new TestConnection(); connection = new TestConnection();
bootstrapInterface = new Capability.Client(Capability.newNullCap()); bootstrapInterface = new Capability.Client(Capability.newNullCap());
rpc = new RpcState(connection, bootstrapInterface); rpc = new RpcState(bootstrapInterface, connection, connection.disconnect);
} }
@After @After

View file

@ -6,24 +6,27 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
class TestCap0Impl extends Demo.TestCap0.Server { class TestCap0Impl extends Demo.TestCap0.Server {
final Demo.TestCap1.Client testCap1a = new Demo.TestCap1.Client(new TestCap1Impl()); final Demo.TestCap1.Client testCap1a = new Demo.TestCap1.Client(new TestCap1Impl());
final Demo.TestCap1.Client testCap1b = new Demo.TestCap1.Client(new TestCap1Impl()); final Demo.TestCap1.Client testCap1b = new Demo.TestCap1.Client(new TestCap1Impl());
public CompletableFuture<?> testMethod0(CallContext<Demo.TestParams0.Reader, Demo.TestResults0.Builder> ctx) { public CompletableFuture<java.lang.Void> testMethod0(CallContext<Demo.TestParams0.Reader, Demo.TestResults0.Builder> ctx) {
var params = ctx.getParams(); var params = ctx.getParams();
var results = ctx.getResults(); var results = ctx.getResults();
results.setResult0(params.getParam0()); results.setResult0(params.getParam0());
ctx.releaseParams();
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
public CompletableFuture<?> testMethod1(CallContext<Demo.TestParams1.Reader, Demo.TestResults1.Builder> ctx) { public CompletableFuture<java.lang.Void> testMethod1(CallContext<Demo.TestParams1.Reader, Demo.TestResults1.Builder> ctx) {
var params = ctx.getParams(); var params = ctx.getParams();
var results = ctx.getResults(); var results = ctx.getResults();
var res0 = results.getResult0(); var res0 = results.getResult0();
@ -39,11 +42,29 @@ class TestCap0Impl extends Demo.TestCap0.Server {
class TestCap1Impl extends Demo.TestCap1.Server { class TestCap1Impl extends Demo.TestCap1.Server {
} }
public class TwoPartyTest { public class TwoPartyTest {
private Thread runServer(TwoPartyVatNetwork network) {
var thread = new Thread(() -> {
try {
network.onDisconnect().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}, "Server");
thread.start();
return thread;
}
AsynchronousServerSocketChannel serverSocket; AsynchronousServerSocketChannel serverSocket;
AsynchronousSocketChannel clientSocket; AsynchronousSocketChannel clientSocket;
TwoPartyClient client; TwoPartyClient client;
TwoPartyVatNetwork serverNetwork;
Thread serverThread;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -52,14 +73,19 @@ public class TwoPartyTest {
this.clientSocket = AsynchronousSocketChannel.open(); this.clientSocket = AsynchronousSocketChannel.open();
this.clientSocket.connect(this.serverSocket.getLocalAddress()).get(); this.clientSocket.connect(this.serverSocket.getLocalAddress()).get();
this.client = new TwoPartyClient(clientSocket); this.client = new TwoPartyClient(clientSocket);
var socket = serverSocket.accept().get();
this.serverNetwork = new TwoPartyVatNetwork(socket, RpcTwoPartyProtocol.Side.SERVER);
//this.serverNetwork.dumper.addSchema(Demo.TestCap1);
this.serverThread = runServer(this.serverNetwork);
} }
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
this.clientSocket.close(); this.clientSocket.close();
this.serverSocket.close(); this.serverSocket.close();
this.serverThread.join();
this.client = null; this.client = null;
} }
@ -73,36 +99,87 @@ public class TwoPartyTest {
} }
@Test @Test
public void testBasic() throws ExecutionException, InterruptedException { public void testBasic() throws ExecutionException, InterruptedException, IOException {
var capServer = new TestCap0Impl(); var server = new TwoPartyRpcSystem(this.serverNetwork, new TestCap0Impl());
var server = new TwoPartyServer(new Demo.TestCap0.Client(capServer));
server.listen(serverSocket); var demo = new Demo.TestCap0.Client(this.client.bootstrap());
var demoClient = new Demo.TestCap0.Client(this.client.bootstrap()); var request = demo.testMethod0Request();
var request = demoClient.testMethod0Request();
var params = request.getParams(); var params = request.getParams();
params.setParam0(4321); params.setParam0(4321);
var response = request.send(); var response = request.send();
while (!response.isDone()) { response.get();
CompletableFuture.anyOf(response, this.client.runOnce(), server.runOnce()).join();
}
Assert.assertTrue(response.isDone()); Assert.assertTrue(response.isDone());
var results = response.get(); var results = response.get();
Assert.assertEquals(params.getParam0(), results.getResult0()); Assert.assertEquals(params.getParam0(), results.getResult0());
this.clientSocket.shutdownOutput();
serverThread.join();
}
@Test
public void testBasicCleanup() throws ExecutionException, InterruptedException, TimeoutException {
var server = new TwoPartyRpcSystem(this.serverNetwork, new TestCap0Impl());
var demo = new Demo.TestCap0.Client(this.client.bootstrap());
var request = demo.testMethod0Request();
var params = request.getParams();
params.setParam0(4321);
var response = request.send();
response.get();
Assert.assertTrue(response.isDone());
var results = response.get();
Assert.assertEquals(params.getParam0(), results.getResult0());
demo = null;
}
@Test
public void testShutdown() throws InterruptedException, IOException {
var server = new TwoPartyRpcSystem(this.serverNetwork, new TestCap0Impl());
var demo = new Demo.TestCap0.Client(this.client.bootstrap());
this.clientSocket.shutdownOutput();
serverThread.join();
}
@Test
public void testCallThrows() throws ExecutionException, InterruptedException {
var impl = new Demo.TestCap0.Server() {
public CompletableFuture<java.lang.Void> testMethod0(CallContext<Demo.TestParams0.Reader, Demo.TestResults0.Builder> ctx) {
return CompletableFuture.failedFuture(new RuntimeException("Call to testMethod0 failed"));
}
public CompletableFuture<java.lang.Void> testMethod1(CallContext<Demo.TestParams1.Reader, Demo.TestResults1.Builder> ctx) {
return CompletableFuture.completedFuture(null);
}
};
var rpcSystem = new TwoPartyRpcSystem(this.serverNetwork, impl);
var demoClient = new Demo.TestCap0.Client(this.client.bootstrap());
{
var request = demoClient.testMethod0Request();
var response = request.send();
while (!response.isDone()) {
CompletableFuture.anyOf(response).exceptionally(exc -> { return null; });
}
Assert.assertTrue(response.isCompletedExceptionally());
}
// test that the system is still valid
{
var request = demoClient.testMethod1Request();
var response = request.send();
response.get();
Assert.assertFalse(response.isCompletedExceptionally());
}
} }
@Test @Test
public void testReturnCap() throws ExecutionException, InterruptedException { public void testReturnCap() throws ExecutionException, InterruptedException {
// send a capability back from the server to the client // send a capability back from the server to the client
var capServer = new TestCap0Impl(); var capServer = new TestCap0Impl();
var server = new TwoPartyServer(new Demo.TestCap0.Client(capServer)); var rpcSystem = new TwoPartyRpcSystem(this.serverNetwork, capServer);
server.listen(serverSocket);
var demoClient = new Demo.TestCap0.Client(this.client.bootstrap()); var demoClient = new Demo.TestCap0.Client(this.client.bootstrap());
var request = demoClient.testMethod1Request(); var request = demoClient.testMethod1Request();
var params = request.getParams();
var response = request.send(); var response = request.send();
while (!response.isDone()) { response.get();
CompletableFuture.anyOf(response, this.client.runOnce(), server.runOnce()).join();
}
Assert.assertTrue(response.isDone()); Assert.assertTrue(response.isDone());
var results = response.get(); var results = response.get();
@ -114,58 +191,5 @@ public class TwoPartyTest {
Assert.assertFalse(cap2.isNull()); Assert.assertFalse(cap2.isNull());
} }
@Test
public void testLocalServer() throws ExecutionException, InterruptedException {
var demo = new TestCap0Impl();
var client = new Demo.TestCap0.Client(demo);
var request = client.testMethod0Request();
var params = request.getParams();
params.setParam0(4321);
var response = request.send();
var results = response.get();
Assert.assertEquals(params.getParam0(), results.getResult0());
}
@Test
public void testGenericServer() throws ExecutionException, InterruptedException {
var demo = new TestCap0Impl();
var client = new Demo.TestCap0.Client(demo);
var request = client.testMethod0Request();
var params = request.getParams();
var response = request.send();
var results = response.get();
Assert.assertEquals(params.getParam0(), results.getResult0());
}
@Test
public void testLocalTwoStagePipeline() {
var server0 = new Demo.Iface0.Server() {
boolean method0called = false;
@Override
protected CompletableFuture<?> method0(CallContext<Demo.Iface0.Method0Params.Reader, Demo.Iface0.Method0Results.Builder> ctx) {
method0called = true;
return CompletableFuture.completedFuture(null);
}
};
var server1 = new Demo.Iface1.Server() {
@Override
protected CompletableFuture<?> method1(CallContext<Demo.Iface1.Method1Params.Reader, Demo.Iface1.Method1Results.Builder> ctx) {
ctx.getResults().setResult0(new Demo.Iface0.Client(server0));
return CompletableFuture.completedFuture(null);
}
};
var iface1Client = new Demo.Iface1.Client(server1);
var request1 = iface1Client.method1Request();
var response = request1.send();
var iface0 = response.getResult0();
var request0 = iface0.method0Request();
var response0 = request0.send();
response0.join();
Assert.assertTrue(!response0.isCompletedExceptionally());
Assert.assertTrue(server0.method0called);
}
} }

View file

@ -411,13 +411,13 @@ public final class Demo {
} }
} }
protected java.util.concurrent.CompletableFuture<?> method0(org.capnproto.CallContext<Method0Params.Reader, Method0Results.Builder> context) { protected java.util.concurrent.CompletableFuture<java.lang.Void> method0(org.capnproto.CallContext<Method0Params.Reader, Method0Results.Builder> context) {
return org.capnproto.Capability.Server.internalUnimplemented( return org.capnproto.Capability.Server.internalUnimplemented(
"runtime/src/test/java/org/capnproto/demo/demo.capnp:Iface0", "method0", "runtime/src/test/java/org/capnproto/demo/demo.capnp:Iface0", "method0",
0xac6d126c2fac16ebL, (short)0); 0xac6d126c2fac16ebL, (short)0);
} }
protected java.util.concurrent.CompletableFuture<?> method1(org.capnproto.StreamingCallContext<Method1Params.Reader> context) { protected java.util.concurrent.CompletableFuture<java.lang.Void> method1(org.capnproto.StreamingCallContext<Method1Params.Reader> context) {
return org.capnproto.Capability.Server.internalUnimplemented( return org.capnproto.Capability.Server.internalUnimplemented(
"runtime/src/test/java/org/capnproto/demo/demo.capnp:Iface0", "method1", "runtime/src/test/java/org/capnproto/demo/demo.capnp:Iface0", "method1",
0xac6d126c2fac16ebL, (short)1); 0xac6d126c2fac16ebL, (short)1);
@ -715,13 +715,13 @@ public final class Demo {
} }
} }
protected java.util.concurrent.CompletableFuture<?> testMethod0(org.capnproto.CallContext<org.capnproto.demo.Demo.TestParams0.Reader, org.capnproto.demo.Demo.TestResults0.Builder> context) { protected java.util.concurrent.CompletableFuture<java.lang.Void> testMethod0(org.capnproto.CallContext<org.capnproto.demo.Demo.TestParams0.Reader, org.capnproto.demo.Demo.TestResults0.Builder> context) {
return org.capnproto.Capability.Server.internalUnimplemented( return org.capnproto.Capability.Server.internalUnimplemented(
"runtime/src/test/java/org/capnproto/demo/demo.capnp:TestCap0", "testMethod0", "runtime/src/test/java/org/capnproto/demo/demo.capnp:TestCap0", "testMethod0",
0x9c0c5ee4bb0cc725L, (short)0); 0x9c0c5ee4bb0cc725L, (short)0);
} }
protected java.util.concurrent.CompletableFuture<?> testMethod1(org.capnproto.CallContext<org.capnproto.demo.Demo.TestParams1.Reader, org.capnproto.demo.Demo.TestResults1.Builder> context) { protected java.util.concurrent.CompletableFuture<java.lang.Void> testMethod1(org.capnproto.CallContext<org.capnproto.demo.Demo.TestParams1.Reader, org.capnproto.demo.Demo.TestResults1.Builder> context) {
return org.capnproto.Capability.Server.internalUnimplemented( return org.capnproto.Capability.Server.internalUnimplemented(
"runtime/src/test/java/org/capnproto/demo/demo.capnp:TestCap0", "testMethod1", "runtime/src/test/java/org/capnproto/demo/demo.capnp:TestCap0", "testMethod1",
0x9c0c5ee4bb0cc725L, (short)1); 0x9c0c5ee4bb0cc725L, (short)1);
@ -829,13 +829,13 @@ public final class Demo {
} }
} }
protected java.util.concurrent.CompletableFuture<?> method0(org.capnproto.CallContext<Method0Params.Reader, Method0Results.Builder> context) { protected java.util.concurrent.CompletableFuture<java.lang.Void> method0(org.capnproto.CallContext<Method0Params.Reader, Method0Results.Builder> context) {
return org.capnproto.Capability.Server.internalUnimplemented( return org.capnproto.Capability.Server.internalUnimplemented(
"runtime/src/test/java/org/capnproto/demo/demo.capnp:Iface1", "method0", "runtime/src/test/java/org/capnproto/demo/demo.capnp:Iface1", "method0",
0xd52dcf38c9f6f7c0L, (short)0); 0xd52dcf38c9f6f7c0L, (short)0);
} }
protected java.util.concurrent.CompletableFuture<?> method1(org.capnproto.CallContext<Method1Params.Reader, Method1Results.Builder> context) { protected java.util.concurrent.CompletableFuture<java.lang.Void> method1(org.capnproto.CallContext<Method1Params.Reader, Method1Results.Builder> context) {
return org.capnproto.Capability.Server.internalUnimplemented( return org.capnproto.Capability.Server.internalUnimplemented(
"runtime/src/test/java/org/capnproto/demo/demo.capnp:Iface1", "method1", "runtime/src/test/java/org/capnproto/demo/demo.capnp:Iface1", "method1",
0xd52dcf38c9f6f7c0L, (short)1); 0xd52dcf38c9f6f7c0L, (short)1);