diff --git a/runtime/src/main/java/org/capnproto/AnyPointer.java b/runtime/src/main/java/org/capnproto/AnyPointer.java index 64f4acc..9a43473 100644 --- a/runtime/src/main/java/org/capnproto/AnyPointer.java +++ b/runtime/src/main/java/org/capnproto/AnyPointer.java @@ -72,6 +72,19 @@ public final class AnyPointer { public final T getAs(FromPointerReader factory) { return factory.fromPointerReader(this.segment, this.capTable, this.pointer, this.nestingLimit); } + + public final ClientHook getPipelinedCap(PipelineOp[] ops) { + for (var op: ops) { + switch (op.type) { + case NOOP: + break; + case GET_POINTER_FIELD: + break; + } + } + // TODO implement getPipelinedCap + return null; + } } public static final class Builder { diff --git a/runtime/src/main/java/org/capnproto/CallContext.java b/runtime/src/main/java/org/capnproto/CallContext.java new file mode 100644 index 0000000..42b02f3 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/CallContext.java @@ -0,0 +1,46 @@ +package org.capnproto; + +import java.util.concurrent.CompletableFuture; + +public class CallContext { + + final CallContextHook hook; + final FromPointerReader params; + final FromPointerBuilder results; + + CallContext(FromPointerReader params, + FromPointerBuilder results, + CallContextHook hook) { + this.hook = hook; + this.params = params; + this.results = results; + } + + public final Params getParams() { + return hook.getParams().getAs(params); + } + + public final void releaseParams() { + this.hook.releaseParams(); + } + + public final Results getResults() { + return this.hook.getResults().getAs(results); + } + + public final Results initResults() { + return this.hook.getResults().initAs(results); + } + + public final CompletableFuture tailCall(Request tailRequest) { + return hook.tailCall(tailRequest.hook); + } + + public final void allowCancellation() { + this.hook.allowCancellation(); + } + + public final CallContextHook getHook() { + return this.hook; + } +} diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index 0718748..b38c3c4 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -8,9 +8,197 @@ public final class Capability { final ClientHook hook; + public Client() { + this.hook = null; + } + public Client(ClientHook hook) { this.hook = hook; } + + public Client(Server server) { + this(server.makeLocalClient()); + } + + public Client(CompletableFuture promise) { + this(Capability.newLocalPromiseClient(promise)); + } + + public Client(Throwable exc) { + this(ClientHook.newBrokenCap(exc)); + } + + public ClientHook getHook() { + return this.hook; + } + + CompletableFuture whenResolved() { + return hook.whenResolved(); + } + + Request typelessRequest( + long interfaceId, + short methodId) { + return hook.newCall(interfaceId, methodId); + } + + public Request newCall(FromPointerBuilder builder, + FromPointerReader reader, + long interfaceId, short methodId) { + var request = hook.newCall(interfaceId, methodId); + return new Request (request.params, reader, request.hook); + } + + public Request newCall(long interfaceId, short methodId) { + return hook.newCall(interfaceId, methodId); + } + } + + public abstract static class Server { + + private static final Object BRAND = new Object(); + ClientHook hook; + + public ClientHook makeLocalClient() { + return new LocalClient(); + } + + private final class LocalClient implements ClientHook { + + CompletableFuture resolveTask; + ClientHook resolved; + boolean blocked = false; + Exception brokenException; + + LocalClient() { + Server.this.hook = this; + startResolveTask(); + } + + @Override + public Request newCall(long interfaceId, short methodId) { + var hook = new LocalRequest(interfaceId, methodId, this); + var root = hook.message.getRoot(AnyPointer.factory); + return new Request<>(root, AnyPointer.factory, hook); + } + + @Override + public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) { + assert !blocked: "Blocked condition not implemented"; + if (blocked) { + // TODO implement blocked behaviour + return null; + } + + // TODO re-visit promises + var promise = callInternal(interfaceId, methodId, ctx); + var forked = promise.copy(); + + CompletableFuture pipelinePromise = promise.thenApply(x -> { + ctx.releaseParams(); + return new LocalPipeline(ctx); + }); + + pipelinePromise = ctx.onTailCall().applyToEither(pipelinePromise, pipeline -> { + return pipeline; + }); + + return new VoidPromiseAndPipeline(forked, new QueuedPipeline(pipelinePromise)); + } + + @Override + public CompletableFuture whenResolved() { + return null; + } + + @Override + public Object getBrand() { + return BRAND; + } + + CompletableFuture callInternal(long interfaceId, short methodId, CallContextHook context) { + var result = dispatchCall( + interfaceId, + methodId, + new CallContext(AnyPointer.factory, AnyPointer.factory, context)); + if (result.streaming) { + // TODO streaming + return null; + } + else { + return result.promise; + } + } + + void startResolveTask() { + var resolver = Server.this.shortenPath(); + if (resolver == null) { + return; + } + this.resolveTask = resolver.thenAccept(client -> { + this.resolved = client.getHook(); + }); + } + } + + public final class DispatchCallResult { + private final CompletableFuture promise; + private final boolean streaming; + + public DispatchCallResult(CompletableFuture promise) { + this.promise = promise; + this.streaming = false; + } + + DispatchCallResult(Throwable exc) { + this.promise = CompletableFuture.failedFuture(exc); + this.streaming = false; + } + + DispatchCallResult(CompletableFuture promise, boolean isStreaming) { + this.promise = promise; + this.streaming = isStreaming; + } + + public CompletableFuture getPromise() { + return promise; + } + + public boolean isStreaming() { + return streaming; + } + } + + public CompletableFuture shortenPath() { + return null; + } + + protected Client thisCap() { + return new Client(hook); + } + + protected final CallContext internalGetTypedContext( + FromPointerReader paramsFactory, + FromPointerBuilder resultsFactory, + CallContext typeless) { + return new CallContext<>(paramsFactory, resultsFactory, typeless.hook); + } + + public abstract DispatchCallResult dispatchCall(long interfaceId, short methodId, CallContext context); + + protected DispatchCallResult internalUnimplemented(String actualInterfaceName, long requestedTypeId) { + return new DispatchCallResult(RpcException.unimplemented( + "Method not implemented. " + actualInterfaceName + " " + requestedTypeId)); + } + protected DispatchCallResult internalUnimplemented(String interfaceName, long typeId, short methodId) { + return new DispatchCallResult(RpcException.unimplemented( + "Method not implemented. " + interfaceName + " " + typeId + " " + methodId)); + } + + protected DispatchCallResult internalUnimplemented(String interfaceName, String methodName, long typeId, short methodId) { + return new DispatchCallResult(RpcException.unimplemented( + "Method not implemented. " + interfaceName + " " + typeId + " " + methodName + " " + methodId)); + } } static ClientHook newLocalPromiseClient(CompletableFuture promise) { @@ -49,6 +237,21 @@ public final class Capability { } } + static final class LocalPipeline implements PipelineHook { + final CallContextHook context; + final AnyPointer.Reader results; + + public LocalPipeline(CallContextHook context) { + this.context = context; + this.results = context.getResults().asReader(); + } + + @Override + public final ClientHook getPipelinedCap(PipelineOp[] ops) { + return this.results.getPipelinedCap(ops); + } + } + static class LocalResponse implements ResponseHook { final MessageBuilder message = new MessageBuilder(); } diff --git a/runtime/src/main/java/org/capnproto/QueuedPipeline.java b/runtime/src/main/java/org/capnproto/QueuedPipeline.java new file mode 100644 index 0000000..8256563 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/QueuedPipeline.java @@ -0,0 +1,28 @@ +package org.capnproto; + +import java.util.concurrent.CompletableFuture; + +final class QueuedPipeline implements PipelineHook { + + final CompletableFuture promise; + final CompletableFuture selfResolutionOp; + PipelineHook redirect; + + public QueuedPipeline(CompletableFuture promiseParam) { + this.promise = promiseParam.copy(); + this.selfResolutionOp = promise.handle((pipeline, exc) -> { + this.redirect = exc == null + ? pipeline + : PipelineHook.newBrokenPipeline(exc); + return null; + }); + } + + @Override + public final ClientHook getPipelinedCap(PipelineOp[] ops) { + return redirect != null + ? redirect.getPipelinedCap(ops) + : new QueuedClient(this.promise.thenApply( + pipeline -> pipeline.getPipelinedCap(ops))); + } +} diff --git a/runtime/src/main/java/org/capnproto/Request.java b/runtime/src/main/java/org/capnproto/Request.java index 9a00f95..36cdc30 100644 --- a/runtime/src/main/java/org/capnproto/Request.java +++ b/runtime/src/main/java/org/capnproto/Request.java @@ -4,9 +4,9 @@ import java.util.concurrent.CompletableFuture; public class Request { - private final AnyPointer.Builder params; + final AnyPointer.Builder params; private final FromPointerReader results; - private RequestHook hook; + RequestHook hook; Request(AnyPointer.Builder params, FromPointerReader results, RequestHook hook) { this.params = params;