From b94f2d6c8ce1b8c4b347805237452bf0560b0341 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Tue, 6 Oct 2020 17:44:54 +0100 Subject: [PATCH] prep for generated code --- .../main/java/org/capnproto/AnyPointer.java | 9 ++- .../java/org/capnproto/CapTableReader.java | 2 +- .../main/java/org/capnproto/Capability.java | 73 ++++++++----------- .../org/capnproto/DispatchCallResult.java | 26 +++++++ 4 files changed, 66 insertions(+), 44 deletions(-) create mode 100644 runtime/src/main/java/org/capnproto/DispatchCallResult.java diff --git a/runtime/src/main/java/org/capnproto/AnyPointer.java b/runtime/src/main/java/org/capnproto/AnyPointer.java index 9e5d6c9..1fb76f6 100644 --- a/runtime/src/main/java/org/capnproto/AnyPointer.java +++ b/runtime/src/main/java/org/capnproto/AnyPointer.java @@ -69,7 +69,7 @@ public final class AnyPointer { return factory.fromPointerReader(this.segment, this.capTable, this.pointer, this.nestingLimit); } - public final Capability.Client getAsCapability() { + public final Capability.Client getAsCap() { return new Capability.Client( WireHelpers.readCapabilityPointer(this.segment, capTable, this.pointer, 0)); } @@ -129,7 +129,12 @@ public final class AnyPointer { factory.setPointerBuilder(this.segment, this.capTable, this.pointer, reader); } - public final void setAsCapability(Capability.Client cap) { + public final Capability.Client getAsCap() { + return new Capability.Client( + WireHelpers.readCapabilityPointer(this.segment, capTable, this.pointer, 0)); + } + + public final void setAsCap(Capability.Client cap) { WireHelpers.setCapabilityPointer(this.segment, capTable, this.pointer, cap.hook); } diff --git a/runtime/src/main/java/org/capnproto/CapTableReader.java b/runtime/src/main/java/org/capnproto/CapTableReader.java index 0b9ddd5..e8f5be5 100644 --- a/runtime/src/main/java/org/capnproto/CapTableReader.java +++ b/runtime/src/main/java/org/capnproto/CapTableReader.java @@ -1,5 +1,5 @@ package org.capnproto; -interface CapTableReader { +public interface CapTableReader { ClientHook extractCap(int index); } diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index 7762809..3087bda 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -4,11 +4,11 @@ import java.util.concurrent.CompletableFuture; public final class Capability { - static abstract class BuilderContext { + public static abstract class BuilderContext { CapTableBuilder capTable; } - static class ReaderContext { + public static class ReaderContext { CapTableReader capTable; } @@ -36,6 +36,10 @@ public final class Capability { this(newBrokenCap(exc)); } + public ClientHook getHook() { + return this.hook; + } + private static ClientHook makeLocalClient(Server server) { return server.makeLocalClient(); } @@ -127,12 +131,12 @@ public final class Capability { interfaceId, methodId, new CallContext<>(AnyPointer.factory, AnyPointer.factory, context)); - if (result.streaming) { + if (result.isStreaming()) { // TODO streaming return null; } else { - return result.promise; + return result.getPromise(); } } @@ -147,34 +151,6 @@ public final class Capability { } } - public static final class DispatchCallResult { - private final CompletableFuture promise; - private final boolean streaming; - - public DispatchCallResult(CompletableFuture promise) { - this.promise = promise; - this.streaming = false; - } - - DispatchCallResult(Throwable exc) { - this.promise = CompletableFuture.failedFuture(exc); - this.streaming = false; - } - - DispatchCallResult(CompletableFuture promise, boolean isStreaming) { - this.promise = promise; - this.streaming = isStreaming; - } - - public CompletableFuture getPromise() { - return promise; - } - - public boolean isStreaming() { - return streaming; - } - } - public CompletableFuture shortenPath() { return null; } @@ -183,28 +159,43 @@ public final class Capability { return new Client(this.hook); } - protected final CallContext internalGetTypedContext( + protected static CallContext typedContext( FromPointerReader paramsFactory, FromPointerBuilder resultsFactory, CallContext typeless) { return new CallContext<>(paramsFactory, resultsFactory, typeless.hook); } - public abstract DispatchCallResult dispatchCall(long interfaceId, short methodId, - CallContext context); + protected abstract DispatchCallResult dispatchCall( + long interfaceId, short methodId, + CallContext context); - protected DispatchCallResult internalUnimplemented(String actualInterfaceName, long requestedTypeId) { - return new DispatchCallResult(RpcException.unimplemented( + protected static DispatchCallResult streamResult(CompletableFuture result) { + // For streaming calls, we need to add an evalNow() here so that exceptions thrown + // directly from the call can propagate to later calls. If we don't capture the + // exception properly then the caller will never find out that this is a streaming + // call (indicated by the boolean in the return value) so won't know to propagate + // the exception. + // TODO the above comment... + return new DispatchCallResult(result, true); + } + + protected static DispatchCallResult result(CompletableFuture result) { + return new DispatchCallResult(result, false); + } + + protected static CompletableFuture internalUnimplemented(String actualInterfaceName, long requestedTypeId) { + return CompletableFuture.failedFuture(RpcException.unimplemented( "Method not implemented. " + actualInterfaceName + " " + requestedTypeId)); } - protected DispatchCallResult internalUnimplemented(String interfaceName, long typeId, short methodId) { - return new DispatchCallResult(RpcException.unimplemented( + protected static CompletableFuture internalUnimplemented(String interfaceName, long typeId, short methodId) { + return CompletableFuture.failedFuture(RpcException.unimplemented( "Method not implemented. " + interfaceName + " " + typeId + " " + methodId)); } - protected DispatchCallResult internalUnimplemented(String interfaceName, String methodName, long typeId, short methodId) { - return new DispatchCallResult(RpcException.unimplemented( + protected static CompletableFuture internalUnimplemented(String interfaceName, String methodName, long typeId, short methodId) { + return CompletableFuture.failedFuture(RpcException.unimplemented( "Method not implemented. " + interfaceName + " " + typeId + " " + methodName + " " + methodId)); } } diff --git a/runtime/src/main/java/org/capnproto/DispatchCallResult.java b/runtime/src/main/java/org/capnproto/DispatchCallResult.java new file mode 100644 index 0000000..de3fb4d --- /dev/null +++ b/runtime/src/main/java/org/capnproto/DispatchCallResult.java @@ -0,0 +1,26 @@ +package org.capnproto; + +import java.util.concurrent.CompletableFuture; + +public final class DispatchCallResult { + + private final CompletableFuture promise; + private final boolean streaming; + + public DispatchCallResult(CompletableFuture promise, boolean isStreaming) { + this.promise = promise; + this.streaming = isStreaming; + } + + public DispatchCallResult(Throwable exc) { + this(CompletableFuture.failedFuture(exc), false); + } + + public CompletableFuture getPromise() { + return promise; + } + + public boolean isStreaming() { + return streaming; + } +}