diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index 220d559..1753374 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -161,9 +161,10 @@ final class RpcState { final class Import { final int importId; - ImportRef importClient; + ImportDisposer disposer; + Integer fd; int remoteRefCount; - WeakReference appClient; + RpcClient appClient; CompletableFuture promise; // If non-null, the import is a promise. @@ -175,6 +176,12 @@ final class RpcState { this.remoteRefCount++; } + void setFdIfMissing(Integer fd) { + if (this.fd == null) { + this.fd = fd; + } + } + public void dispose() { // Remove self from the import table. var imp = imports.find(importId); @@ -246,7 +253,7 @@ final class RpcState { private final CompletableFuture messageLoop = new CompletableFuture<>(); // completes when the message loop exits private final ReferenceQueue questionRefs = new ReferenceQueue<>(); - private final ReferenceQueue importRefs = new ReferenceQueue<>(); + private final ReferenceQueue importRefs = new ReferenceQueue<>(); private final Queue> lastEvals = new ArrayDeque<>(); RpcState(BootstrapFactory bootstrapFactory, @@ -763,7 +770,7 @@ final class RpcState { } if (imp.promise == null) { - assert imp.importClient == null : "Import already resolved."; + assert imp.disposer != null: "Import already resolved."; // It appears this is a valid entry on the import table, but was not expected to be a // promise. return; @@ -1087,36 +1094,45 @@ final class RpcState { private ClientHook importCap(int importId, boolean isPromise, Integer fd) { // Receive a new import. + var imp = imports.put(importId); - ImportClient importClient = null; - if (imp.importClient != null) { - importClient = imp.importClient.get(); - } - if (importClient == null) { - importClient = new ImportClient(imp, fd); - imp.importClient = new ImportRef(importId, importClient); + + ImportClient importClient; + + // new import + if (imp.disposer == null) { + var importRef = new ImportRef(importId); + imp.disposer = new ImportDisposer(importRef); + importClient = new ImportClient(importRef); } else { - importClient.setFdIfMissing(fd); + var importRef = imp.disposer.get(); + if (importRef == null) { + // Import still exists, but has no references. Resurrect it. + importRef = new ImportRef(importId); + imp.disposer = new ImportDisposer(importRef); + importClient = new ImportClient(importRef); + } + else { + importClient = new ImportClient(importRef); + } } + + imp.setFdIfMissing(fd); imp.addRemoteRef(); if (!isPromise) { - imp.appClient = new WeakReference<>(importClient); return importClient; } if (imp.appClient != null) { - var tmp = imp.appClient.get(); - if (tmp != null) { - return tmp; - } + return imp.appClient; } imp.promise = new CompletableFuture<>(); - var result = new PromiseClient(importClient, imp.promise, importId); - imp.appClient = new WeakReference<>(result); + var result = new PromiseClient(importClient, imp.promise, importClient.importRef); + imp.appClient = result; return result; } @@ -1736,41 +1752,49 @@ final class RpcState { } } - private class ImportRef extends WeakReference { + private class ImportDisposer extends WeakReference { + + private final int importId; + + ImportDisposer(ImportRef importRef) { + super(importRef, importRefs); + this.importId = importRef.importId; + } + + void dispose() { + var imp = imports.find(this.importId); + if (imp != null) { + imp.dispose(); + } + } + } + + private static class ImportRef { final int importId; - ImportRef(int importId, ImportClient hook) { - super(hook, importRefs); + ImportRef(int importId) { this.importId = importId; } } private class ImportClient extends RpcClient { - final Import imp; - Integer fd; + private final ImportRef importRef; - ImportClient(Import imp, Integer fd) { - this.imp = imp; - this.fd = fd; - } - - void setFdIfMissing(Integer fd) { - if (this.fd == null) { - this.fd = fd; - } + ImportClient(ImportRef importRef) { + this.importRef = importRef; } @Override public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List fds) { - descriptor.setReceiverHosted(this.imp.importId); + descriptor.setReceiverHosted(this.importRef.importId); return null; } @Override public ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target) { - target.setImportedCap(this.imp.importId); + target.setImportedCap(this.importRef.importId); return null; } @@ -1778,19 +1802,21 @@ final class RpcState { public CompletableFuture whenMoreResolved() { return null; } + + @Override + public Integer getFd() { + var imp = imports.find(this.importRef.importId); + return imp != null ? imp.fd : null; + } } private void cleanupImports() { while (true) { - var ref = (ImportRef)this.importRefs.poll(); - if (ref == null) { + var disposer = (ImportDisposer)this.importRefs.poll(); + if (disposer == null) { return; } - var imp = this.imports.find(ref.importId); - assert imp != null; - if (imp != null) { - imp.dispose(); - } + disposer.dispose(); } } @@ -1815,20 +1841,28 @@ final class RpcState { private class PromiseClient extends RpcClient { private ClientHook cap; - private final Integer importId; + private final ImportRef importRef; private boolean receivedCall = false; private ResolutionType resolutionType = ResolutionType.UNRESOLVED; private final CompletableFuture eventual; PromiseClient(RpcClient initial, CompletableFuture eventual, - Integer importId) { + ImportRef importRef) { this.cap = initial; - this.importId = importId; + this.importRef = importRef; this.eventual = eventual.handle((resolution, exc) -> { this.cap = exc == null ? this.resolve(resolution) : this.resolve(Capability.newBrokenCap(exc)); + + if (this.importRef != null) { + var imp = imports.find(this.importRef.importId); + if (imp != null && imp.appClient == this) { + imp.appClient = null; + } + } + return this.cap; }); } diff --git a/runtime-rpc/src/test/java/org/capnproto/RpcTest.java b/runtime-rpc/src/test/java/org/capnproto/RpcTest.java index 9751075..c054119 100644 --- a/runtime-rpc/src/test/java/org/capnproto/RpcTest.java +++ b/runtime-rpc/src/test/java/org/capnproto/RpcTest.java @@ -25,6 +25,7 @@ import org.capnproto.rpctest.Test; import org.junit.Assert; +import java.lang.ref.WeakReference; import java.util.ArrayDeque; import java.util.HashMap; import java.util.Map; @@ -402,11 +403,18 @@ public class RpcTest { var promise = client.getHandleRequest().send(); var handle2 = this.context.runUntil(promise).join().getHandle(); + var handleRef1 = new WeakReference<>(handle1); + var handleRef2 = new WeakReference<>(handle2); + + promise = null; handle1 = null; handle2 = null; - System.gc(); - this.context.runUntil(client.echoRequest().send()).join(); + // TODO monitor the imported caps for release? close? + while (handleRef1.get() != null && handleRef2.get() != null) { + System.gc(); + this.context.runUntil(client.echoRequest().send()).join(); + } } @org.junit.Test