defer disembargo

This commit is contained in:
Vaci Koblizek 2020-10-08 15:36:40 +01:00
parent 2ffdecbe41
commit 1abc975b8b

View file

@ -1,8 +1,11 @@
package org.capnproto; package org.capnproto;
import java.util.*; import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.FutureTask;
import java.util.function.Function;
final class RpcState { final class RpcState {
@ -117,6 +120,18 @@ final class RpcState {
return !isDisconnected(); return !isDisconnected();
} }
// Run func() before the next IO event.
private <T> CompletableFuture<T> evalLast(Callable<T> func) {
return this.messageReady.thenCompose(x -> {
try {
return CompletableFuture.completedFuture(func.call());
}
catch (java.lang.Exception exc) {
return CompletableFuture.failedFuture(exc);
}
});
}
ClientHook restore() { ClientHook restore() {
var question = questions.next(); var question = questions.next();
question.isAwaitingReturn = true; question.isAwaitingReturn = true;
@ -276,7 +291,7 @@ final class RpcState {
var payload = ret.initResults(); var payload = ret.initResults();
var content = payload.getContent().imbue(capTable); var content = payload.getContent().imbue(capTable);
content.setAsCapability(bootstrapInterface); content.setAsCap(bootstrapInterface);
var capTableArray = capTable.getTable(); var capTableArray = capTable.getTable();
assert capTableArray.length != 0; assert capTableArray.length != 0;
@ -531,28 +546,31 @@ final class RpcState {
return; return;
} }
var embargoId = ctx.getSenderLoopback(); final var embargoId = ctx.getSenderLoopback();
final var rpcTarget = (RpcClient) target;
// TODO run this later... Callable<?> sendDisembargo = () -> {
if (isDisconnected()) { if (isDisconnected()) {
return; return null;
} }
var rpcTarget = (RpcClient) target; var message = connection.newOutgoingMessage(1024);
var message = connection.newOutgoingMessage(1024); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initDisembargo();
var builder = message.getBody().initAs(RpcProtocol.Message.factory).initDisembargo(); var redirect = rpcTarget.writeTarget(builder.initTarget());
var redirect = rpcTarget.writeTarget(builder.initTarget()); // Disembargoes should only be sent to capabilities that were previously the subject of
// Disembargoes should only be sent to capabilities that were previously the subject of // a `Resolve` message. But `writeTarget` only ever returns non-null when called on
// a `Resolve` message. But `writeTarget` only ever returns non-null when called on // a PromiseClient. The code which sends `Resolve` and `Return` should have replaced
// a PromiseClient. The code which sends `Resolve` and `Return` should have replaced // any promise with a direct node in order to solve the Tribble 4-way race condition.
// any promise with a direct node in order to solve the Tribble 4-way race condition. // See the documentation of Disembargo in rpc.capnp for more.
// See the documentation of Disembargo in rpc.capnp for more. if (redirect == null) {
if (redirect == null) { assert false : "'Disembargo' of type 'senderLoopback' sent to an object that does not appear to have been the subject of a previous 'Resolve' message.";
assert false: "'Disembargo' of type 'senderLoopback' sent to an object that does not appear to have been the subject of a previous 'Resolve' message."; return null;
return; }
} builder.getContext().setReceiverLoopback(embargoId);
builder.getContext().setReceiverLoopback(embargoId); message.send();
message.send(); return null;
};
evalLast(sendDisembargo);
break; break;
case RECEIVER_LOOPBACK: case RECEIVER_LOOPBACK:
@ -1274,12 +1292,18 @@ final class RpcState {
var appPromise = question.response.thenApply(response -> { var appPromise = question.response.thenApply(response -> {
var results = response.getResults(); var results = response.getResults();
return new Response(results, response); return new Response<>(AnyPointer.factory, results, response);
}); });
return new RemotePromise<>(appPromise, pipeline); return new RemotePromise<>(appPromise, pipeline);
} }
@Override
public CompletableFuture<?> sendStreaming() {
// TODO falling back to regular send for now...
return send().ignoreResult();
}
Question sendInternal(boolean isTailCall) { Question sendInternal(boolean isTailCall) {
// TODO refactor // TODO refactor
var fds = List.<Integer>of(); var fds = List.<Integer>of();