add cleanup to questions and imports

This commit is contained in:
Vaci Koblizek 2020-10-14 15:54:44 +01:00
parent 89451874c3
commit c49221c2e9

View file

@ -7,7 +7,7 @@ import java.util.concurrent.CompletionStage;
final class RpcState { final class RpcState {
static final class Question { final class Question {
final int id; final int id;
CompletableFuture<RpcResponse> response = new CompletableFuture<>(); CompletableFuture<RpcResponse> response = new CompletableFuture<>();
List<Integer> paramExports; List<Integer> paramExports;
@ -21,10 +21,29 @@ final class RpcState {
void reject(Throwable exc) { void reject(Throwable exc) {
this.response.completeExceptionally(exc); this.response.completeExceptionally(exc);
this.finish();
} }
void answer(RpcResponse response) { void answer(RpcResponse response) {
this.response.complete(response); this.response.complete(response);
this.finish();
}
void finish() {
assert questions.find(this.id) != null : "Question ID no longer on table?";
if (isConnected() && !this.skipFinish) {
var message = connection.newOutgoingMessage(1024);
var builder = message.getBody().getAs(RpcProtocol.Message.factory).initFinish();
builder.setQuestionId(this.id);
builder.setReleaseResultCaps(this.isAwaitingReturn);
message.send();
}
// Check if the question has returned and, if so, remove it from the table.
// Remove question ID from the table. Must do this *after* sending `Finish` to ensure that
// the ID is not re-allocated before the `Finish` message can be sent.
assert !this.isAwaitingReturn;
questions.erase(id, this);
} }
} }
@ -411,7 +430,9 @@ final class RpcState {
} }
var payload = callReturn.getResults(); var payload = callReturn.getResults();
var capTable = receiveCaps(payload.getCapTable(), message.getAttachedFds()); var capTable = receiveCaps(payload.getCapTable(), message.getAttachedFds());
var response = new RpcResponseImpl(question, message, capTable, payload.getContent()); // TODO question, message unused in RpcResponseImpl
// var response = new RpcResponseImpl(question, message, capTable, payload.getContent());
var response = new RpcResponseImpl(capTable, payload.getContent());
question.answer(response); question.answer(response);
break; break;
@ -587,6 +608,7 @@ final class RpcState {
return; return;
} }
} }
private List<Integer> writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List<Integer> fds) { private List<Integer> writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List<Integer> fds) {
if (capTable.length == 0) { if (capTable.length == 0) {
return List.of(); return List.of();
@ -900,16 +922,17 @@ final class RpcState {
} }
static class RpcResponseImpl implements RpcResponse { static class RpcResponseImpl implements RpcResponse {
private final Question question; // TODO unused?
private final IncomingRpcMessage message; // private final Question question;
// private final IncomingRpcMessage message;
private final AnyPointer.Reader results; private final AnyPointer.Reader results;
RpcResponseImpl(Question question, RpcResponseImpl(/*Question question,
IncomingRpcMessage message, IncomingRpcMessage message,*/
List<ClientHook> capTable, List<ClientHook> capTable,
AnyPointer.Reader results) { AnyPointer.Reader results) {
this.question = question; // this.question = question;
this.message = message; // this.message = message;
this.results = results.imbue(new ReaderCapabilityTable(capTable)); this.results = results.imbue(new ReaderCapabilityTable(capTable));
} }
@ -1048,7 +1071,7 @@ final class RpcState {
return null; return null;
} }
RpcResponse consumeRedirectedResponse() { private RpcResponse consumeRedirectedResponse() {
assert this.redirectResults; assert this.redirectResults;
if (this.response == null) { if (this.response == null) {
@ -1058,11 +1081,11 @@ final class RpcState {
return ((LocallyRedirectedRpcResponse) this.response); return ((LocallyRedirectedRpcResponse) this.response);
} }
void sendReturn() { private void sendReturn() {
assert !redirectResults; assert !redirectResults;
if (!this.cancelRequested && isDisconnected()) { if (!this.cancelRequested && isDisconnected()) {
assert false: "Cancellation should have been requested on disconnect."; assert false : "Cancellation should have been requested on disconnect.";
return; return;
} }
@ -1073,17 +1096,20 @@ final class RpcState {
this.returnMessage.setAnswerId(this.answerId); this.returnMessage.setAnswerId(this.answerId);
this.returnMessage.setReleaseParamCaps(false); this.returnMessage.setReleaseParamCaps(false);
List<Integer> exports; List<Integer> exports = List.of();
try { try {
exports = ((RpcServerResponseImpl) response).send(); exports = ((RpcServerResponseImpl) response).send();
} } catch (Throwable exc) {
catch (Throwable exc) {
this.responseSent = false; this.responseSent = false;
sendErrorReturn(exc); sendErrorReturn(exc);
} }
// If no caps in the results, the pipeline is irrelevant.
boolean shouldFreePipeline = exports.isEmpty();
cleanupAnswerTable(exports, shouldFreePipeline);
} }
void sendErrorReturn(Throwable exc) { private void sendErrorReturn(Throwable exc) {
assert !redirectResults; assert !redirectResults;
if (!isFirstResponder()) { if (!isFirstResponder()) {
@ -1099,10 +1125,10 @@ final class RpcState {
message.send(); message.send();
} }
cleanupAnswerTable(null, false); cleanupAnswerTable(List.of(), false);
} }
boolean isFirstResponder() { private boolean isFirstResponder() {
if (this.responseSent) { if (this.responseSent) {
return false; return false;
} }
@ -1110,7 +1136,7 @@ final class RpcState {
return true; return true;
} }
void cleanupAnswerTable(List<Integer> resultExports, boolean shouldFreePipeline) { private void cleanupAnswerTable(List<Integer> resultExports, boolean shouldFreePipeline) {
if (this.cancelRequested) { if (this.cancelRequested) {
assert resultExports.size() == 0; assert resultExports.size() == 0;
answers.erase(this.answerId); answers.erase(this.answerId);
@ -1151,7 +1177,7 @@ final class RpcState {
WAITING, RESOLVED, BROKEN WAITING, RESOLVED, BROKEN
} }
class RpcPipeline implements PipelineHook { private class RpcPipeline implements PipelineHook {
private final Question question; private final Question question;
private PipelineState state = PipelineState.WAITING; private PipelineState state = PipelineState.WAITING;
@ -1188,7 +1214,7 @@ final class RpcState {
@Override @Override
public ClientHook getPipelinedCap(PipelineOp[] ops) { public ClientHook getPipelinedCap(PipelineOp[] ops) {
// TODO avoid conversion to/from ArrayList? // TODO avoid conversion to/from ArrayList?
var key = new ArrayList<>(Arrays.<PipelineOp>asList(ops)); var key = new ArrayList<>(Arrays.asList(ops));
var hook = this.clientMap.computeIfAbsent(key, k -> { var hook = this.clientMap.computeIfAbsent(key, k -> {
switch (state) { switch (state) {
case WAITING: case WAITING:
@ -1269,10 +1295,11 @@ final class RpcState {
this.paramsBuilder = callBuilder.getParams().getContent().imbue(this.capTable); this.paramsBuilder = callBuilder.getParams().getContent().imbue(this.capTable);
} }
AnyPointer.Builder getRoot() { private AnyPointer.Builder getRoot() {
return this.paramsBuilder; return this.paramsBuilder;
} }
RpcProtocol.Call.Builder getCall() {
private RpcProtocol.Call.Builder getCall() {
return this.callBuilder; return this.callBuilder;
} }
@ -1353,7 +1380,7 @@ final class RpcState {
} }
} }
class ImportClient extends RpcClient { private class ImportClient extends RpcClient {
final int importId; final int importId;
int remoteRefCount = 0; int remoteRefCount = 0;
@ -1374,8 +1401,8 @@ final class RpcState {
} }
} }
public void dispose() { public void remove() {
// TODO manage destruction... // Remove self from the import table.
var imp = imports.find(importId); var imp = imports.find(importId);
if (imp != null) { if (imp != null) {
if (imp.importClient == this) { if (imp.importClient == this) {
@ -1383,6 +1410,7 @@ final class RpcState {
} }
} }
// Send a message releasing our remote references.
if (remoteRefCount > 0 && !isDisconnected()) { if (remoteRefCount > 0 && !isDisconnected()) {
var message = connection.newOutgoingMessage(1024); var message = connection.newOutgoingMessage(1024);
var builder = message.getBody().initAs(RpcProtocol.Message.factory).initRelease(); var builder = message.getBody().initAs(RpcProtocol.Message.factory).initRelease();
@ -1541,6 +1569,17 @@ final class RpcState {
public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) { public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) {
return null; return null;
} }
public void remove() {
if (this.importId != null) {
// This object represents an import promise. Clean that up.
var imp = imports.find(this.importId);
if (imp.appClient != null && imp.appClient == this) {
imp.appClient = null;
imp.importClient.remove();
}
}
}
} }
class PipelineClient extends RpcClient { class PipelineClient extends RpcClient {