tidy up response and pipeline construction

This commit is contained in:
Vaci Koblizek 2020-11-30 17:47:23 +00:00
parent c66250f492
commit d02e460f06
4 changed files with 19 additions and 17 deletions

View file

@ -1177,7 +1177,7 @@ final class RpcState<VatId> {
} }
var pipeline = base.pipeline; var pipeline = base.pipeline;
if (pipeline == null) { if (pipeline == null) {
pipeline = PipelineHook.newBrokenPipeline( pipeline = Capability.newBrokenPipeline(
RpcException.failed("Pipeline call on a request that returned no capabilities or was already closed.")); RpcException.failed("Pipeline call on a request that returned no capabilities or was already closed."));
} }
var ops = ToPipelineOps(promisedAnswer); var ops = ToPipelineOps(promisedAnswer);
@ -1681,7 +1681,8 @@ final class RpcState<VatId> {
@Override @Override
public RemotePromise<AnyPointer.Reader> send() { public RemotePromise<AnyPointer.Reader> send() {
if (isDisconnected()) { if (isDisconnected()) {
return new RemotePromise<>(CompletableFuture.failedFuture(disconnected), null); return new RemotePromise<>(CompletableFuture.failedFuture(disconnected),
Capability.newBrokenPipeline(disconnected));
} }
var redirect = this.target.writeTarget(this.callBuilder.getTarget()); var redirect = this.target.writeTarget(this.callBuilder.getTarget());
@ -1700,7 +1701,7 @@ final class RpcState<VatId> {
var appPromise = questionRef.response.thenApply( var appPromise = questionRef.response.thenApply(
hook -> new Response<>(hook.getResults(), hook)); hook -> new Response<>(hook.getResults(), hook));
return new RemotePromise<>(appPromise, new AnyPointer.Pipeline(pipeline)); return new RemotePromise<>(appPromise, pipeline);
} }
QuestionRef sendInternal(boolean isTailCall) { QuestionRef sendInternal(boolean isTailCall) {

View file

@ -442,7 +442,7 @@ public final class Capability {
this.callRelease.complete(null); this.callRelease.complete(null);
assert promiseAndPipeline.pipeline != null; assert promiseAndPipeline.pipeline != null;
return new RemotePromise<>(promise, new AnyPointer.Pipeline(promiseAndPipeline.pipeline)); return new RemotePromise<>(promise, promiseAndPipeline.pipeline);
} }
@Override @Override
@ -545,7 +545,7 @@ public final class Capability {
public ClientHook.VoidPromiseAndPipeline directTailCall(RequestHook request) { public ClientHook.VoidPromiseAndPipeline directTailCall(RequestHook request) {
assert this.response == null : "Can't call tailCall() after initializing the results struct."; assert this.response == null : "Can't call tailCall() after initializing the results struct.";
var promise = request.send(); var promise = request.send();
var voidPromise = promise._getResponse().thenAccept(tailResponse -> { var voidPromise = promise.response.thenAccept(tailResponse -> {
this.response = tailResponse; this.response = tailResponse;
}); });
return new ClientHook.VoidPromiseAndPipeline(voidPromise, promise.pipeline().hook); return new ClientHook.VoidPromiseAndPipeline(voidPromise, promise.pipeline().hook);
@ -619,6 +619,10 @@ public final class Capability {
}; };
} }
static PipelineHook newBrokenPipeline(Throwable exc) {
return ops -> newBrokenCap(exc);
}
static Request<AnyPointer.Builder> newBrokenRequest(Throwable exc) { static Request<AnyPointer.Builder> newBrokenRequest(Throwable exc) {
var message = new MessageBuilder(); var message = new MessageBuilder();
@ -628,7 +632,7 @@ public final class Capability {
@Override @Override
public RemotePromise<AnyPointer.Reader> send() { public RemotePromise<AnyPointer.Reader> send() {
return new RemotePromise<>(CompletableFuture.failedFuture(exc), return new RemotePromise<>(CompletableFuture.failedFuture(exc),
new AnyPointer.Pipeline(PipelineHook.newBrokenPipeline(exc))); newBrokenPipeline(exc));
} }
@Override @Override
@ -665,7 +669,7 @@ public final class Capability {
@Override @Override
public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) { public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) {
return new VoidPromiseAndPipeline(CompletableFuture.failedFuture(exc), PipelineHook.newBrokenPipeline(exc)); return new VoidPromiseAndPipeline(CompletableFuture.failedFuture(exc), newBrokenPipeline(exc));
} }
@Override @Override
@ -696,7 +700,7 @@ public final class Capability {
this.promise = promise.whenComplete((pipeline, exc) -> { this.promise = promise.whenComplete((pipeline, exc) -> {
this.redirect = exc == null this.redirect = exc == null
? pipeline ? pipeline
: PipelineHook.newBrokenPipeline(exc); : newBrokenPipeline(exc);
}); });
} }

View file

@ -6,8 +6,4 @@ public interface PipelineHook {
default void cancel(Throwable exc) { default void cancel(Throwable exc) {
} }
static PipelineHook newBrokenPipeline(Throwable exc) {
return ops -> Capability.newBrokenCap(exc);
}
} }

View file

@ -6,7 +6,7 @@ public class RemotePromise<Results>
extends CompletableFutureWrapper<Results> extends CompletableFutureWrapper<Results>
implements AutoCloseable { implements AutoCloseable {
private final CompletableFuture<Response<Results>> response; final CompletableFuture<Response<Results>> response;
private final AnyPointer.Pipeline pipeline; private final AnyPointer.Pipeline pipeline;
public RemotePromise(FromPointerReader<Results> factory, public RemotePromise(FromPointerReader<Results> factory,
@ -19,6 +19,11 @@ public class RemotePromise<Results>
this.pipeline = other.pipeline; this.pipeline = other.pipeline;
} }
public RemotePromise(CompletableFuture<Response<Results>> promise,
PipelineHook pipeline) {
this(promise, new AnyPointer.Pipeline(pipeline));
}
public RemotePromise(CompletableFuture<Response<Results>> promise, public RemotePromise(CompletableFuture<Response<Results>> promise,
AnyPointer.Pipeline pipeline) { AnyPointer.Pipeline pipeline) {
super(promise.thenApply(Response::getResults)); super(promise.thenApply(Response::getResults));
@ -32,10 +37,6 @@ public class RemotePromise<Results>
this.join(); this.join();
} }
CompletableFuture<Response<Results>> _getResponse() {
return this.response;
}
public AnyPointer.Pipeline pipeline() { public AnyPointer.Pipeline pipeline() {
return this.pipeline; return this.pipeline;
} }