implement capability client and server

This commit is contained in:
Vaci Koblizek 2020-09-29 12:24:49 +01:00
parent 37c93cc5d2
commit f5e4630aef
5 changed files with 292 additions and 2 deletions

View file

@ -72,6 +72,19 @@ public final class AnyPointer {
public final <T> T getAs(FromPointerReader<T> factory) { public final <T> T getAs(FromPointerReader<T> factory) {
return factory.fromPointerReader(this.segment, this.capTable, this.pointer, this.nestingLimit); return factory.fromPointerReader(this.segment, this.capTable, this.pointer, this.nestingLimit);
} }
public final ClientHook getPipelinedCap(PipelineOp[] ops) {
for (var op: ops) {
switch (op.type) {
case NOOP:
break;
case GET_POINTER_FIELD:
break;
}
}
// TODO implement getPipelinedCap
return null;
}
} }
public static final class Builder { public static final class Builder {

View file

@ -0,0 +1,46 @@
package org.capnproto;
import java.util.concurrent.CompletableFuture;
public class CallContext<Params, Results> {
final CallContextHook hook;
final FromPointerReader<Params> params;
final FromPointerBuilder<Results> results;
CallContext(FromPointerReader<Params> params,
FromPointerBuilder<Results> results,
CallContextHook hook) {
this.hook = hook;
this.params = params;
this.results = results;
}
public final Params getParams() {
return hook.getParams().getAs(params);
}
public final void releaseParams() {
this.hook.releaseParams();
}
public final Results getResults() {
return this.hook.getResults().getAs(results);
}
public final Results initResults() {
return this.hook.getResults().initAs(results);
}
public final <SubParams> CompletableFuture<java.lang.Void> tailCall(Request<SubParams, Results> tailRequest) {
return hook.tailCall(tailRequest.hook);
}
public final void allowCancellation() {
this.hook.allowCancellation();
}
public final CallContextHook getHook() {
return this.hook;
}
}

View file

@ -8,9 +8,197 @@ public final class Capability {
final ClientHook hook; final ClientHook hook;
public Client() {
this.hook = null;
}
public Client(ClientHook hook) { public Client(ClientHook hook) {
this.hook = hook; this.hook = hook;
} }
public Client(Server server) {
this(server.makeLocalClient());
}
public Client(CompletableFuture<ClientHook> promise) {
this(Capability.newLocalPromiseClient(promise));
}
public Client(Throwable exc) {
this(ClientHook.newBrokenCap(exc));
}
public ClientHook getHook() {
return this.hook;
}
CompletableFuture<?> whenResolved() {
return hook.whenResolved();
}
Request<AnyPointer.Builder, AnyPointer.Reader> typelessRequest(
long interfaceId,
short methodId) {
return hook.newCall(interfaceId, methodId);
}
public <T, U> Request<T, U> newCall(FromPointerBuilder<T> builder,
FromPointerReader<U> reader,
long interfaceId, short methodId) {
var request = hook.newCall(interfaceId, methodId);
return new Request<T, U> (request.params, reader, request.hook);
}
public Request<AnyPointer.Builder, AnyPointer.Reader> newCall(long interfaceId, short methodId) {
return hook.newCall(interfaceId, methodId);
}
}
public abstract static class Server {
private static final Object BRAND = new Object();
ClientHook hook;
public ClientHook makeLocalClient() {
return new LocalClient();
}
private final class LocalClient implements ClientHook {
CompletableFuture<java.lang.Void> resolveTask;
ClientHook resolved;
boolean blocked = false;
Exception brokenException;
LocalClient() {
Server.this.hook = this;
startResolveTask();
}
@Override
public Request<AnyPointer.Builder, AnyPointer.Reader> newCall(long interfaceId, short methodId) {
var hook = new LocalRequest(interfaceId, methodId, this);
var root = hook.message.getRoot(AnyPointer.factory);
return new Request<>(root, AnyPointer.factory, hook);
}
@Override
public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook ctx) {
assert !blocked: "Blocked condition not implemented";
if (blocked) {
// TODO implement blocked behaviour
return null;
}
// TODO re-visit promises
var promise = callInternal(interfaceId, methodId, ctx);
var forked = promise.copy();
CompletableFuture<PipelineHook> pipelinePromise = promise.thenApply(x -> {
ctx.releaseParams();
return new LocalPipeline(ctx);
});
pipelinePromise = ctx.onTailCall().applyToEither(pipelinePromise, pipeline -> {
return pipeline;
});
return new VoidPromiseAndPipeline(forked, new QueuedPipeline(pipelinePromise));
}
@Override
public CompletableFuture<java.lang.Void> whenResolved() {
return null;
}
@Override
public Object getBrand() {
return BRAND;
}
CompletableFuture<java.lang.Void> callInternal(long interfaceId, short methodId, CallContextHook context) {
var result = dispatchCall(
interfaceId,
methodId,
new CallContext(AnyPointer.factory, AnyPointer.factory, context));
if (result.streaming) {
// TODO streaming
return null;
}
else {
return result.promise;
}
}
void startResolveTask() {
var resolver = Server.this.shortenPath();
if (resolver == null) {
return;
}
this.resolveTask = resolver.thenAccept(client -> {
this.resolved = client.getHook();
});
}
}
public final class DispatchCallResult {
private final CompletableFuture<java.lang.Void> promise;
private final boolean streaming;
public DispatchCallResult(CompletableFuture<java.lang.Void> promise) {
this.promise = promise;
this.streaming = false;
}
DispatchCallResult(Throwable exc) {
this.promise = CompletableFuture.failedFuture(exc);
this.streaming = false;
}
DispatchCallResult(CompletableFuture<java.lang.Void> promise, boolean isStreaming) {
this.promise = promise;
this.streaming = isStreaming;
}
public CompletableFuture<?> getPromise() {
return promise;
}
public boolean isStreaming() {
return streaming;
}
}
public CompletableFuture<Client> shortenPath() {
return null;
}
protected Client thisCap() {
return new Client(hook);
}
protected final <Params, Results> CallContext<Params, Results> internalGetTypedContext(
FromPointerReader<Params> paramsFactory,
FromPointerBuilder<Results> resultsFactory,
CallContext<AnyPointer.Reader, AnyPointer.Builder> typeless) {
return new CallContext<>(paramsFactory, resultsFactory, typeless.hook);
}
public abstract DispatchCallResult dispatchCall(long interfaceId, short methodId, CallContext<AnyPointer.Reader, AnyPointer.Builder> context);
protected DispatchCallResult internalUnimplemented(String actualInterfaceName, long requestedTypeId) {
return new DispatchCallResult(RpcException.unimplemented(
"Method not implemented. " + actualInterfaceName + " " + requestedTypeId));
}
protected DispatchCallResult internalUnimplemented(String interfaceName, long typeId, short methodId) {
return new DispatchCallResult(RpcException.unimplemented(
"Method not implemented. " + interfaceName + " " + typeId + " " + methodId));
}
protected DispatchCallResult internalUnimplemented(String interfaceName, String methodName, long typeId, short methodId) {
return new DispatchCallResult(RpcException.unimplemented(
"Method not implemented. " + interfaceName + " " + typeId + " " + methodName + " " + methodId));
}
} }
static ClientHook newLocalPromiseClient(CompletableFuture<ClientHook> promise) { static ClientHook newLocalPromiseClient(CompletableFuture<ClientHook> promise) {
@ -49,6 +237,21 @@ public final class Capability {
} }
} }
static final class LocalPipeline implements PipelineHook {
final CallContextHook context;
final AnyPointer.Reader results;
public LocalPipeline(CallContextHook context) {
this.context = context;
this.results = context.getResults().asReader();
}
@Override
public final ClientHook getPipelinedCap(PipelineOp[] ops) {
return this.results.getPipelinedCap(ops);
}
}
static class LocalResponse implements ResponseHook { static class LocalResponse implements ResponseHook {
final MessageBuilder message = new MessageBuilder(); final MessageBuilder message = new MessageBuilder();
} }

View file

@ -0,0 +1,28 @@
package org.capnproto;
import java.util.concurrent.CompletableFuture;
final class QueuedPipeline implements PipelineHook {
final CompletableFuture<PipelineHook> promise;
final CompletableFuture<Void> selfResolutionOp;
PipelineHook redirect;
public QueuedPipeline(CompletableFuture<PipelineHook> promiseParam) {
this.promise = promiseParam.copy();
this.selfResolutionOp = promise.handle((pipeline, exc) -> {
this.redirect = exc == null
? pipeline
: PipelineHook.newBrokenPipeline(exc);
return null;
});
}
@Override
public final ClientHook getPipelinedCap(PipelineOp[] ops) {
return redirect != null
? redirect.getPipelinedCap(ops)
: new QueuedClient(this.promise.thenApply(
pipeline -> pipeline.getPipelinedCap(ops)));
}
}

View file

@ -4,9 +4,9 @@ import java.util.concurrent.CompletableFuture;
public class Request<Params, Results> { public class Request<Params, Results> {
private final AnyPointer.Builder params; final AnyPointer.Builder params;
private final FromPointerReader<Results> results; private final FromPointerReader<Results> results;
private RequestHook hook; RequestHook hook;
Request(AnyPointer.Builder params, FromPointerReader<Results> results, RequestHook hook) { Request(AnyPointer.Builder params, FromPointerReader<Results> results, RequestHook hook) {
this.params = params; this.params = params;