From 4a77f678191d01f74159b2971f5a7bf0e759a0cd Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Tue, 29 Sep 2020 14:08:23 +0100 Subject: [PATCH] twoparty rpc --- .../main/java/org/capnproto/AnyPointer.java | 7 + .../main/java/org/capnproto/CallContext.java | 4 +- .../main/java/org/capnproto/Capability.java | 81 +- .../main/java/org/capnproto/ClientHook.java | 47 +- .../main/java/org/capnproto/ExportTable.java | 21 +- .../main/java/org/capnproto/PipelineHook.java | 2 +- .../main/java/org/capnproto/QueuedClient.java | 4 +- .../src/main/java/org/capnproto/Request.java | 30 +- .../src/main/java/org/capnproto/RpcState.java | 708 +++++++++++++++++- .../main/java/org/capnproto/RpcSystem.java | 42 +- .../java/org/capnproto/TwoPartyClient.java | 33 + .../java/org/capnproto/TwoPartyRpcSystem.java | 14 +- .../java/org/capnproto/TwoPartyServer.java | 64 ++ .../org/capnproto/TwoPartyVatNetwork.java | 35 +- .../main/java/org/capnproto/VatNetwork.java | 8 +- .../main/java/org/capnproto/WireHelpers.java | 6 +- .../test/java/org/capnproto/RpcStateTest.java | 5 +- .../test/java/org/capnproto/TwoPartyTest.java | 216 ++++++ .../test/java/org/capnproto/demo/Demo.java | 371 +++++++++ .../java/org/capnproto/demo/democap.capnp | 12 + .../java/org/capnproto/demo/democap.capnp.c++ | 190 +++++ .../java/org/capnproto/demo/democap.capnp.h | 216 ++++++ .../java/org/capnproto/demo/demoparams.capnp | 23 + 23 files changed, 2017 insertions(+), 122 deletions(-) create mode 100644 runtime/src/main/java/org/capnproto/TwoPartyClient.java create mode 100644 runtime/src/main/java/org/capnproto/TwoPartyServer.java create mode 100644 runtime/src/test/java/org/capnproto/TwoPartyTest.java create mode 100644 runtime/src/test/java/org/capnproto/demo/Demo.java create mode 100644 runtime/src/test/java/org/capnproto/demo/democap.capnp create mode 100644 runtime/src/test/java/org/capnproto/demo/democap.capnp.c++ create mode 100644 runtime/src/test/java/org/capnproto/demo/democap.capnp.h create mode 100644 runtime/src/test/java/org/capnproto/demo/demoparams.capnp diff --git a/runtime/src/main/java/org/capnproto/AnyPointer.java b/runtime/src/main/java/org/capnproto/AnyPointer.java index 9a43473..5c3fac2 100644 --- a/runtime/src/main/java/org/capnproto/AnyPointer.java +++ b/runtime/src/main/java/org/capnproto/AnyPointer.java @@ -73,12 +73,19 @@ public final class AnyPointer { return factory.fromPointerReader(this.segment, this.capTable, this.pointer, this.nestingLimit); } + public final Capability.Client getAsCapability() { + return new Capability.Client( + WireHelpers.readCapabilityPointer(this.segment, capTable, this.pointer, 0)); + } + public final ClientHook getPipelinedCap(PipelineOp[] ops) { for (var op: ops) { switch (op.type) { case NOOP: break; case GET_POINTER_FIELD: + var index = op.pointerIndex; + // TODO getpointerfield break; } } diff --git a/runtime/src/main/java/org/capnproto/CallContext.java b/runtime/src/main/java/org/capnproto/CallContext.java index 42b02f3..a01d9e6 100644 --- a/runtime/src/main/java/org/capnproto/CallContext.java +++ b/runtime/src/main/java/org/capnproto/CallContext.java @@ -5,8 +5,8 @@ import java.util.concurrent.CompletableFuture; public class CallContext { final CallContextHook hook; - final FromPointerReader params; - final FromPointerBuilder results; + private final FromPointerReader params; + private final FromPointerBuilder results; CallContext(FromPointerReader params, FromPointerBuilder results, diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index b38c3c4..9706fd0 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -25,7 +25,7 @@ public final class Capability { } public Client(Throwable exc) { - this(ClientHook.newBrokenCap(exc)); + this(newBrokenCap(exc)); } public ClientHook getHook() { @@ -46,12 +46,16 @@ public final class Capability { FromPointerReader reader, long interfaceId, short methodId) { var request = hook.newCall(interfaceId, methodId); - return new Request (request.params, reader, request.hook); + return new Request (builder, reader, request.params, request.hook); } public Request newCall(long interfaceId, short methodId) { return hook.newCall(interfaceId, methodId); } + + private static ClientHook makeLocalClient(Capability.Server server) { + return server.makeLocalClient(); + } } public abstract static class Server { @@ -79,7 +83,7 @@ public final class Capability { public Request newCall(long interfaceId, short methodId) { var hook = new LocalRequest(interfaceId, methodId, this); var root = hook.message.getRoot(AnyPointer.factory); - return new Request<>(root, AnyPointer.factory, hook); + return new Request<>(AnyPointer.factory, AnyPointer.factory, root, hook); } @Override @@ -99,9 +103,11 @@ public final class Capability { return new LocalPipeline(ctx); }); - pipelinePromise = ctx.onTailCall().applyToEither(pipelinePromise, pipeline -> { - return pipeline; - }); + var tailCall = ctx.onTailCall(); + // TODO implement tailCall + if (tailCall != null) { + pipelinePromise = tailCall.applyToEither(pipelinePromise, pipeline -> pipeline); + } return new VoidPromiseAndPipeline(forked, new QueuedPipeline(pipelinePromise)); } @@ -116,11 +122,11 @@ public final class Capability { return BRAND; } - CompletableFuture callInternal(long interfaceId, short methodId, CallContextHook context) { + CompletableFuture callInternal(long interfaceId, short methodId, CallContextHook context) { var result = dispatchCall( interfaceId, methodId, - new CallContext(AnyPointer.factory, AnyPointer.factory, context)); + new CallContext<>(AnyPointer.factory, AnyPointer.factory, context)); if (result.streaming) { // TODO streaming return null; @@ -141,11 +147,11 @@ public final class Capability { } } - public final class DispatchCallResult { - private final CompletableFuture promise; + final class DispatchCallResult { + private final CompletableFuture promise; private final boolean streaming; - public DispatchCallResult(CompletableFuture promise) { + public DispatchCallResult(CompletableFuture promise) { this.promise = promise; this.streaming = false; } @@ -155,7 +161,7 @@ public final class Capability { this.streaming = false; } - DispatchCallResult(CompletableFuture promise, boolean isStreaming) { + DispatchCallResult(CompletableFuture promise, boolean isStreaming) { this.promise = promise; this.streaming = isStreaming; } @@ -201,10 +207,14 @@ public final class Capability { } } - static ClientHook newLocalPromiseClient(CompletableFuture promise) { + public static ClientHook newLocalPromiseClient(CompletableFuture promise) { return new QueuedClient(promise); } + public static PipelineHook newLocalPromisePipeline(CompletableFuture promise) { + return new QueuedPipeline(promise); + } + static class LocalRequest implements RequestHook { final MessageBuilder message = new MessageBuilder(); @@ -315,4 +325,49 @@ public final class Capability { return null; } } + + public static ClientHook newBrokenCap(String reason) { + return newBrokenClient(reason, false, ClientHook.BROKEN_CAPABILITY_BRAND); + } + + public static ClientHook newBrokenCap(Throwable exc) { + return newBrokenClient(exc, false, ClientHook.BROKEN_CAPABILITY_BRAND); + } + + public static ClientHook newNullCap() { + return newBrokenClient(new RuntimeException("Called null capability"), true, ClientHook.NULL_CAPABILITY_BRAND); + } + + static private ClientHook newBrokenClient(String reason, boolean resolved, Object brand) { + return newBrokenClient(new RuntimeException(reason), resolved, brand); + } + + static private ClientHook newBrokenClient(Throwable exc, boolean resolved, Object brand) { + return new ClientHook() { + @Override + public Request newCall(long interfaceId, short methodId) { + return Request.newBrokenRequest(exc); + } + + @Override + public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) { + return new VoidPromiseAndPipeline(CompletableFuture.failedFuture(exc), null); + } + + @Override + public CompletableFuture whenMoreResolved() { + if (resolved) { + return null; + } else { + return CompletableFuture.failedFuture(exc); + } + } + + @Override + public Object getBrand() { + return brand; + } + }; + } + } diff --git a/runtime/src/main/java/org/capnproto/ClientHook.java b/runtime/src/main/java/org/capnproto/ClientHook.java index 8e8ad8d..9dc49ae 100644 --- a/runtime/src/main/java/org/capnproto/ClientHook.java +++ b/runtime/src/main/java/org/capnproto/ClientHook.java @@ -43,56 +43,13 @@ public interface ClientHook { } final class VoidPromiseAndPipeline { - public final CompletableFuture promise; + public final CompletableFuture promise; public final PipelineHook pipeline; - VoidPromiseAndPipeline(CompletableFuture promise, PipelineHook pipeline) { + VoidPromiseAndPipeline(CompletableFuture promise, PipelineHook pipeline) { this.promise = promise; this.pipeline = pipeline; } } - static ClientHook newBrokenCap(String reason) { - return newBrokenClient(reason, false, BROKEN_CAPABILITY_BRAND); - } - - static ClientHook newBrokenCap(Throwable exc) { - return newBrokenClient(exc, false, BROKEN_CAPABILITY_BRAND); - } - - static ClientHook newNullCap() { - return newBrokenClient(new RuntimeException("Called null capability"), true, NULL_CAPABILITY_BRAND); - } - - static private ClientHook newBrokenClient(String reason, boolean resolved, Object brand) { - return newBrokenClient(new RuntimeException(reason), resolved, brand); - } - - static private ClientHook newBrokenClient(Throwable exc, boolean resolved, Object brand) { - return new ClientHook() { - @Override - public Request newCall(long interfaceId, short methodId) { - return Request.newBrokenRequest(exc); - } - - @Override - public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) { - return new VoidPromiseAndPipeline(CompletableFuture.failedFuture(exc), null); - } - - @Override - public CompletableFuture whenMoreResolved() { - if (resolved) { - return null; - } else { - return CompletableFuture.failedFuture(exc); - } - } - - @Override - public Object getBrand() { - return brand; - } - }; - } } diff --git a/runtime/src/main/java/org/capnproto/ExportTable.java b/runtime/src/main/java/org/capnproto/ExportTable.java index d199130..f91a7e0 100644 --- a/runtime/src/main/java/org/capnproto/ExportTable.java +++ b/runtime/src/main/java/org/capnproto/ExportTable.java @@ -6,12 +6,14 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.function.Consumer; -class ExportTable implements Iterable { +abstract class ExportTable implements Iterable { final HashMap slots = new HashMap<>(); final Queue freeIds = new PriorityQueue<>(); int max = 0; + abstract T newExportable(int id); + public T find(int id) { return slots.get(id); } @@ -26,17 +28,12 @@ class ExportTable implements Iterable { } } - public int next(T value) { - if (freeIds.isEmpty()) { - var id = max; - max++; - slots.put(id, value); - return id; - } else { - var id = freeIds.remove(); - slots.put(id, value); - return id; - } + public T next() { + int id = freeIds.isEmpty() ? max++ : freeIds.remove(); + var value = newExportable(id); + var prev = slots.put(id, value); + assert prev == null; + return value; } @Override diff --git a/runtime/src/main/java/org/capnproto/PipelineHook.java b/runtime/src/main/java/org/capnproto/PipelineHook.java index d242f2d..f926c54 100644 --- a/runtime/src/main/java/org/capnproto/PipelineHook.java +++ b/runtime/src/main/java/org/capnproto/PipelineHook.java @@ -5,6 +5,6 @@ interface PipelineHook { ClientHook getPipelinedCap(PipelineOp[] ops); static PipelineHook newBrokenPipeline(Throwable exc) { - return ops -> ClientHook.newBrokenCap(exc); + return ops -> Capability.newBrokenCap(exc); } } diff --git a/runtime/src/main/java/org/capnproto/QueuedClient.java b/runtime/src/main/java/org/capnproto/QueuedClient.java index 9c53bb5..64171f1 100644 --- a/runtime/src/main/java/org/capnproto/QueuedClient.java +++ b/runtime/src/main/java/org/capnproto/QueuedClient.java @@ -18,7 +18,7 @@ class QueuedClient implements ClientHook { this.setResolutionOp = promise.thenAccept(inner -> { this.redirect = inner; }).exceptionally(exc -> { - this.redirect = ClientHook.newBrokenCap(exc); + this.redirect = Capability.newBrokenCap(exc); return null; }); } @@ -27,7 +27,7 @@ class QueuedClient implements ClientHook { public Request newCall(long interfaceId, short methodId) { var hook = new Capability.LocalRequest(interfaceId, methodId, this); var root = hook.message.getRoot(AnyPointer.factory); - return new Request<>(root, AnyPointer.factory, hook); + return new Request<>(AnyPointer.factory, AnyPointer.factory, root, hook); } @Override diff --git a/runtime/src/main/java/org/capnproto/Request.java b/runtime/src/main/java/org/capnproto/Request.java index 36cdc30..38c931c 100644 --- a/runtime/src/main/java/org/capnproto/Request.java +++ b/runtime/src/main/java/org/capnproto/Request.java @@ -4,25 +4,29 @@ import java.util.concurrent.CompletableFuture; public class Request { - final AnyPointer.Builder params; - private final FromPointerReader results; + AnyPointer.Builder params; + private final FromPointerBuilder paramsBuilder; + private final FromPointerReader resultsReader; RequestHook hook; - Request(AnyPointer.Builder params, FromPointerReader results, RequestHook hook) { + Request(FromPointerBuilder paramsBuilder, + FromPointerReader resultsReader, + AnyPointer.Builder params, RequestHook hook) { + this.paramsBuilder = paramsBuilder; + this.resultsReader = resultsReader; this.params = params; - this.results = results; this.hook = hook; } - AnyPointer.Builder params() { - return params; + Params params() { + return params.getAs(paramsBuilder); } CompletableFuture send() { var typelessPromise = hook.send(); hook = null; // prevent reuse return typelessPromise.getResponse().thenApply(response -> { - return response.getAs(results); + return response.getAs(resultsReader); }); } @@ -42,7 +46,17 @@ public class Request { }; var root = message.getRoot(AnyPointer.factory); - return new Request(root, null, hook); + return new Request(null, null, root, hook); + } + + static Request newTypelessRequest(AnyPointer.Builder root, RequestHook hook) { + return new Request<>(AnyPointer.factory, AnyPointer.factory, root, hook); + } + + static Request fromTypeless(FromPointerBuilder params, + FromPointerReader results, + Request typeless) { + return new Request<>(params, results, typeless.params(), typeless.hook); } } diff --git a/runtime/src/main/java/org/capnproto/RpcState.java b/runtime/src/main/java/org/capnproto/RpcState.java index 1e55de8..0742d19 100644 --- a/runtime/src/main/java/org/capnproto/RpcState.java +++ b/runtime/src/main/java/org/capnproto/RpcState.java @@ -3,15 +3,20 @@ package org.capnproto; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.function.Consumer; final class RpcState { static final class Question { + final int id; List paramExports; boolean isAwaitingReturn = false; boolean isTailCall = false; boolean skipFinish = false; + + Question(int id) { + this.id = id; + } + CompletableFuture response = new CompletableFuture<>(); void reject(Throwable exc) { @@ -32,9 +37,14 @@ final class RpcState { } static final class Export { + final int id; int refcount; ClientHook clientHook; CompletionStage resolveOp; + + Export(int id) { + this.id = id; + } } static final class Import { @@ -45,11 +55,27 @@ final class RpcState { } final static class Embargo { - CompletableFuture fulfiller; + final int id; + CompletableFuture disembargo; + + Embargo(int id) { + this.id = id; + } } - private final ExportTable exports = new ExportTable(); - private final ExportTable questions = new ExportTable(); + private final ExportTable exports = new ExportTable() { + @Override + Export newExportable(int id) { + return new Export(id); + } + }; + + private final ExportTable questions = new ExportTable() { + @Override + Question newExportable(int id) { + return new Question(id); + } + }; private final ImportTable answers = new ImportTable() { @Override @@ -65,25 +91,95 @@ final class RpcState { } }; - private final ExportTable embargos = new ExportTable(); + private final ExportTable embargos = new ExportTable() { + @Override + Embargo newExportable(int id) { + return new Embargo(id); + } + }; private final HashMap exportsByCap = new HashMap<>(); private final VatNetwork.Connection connection; private final Capability.Client bootstrapInterface; private Throwable disconnected = null; + private CompletableFuture messageReady = CompletableFuture.completedFuture(null); RpcState(VatNetwork.Connection connection, Capability.Client bootstrapInterface) { this.connection = connection; this.bootstrapInterface = bootstrapInterface; } - boolean isDisconnected() { + final boolean isDisconnected() { return this.disconnected != null; } - void handleMessage(IncomingRpcMessage message) throws RpcException { + final boolean isConnected() { + return !isDisconnected(); + } + + ClientHook restore() { + var question = questions.next(); + question.isAwaitingReturn = true; + question.paramExports = List.of(); + var message = connection.newOutgoingMessage(64); + var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap(); + builder.setQuestionId(question.id); + message.send(); + var pipeline = new RpcPipeline(question); + return pipeline.getPipelinedCap(new PipelineOp[0]); + } + + // run message loop once + final CompletableFuture runOnce() { + + if (isDisconnected()) { + return CompletableFuture.failedFuture(disconnected); + } + + if (!messageReady.isDone()) { + return messageReady; + } + + messageReady = connection.receiveIncomingMessage().thenAccept(message -> { + try { + handleMessage(message); + } + catch (Exception exc) { + this.disconnected = exc; + } + }).exceptionally(exc -> { + this.disconnected = exc; + return null; + }); + + return messageReady; + } + + // run message loop until promise is completed + public final CompletableFuture messageLoop(CompletableFuture done) { + if (done.isDone()) { + return done; + } + + if (isDisconnected()) { + done.completeExceptionally(disconnected); + return done; + } + + return connection.receiveIncomingMessage().thenCompose(message -> { + try { + handleMessage(message); + } + catch (Exception exc) { + done.completeExceptionally(exc); + } + return messageLoop(done); + }); + } + + synchronized void handleMessage(IncomingRpcMessage message) throws RpcException { var reader = message.getBody().getAs(RpcProtocol.Message.factory); - + System.out.println(reader.which()); switch (reader.which()) { case UNIMPLEMENTED: handleUnimplemented(reader.getUnimplemented()); @@ -194,7 +290,7 @@ final class RpcState { answer.resultExports = writeDescriptors(capTableArray, payload, fds); answer.pipeline = ops -> ops.length == 0 ? capHook - : ClientHook.newBrokenCap("Invalid pipeline transform."); + : Capability.newBrokenCap("Invalid pipeline transform."); response.send(); @@ -204,9 +300,148 @@ final class RpcState { } void handleCall(IncomingRpcMessage message, RpcProtocol.Call.Reader call) { + var cap = getMessageTarget(call.getTarget()); + if (cap == null) { + return; + } + + boolean redirectResults; + switch (call.getSendResultsTo().which()) { + case CALLER: + redirectResults = false; + break; + case YOURSELF: + redirectResults = true; + break; + default: + assert false: "Unsupported 'Call.sendResultsTo'."; + return; + } + + var payload = call.getParams(); + var capTableArray = receiveCaps(payload.getCapTable(), message.getAttachedFds()); + var answerId = call.getQuestionId(); + var cancel = new CompletableFuture(); + var context = new RpcCallContext( + answerId, message, capTableArray, + payload.getContent(), redirectResults, cancel, + call.getInterfaceId(), call.getMethodId()); + + { + var answer = answers.put(answerId); + assert !answer.active : "questionId is already in use"; + if (answer.active) { + return; + } + + answer.active = true; + answer.callContext = context; + } + + var pap = startCall(call.getInterfaceId(), call.getMethodId(), cap, context); + { + var answer = answers.find(answerId); + assert answer != null; + answer.pipeline = pap.pipeline; + + if (redirectResults) { + answer.redirectedResults = pap.promise.thenApply(x -> { + return context.consumeRedirectedResponse(); + }); + // TODO cancellation deferral + } + else { + pap.promise.thenAccept(x -> { + context.sendReturn(); + }).exceptionally(exc -> { + context.sendErrorReturn(exc); + // TODO wait on the cancellation... + return null; + }); + } + } + } + + private ClientHook.VoidPromiseAndPipeline startCall(long interfaceId, short methodId, ClientHook cap, RpcCallContext context) { + // TODO gateways...? + return cap.call(interfaceId, methodId, context); } void handleReturn(IncomingRpcMessage message, RpcProtocol.Return.Reader callReturn) { + var exportsToRelease = new ArrayList(); + + var question = questions.find(callReturn.getAnswerId()); + assert question != null : "Invalid question ID in Return message."; + if (question == null) { + return; + } + + assert question.isAwaitingReturn: "Duplicate Return"; + if (!question.isAwaitingReturn) { + return; + } + + question.isAwaitingReturn = false; + if (callReturn.getReleaseParamCaps()) { + exportsToRelease.addAll(question.paramExports); + question.paramExports = List.of(); + } + + assert !callReturn.isTakeFromOtherQuestion(): "Not implemented"; + if (callReturn.isTakeFromOtherQuestion()) { + // TODO process isTakeFromOtherQuestion... + return; + } + + switch (callReturn.which()) { + case RESULTS: + if (question.isTailCall) { + // TODO resultsSentElsewhere + return; + } + var payload = callReturn.getResults(); + var capTable = receiveCaps(payload.getCapTable(), message.getAttachedFds()); + var response = new RpcResponseImpl(question, message, capTable, payload.getContent()); + question.answer(response); + break; + case EXCEPTION: + assert !question.isTailCall : "Tail call `Return` must set `resultsSentElsewhere`, not `exception`."; + if (question.isTailCall) { + return; + } + question.reject(RpcException.toException(callReturn.getException())); + break; + case CANCELED: + assert false : "Return message falsely claims call was canceled."; + break; + case RESULTS_SENT_ELSEWHERE: + assert question.isTailCall : "`Return` had `resultsSentElsewhere` but this was not a tail call."; + if (!question.isTailCall) { + return; + } + // Tail calls are fulfilled with a null pointer. + question.answer(() -> null); + break; + + case TAKE_FROM_OTHER_QUESTION: + var other = callReturn.getTakeFromOtherQuestion(); + var answer = answers.find(other); + assert answer != null : "`Return.takeFromOtherQuestion` had invalid answer ID."; + if (answer == null) { + return; + } + assert answer.redirectedResults != null : "`Return.takeFromOtherQuestion` referenced a call that did not use `sendResultsTo.yourself`."; + if (answer.redirectedResults == null) { + return; + } + question.response = answer.redirectedResults; + answer.redirectedResults = null; + break; + default: + assert false : "Unknown 'Return' type."; + return; + } + } void handleFinish(RpcProtocol.Finish.Reader finish) { @@ -218,7 +453,6 @@ final class RpcState { void handleDisembargo(RpcProtocol.Disembargo.Reader disembargo) { } - private List writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List fds) { if (capTable.length == 0) { return List.of(); @@ -271,21 +505,20 @@ final class RpcState { } // This is the first time we've seen this capability. - var export = new Export(); + var export = exports.next(); export.refcount = 1; export.clientHook = inner; - exportId = exports.next(export); var wrapped = inner.whenMoreResolved(); if (wrapped != null) { // This is a promise. Arrange for the `Resolve` message to be sent later. - export.resolveOp = resolveExportedPromise(exportId, wrapped); - descriptor.setSenderPromise(exportId); + export.resolveOp = resolveExportedPromise(export.id, wrapped); + descriptor.setSenderPromise(export.id); } else { - descriptor.setSenderHosted(exportId); + descriptor.setSenderHosted(export.id); } - return exportId; + return export.id; } CompletionStage resolveExportedPromise(int exportId, CompletionStage promise) { @@ -394,7 +627,7 @@ final class RpcState { case RECEIVER_HOSTED: var exp = exports.find(descriptor.getReceiverHosted()); if (exp == null) { - return ClientHook.newBrokenCap("invalid 'receiverHosted' export ID"); + return Capability.newBrokenCap("invalid 'receiverHosted' export ID"); } if (exp.clientHook.getBrand() == this) { // TODO Tribble 4-way race! @@ -409,12 +642,12 @@ final class RpcState { var ops = PipelineOp.ToPipelineOps(promisedAnswer); if (answer == null || !answer.active || answer.pipeline == null || ops == null) { - return ClientHook.newBrokenCap("invalid 'receiverAnswer'"); + return Capability.newBrokenCap("invalid 'receiverAnswer'"); } var result = answer.pipeline.getPipelinedCap(ops); if (result == null) { - return ClientHook.newBrokenCap("Unrecognised pipeline ops"); + return Capability.newBrokenCap("Unrecognised pipeline ops"); } if (result.getBrand() == this) { @@ -425,14 +658,13 @@ final class RpcState { return result; case THIRD_PARTY_HOSTED: - return ClientHook.newBrokenCap("Third party caps not supported"); + return Capability.newBrokenCap("Third party caps not supported"); default: - return ClientHook.newBrokenCap("unknown CapDescriptor type"); + return Capability.newBrokenCap("unknown CapDescriptor type"); } } - private ClientHook importCap(int importId, boolean isPromise, Integer fd) { // Receive a new import. @@ -467,6 +699,39 @@ final class RpcState { : cap; } + ClientHook getMessageTarget(RpcProtocol.MessageTarget.Reader target) { + switch (target.which()) { + case IMPORTED_CAP: + var exp = exports.find(target.getImportedCap()); + assert exp != null: "Message target is not a current export ID."; + return exp != null ? exp.clientHook : null; + + case PROMISED_ANSWER: + var promisedAnswer = target.getPromisedAnswer(); + var base = answers.find(promisedAnswer.getQuestionId()); + assert base != null && base.active: "PromisedAnswer.questionId is not a current question."; + if (base == null || !base.active) { + return null; + } + + var pipeline = base.pipeline; + if (pipeline == null) { + pipeline = PipelineHook.newBrokenPipeline( + RpcException.failed("Pipeline call on a request that returned no capabilities or was already closed.")); + } + + var ops = PipelineOp.ToPipelineOps(promisedAnswer); + if (ops == null) { + return null; + } + return pipeline.getPipelinedCap(ops); + + default: + assert false: "Unknown message target type. " + target.which(); + return null; + } + } + ClientHook getInnermostClient(ClientHook client) { for (;;) { var inner = client.getResolved(); @@ -489,6 +754,79 @@ final class RpcState { AnyPointer.Reader getResults(); } + interface RpcServerResponse { + AnyPointer.Builder getResultsBuilder(); + } + + static class RpcResponseImpl implements RpcResponse { + private final Question question; + private final IncomingRpcMessage message; + private final AnyPointer.Reader results; + + RpcResponseImpl(Question question, + IncomingRpcMessage message, + List capTable, + AnyPointer.Reader results) { + this.question = question; + this.message = message; + this.results = results.imbue(new ReaderCapabilityTable(capTable)); + } + + public AnyPointer.Reader getResults() { + return results; + } + } + + class RpcServerResponseImpl implements RpcServerResponse { + + final OutgoingRpcMessage message; + final RpcProtocol.Payload.Builder payload; + final BuilderCapabilityTable capTable = new BuilderCapabilityTable(); + + RpcServerResponseImpl(OutgoingRpcMessage message, RpcProtocol.Payload.Builder payload) { + this.message = message; + this.payload = payload; + } + + @Override + public AnyPointer.Builder getResultsBuilder() { + return payload.getContent().imbue(capTable); + } + + List send() { + var capTable = this.capTable.getTable(); + var fds = List.of(); + var exports = writeDescriptors(capTable, payload, fds); + // TODO process FDs + message.setFds(fds); + + for (int ii = 0; ii < capTable.length; ++ii) { + var slot = capTable[ii]; + if (slot != null) { + capTable[ii] = getInnermostClient(slot); + } + } + + message.send(); + return exports; + } + } + + private static class LocallyRedirectedRpcResponse implements RpcServerResponse, RpcResponse { + + private final MessageBuilder message = new MessageBuilder(); + + @Override + public AnyPointer.Builder getResultsBuilder() { + return message.getRoot(AnyPointer.factory); + } + + @Override + public AnyPointer.Reader getResults() { + return getResultsBuilder().asReader(); + } + } + class RpcCallContext implements CallContextHook { final int answerId; @@ -500,9 +838,13 @@ final class RpcState { final AnyPointer.Reader params; // response - RpcResponse response; + RpcServerResponse response; RpcProtocol.Return.Builder returnMessage; boolean redirectResults = false; + boolean responseSent = false; + + boolean cancelRequested = false; + boolean cancelAllowed = false; final CompletableFuture cancelled; @@ -531,7 +873,20 @@ final class RpcState { @Override public AnyPointer.Builder getResults() { - return null; + if (response != null) { + return response.getResultsBuilder(); + } + + if (redirectResults || isDisconnected()) { + response = new LocallyRedirectedRpcResponse(); + } + else { + var message = connection.newOutgoingMessage(1024); + returnMessage = message.getBody().initAs(RpcProtocol.Message.factory).initReturn(); + response = new RpcServerResponseImpl(message, returnMessage.getResults()); + } + + return response.getResultsBuilder(); } @Override @@ -552,6 +907,147 @@ final class RpcState { public ClientHook.VoidPromiseAndPipeline directTailCall(RequestHook request) { return null; } + + RpcResponse consumeRedirectedResponse() { + assert this.redirectResults; + + if (this.response == null) { + getResults(); // force initialization of response + } + + return ((LocallyRedirectedRpcResponse)this.response); + } + + void sendReturn() { + assert !redirectResults; + + if (!this.cancelRequested && isDisconnected()) { + assert false: "Cancellation should have been requested on disconnect."; + return; + } + + if (this.response == null) { + getResults(); // force initialization + } + + this.returnMessage.setAnswerId(this.answerId); + this.returnMessage.setReleaseParamCaps(false); + + List exports; + try { + exports = ((RpcServerResponseImpl)response).send(); + } + catch (Throwable exc) { + this.responseSent = false; + sendErrorReturn(exc); + } + } + + void sendErrorReturn(Throwable exc) { + assert !redirectResults; + + if (!isFirstResponder()) { + return; + } + + if (isConnected()) { + var message = connection.newOutgoingMessage(1024); + var builder = message.getBody().initAs(RpcProtocol.Message.factory).initReturn(); + builder.setAnswerId(this.answerId); + builder.setReleaseParamCaps(false); + RpcException.fromException(exc, builder.initException()); + message.send(); + } + + cleanupAnswerTable(null, false); + } + + boolean isFirstResponder() { + if (this.responseSent) { + return false; + } + this.responseSent = true; + return true; + } + + void cleanupAnswerTable(List resultExports, boolean shouldFreePipeline) { + if (this.cancelRequested) { + assert resultExports.size() == 0; + answers.erase(this.answerId); + return; + } + else { + var answer = answers.find(answerId); + answer.callContext = null; + answer.resultExports = resultExports; + + if (shouldFreePipeline) { + assert resultExports.size() == 0; + answer.pipeline = null; + } + } + } + } + + enum PipelineState { + WAITING, RESOLVED, BROKEN + } + + class RpcPipeline implements PipelineHook { + + private final Question question; + private PipelineState state = PipelineState.WAITING; + private RpcResponse resolved; + private Throwable broken; + + final HashMap, ClientHook> clientMap = new HashMap<>(); + final CompletionStage redirectLater; + final CompletionStage resolveSelf; + + RpcPipeline(Question question, + CompletionStage redirectLater) { + this.question = question; + this.redirectLater = redirectLater; + this.resolveSelf = this.redirectLater + .thenAccept(response -> { + this.state = PipelineState.RESOLVED; + this.resolved = response; + }) + .exceptionally(exc -> { + this.state = PipelineState.BROKEN; + this.broken = exc; + return null; + }); + } + + RpcPipeline(Question question) { + // never resolves + this.question = question; + this.redirectLater = null; + this.resolveSelf = null; + } + + @Override + public ClientHook getPipelinedCap(PipelineOp[] ops) { + // TODO avoid conversion to/from ArrayList? + var key = new ArrayList<>(Arrays.asList(ops)); + var hook = this.clientMap.computeIfAbsent(key, k -> { + switch (state) { + case WAITING: + if (redirectLater != null) { + // TODO implement redirect + assert false: "redirection not implemented"; + return null; + } + return new PipelineClient(question, ops); + case RESOLVED: + return resolved.getResults().getPipelinedCap(ops); + default: + return Capability.newBrokenCap(broken); + } + }); + return hook; + } } abstract class RpcClient implements ClientHook { @@ -566,13 +1062,130 @@ final class RpcState { @Override public Request newCall(long interfaceId, short methodId) { - return null; + return newCallNoIntercept(interfaceId, methodId); } @Override public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) { return null; } + + public VoidPromiseAndPipeline callNoIntercept(long interfaceId, short methodId, CallContextHook context) { + var params = context.getParams(); + var request = newCallNoIntercept(interfaceId, methodId); + var x = request.params; + context.allowCancellation(); + return context.directTailCall(request.hook); + } + + private Request newCallNoIntercept(long interfaceId, short methodId) { + if (isDisconnected()) { + return Request.newBrokenRequest(disconnected); + } + + var request = new RpcRequest(this); + var callBuilder = request.getCall(); + callBuilder.setInterfaceId(interfaceId); + callBuilder.setMethodId(methodId); + var root = request.getRoot(); + return Request.newTypelessRequest(root, request); + } + } + + class RpcRequest implements RequestHook { + + final RpcClient target; + final OutgoingRpcMessage message; + final BuilderCapabilityTable capTable = new BuilderCapabilityTable(); + final RpcProtocol.Call.Builder callBuilder; + final AnyPointer.Builder paramsBuilder; + + RpcRequest(RpcClient target) { + this.target = target; + this.message = connection.newOutgoingMessage(1024); + this.callBuilder = message.getBody().getAs(RpcProtocol.Message.factory).initCall(); + this.paramsBuilder = callBuilder.getParams().getContent(); + } + + AnyPointer.Builder getRoot() { + return this.paramsBuilder; + } + RpcProtocol.Call.Builder getCall() { + return this.callBuilder; + } + + @Override + public RemotePromise send() { + if (isDisconnected()) { + return new RemotePromise<>(CompletableFuture.failedFuture(disconnected), null); + } + + var redirect = this.target.writeTarget(this.callBuilder.getTarget()); + if (redirect != null) { + var replacement = redirect.newCall( + this.callBuilder.getInterfaceId(), this.callBuilder.getMethodId()); + replacement.params = paramsBuilder; + return replacement.hook.send(); + } + + var question = sendInternal(false); + + // The pipeline must get notified of resolution before the app does to maintain ordering. + var pipeline = new RpcPipeline(question, question.response); + + // drive the message loop until the question is answered + var appPromise = messageLoop(question.response).thenApply(response -> { + var results = response.getResults(); + return new Response(results, response); + }); + + return new RemotePromise<>(appPromise, pipeline); + } + + Question sendInternal(boolean isTailCall) { + // TODO refactor + var fds = List.of(); + var exports = writeDescriptors(capTable.getTable(), callBuilder.getParams(), fds); + message.setFds(fds); + var question = questions.next(); + question.isAwaitingReturn = true; + question.isTailCall = isTailCall; + question.paramExports = exports; + + callBuilder.setQuestionId(question.id); + if (isTailCall) { + callBuilder.getSendResultsTo().getYourself(); + } + try { + message.send(); + } catch (Exception exc) { + question.isAwaitingReturn = false; + question.skipFinish = true; + question.reject(exc); + } + return question; + } + + @Override + public Object getBrand() { + return RpcState.this; + } + + final class TailInfo { + int questionId; + CompletableFuture promise; + PipelineHook pipeline; + } + + TailInfo tailSend() { + if (isDisconnected()) { + // Disconnected; fall back to a regular send() which will fail appropriately. + return null; + } + + // TODO implement tail-calls + return null; + } } class ImportClient extends RpcClient { @@ -715,14 +1328,13 @@ final class RpcState { assert redirect == null; } - var embargo = new Embargo(); - var embargoId = embargos.next(embargo); - disembargo.getContext().setSenderLoopback(embargoId); + var embargo = embargos.next(); + disembargo.getContext().setSenderLoopback(embargo.id); - embargo.fulfiller = new CompletableFuture<>(); + embargo.disembargo = new CompletableFuture<>(); final ClientHook finalReplacement = replacement; - var embargoPromise = embargo.fulfiller.thenApply(x -> { + var embargoPromise = embargo.disembargo.thenApply(x -> { return finalReplacement; }); @@ -766,4 +1378,40 @@ final class RpcState { } } + class PipelineClient extends RpcClient { + + private final Question question; + private final PipelineOp[] ops; + + PipelineClient(Question question, PipelineOp[] ops) { + this.question = question; + this.ops = ops; + } + + @Override + public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) { + return null; + } + + @Override + public CompletableFuture whenMoreResolved() { + return null; + } + + @Override + public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List fds) { + var promisedAnswer = descriptor.initReceiverAnswer(); + promisedAnswer.setQuestionId(question.id); + PipelineOp.FromPipelineOps(ops, promisedAnswer); + return null; + } + + @Override + public ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target) { + var builder = target.initPromisedAnswer(); + builder.setQuestionId(question.id); + PipelineOp.FromPipelineOps(ops, builder); + return null; + } + } } diff --git a/runtime/src/main/java/org/capnproto/RpcSystem.java b/runtime/src/main/java/org/capnproto/RpcSystem.java index 60f5aab..d6bb290 100644 --- a/runtime/src/main/java/org/capnproto/RpcSystem.java +++ b/runtime/src/main/java/org/capnproto/RpcSystem.java @@ -1,4 +1,44 @@ package org.capnproto; -public class RpcSystem { +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public abstract class RpcSystem { + + final Network network; + final Capability.Client bootstrapInterface; + final Map connections = new HashMap<>(); + CompletableFuture acceptCompleted = CompletableFuture.completedFuture(null); + + public RpcSystem(Network network, Capability.Client bootstrapInterface) { + this.network = network; + this.bootstrapInterface = bootstrapInterface; + } + + public void accept(VatNetwork.Connection connection) { + getConnectionState(connection); + } + + synchronized RpcState getConnectionState(VatNetwork.Connection connection) { + return connections.computeIfAbsent(connection, key -> + new RpcState(key, bootstrapInterface)); + } + + public final CompletableFuture runOnce() { + var done = acceptLoop(); + for (var conn : connections.values()) { + done = CompletableFuture.anyOf(done, conn.runOnce()); + } + return done; + } + + + CompletableFuture acceptLoop() { + if (this.acceptCompleted.isDone()) { + CompletableFuture accepted = this.network.baseAccept(); + this.acceptCompleted = accepted.thenAccept(this::accept); + } + return this.acceptCompleted; + } } diff --git a/runtime/src/main/java/org/capnproto/TwoPartyClient.java b/runtime/src/main/java/org/capnproto/TwoPartyClient.java new file mode 100644 index 0000000..1707e03 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/TwoPartyClient.java @@ -0,0 +1,33 @@ +package org.capnproto; + +import java.nio.channels.AsynchronousByteChannel; + +public class TwoPartyClient { + + private final TwoPartyVatNetwork network; + private final TwoPartyRpcSystem rpcSystem; + + public TwoPartyClient(AsynchronousByteChannel channel) { + this(channel, null); + } + + public TwoPartyClient(AsynchronousByteChannel channel, Capability.Client bootstrapInterface) { + this(channel, bootstrapInterface, RpcTwoPartyProtocol.Side.CLIENT); + } + + public TwoPartyClient(AsynchronousByteChannel channel, + Capability.Client bootstrapInterface, + RpcTwoPartyProtocol.Side side) { + this.network = new TwoPartyVatNetwork(channel, side); + this.rpcSystem = new TwoPartyRpcSystem(network, bootstrapInterface); + } + + public Capability.Client bootstrap() { + var message = new MessageBuilder(); + var vatId = message.getRoot(RpcTwoPartyProtocol.VatId.factory); + vatId.setSide(network.getSide() == RpcTwoPartyProtocol.Side.CLIENT + ? RpcTwoPartyProtocol.Side.SERVER + : RpcTwoPartyProtocol.Side.CLIENT); + return rpcSystem.bootstrap(vatId.asReader()); + } +} diff --git a/runtime/src/main/java/org/capnproto/TwoPartyRpcSystem.java b/runtime/src/main/java/org/capnproto/TwoPartyRpcSystem.java index 1671843..d7996b1 100644 --- a/runtime/src/main/java/org/capnproto/TwoPartyRpcSystem.java +++ b/runtime/src/main/java/org/capnproto/TwoPartyRpcSystem.java @@ -1,4 +1,16 @@ package org.capnproto; -public class TwoPartyRpcSystem extends RpcSystem { +public class TwoPartyRpcSystem + extends RpcSystem { + + public TwoPartyRpcSystem(TwoPartyVatNetwork network, Capability.Client bootstrapInterface) { + super(network, bootstrapInterface); + } + + public Capability.Client bootstrap(RpcTwoPartyProtocol.VatId.Reader vatId) { + var connection = this.network.baseConnect(vatId); + var state = getConnectionState(connection); + var hook = state.restore(); + return new Capability.Client(hook); + } } diff --git a/runtime/src/main/java/org/capnproto/TwoPartyServer.java b/runtime/src/main/java/org/capnproto/TwoPartyServer.java new file mode 100644 index 0000000..8a27315 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/TwoPartyServer.java @@ -0,0 +1,64 @@ +package org.capnproto; + +import java.nio.channels.AsynchronousByteChannel; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class TwoPartyServer { + + private class AcceptedConnection { + final AsynchronousByteChannel channel; + final TwoPartyVatNetwork network; + final TwoPartyRpcSystem rpcSystem; + + AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousByteChannel channel) { + this.channel = channel; + this.network = new TwoPartyVatNetwork(channel, RpcTwoPartyProtocol.Side.SERVER); + this.rpcSystem = new TwoPartyRpcSystem(network, bootstrapInterface); + } + + public CompletableFuture runOnce() { + return this.rpcSystem.runOnce(); + } + } + + private final Capability.Client bootstrapInterface; + private final List connections = new ArrayList<>(); + private final List listeners = new ArrayList<>(); + + public TwoPartyServer(Capability.Client bootstrapInterface) { + this.bootstrapInterface = bootstrapInterface; + } + + private synchronized void accept(AsynchronousByteChannel channel) { + var connection = new AcceptedConnection(this.bootstrapInterface, channel); + this.connections.add(connection); + } + + public void listen(AsynchronousServerSocketChannel listener) { + listener.accept(null, new CompletionHandler() { + @Override + public void completed(AsynchronousSocketChannel channel, Object attachment) { + accept(channel); + listen(listener); + } + + @Override + public void failed(Throwable exc, Object attachment) { + listeners.remove(listener); + } + }); + } + + public synchronized CompletableFuture runOnce() { + var done = new CompletableFuture(); + for (var conn: connections) { + done = CompletableFuture.anyOf(done, conn.runOnce()); + } + return done; + } +} diff --git a/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java b/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java index 75d822a..3a20469 100644 --- a/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java +++ b/runtime/src/main/java/org/capnproto/TwoPartyVatNetwork.java @@ -3,13 +3,18 @@ package org.capnproto; import java.nio.channels.AsynchronousByteChannel; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; -public class TwoPartyVatNetwork implements VatNetwork, VatNetwork.Connection { +public class TwoPartyVatNetwork + implements VatNetwork, VatNetwork.Connection { private CompletableFuture writeCompleted = CompletableFuture.completedFuture(null); + private final Executor executor = Executors.newSingleThreadExecutor(); private final AsynchronousByteChannel channel; private final RpcTwoPartyProtocol.Side side; private final MessageBuilder peerVatId = new MessageBuilder(4); + private boolean accepted; public TwoPartyVatNetwork(AsynchronousByteChannel channel, RpcTwoPartyProtocol.Side side) { this.channel = channel; @@ -28,6 +33,24 @@ public class TwoPartyVatNetwork implements VatNetwork, VatNetwork.Connection { return peerVatId.getRoot(RpcTwoPartyProtocol.VatId.factory).asReader(); } + private Connection connect(RpcTwoPartyProtocol.VatId.Reader vatId) { + if (vatId.getSide() != side) { + return this; + } + return null; + } + + private CompletableFuture accept() { + if (side == RpcTwoPartyProtocol.Side.SERVER & !accepted) { + accepted = true; + return CompletableFuture.completedFuture(this); + } + else { + // never completes + return new CompletableFuture<>(); + } + } + @Override public OutgoingRpcMessage newOutgoingMessage(int firstSegmentWordSize) { return new OutgoingMessage(firstSegmentWordSize); @@ -40,6 +63,16 @@ public class TwoPartyVatNetwork implements VatNetwork, VatNetwork.Connection { }); } + @Override + public Connection baseConnect(RpcTwoPartyProtocol.VatId.Reader hostId) { + return this.connect(hostId); + } + + @Override + public CompletableFuture baseAccept() { + return this.accept().thenApply(conn -> conn); + } + final class OutgoingMessage implements OutgoingRpcMessage { final MessageBuilder message; diff --git a/runtime/src/main/java/org/capnproto/VatNetwork.java b/runtime/src/main/java/org/capnproto/VatNetwork.java index 16a4c21..96c2cf1 100644 --- a/runtime/src/main/java/org/capnproto/VatNetwork.java +++ b/runtime/src/main/java/org/capnproto/VatNetwork.java @@ -1,13 +1,17 @@ package org.capnproto; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; -public interface VatNetwork { +public interface VatNetwork { interface Connection { OutgoingRpcMessage newOutgoingMessage(int firstSegmentWordSize); CompletableFuture receiveIncomingMessage(); - } + } + + Connection baseConnect(VatId hostId); + CompletableFuture baseAccept(); } diff --git a/runtime/src/main/java/org/capnproto/WireHelpers.java b/runtime/src/main/java/org/capnproto/WireHelpers.java index 8600a6a..72a1e1e 100644 --- a/runtime/src/main/java/org/capnproto/WireHelpers.java +++ b/runtime/src/main/java/org/capnproto/WireHelpers.java @@ -1332,16 +1332,16 @@ final class WireHelpers { long ref = segment.get(refOffset); if (WirePointer.isNull(ref)) { - return ClientHook.newNullCap(); + return Capability.newNullCap(); } if (WirePointer.kind(ref) != WirePointer.OTHER) { - return ClientHook.newBrokenCap("Calling capability extracted from a non-capability pointer."); + return Capability.newBrokenCap("Calling capability extracted from a non-capability pointer."); } var cap = capTable.extractCap(WirePointer.upper32Bits(ref)); if (cap == null) { - return ClientHook.newBrokenCap("Calling invalid capability pointer."); + return Capability.newBrokenCap("Calling invalid capability pointer."); } return cap; } diff --git a/runtime/src/test/java/org/capnproto/RpcStateTest.java b/runtime/src/test/java/org/capnproto/RpcStateTest.java index 43faab3..4d5f2e9 100644 --- a/runtime/src/test/java/org/capnproto/RpcStateTest.java +++ b/runtime/src/test/java/org/capnproto/RpcStateTest.java @@ -8,6 +8,8 @@ import org.junit.Test; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; public class RpcStateTest { @@ -23,6 +25,7 @@ public class RpcStateTest { class TestConnection implements VatNetwork.Connection { + Executor executor = Executors.newSingleThreadExecutor(); @Override public OutgoingRpcMessage newOutgoingMessage(int firstSegmentWordSize) { var message = new MessageBuilder(); @@ -59,7 +62,7 @@ public class RpcStateTest { @Before public void setUp() throws Exception { connection = new TestConnection(); - bootstrapInterface = new Capability.Client(ClientHook.newNullCap()); + bootstrapInterface = new Capability.Client(Capability.newNullCap()); rpc = new RpcState(connection, bootstrapInterface); } diff --git a/runtime/src/test/java/org/capnproto/TwoPartyTest.java b/runtime/src/test/java/org/capnproto/TwoPartyTest.java new file mode 100644 index 0000000..2b00a39 --- /dev/null +++ b/runtime/src/test/java/org/capnproto/TwoPartyTest.java @@ -0,0 +1,216 @@ +package org.capnproto; + +import org.capnproto.demo.Demo; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.*; + +class TestCap0 { + + static final class Client extends org.capnproto.Capability.Client { + public Client() { super(); } + + public Client(ClientHook hook) { super(hook); } + + public Client(Capability.Client cap) { super(cap.getHook()); } + + public Client(Capability.Server server) { super(server); } + + public org.capnproto.Request testMethod0Request() { + return newCall(Demo.TestParams0.factory, Demo.TestResults0.factory, 0xa65f4a3d7f622e6bL, (short) 0); + } + + public org.capnproto.Request testMethod1Request() { + return newCall(Demo.TestParams1.factory, Demo.TestResults1.factory, 0xa65f4a3d7f622e6bL, (short) 1); + } + } + + static abstract class Server extends org.capnproto.Capability.Server { + + public class TestMethod0Context extends CallContext { + public TestMethod0Context(CallContextHook hook) { + super(Demo.TestParams0.factory, Demo.TestResults0.factory, hook); + } + } + + public class TestMethod1Context extends CallContext { + public TestMethod1Context(CallContextHook hook) { + super(Demo.TestParams1.factory, Demo.TestResults1.factory, hook); + } + } + + @Override + public DispatchCallResult dispatchCall(long interfaceId, short methodId, CallContext context) { + if (interfaceId == 0xa65f4a3d7f622e6bL) { + return dispatchCallInternal(methodId, context); + } + return internalUnimplemented(Demo.class.getName(), interfaceId); + } + + private DispatchCallResult dispatchCallInternal(short methodId, CallContext ctx) { + switch (methodId) { + case 0: + return new DispatchCallResult(testMethod0(new TestMethod0Context(ctx.getHook()))); + case 1: + return new DispatchCallResult(testMethod1(new TestMethod1Context(ctx.getHook()))); + default: + return internalUnimplemented(Demo.class.getName(), 0xa27d3c231c7b9202L, methodId); + } + } + + public CompletableFuture testMethod0(TestMethod0Context ctx) { + return CompletableFuture.failedFuture(RpcException.unimplemented("testMethod0")); + } + + public CompletableFuture testMethod1(TestMethod1Context ctx) { + return CompletableFuture.failedFuture(RpcException.unimplemented("testMethod1")); + } + } +} + +class TestCap1 { + + static final class Client extends org.capnproto.Capability.Client { + public Client() { super(); } + + public Client(ClientHook hook) { super(hook); } + + public Client(Capability.Client cap) { super(cap.getHook()); } + + public Client(Capability.Server server) { super(server); } + } + + static abstract class Server extends org.capnproto.Capability.Server { + + @Override + public DispatchCallResult dispatchCall(long interfaceId, short methodId, CallContext context) { + if (interfaceId == 0x81da3f8f6079c216L) { + return dispatchCallInternal(methodId, context); + } + return internalUnimplemented(Demo.class.getName(), interfaceId); + } + + private DispatchCallResult dispatchCallInternal(short methodId, CallContext ctx) { + switch (methodId) { + default: + return internalUnimplemented(Demo.class.getName(), 0x81da3f8f6079c216L, methodId); + } + } + } +} + +class TestCap0Impl extends TestCap0.Server { + + final TestCap1.Client testCap1 = new TestCap1.Client(new TestCap1Impl()); + + public CompletableFuture testMethod0(TestCap0.Server.TestMethod0Context ctx) { + var params = ctx.getParams(); + var results = ctx.getResults(); + results.setResult0(params.getParam0()); + return CompletableFuture.completedFuture(null); + } + + public CompletableFuture testMethod1(TestCap0.Server.TestMethod1Context ctx) { + var params = ctx.getParams(); + var results = ctx.getResults(); + var res0 = results.getResult0(); + res0.setAsCapability(testCap1); + return CompletableFuture.completedFuture(null); + } +} + +class TestCap1Impl extends TestCap1.Server { +} + +public class TwoPartyTest { + + AsynchronousServerSocketChannel serverSocket; + AsynchronousSocketChannel clientSocket; + TwoPartyClient client; + + @Before + public void setUp() throws Exception { + this.serverSocket = AsynchronousServerSocketChannel.open(); + this.serverSocket.bind(null); + + this.clientSocket = AsynchronousSocketChannel.open(); + this.clientSocket.connect(this.serverSocket.getLocalAddress()).get(); + + this.client = new TwoPartyClient(clientSocket); + } + + @After + public void tearDown() throws Exception { + this.clientSocket.close(); + this.serverSocket.close(); + this.client = null; + } + + @Test + public void testNullCap() { + + var server = new TwoPartyServer(new Capability.Client()); + server.listen(serverSocket); + var cap = this.client.bootstrap(); + var resolved = cap.whenResolved(); + resolved.join(); + } + + @Test + public void testBasic() throws ExecutionException, InterruptedException { + var capServer = new TestCap0Impl(); + var server = new TwoPartyServer(new TestCap0.Client(capServer)); + server.listen(serverSocket); + var demoClient = new TestCap0.Client(this.client.bootstrap()); + var request = demoClient.testMethod0Request(); + var params = request.params(); + params.setParam0(4321); + var resultsPromise = request.send(); + while (!resultsPromise.isDone()) { + CompletableFuture.anyOf(resultsPromise, server.runOnce()).join(); + } + Assert.assertTrue(resultsPromise.isDone()); + var results = resultsPromise.get(); + Assert.assertEquals(params.getParam0(), results.getResult0()); + } + + @Test + public void testReturnCap() throws ExecutionException, InterruptedException { + // send a capabilty back from the server to the client + var capServer = new TestCap0Impl(); + var server = new TwoPartyServer(new TestCap0.Client(capServer)); + server.listen(serverSocket); + var demoClient = new TestCap0.Client(this.client.bootstrap()); + var request = demoClient.testMethod1Request(); + var params = request.params(); + var resultsPromise = request.send(); + while (!resultsPromise.isDone()) { + CompletableFuture.anyOf(resultsPromise, server.runOnce()).join(); + } + Assert.assertTrue(resultsPromise.isDone()); + var results = resultsPromise.get(); + var any = results.getResult0(); + var cap1 = any.getAsCapability(); + } + + @Test + public void testLocalServer() throws ExecutionException, InterruptedException { + var demo = new TestCap0Impl(); + var client = new TestCap0.Client(demo); + var request = client.testMethod0Request(); + var params = request.params(); + params.setParam0(4321); + var results = request.send().get(); + Assert.assertEquals(params.getParam0(), results.getResult0()); + } +} \ No newline at end of file diff --git a/runtime/src/test/java/org/capnproto/demo/Demo.java b/runtime/src/test/java/org/capnproto/demo/Demo.java new file mode 100644 index 0000000..2b531dd --- /dev/null +++ b/runtime/src/test/java/org/capnproto/demo/Demo.java @@ -0,0 +1,371 @@ +// Generated by Cap'n Proto compiler, DO NOT EDIT +// source: demoparams.capnp + +package org.capnproto.demo; + +public final class Demo { + public static class TestParams0 { + public static final org.capnproto.StructSize STRUCT_SIZE = new org.capnproto.StructSize((short)1,(short)0); + public static final class Factory extends org.capnproto.StructFactory { + public Factory() { + } + public final Reader constructReader(org.capnproto.SegmentReader segment, int data,int pointers, int dataSize, short pointerCount, int nestingLimit) { + return new Reader(segment,data,pointers,dataSize,pointerCount,nestingLimit); + } + public final Builder constructBuilder(org.capnproto.SegmentBuilder segment, int data,int pointers, int dataSize, short pointerCount) { + return new Builder(segment, data, pointers, dataSize, pointerCount); + } + public final org.capnproto.StructSize structSize() { + return TestParams0.STRUCT_SIZE; + } + public final Reader asReader(Builder builder) { + return builder.asReader(); + } + } + public static final Factory factory = new Factory(); + public static final org.capnproto.StructList.Factory listFactory = + new org.capnproto.StructList.Factory(factory); + public static final class Builder extends org.capnproto.StructBuilder { + Builder(org.capnproto.SegmentBuilder segment, int data, int pointers,int dataSize, short pointerCount){ + super(segment, data, pointers, dataSize, pointerCount); + } + public final Reader asReader() { + return new Reader(segment, data, pointers, dataSize, pointerCount, 0x7fffffff); + } + public final int getParam0() { + return _getIntField(0); + } + public final void setParam0(int value) { + _setIntField(0, value); + } + + } + + public static final class Reader extends org.capnproto.StructReader { + Reader(org.capnproto.SegmentReader segment, int data, int pointers,int dataSize, short pointerCount, int nestingLimit){ + super(segment, data, pointers, dataSize, pointerCount, nestingLimit); + } + + public final int getParam0() { + return _getIntField(0); + } + + } + + } + + + public static class TestResults0 { + public static final org.capnproto.StructSize STRUCT_SIZE = new org.capnproto.StructSize((short)1,(short)0); + public static final class Factory extends org.capnproto.StructFactory { + public Factory() { + } + public final Reader constructReader(org.capnproto.SegmentReader segment, int data,int pointers, int dataSize, short pointerCount, int nestingLimit) { + return new Reader(segment,data,pointers,dataSize,pointerCount,nestingLimit); + } + public final Builder constructBuilder(org.capnproto.SegmentBuilder segment, int data,int pointers, int dataSize, short pointerCount) { + return new Builder(segment, data, pointers, dataSize, pointerCount); + } + public final org.capnproto.StructSize structSize() { + return TestResults0.STRUCT_SIZE; + } + public final Reader asReader(Builder builder) { + return builder.asReader(); + } + } + public static final Factory factory = new Factory(); + public static final org.capnproto.StructList.Factory listFactory = + new org.capnproto.StructList.Factory(factory); + public static final class Builder extends org.capnproto.StructBuilder { + Builder(org.capnproto.SegmentBuilder segment, int data, int pointers,int dataSize, short pointerCount){ + super(segment, data, pointers, dataSize, pointerCount); + } + public final Reader asReader() { + return new Reader(segment, data, pointers, dataSize, pointerCount, 0x7fffffff); + } + public final int getResult0() { + return _getIntField(0); + } + public final void setResult0(int value) { + _setIntField(0, value); + } + + } + + public static final class Reader extends org.capnproto.StructReader { + Reader(org.capnproto.SegmentReader segment, int data, int pointers,int dataSize, short pointerCount, int nestingLimit){ + super(segment, data, pointers, dataSize, pointerCount, nestingLimit); + } + + public final int getResult0() { + return _getIntField(0); + } + + } + + } + + + public static class TestParams1 { + public static final org.capnproto.StructSize STRUCT_SIZE = new org.capnproto.StructSize((short)0,(short)1); + public static final class Factory extends org.capnproto.StructFactory { + public Factory() { + } + public final Reader constructReader(org.capnproto.SegmentReader segment, int data,int pointers, int dataSize, short pointerCount, int nestingLimit) { + return new Reader(segment,data,pointers,dataSize,pointerCount,nestingLimit); + } + public final Builder constructBuilder(org.capnproto.SegmentBuilder segment, int data,int pointers, int dataSize, short pointerCount) { + return new Builder(segment, data, pointers, dataSize, pointerCount); + } + public final org.capnproto.StructSize structSize() { + return TestParams1.STRUCT_SIZE; + } + public final Reader asReader(Builder builder) { + return builder.asReader(); + } + } + public static final Factory factory = new Factory(); + public static final org.capnproto.StructList.Factory listFactory = + new org.capnproto.StructList.Factory(factory); + public static final class Builder extends org.capnproto.StructBuilder { + Builder(org.capnproto.SegmentBuilder segment, int data, int pointers,int dataSize, short pointerCount){ + super(segment, data, pointers, dataSize, pointerCount); + } + public final Reader asReader() { + return new Reader(segment, data, pointers, dataSize, pointerCount, 0x7fffffff); + } + public final boolean hasParam0() { + return !_pointerFieldIsNull(0); + } + public org.capnproto.AnyPointer.Builder getParam0() { + return _getPointerField(org.capnproto.AnyPointer.factory, 0); + } + public org.capnproto.AnyPointer.Builder initParam0() { + return _initPointerField(org.capnproto.AnyPointer.factory, 0, 0); + } + public org.capnproto.AnyPointer.Builder initParam0(int size) { + return _initPointerField(org.capnproto.AnyPointer.factory, 0, size); + } + + } + + public static final class Reader extends org.capnproto.StructReader { + Reader(org.capnproto.SegmentReader segment, int data, int pointers,int dataSize, short pointerCount, int nestingLimit){ + super(segment, data, pointers, dataSize, pointerCount, nestingLimit); + } + + public boolean hasParam0() { + return !_pointerFieldIsNull(0); + } + public org.capnproto.AnyPointer.Reader getParam0() { + return _getPointerField(org.capnproto.AnyPointer.factory, 0); + } + } + + } + + + public static class TestResults1 { + public static final org.capnproto.StructSize STRUCT_SIZE = new org.capnproto.StructSize((short)0,(short)1); + public static final class Factory extends org.capnproto.StructFactory { + public Factory() { + } + public final Reader constructReader(org.capnproto.SegmentReader segment, int data,int pointers, int dataSize, short pointerCount, int nestingLimit) { + return new Reader(segment,data,pointers,dataSize,pointerCount,nestingLimit); + } + public final Builder constructBuilder(org.capnproto.SegmentBuilder segment, int data,int pointers, int dataSize, short pointerCount) { + return new Builder(segment, data, pointers, dataSize, pointerCount); + } + public final org.capnproto.StructSize structSize() { + return TestResults1.STRUCT_SIZE; + } + public final Reader asReader(Builder builder) { + return builder.asReader(); + } + } + public static final Factory factory = new Factory(); + public static final org.capnproto.StructList.Factory listFactory = + new org.capnproto.StructList.Factory(factory); + public static final class Builder extends org.capnproto.StructBuilder { + Builder(org.capnproto.SegmentBuilder segment, int data, int pointers,int dataSize, short pointerCount){ + super(segment, data, pointers, dataSize, pointerCount); + } + public final Reader asReader() { + return new Reader(segment, data, pointers, dataSize, pointerCount, 0x7fffffff); + } + public final boolean hasResult0() { + return !_pointerFieldIsNull(0); + } + public org.capnproto.AnyPointer.Builder getResult0() { + return _getPointerField(org.capnproto.AnyPointer.factory, 0); + } + public org.capnproto.AnyPointer.Builder initResult0() { + return _initPointerField(org.capnproto.AnyPointer.factory, 0, 0); + } + public org.capnproto.AnyPointer.Builder initResult0(int size) { + return _initPointerField(org.capnproto.AnyPointer.factory, 0, size); + } + + } + + public static final class Reader extends org.capnproto.StructReader { + Reader(org.capnproto.SegmentReader segment, int data, int pointers,int dataSize, short pointerCount, int nestingLimit){ + super(segment, data, pointers, dataSize, pointerCount, nestingLimit); + } + + public boolean hasResult0() { + return !_pointerFieldIsNull(0); + } + public org.capnproto.AnyPointer.Reader getResult0() { + return _getPointerField(org.capnproto.AnyPointer.factory, 0); + } + } + + } + + + +public static final class Schemas { +public static final org.capnproto.SegmentReader b_b301b4acd180f012 = + org.capnproto.GeneratedClassSupport.decodeRawBytes( + "\u0000\u0000\u0000\u0000\u0005\u0000\u0006\u0000" + + "\u0012\u00f0\u0080\u00d1\u00ac\u00b4\u0001\u00b3" + + "\u0011\u0000\u0000\u0000\u0001\u0000\u0001\u0000" + + "\u00c4\u0053\u0042\u00d6\u0097\u0077\u00b5\u0091" + + "\u0000\u0000\u0007\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0015\u0000\u0000\u0000\u00ea\u0000\u0000\u0000" + + "\u0021\u0000\u0000\u0000\u0007\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u001d\u0000\u0000\u0000\u003f\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0064\u0065\u006d\u006f\u0070\u0061\u0072\u0061" + + "\u006d\u0073\u002e\u0063\u0061\u0070\u006e\u0070" + + "\u003a\u0054\u0065\u0073\u0074\u0050\u0061\u0072" + + "\u0061\u006d\u0073\u0030\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000" + + "\u0004\u0000\u0000\u0000\u0003\u0000\u0004\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0001\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\r\u0000\u0000\u0000\u003a\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0008\u0000\u0000\u0000\u0003\u0000\u0001\u0000" + + "\u0014\u0000\u0000\u0000\u0002\u0000\u0001\u0000" + + "\u0070\u0061\u0072\u0061\u006d\u0030\u0000\u0000" + + "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + ""); +public static final org.capnproto.SegmentReader b_96a42e5421b52881 = + org.capnproto.GeneratedClassSupport.decodeRawBytes( + "\u0000\u0000\u0000\u0000\u0005\u0000\u0006\u0000" + + "\u0081\u0028\u00b5\u0021\u0054\u002e\u00a4\u0096" + + "\u0011\u0000\u0000\u0000\u0001\u0000\u0001\u0000" + + "\u00c4\u0053\u0042\u00d6\u0097\u0077\u00b5\u0091" + + "\u0000\u0000\u0007\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0015\u0000\u0000\u0000\u00f2\u0000\u0000\u0000" + + "\u0021\u0000\u0000\u0000\u0007\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u001d\u0000\u0000\u0000\u003f\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0064\u0065\u006d\u006f\u0070\u0061\u0072\u0061" + + "\u006d\u0073\u002e\u0063\u0061\u0070\u006e\u0070" + + "\u003a\u0054\u0065\u0073\u0074\u0052\u0065\u0073" + + "\u0075\u006c\u0074\u0073\u0030\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000" + + "\u0004\u0000\u0000\u0000\u0003\u0000\u0004\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0001\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\r\u0000\u0000\u0000\u0042\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0008\u0000\u0000\u0000\u0003\u0000\u0001\u0000" + + "\u0014\u0000\u0000\u0000\u0002\u0000\u0001\u0000" + + "\u0072\u0065\u0073\u0075\u006c\u0074\u0030\u0000" + + "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + ""); +public static final org.capnproto.SegmentReader b_e10363a8b6220957 = + org.capnproto.GeneratedClassSupport.decodeRawBytes( + "\u0000\u0000\u0000\u0000\u0005\u0000\u0006\u0000" + + "\u0057\u0009\"\u00b6\u00a8\u0063\u0003\u00e1" + + "\u0011\u0000\u0000\u0000\u0001\u0000\u0000\u0000" + + "\u00c4\u0053\u0042\u00d6\u0097\u0077\u00b5\u0091" + + "\u0001\u0000\u0007\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0015\u0000\u0000\u0000\u00ea\u0000\u0000\u0000" + + "\u0021\u0000\u0000\u0000\u0007\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u001d\u0000\u0000\u0000\u003f\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0064\u0065\u006d\u006f\u0070\u0061\u0072\u0061" + + "\u006d\u0073\u002e\u0063\u0061\u0070\u006e\u0070" + + "\u003a\u0054\u0065\u0073\u0074\u0050\u0061\u0072" + + "\u0061\u006d\u0073\u0031\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000" + + "\u0004\u0000\u0000\u0000\u0003\u0000\u0004\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0001\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\r\u0000\u0000\u0000\u003a\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0008\u0000\u0000\u0000\u0003\u0000\u0001\u0000" + + "\u0014\u0000\u0000\u0000\u0002\u0000\u0001\u0000" + + "\u0070\u0061\u0072\u0061\u006d\u0030\u0000\u0000" + + "\u0012\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0012\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + ""); +public static final org.capnproto.SegmentReader b_99852ee4d45ddb3e = + org.capnproto.GeneratedClassSupport.decodeRawBytes( + "\u0000\u0000\u0000\u0000\u0005\u0000\u0006\u0000" + + "\u003e\u00db\u005d\u00d4\u00e4\u002e\u0085\u0099" + + "\u0011\u0000\u0000\u0000\u0001\u0000\u0000\u0000" + + "\u00c4\u0053\u0042\u00d6\u0097\u0077\u00b5\u0091" + + "\u0001\u0000\u0007\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0015\u0000\u0000\u0000\u00f2\u0000\u0000\u0000" + + "\u0021\u0000\u0000\u0000\u0007\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u001d\u0000\u0000\u0000\u003f\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0064\u0065\u006d\u006f\u0070\u0061\u0072\u0061" + + "\u006d\u0073\u002e\u0063\u0061\u0070\u006e\u0070" + + "\u003a\u0054\u0065\u0073\u0074\u0052\u0065\u0073" + + "\u0075\u006c\u0074\u0073\u0031\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000" + + "\u0004\u0000\u0000\u0000\u0003\u0000\u0004\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0001\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\r\u0000\u0000\u0000\u0042\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0008\u0000\u0000\u0000\u0003\u0000\u0001\u0000" + + "\u0014\u0000\u0000\u0000\u0002\u0000\u0001\u0000" + + "\u0072\u0065\u0073\u0075\u006c\u0074\u0030\u0000" + + "\u0012\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0012\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + + "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000" + ""); +} +} + diff --git a/runtime/src/test/java/org/capnproto/demo/democap.capnp b/runtime/src/test/java/org/capnproto/demo/democap.capnp new file mode 100644 index 0000000..a195446 --- /dev/null +++ b/runtime/src/test/java/org/capnproto/demo/democap.capnp @@ -0,0 +1,12 @@ +@0xf29f4ba3b0a5a945; + +using Params = import "demoparams.capnp"; + +interface TestCap0 { + testMethod0 @0 Params.TestParams0 -> Params.TestResults0; + testMethod1 @1 Params.TestParams1 -> Params.TestResults1; +} + +interface TestCap1 { +} + diff --git a/runtime/src/test/java/org/capnproto/demo/democap.capnp.c++ b/runtime/src/test/java/org/capnproto/demo/democap.capnp.c++ new file mode 100644 index 0000000..bcdec32 --- /dev/null +++ b/runtime/src/test/java/org/capnproto/demo/democap.capnp.c++ @@ -0,0 +1,190 @@ +// Generated by Cap'n Proto compiler, DO NOT EDIT +// source: democap.capnp + +#include "democap.capnp.h" + +namespace capnp { +namespace schemas { +static const ::capnp::_::AlignedData<40> b_a65f4a3d7f622e6b = { + { 0, 0, 0, 0, 5, 0, 6, 0, + 107, 46, 98, 127, 61, 74, 95, 166, + 14, 0, 0, 0, 3, 0, 0, 0, + 69, 169, 165, 176, 163, 75, 159, 242, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 21, 0, 0, 0, 186, 0, 0, 0, + 29, 0, 0, 0, 7, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 25, 0, 0, 0, 135, 0, 0, 0, + 113, 0, 0, 0, 7, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 100, 101, 109, 111, 99, 97, 112, 46, + 99, 97, 112, 110, 112, 58, 84, 101, + 115, 116, 67, 97, 112, 48, 0, 0, + 0, 0, 0, 0, 1, 0, 1, 0, + 8, 0, 0, 0, 3, 0, 5, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 18, 240, 128, 209, 172, 180, 1, 179, + 129, 40, 181, 33, 84, 46, 164, 150, + 49, 0, 0, 0, 98, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 41, 0, 0, 0, 7, 0, 0, 0, + 1, 0, 0, 0, 0, 0, 0, 0, + 87, 9, 34, 182, 168, 99, 3, 225, + 62, 219, 93, 212, 228, 46, 133, 153, + 29, 0, 0, 0, 98, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 21, 0, 0, 0, 7, 0, 0, 0, + 116, 101, 115, 116, 77, 101, 116, 104, + 111, 100, 48, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 1, 0, + 116, 101, 115, 116, 77, 101, 116, 104, + 111, 100, 49, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 1, 0, + 0, 0, 0, 0, 1, 0, 1, 0, } +}; +::capnp::word const* const bp_a65f4a3d7f622e6b = b_a65f4a3d7f622e6b.words; +#if !CAPNP_LITE +static const ::capnp::_::RawSchema* const d_a65f4a3d7f622e6b[] = { + &s_96a42e5421b52881, + &s_99852ee4d45ddb3e, + &s_b301b4acd180f012, + &s_e10363a8b6220957, +}; +static const uint16_t m_a65f4a3d7f622e6b[] = {0, 1}; +const ::capnp::_::RawSchema s_a65f4a3d7f622e6b = { + 0xa65f4a3d7f622e6b, b_a65f4a3d7f622e6b.words, 40, d_a65f4a3d7f622e6b, m_a65f4a3d7f622e6b, + 4, 2, nullptr, nullptr, nullptr, { &s_a65f4a3d7f622e6b, nullptr, nullptr, 0, 0, nullptr } +}; +#endif // !CAPNP_LITE +static const ::capnp::_::AlignedData<18> b_81da3f8f6079c216 = { + { 0, 0, 0, 0, 5, 0, 6, 0, + 22, 194, 121, 96, 143, 63, 218, 129, + 14, 0, 0, 0, 3, 0, 0, 0, + 69, 169, 165, 176, 163, 75, 159, 242, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 21, 0, 0, 0, 186, 0, 0, 0, + 29, 0, 0, 0, 7, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 25, 0, 0, 0, 7, 0, 0, 0, + 25, 0, 0, 0, 7, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 100, 101, 109, 111, 99, 97, 112, 46, + 99, 97, 112, 110, 112, 58, 84, 101, + 115, 116, 67, 97, 112, 49, 0, 0, + 0, 0, 0, 0, 1, 0, 1, 0, + 0, 0, 0, 0, 3, 0, 5, 0, + 0, 0, 0, 0, 1, 0, 1, 0, } +}; +::capnp::word const* const bp_81da3f8f6079c216 = b_81da3f8f6079c216.words; +#if !CAPNP_LITE +const ::capnp::_::RawSchema s_81da3f8f6079c216 = { + 0x81da3f8f6079c216, b_81da3f8f6079c216.words, 18, nullptr, nullptr, + 0, 0, nullptr, nullptr, nullptr, { &s_81da3f8f6079c216, nullptr, nullptr, 0, 0, nullptr } +}; +#endif // !CAPNP_LITE +} // namespace schemas +} // namespace capnp + +// ======================================================================================= + + +#if !CAPNP_LITE +::capnp::Request< ::TestParams0, ::TestResults0> +TestCap0::Client::testMethod0Request(::kj::Maybe< ::capnp::MessageSize> sizeHint) { + return newCall< ::TestParams0, ::TestResults0>( + 0xa65f4a3d7f622e6bull, 0, sizeHint); +} +::kj::Promise TestCap0::Server::testMethod0(TestMethod0Context) { + return ::capnp::Capability::Server::internalUnimplemented( + "democap.capnp:TestCap0", "testMethod0", + 0xa65f4a3d7f622e6bull, 0); +} +::capnp::Request< ::TestParams1, ::TestResults1> +TestCap0::Client::testMethod1Request(::kj::Maybe< ::capnp::MessageSize> sizeHint) { + return newCall< ::TestParams1, ::TestResults1>( + 0xa65f4a3d7f622e6bull, 1, sizeHint); +} +::kj::Promise TestCap0::Server::testMethod1(TestMethod1Context) { + return ::capnp::Capability::Server::internalUnimplemented( + "democap.capnp:TestCap0", "testMethod1", + 0xa65f4a3d7f622e6bull, 1); +} +::capnp::Capability::Server::DispatchCallResult TestCap0::Server::dispatchCall( + uint64_t interfaceId, uint16_t methodId, + ::capnp::CallContext< ::capnp::AnyPointer, ::capnp::AnyPointer> context) { + switch (interfaceId) { + case 0xa65f4a3d7f622e6bull: + return dispatchCallInternal(methodId, context); + default: + return internalUnimplemented("democap.capnp:TestCap0", interfaceId); + } +} +::capnp::Capability::Server::DispatchCallResult TestCap0::Server::dispatchCallInternal( + uint16_t methodId, + ::capnp::CallContext< ::capnp::AnyPointer, ::capnp::AnyPointer> context) { + switch (methodId) { + case 0: + return { + testMethod0(::capnp::Capability::Server::internalGetTypedContext< + ::TestParams0, ::TestResults0>(context)), + false + }; + case 1: + return { + testMethod1(::capnp::Capability::Server::internalGetTypedContext< + ::TestParams1, ::TestResults1>(context)), + false + }; + default: + (void)context; + return ::capnp::Capability::Server::internalUnimplemented( + "democap.capnp:TestCap0", + 0xa65f4a3d7f622e6bull, methodId); + } +} +#endif // !CAPNP_LITE + +// TestCap0 +#if !CAPNP_LITE +constexpr ::capnp::Kind TestCap0::_capnpPrivate::kind; +constexpr ::capnp::_::RawSchema const* TestCap0::_capnpPrivate::schema; +#endif // !CAPNP_LITE + +#if !CAPNP_LITE +::capnp::Capability::Server::DispatchCallResult TestCap1::Server::dispatchCall( + uint64_t interfaceId, uint16_t methodId, + ::capnp::CallContext< ::capnp::AnyPointer, ::capnp::AnyPointer> context) { + switch (interfaceId) { + case 0x81da3f8f6079c216ull: + return dispatchCallInternal(methodId, context); + default: + return internalUnimplemented("democap.capnp:TestCap1", interfaceId); + } +} +::capnp::Capability::Server::DispatchCallResult TestCap1::Server::dispatchCallInternal( + uint16_t methodId, + ::capnp::CallContext< ::capnp::AnyPointer, ::capnp::AnyPointer> context) { + switch (methodId) { + default: + (void)context; + return ::capnp::Capability::Server::internalUnimplemented( + "democap.capnp:TestCap1", + 0x81da3f8f6079c216ull, methodId); + } +} +#endif // !CAPNP_LITE + +// TestCap1 +#if !CAPNP_LITE +constexpr ::capnp::Kind TestCap1::_capnpPrivate::kind; +constexpr ::capnp::_::RawSchema const* TestCap1::_capnpPrivate::schema; +#endif // !CAPNP_LITE + + + diff --git a/runtime/src/test/java/org/capnproto/demo/democap.capnp.h b/runtime/src/test/java/org/capnproto/demo/democap.capnp.h new file mode 100644 index 0000000..b60b09e --- /dev/null +++ b/runtime/src/test/java/org/capnproto/demo/democap.capnp.h @@ -0,0 +1,216 @@ +// Generated by Cap'n Proto compiler, DO NOT EDIT +// source: democap.capnp + +#pragma once + +#include +#include +#if !CAPNP_LITE +#include +#endif // !CAPNP_LITE + +#if CAPNP_VERSION != 8000 +#error "Version mismatch between generated code and library headers. You must use the same version of the Cap'n Proto compiler and library." +#endif + +#include "demoparams.capnp.h" + +namespace capnp { +namespace schemas { + +CAPNP_DECLARE_SCHEMA(a65f4a3d7f622e6b); +CAPNP_DECLARE_SCHEMA(81da3f8f6079c216); + +} // namespace schemas +} // namespace capnp + + +struct TestCap0 { + TestCap0() = delete; + +#if !CAPNP_LITE + class Client; + class Server; +#endif // !CAPNP_LITE + + + #if !CAPNP_LITE + struct _capnpPrivate { + CAPNP_DECLARE_INTERFACE_HEADER(a65f4a3d7f622e6b) + static constexpr ::capnp::_::RawBrandedSchema const* brand() { return &schema->defaultBrand; } + }; + #endif // !CAPNP_LITE +}; + +struct TestCap1 { + TestCap1() = delete; + +#if !CAPNP_LITE + class Client; + class Server; +#endif // !CAPNP_LITE + + + #if !CAPNP_LITE + struct _capnpPrivate { + CAPNP_DECLARE_INTERFACE_HEADER(81da3f8f6079c216) + static constexpr ::capnp::_::RawBrandedSchema const* brand() { return &schema->defaultBrand; } + }; + #endif // !CAPNP_LITE +}; + +// ======================================================================================= + +#if !CAPNP_LITE +class TestCap0::Client + : public virtual ::capnp::Capability::Client { +public: + typedef TestCap0 Calls; + typedef TestCap0 Reads; + + Client(decltype(nullptr)); + explicit Client(::kj::Own< ::capnp::ClientHook>&& hook); + template ()>> + Client(::kj::Own<_t>&& server); + template ()>> + Client(::kj::Promise<_t>&& promise); + Client(::kj::Exception&& exception); + Client(Client&) = default; + Client(Client&&) = default; + Client& operator=(Client& other); + Client& operator=(Client&& other); + + ::capnp::Request< ::TestParams0, ::TestResults0> testMethod0Request( + ::kj::Maybe< ::capnp::MessageSize> sizeHint = nullptr); + ::capnp::Request< ::TestParams1, ::TestResults1> testMethod1Request( + ::kj::Maybe< ::capnp::MessageSize> sizeHint = nullptr); + +protected: + Client() = default; +}; + +class TestCap0::Server + : public virtual ::capnp::Capability::Server { +public: + typedef TestCap0 Serves; + + ::capnp::Capability::Server::DispatchCallResult dispatchCall( + uint64_t interfaceId, uint16_t methodId, + ::capnp::CallContext< ::capnp::AnyPointer, ::capnp::AnyPointer> context) + override; + +protected: + typedef ::capnp::CallContext< ::TestParams0, ::TestResults0> TestMethod0Context; + virtual ::kj::Promise testMethod0(TestMethod0Context context); + typedef ::capnp::CallContext< ::TestParams1, ::TestResults1> TestMethod1Context; + virtual ::kj::Promise testMethod1(TestMethod1Context context); + + inline ::TestCap0::Client thisCap() { + return ::capnp::Capability::Server::thisCap() + .template castAs< ::TestCap0>(); + } + + ::capnp::Capability::Server::DispatchCallResult dispatchCallInternal( + uint16_t methodId, + ::capnp::CallContext< ::capnp::AnyPointer, ::capnp::AnyPointer> context); +}; +#endif // !CAPNP_LITE + +#if !CAPNP_LITE +class TestCap1::Client + : public virtual ::capnp::Capability::Client { +public: + typedef TestCap1 Calls; + typedef TestCap1 Reads; + + Client(decltype(nullptr)); + explicit Client(::kj::Own< ::capnp::ClientHook>&& hook); + template ()>> + Client(::kj::Own<_t>&& server); + template ()>> + Client(::kj::Promise<_t>&& promise); + Client(::kj::Exception&& exception); + Client(Client&) = default; + Client(Client&&) = default; + Client& operator=(Client& other); + Client& operator=(Client&& other); + + +protected: + Client() = default; +}; + +class TestCap1::Server + : public virtual ::capnp::Capability::Server { +public: + typedef TestCap1 Serves; + + ::capnp::Capability::Server::DispatchCallResult dispatchCall( + uint64_t interfaceId, uint16_t methodId, + ::capnp::CallContext< ::capnp::AnyPointer, ::capnp::AnyPointer> context) + override; + +protected: + + inline ::TestCap1::Client thisCap() { + return ::capnp::Capability::Server::thisCap() + .template castAs< ::TestCap1>(); + } + + ::capnp::Capability::Server::DispatchCallResult dispatchCallInternal( + uint16_t methodId, + ::capnp::CallContext< ::capnp::AnyPointer, ::capnp::AnyPointer> context); +}; +#endif // !CAPNP_LITE + +// ======================================================================================= + +#if !CAPNP_LITE +inline TestCap0::Client::Client(decltype(nullptr)) + : ::capnp::Capability::Client(nullptr) {} +inline TestCap0::Client::Client( + ::kj::Own< ::capnp::ClientHook>&& hook) + : ::capnp::Capability::Client(::kj::mv(hook)) {} +template +inline TestCap0::Client::Client(::kj::Own<_t>&& server) + : ::capnp::Capability::Client(::kj::mv(server)) {} +template +inline TestCap0::Client::Client(::kj::Promise<_t>&& promise) + : ::capnp::Capability::Client(::kj::mv(promise)) {} +inline TestCap0::Client::Client(::kj::Exception&& exception) + : ::capnp::Capability::Client(::kj::mv(exception)) {} +inline ::TestCap0::Client& TestCap0::Client::operator=(Client& other) { + ::capnp::Capability::Client::operator=(other); + return *this; +} +inline ::TestCap0::Client& TestCap0::Client::operator=(Client&& other) { + ::capnp::Capability::Client::operator=(kj::mv(other)); + return *this; +} + +#endif // !CAPNP_LITE +#if !CAPNP_LITE +inline TestCap1::Client::Client(decltype(nullptr)) + : ::capnp::Capability::Client(nullptr) {} +inline TestCap1::Client::Client( + ::kj::Own< ::capnp::ClientHook>&& hook) + : ::capnp::Capability::Client(::kj::mv(hook)) {} +template +inline TestCap1::Client::Client(::kj::Own<_t>&& server) + : ::capnp::Capability::Client(::kj::mv(server)) {} +template +inline TestCap1::Client::Client(::kj::Promise<_t>&& promise) + : ::capnp::Capability::Client(::kj::mv(promise)) {} +inline TestCap1::Client::Client(::kj::Exception&& exception) + : ::capnp::Capability::Client(::kj::mv(exception)) {} +inline ::TestCap1::Client& TestCap1::Client::operator=(Client& other) { + ::capnp::Capability::Client::operator=(other); + return *this; +} +inline ::TestCap1::Client& TestCap1::Client::operator=(Client&& other) { + ::capnp::Capability::Client::operator=(kj::mv(other)); + return *this; +} + +#endif // !CAPNP_LITE + diff --git a/runtime/src/test/java/org/capnproto/demo/demoparams.capnp b/runtime/src/test/java/org/capnproto/demo/demoparams.capnp new file mode 100644 index 0000000..5c2cae4 --- /dev/null +++ b/runtime/src/test/java/org/capnproto/demo/demoparams.capnp @@ -0,0 +1,23 @@ +@0x91b57797d64253c4; + +using Java = import "/capnp/java.capnp"; +$Java.package("org.capnproto.demo"); +$Java.outerClassname("Demo"); + +struct TestParams0 { + param0 @0 :Int32; +} + +struct TestResults0 { + result0 @0 :Int32; +} + +struct TestParams1 { + param0 @0 :AnyPointer; +} + +struct TestResults1 { + result0 @0 :AnyPointer; +} + +