cleanup import lifecycle

This commit is contained in:
Vaci Koblizek 2020-11-24 15:23:37 +00:00
parent 594e5e3a28
commit fb5f1bf2ba
2 changed files with 89 additions and 47 deletions

View file

@ -161,9 +161,10 @@ final class RpcState<VatId> {
final class Import {
final int importId;
ImportRef importClient;
ImportDisposer disposer;
Integer fd;
int remoteRefCount;
WeakReference<RpcClient> appClient;
RpcClient appClient;
CompletableFuture<ClientHook> promise;
// If non-null, the import is a promise.
@ -175,6 +176,12 @@ final class RpcState<VatId> {
this.remoteRefCount++;
}
void setFdIfMissing(Integer fd) {
if (this.fd == null) {
this.fd = fd;
}
}
public void dispose() {
// Remove self from the import table.
var imp = imports.find(importId);
@ -246,7 +253,7 @@ final class RpcState<VatId> {
private final CompletableFuture<java.lang.Void> messageLoop = new CompletableFuture<>();
// completes when the message loop exits
private final ReferenceQueue<QuestionRef> questionRefs = new ReferenceQueue<>();
private final ReferenceQueue<ImportClient> importRefs = new ReferenceQueue<>();
private final ReferenceQueue<ImportRef> importRefs = new ReferenceQueue<>();
private final Queue<Callable<java.lang.Void>> lastEvals = new ArrayDeque<>();
RpcState(BootstrapFactory<? super VatId> bootstrapFactory,
@ -763,7 +770,7 @@ final class RpcState<VatId> {
}
if (imp.promise == null) {
assert imp.importClient == null : "Import already resolved.";
assert imp.disposer != null: "Import already resolved.";
// It appears this is a valid entry on the import table, but was not expected to be a
// promise.
return;
@ -1087,36 +1094,45 @@ final class RpcState<VatId> {
private ClientHook importCap(int importId, boolean isPromise, Integer fd) {
// Receive a new import.
var imp = imports.put(importId);
ImportClient importClient = null;
if (imp.importClient != null) {
importClient = imp.importClient.get();
}
if (importClient == null) {
importClient = new ImportClient(imp, fd);
imp.importClient = new ImportRef(importId, importClient);
ImportClient importClient;
// new import
if (imp.disposer == null) {
var importRef = new ImportRef(importId);
imp.disposer = new ImportDisposer(importRef);
importClient = new ImportClient(importRef);
}
else {
importClient.setFdIfMissing(fd);
var importRef = imp.disposer.get();
if (importRef == null) {
// Import still exists, but has no references. Resurrect it.
importRef = new ImportRef(importId);
imp.disposer = new ImportDisposer(importRef);
importClient = new ImportClient(importRef);
}
else {
importClient = new ImportClient(importRef);
}
}
imp.setFdIfMissing(fd);
imp.addRemoteRef();
if (!isPromise) {
imp.appClient = new WeakReference<>(importClient);
return importClient;
}
if (imp.appClient != null) {
var tmp = imp.appClient.get();
if (tmp != null) {
return tmp;
}
return imp.appClient;
}
imp.promise = new CompletableFuture<>();
var result = new PromiseClient(importClient, imp.promise, importId);
imp.appClient = new WeakReference<>(result);
var result = new PromiseClient(importClient, imp.promise, importClient.importRef);
imp.appClient = result;
return result;
}
@ -1736,41 +1752,49 @@ final class RpcState<VatId> {
}
}
private class ImportRef extends WeakReference<ImportClient> {
private class ImportDisposer extends WeakReference<ImportRef> {
private final int importId;
ImportDisposer(ImportRef importRef) {
super(importRef, importRefs);
this.importId = importRef.importId;
}
void dispose() {
var imp = imports.find(this.importId);
if (imp != null) {
imp.dispose();
}
}
}
private static class ImportRef {
final int importId;
ImportRef(int importId, ImportClient hook) {
super(hook, importRefs);
ImportRef(int importId) {
this.importId = importId;
}
}
private class ImportClient extends RpcClient {
final Import imp;
Integer fd;
private final ImportRef importRef;
ImportClient(Import imp, Integer fd) {
this.imp = imp;
this.fd = fd;
}
void setFdIfMissing(Integer fd) {
if (this.fd == null) {
this.fd = fd;
}
ImportClient(ImportRef importRef) {
this.importRef = importRef;
}
@Override
public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds) {
descriptor.setReceiverHosted(this.imp.importId);
descriptor.setReceiverHosted(this.importRef.importId);
return null;
}
@Override
public ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target) {
target.setImportedCap(this.imp.importId);
target.setImportedCap(this.importRef.importId);
return null;
}
@ -1778,19 +1802,21 @@ final class RpcState<VatId> {
public CompletableFuture<ClientHook> whenMoreResolved() {
return null;
}
@Override
public Integer getFd() {
var imp = imports.find(this.importRef.importId);
return imp != null ? imp.fd : null;
}
}
private void cleanupImports() {
while (true) {
var ref = (ImportRef)this.importRefs.poll();
if (ref == null) {
var disposer = (ImportDisposer)this.importRefs.poll();
if (disposer == null) {
return;
}
var imp = this.imports.find(ref.importId);
assert imp != null;
if (imp != null) {
imp.dispose();
}
disposer.dispose();
}
}
@ -1815,20 +1841,28 @@ final class RpcState<VatId> {
private class PromiseClient extends RpcClient {
private ClientHook cap;
private final Integer importId;
private final ImportRef importRef;
private boolean receivedCall = false;
private ResolutionType resolutionType = ResolutionType.UNRESOLVED;
private final CompletableFuture<ClientHook> eventual;
PromiseClient(RpcClient initial,
CompletableFuture<ClientHook> eventual,
Integer importId) {
ImportRef importRef) {
this.cap = initial;
this.importId = importId;
this.importRef = importRef;
this.eventual = eventual.handle((resolution, exc) -> {
this.cap = exc == null
? this.resolve(resolution)
: this.resolve(Capability.newBrokenCap(exc));
if (this.importRef != null) {
var imp = imports.find(this.importRef.importId);
if (imp != null && imp.appClient == this) {
imp.appClient = null;
}
}
return this.cap;
});
}

View file

@ -25,6 +25,7 @@ import org.capnproto.rpctest.Test;
import org.junit.Assert;
import java.lang.ref.WeakReference;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
@ -402,12 +403,19 @@ public class RpcTest {
var promise = client.getHandleRequest().send();
var handle2 = this.context.runUntil(promise).join().getHandle();
var handleRef1 = new WeakReference<>(handle1);
var handleRef2 = new WeakReference<>(handle2);
promise = null;
handle1 = null;
handle2 = null;
// TODO monitor the imported caps for release? close?
while (handleRef1.get() != null && handleRef2.get() != null) {
System.gc();
this.context.runUntil(client.echoRequest().send()).join();
}
}
@org.junit.Test
public void testPromiseResolve() {