improve question lifecycle handling

A specialised export table was a bad idea.
Stick more closely to C++ implentation of QuestionRef.
This commit is contained in:
Vaci Koblizek 2020-11-16 10:39:05 +00:00
parent ad17a4c148
commit c2423d453e
2 changed files with 129 additions and 136 deletions

View file

@ -11,7 +11,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
final class RpcState<VatId> {
@ -46,14 +45,20 @@ final class RpcState<VatId> {
}
}
private final class QuestionDisposer {
private final class Question {
final int id;
boolean skipFinish;
boolean isAwaitingReturn;
int[] paramExports = new int[0];
boolean isTailCall = false;
QuestionRef selfRef;
private final WeakReference<QuestionRef> disposer;
QuestionDisposer(int id) {
Question(int id) {
this.id = id;
this.selfRef = new QuestionRef(this.id);;
this.disposer = new QuestionDisposer(this.selfRef);
}
void finish() {
@ -66,109 +71,62 @@ final class RpcState<VatId> {
message.send();
}
this.skipFinish = true;
}
void dispose() {
this.finish();
// Check if the question has returned and, if so, remove it from the table.
// Remove question ID from the table. Must do this *after* sending `Finish` to ensure that
// the ID is not re-allocated before the `Finish` message can be sent.
if (!this.isAwaitingReturn) {
questions.erase(this.id);
questions.erase(this.id, this);
}
}
}
private final class QuestionRef extends WeakReference<Question> {
private final QuestionDisposer disposer;
/**
* A reference to an entry on the question table.
*/
private final class QuestionRef {
QuestionRef(Question question, ReferenceQueue<Question> queue) {
super(question, queue);
this.disposer = question.disposer;
private final int questionId;
CompletableFuture<RpcResponse> response = new CompletableFuture<>();
QuestionRef(int questionId) {
this.questionId = questionId;
}
void fulfill(Throwable exc) {
this.response.completeExceptionally(exc);
this.finish();
}
void fulfill(RpcResponse response) {
this.response.complete(response);
this.finish();
}
private void finish() {
// We no longer need access to the questionRef in order to complete it.
// Dropping the selfRef releases the question for disposal once all other
// references are gone.
var question = questions.find(this.questionId);
if (question != null) {
question.selfRef = null;
}
}
}
private final class QuestionDisposer extends WeakReference<QuestionRef> {
private final int questionId;
QuestionDisposer(QuestionRef questionRef) {
super(questionRef, questionRefs);
this.questionId = questionRef.questionId;
}
void dispose() {
this.disposer.dispose();
}
}
private class Question {
CompletableFuture<RpcResponse> response = new CompletableFuture<>();
int[] paramExports = new int[0];
private final QuestionDisposer disposer;
boolean isTailCall = false;
Question(int id) {
this.disposer = new QuestionDisposer(id);
}
int getId() {
return this.disposer.id;
}
boolean isAwaitingReturn() {
return this.disposer.isAwaitingReturn;
}
public void setAwaitingReturn(boolean value) {
this.disposer.isAwaitingReturn = value;
}
void reject(Throwable exc) {
this.response.completeExceptionally(exc);
}
void answer(RpcResponse response) {
this.response.complete(response);
}
void setSkipFinish(boolean value) {
this.disposer.skipFinish = value;
}
public void finish() {
this.disposer.finish();
}
}
class QuestionExportTable {
private final HashMap<Integer, WeakReference<Question>> slots = new HashMap<>();
private final Queue<Integer> freeIds = new PriorityQueue<>();
private int max = 0;
public Question find(int id) {
var ref = this.slots.get(id);
return ref == null ? null : ref.get();
}
public Question erase(int id) {
var value = this.slots.get(id);
if (value != null) {
freeIds.add(id);
this.slots.remove(id);
return value.get();
} else {
return null;
}
}
public Question next() {
int id = freeIds.isEmpty() ? max++ : freeIds.remove();
var value = new Question(id);
var prev = slots.put(id, new QuestionRef(value, questionRefs));
assert prev == null;
return value;
}
public void forEach(Consumer<? super Question> action) {
for (var entry: this.slots.values()) {
var question = entry.get();
var question = questions.find(this.questionId);
if (question != null) {
action.accept(question);
}
question.finish();
}
}
}
@ -247,7 +205,12 @@ final class RpcState<VatId> {
}
};
private final QuestionExportTable questions = new QuestionExportTable();
private final ExportTable<Question> questions = new ExportTable<>() {
@Override
Question newExportable(int id) {
return new Question(id);
}
};
private final ImportTable<Answer> answers = new ImportTable<>() {
@Override
@ -278,7 +241,7 @@ final class RpcState<VatId> {
private CompletableFuture<java.lang.Void> messageReady = CompletableFuture.completedFuture(null);
private final CompletableFuture<java.lang.Void> messageLoop = new CompletableFuture<>();
// completes when the message loop exits
private final ReferenceQueue<Question> questionRefs = new ReferenceQueue<>();
private final ReferenceQueue<QuestionRef> questionRefs = new ReferenceQueue<>();
private final ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>();
RpcState(BootstrapFactory<VatId> bootstrapFactory,
@ -303,7 +266,12 @@ final class RpcState<VatId> {
var networkExc = RpcException.disconnected(exc.getMessage());
// All current questions complete with exceptions.
questions.forEach(question -> question.reject(networkExc));
for (var question: questions) {
var questionRef = question.selfRef;
if (questionRef != null) {
questionRef.fulfill(networkExc);
}
}
List<PipelineHook> pipelinesToRelease = new ArrayList<>();
List<ClientHook> clientsToRelease = new ArrayList<>();
@ -406,14 +374,17 @@ final class RpcState<VatId> {
ClientHook restore() {
var question = questions.next();
question.setAwaitingReturn(true);
question.isAwaitingReturn = true;
var questionRef = question.selfRef;
var promise = new CompletableFuture<RpcResponse>();
var pipeline = new RpcPipeline(questionRef, promise);
int sizeHint = messageSizeHint(RpcProtocol.Bootstrap.factory);
var message = connection.newOutgoingMessage(sizeHint);
var builder = message.getBody().initAs(RpcProtocol.Message.factory).initBootstrap();
builder.setQuestionId(question.getId());
builder.setQuestionId(question.id);
message.send();
var pipeline = new RpcPipeline(question, promise);
return pipeline.getPipelinedCap(new PipelineOp[0]);
}
@ -440,7 +411,8 @@ final class RpcState<VatId> {
});
messageReader.thenRunAsync(this::startMessageLoop).exceptionallyCompose(exc -> {
assert exc == null: "Exception in startMessageLoop!";
//System.out.println("Exception in startMessageLoop!");
//exc.printStackTrace();
return CompletableFuture.failedFuture(exc);
});
}
@ -660,11 +632,11 @@ final class RpcState<VatId> {
return;
}
if (!question.isAwaitingReturn()) {
if (!question.isAwaitingReturn) {
assert false: "Duplicate Return";
return;
}
question.setAwaitingReturn(false);
question.isAwaitingReturn = false;
int[] exportsToRelease = null;
if (callReturn.getReleaseParamCaps()) {
@ -672,11 +644,21 @@ final class RpcState<VatId> {
question.paramExports = null;
}
var questionRef = question.selfRef;
if (questionRef == null) {
if (callReturn.isTakeFromOtherQuestion()) {
var answer = this.answers.find(callReturn.getTakeFromOtherQuestion());
if (answer != null) {
answer.redirectedResults = null;
}
}
// Looks like this question was canceled earlier, so `Finish` was already sent, with
// `releaseResultCaps` set true so that we don't have to release them here. We can go
// ahead and delete it from the table.
// TODO Should we do this?
questions.erase(callReturn.getAnswerId(), question);
if (exportsToRelease != null) {
this.releaseExports(exportsToRelease);
}
@ -692,8 +674,8 @@ final class RpcState<VatId> {
var payload = callReturn.getResults();
var capTable = receiveCaps(payload.getCapTable(), message.getAttachedFds());
var response = new RpcResponseImpl(capTable, payload.getContent());
question.answer(response);
var response = new RpcResponseImpl(questionRef, message, capTable, payload.getContent());
questionRef.fulfill(response);
break;
case EXCEPTION:
@ -701,7 +683,7 @@ final class RpcState<VatId> {
assert false: "Tail call `Return` must set `resultsSentElsewhere`, not `exception`.";
break;
}
question.reject(ToException(callReturn.getException()));
questionRef.fulfill(ToException(callReturn.getException()));
break;
case CANCELED:
@ -714,7 +696,7 @@ final class RpcState<VatId> {
break;
}
// Tail calls are fulfilled with a null pointer.
question.answer(() -> null);
questionRef.fulfill(() -> null);
break;
case TAKE_FROM_OTHER_QUESTION:
@ -728,7 +710,7 @@ final class RpcState<VatId> {
assert false: "`Return.takeFromOtherQuestion` referenced a call that did not use `sendResultsTo.yourself`.";
break;
}
question.response = answer.redirectedResults;
questionRef.response = answer.redirectedResults;
answer.redirectedResults = null;
break;
@ -1225,10 +1207,16 @@ final class RpcState<VatId> {
class RpcResponseImpl implements RpcResponse {
private final IncomingRpcMessage message;
private final QuestionRef questionRef;
private final AnyPointer.Reader results;
RpcResponseImpl(List<ClientHook> capTable,
RpcResponseImpl(QuestionRef questionRef,
IncomingRpcMessage message,
List<ClientHook> capTable,
AnyPointer.Reader results) {
this.questionRef = questionRef;
this.message = message;
this.results = results.imbue(new ReaderCapabilityTable(capTable));
}
@ -1521,20 +1509,20 @@ final class RpcState<VatId> {
private class RpcPipeline implements PipelineHook {
private final Question question;
private final QuestionRef questionRef;
final HashMap<List<PipelineOp>, ClientHook> clientMap = new HashMap<>();
final CompletableFuture<RpcResponse> redirectLater;
RpcPipeline(Question question,
RpcPipeline(QuestionRef questionRef,
CompletableFuture<RpcResponse> redirectLater) {
this.question = question;
this.questionRef = questionRef;
assert redirectLater != null;
this.redirectLater = redirectLater;
}
RpcPipeline(Question question) {
this(question, null);
RpcPipeline(QuestionRef questionRef) {
this(questionRef, null);
// never resolves
}
@ -1549,7 +1537,7 @@ final class RpcState<VatId> {
// TODO avoid conversion to/from ArrayList?
var key = new ArrayList<>(Arrays.asList(ops));
return this.clientMap.computeIfAbsent(key, k -> {
var pipelineClient = new PipelineClient(this.question, ops);
var pipelineClient = new PipelineClient(this.questionRef, ops);
if (this.redirectLater == null) {
// This pipeline will never get redirected, so just return the PipelineClient.
return pipelineClient;
@ -1563,7 +1551,7 @@ final class RpcState<VatId> {
@Override
public void cancel(Throwable exc) {
this.question.reject(exc);
this.questionRef.fulfill(exc);
}
}
@ -1658,12 +1646,12 @@ final class RpcState<VatId> {
return replacement.send();
}
final var question = sendInternal(false);
final var questionRef = sendInternal(false);
// The pipeline must get notified of resolution before the app does to maintain ordering.
var pipeline = new RpcPipeline(question, question.response);
var pipeline = new RpcPipeline(questionRef, questionRef.response);
var appPromise = question.response.thenApply(
var appPromise = questionRef.response.thenApply(
hook -> new Response<>(hook.getResults(), hook));
return new RemotePromise<>(appPromise, new AnyPointer.Pipeline(pipeline));
@ -1675,28 +1663,30 @@ final class RpcState<VatId> {
return send();
}
Question sendInternal(boolean isTailCall) {
QuestionRef sendInternal(boolean isTailCall) {
// TODO refactor
var fds = List.<Integer>of();
var exports = writeDescriptors(capTable.getTable(), callBuilder.getParams(), fds);
message.setFds(fds);
var question = questions.next();
question.setAwaitingReturn(true);
question.isAwaitingReturn = true;
question.isTailCall = isTailCall;
question.paramExports = exports;
callBuilder.setQuestionId(question.getId());
var questionRef = question.selfRef;
callBuilder.setQuestionId(question.id);
if (isTailCall) {
callBuilder.getSendResultsTo().getYourself();
}
try {
message.send();
} catch (Exception exc) {
question.setAwaitingReturn(false);
question.setSkipFinish(true);
question.reject(exc);
question.isAwaitingReturn = false;
question.skipFinish = true;
questionRef.fulfill(exc);
}
return question;
return questionRef;
}
@Override
@ -1781,11 +1771,11 @@ final class RpcState<VatId> {
private void cleanupQuestions() {
while (true) {
var ref = (QuestionRef)this.questionRefs.poll();
if (ref == null) {
var disposer = (QuestionDisposer)this.questionRefs.poll();
if (disposer == null) {
break;
}
ref.dispose();
disposer.dispose();
}
}
@ -1914,11 +1904,11 @@ final class RpcState<VatId> {
private class PipelineClient extends RpcClient {
private final Question question;
private final QuestionRef questionRef;
private final PipelineOp[] ops;
PipelineClient(Question question, PipelineOp[] ops) {
this.question = question;
PipelineClient(QuestionRef questionRef, PipelineOp[] ops) {
this.questionRef = questionRef;
this.ops = ops;
}
@ -1935,7 +1925,7 @@ final class RpcState<VatId> {
@Override
public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds) {
var promisedAnswer = descriptor.initReceiverAnswer();
promisedAnswer.setQuestionId(question.getId());
promisedAnswer.setQuestionId(questionRef.questionId);
FromPipelineOps(ops, promisedAnswer);
return null;
}
@ -1943,7 +1933,7 @@ final class RpcState<VatId> {
@Override
public ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target) {
var builder = target.initPromisedAnswer();
builder.setQuestionId(question.getId());
builder.setQuestionId(questionRef.questionId);
FromPipelineOps(ops, builder);
return null;
}

View file

@ -371,6 +371,9 @@ public class RpcTest {
handle1 = null;
handle2 = null;
System.gc();
client.echoRequest().send().join();
}
@org.junit.Test