fix rpcsystem generic params, and hide various fields
This commit is contained in:
parent
c49221c2e9
commit
caec63d68c
5 changed files with 32 additions and 31 deletions
|
@ -32,7 +32,7 @@ public class CallContext<Params, Results> {
|
|||
return this.hook.getResults().initAs(results);
|
||||
}
|
||||
|
||||
public final <SubParams, Results> CompletableFuture<java.lang.Void> tailCall(Request<SubParams, Results> tailRequest) {
|
||||
public final <SubParams, Results> CompletableFuture<?> tailCall(Request<SubParams, Results> tailRequest) {
|
||||
return hook.tailCall(tailRequest.getHook());
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ public interface CallContextHook {
|
|||
|
||||
AnyPointer.Builder getResults();
|
||||
|
||||
CompletableFuture<java.lang.Void> tailCall(RequestHook request);
|
||||
CompletableFuture<?> tailCall(RequestHook request);
|
||||
|
||||
void allowCancellation();
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import java.util.concurrent.CompletionStage;
|
|||
|
||||
final class RpcState {
|
||||
|
||||
|
||||
final class Question {
|
||||
final int id;
|
||||
CompletableFuture<RpcResponse> response = new CompletableFuture<>();
|
||||
|
@ -976,7 +977,7 @@ final class RpcState {
|
|||
}
|
||||
}
|
||||
|
||||
private static class LocallyRedirectedRpcResponse implements RpcServerResponse, RpcResponse {
|
||||
private static final class LocallyRedirectedRpcResponse implements RpcServerResponse, RpcResponse {
|
||||
|
||||
private final MessageBuilder message = new MessageBuilder();
|
||||
|
||||
|
@ -991,30 +992,30 @@ final class RpcState {
|
|||
}
|
||||
}
|
||||
|
||||
class RpcCallContext implements CallContextHook {
|
||||
private final class RpcCallContext implements CallContextHook {
|
||||
|
||||
final int answerId;
|
||||
final long interfaceId;
|
||||
final short methodId;
|
||||
private final int answerId;
|
||||
private final long interfaceId;
|
||||
private final short methodId;
|
||||
|
||||
// request
|
||||
IncomingRpcMessage request;
|
||||
final AnyPointer.Reader params;
|
||||
private IncomingRpcMessage request;
|
||||
private final AnyPointer.Reader params;
|
||||
|
||||
// response
|
||||
RpcServerResponse response;
|
||||
RpcProtocol.Return.Builder returnMessage;
|
||||
boolean redirectResults = false;
|
||||
boolean responseSent = false;
|
||||
private RpcServerResponse response;
|
||||
private RpcProtocol.Return.Builder returnMessage;
|
||||
private boolean redirectResults = false;
|
||||
private boolean responseSent = false;
|
||||
|
||||
boolean cancelRequested = false;
|
||||
boolean cancelAllowed = false;
|
||||
private boolean cancelRequested = false;
|
||||
private boolean cancelAllowed = false;
|
||||
|
||||
final CompletableFuture<java.lang.Void> cancelled;
|
||||
private final CompletableFuture<java.lang.Void> whenCancelled;
|
||||
|
||||
RpcCallContext(int answerId, IncomingRpcMessage request, List<ClientHook> capTable,
|
||||
AnyPointer.Reader params, boolean redirectResults,
|
||||
CompletableFuture<java.lang.Void> cancelled,
|
||||
CompletableFuture<java.lang.Void> whenCancelled,
|
||||
long interfaceId, short methodId) {
|
||||
this.answerId = answerId;
|
||||
this.interfaceId = interfaceId;
|
||||
|
@ -1022,7 +1023,7 @@ final class RpcState {
|
|||
this.request = request;
|
||||
this.params = params.imbue(new ReaderCapabilityTable(capTable));
|
||||
this.redirectResults = redirectResults;
|
||||
this.cancelled = cancelled;
|
||||
this.whenCancelled = whenCancelled;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1037,23 +1038,23 @@ final class RpcState {
|
|||
|
||||
@Override
|
||||
public AnyPointer.Builder getResults() {
|
||||
if (response == null) {
|
||||
if (this.response == null) {
|
||||
|
||||
if (redirectResults || isDisconnected()) {
|
||||
response = new LocallyRedirectedRpcResponse();
|
||||
if (this.redirectResults || isDisconnected()) {
|
||||
this.response = new LocallyRedirectedRpcResponse();
|
||||
}
|
||||
else {
|
||||
var message = connection.newOutgoingMessage(1024);
|
||||
returnMessage = message.getBody().initAs(RpcProtocol.Message.factory).initReturn();
|
||||
response = new RpcServerResponseImpl(message, returnMessage.getResults());
|
||||
this.returnMessage = message.getBody().initAs(RpcProtocol.Message.factory).initReturn();
|
||||
this.response = new RpcServerResponseImpl(message, returnMessage.getResults());
|
||||
}
|
||||
}
|
||||
|
||||
return response.getResultsBuilder();
|
||||
return this.response.getResultsBuilder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<java.lang.Void> tailCall(RequestHook request) {
|
||||
public CompletableFuture<?> tailCall(RequestHook request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1167,7 +1168,7 @@ final class RpcState {
|
|||
if (previouslyAllowedButNotRequested) {
|
||||
// We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously. Initiate
|
||||
// the cancellation.
|
||||
this.cancelled.complete(null);
|
||||
this.whenCancelled.complete(null);
|
||||
}
|
||||
// TODO do we care about cancelRequested if further completions are effectively ignored?
|
||||
}
|
||||
|
|
|
@ -4,14 +4,14 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public abstract class RpcSystem<Network extends VatNetwork> {
|
||||
public abstract class RpcSystem<VatId> {
|
||||
|
||||
final Network network;
|
||||
final VatNetwork<VatId> network;
|
||||
final Capability.Client bootstrapInterface;
|
||||
final Map<VatNetwork.Connection, RpcState> connections = new HashMap<>();
|
||||
CompletableFuture<?> acceptCompleted = CompletableFuture.completedFuture(null);
|
||||
|
||||
public RpcSystem(Network network, Capability.Client bootstrapInterface) {
|
||||
public RpcSystem(VatNetwork<VatId> network, Capability.Client bootstrapInterface) {
|
||||
this.network = network;
|
||||
this.bootstrapInterface = bootstrapInterface;
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ public abstract class RpcSystem<Network extends VatNetwork> {
|
|||
|
||||
CompletableFuture<?> acceptLoop() {
|
||||
if (this.acceptCompleted.isDone()) {
|
||||
CompletableFuture<VatNetwork.Connection> accepted = this.network.baseAccept();
|
||||
var accepted = this.network.baseAccept();
|
||||
this.acceptCompleted = accepted.thenAccept(this::accept);
|
||||
}
|
||||
return this.acceptCompleted;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package org.capnproto;
|
||||
|
||||
public class TwoPartyRpcSystem
|
||||
extends RpcSystem<TwoPartyVatNetwork> {
|
||||
extends RpcSystem<RpcTwoPartyProtocol.VatId.Reader> {
|
||||
|
||||
public TwoPartyRpcSystem(TwoPartyVatNetwork network, Capability.Client bootstrapInterface) {
|
||||
super(network, bootstrapInterface);
|
||||
|
|
Loading…
Reference in a new issue