diff --git a/runtime-rpc/src/main/java/org/capnproto/RpcState.java b/runtime-rpc/src/main/java/org/capnproto/RpcState.java index a0ea905..0211aa0 100644 --- a/runtime-rpc/src/main/java/org/capnproto/RpcState.java +++ b/runtime-rpc/src/main/java/org/capnproto/RpcState.java @@ -1,5 +1,6 @@ package org.capnproto; +import java.io.FileDescriptor; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; @@ -162,7 +163,7 @@ final class RpcState { final class Import { final int importId; ImportDisposer disposer; - Integer fd; + FileDescriptor fd; int remoteRefCount; RpcClient appClient; CompletableFuture promise; @@ -176,7 +177,7 @@ final class RpcState { this.remoteRefCount++; } - void setFdIfMissing(Integer fd) { + void setFdIfMissing(FileDescriptor fd) { if (this.fd == null) { this.fd = fd; } @@ -541,7 +542,7 @@ final class RpcState { ? caps[0] : Capability.newNullCap(); - var fds = List.of(); + var fds = List.of(); response.setFds(List.of()); answer.resultExports = writeDescriptors(caps, payload, fds); @@ -872,7 +873,7 @@ final class RpcState { } } - private int[] writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List fds) { + private int[] writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List fds) { if (capTable.length == 0) { return new int[0]; } @@ -897,7 +898,7 @@ final class RpcState { .toArray(); } - private Integer writeDescriptor(ClientHook cap, RpcProtocol.CapDescriptor.Builder descriptor, List fds) { + private Integer writeDescriptor(ClientHook cap, RpcProtocol.CapDescriptor.Builder descriptor, List fds) { ClientHook inner = cap; for (;;) { var resolved = inner.getResolved(); @@ -982,7 +983,7 @@ final class RpcState { var message = connection.newOutgoingMessage(sizeHint); var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve(); resolve.setPromiseId(exportId); - var fds = List.of(); + var fds = List.of(); writeDescriptor(exp.clientHook, resolve.initCap(), fds); message.setFds(fds); LOGGER.fine(() -> this.toString() + ": > RESOLVE export=" + exportId); @@ -1027,7 +1028,7 @@ final class RpcState { } } - private List receiveCaps(StructList.Reader capTable, List fds) { + private List receiveCaps(StructList.Reader capTable, List fds) { var result = new ArrayList(); for (var cap: capTable) { result.add(receiveCap(cap, fds)); @@ -1035,10 +1036,8 @@ final class RpcState { return result; } - private ClientHook receiveCap(RpcProtocol.CapDescriptor.Reader descriptor, List fds) { - // TODO AutoCloseFd - Integer fd = null; - + private ClientHook receiveCap(RpcProtocol.CapDescriptor.Reader descriptor, List fds) { + FileDescriptor fd = null; int fdIndex = descriptor.getAttachedFd(); if (fdIndex >= 0 && fdIndex < fds.size()) { fd = fds.get(fdIndex); @@ -1097,10 +1096,8 @@ final class RpcState { } } - private ClientHook importCap(int importId, boolean isPromise, Integer fd) { + private ClientHook importCap(int importId, boolean isPromise, FileDescriptor fd) { // Receive a new import. - - var imp = imports.put(importId); ImportClient importClient; @@ -1260,7 +1257,7 @@ final class RpcState { int[] send() { var capTable = this.capTable.getTable(); - var fds = List.of(); + var fds = List.of(); var exports = writeDescriptors(capTable, payload, fds); // TODO process FDs message.setFds(fds); @@ -1606,7 +1603,7 @@ final class RpcState { abstract class RpcClient implements ClientHook { - public abstract Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List fds); + public abstract Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List fds); public abstract ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target); @@ -1708,7 +1705,7 @@ final class RpcState { QuestionRef sendInternal(boolean isTailCall) { // TODO refactor - var fds = List.of(); + var fds = List.of(); var exports = writeDescriptors(capTable.getTable(), callBuilder.getParams(), fds); message.setFds(fds); var question = questions.next(); @@ -1790,7 +1787,7 @@ final class RpcState { } @Override - public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List fds) { + public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List fds) { descriptor.setReceiverHosted(this.importRef.importId); return null; } @@ -1807,7 +1804,7 @@ final class RpcState { } @Override - public Integer getFd() { + public FileDescriptor getFd() { var imp = imports.find(this.importRef.importId); return imp != null ? imp.fd : null; } @@ -1871,7 +1868,7 @@ final class RpcState { } @Override - public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder target, List fds) { + public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder target, List fds) { this.receivedCall = true; return RpcState.this.writeDescriptor(this.cap, target, fds); } @@ -1913,7 +1910,7 @@ final class RpcState { } @Override - public Integer getFd() { + public FileDescriptor getFd() { if (this.isResolved()) { return this.cap.getFd(); } @@ -2023,7 +2020,7 @@ final class RpcState { } @Override - public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List fds) { + public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List fds) { var promisedAnswer = descriptor.initReceiverAnswer(); promisedAnswer.setQuestionId(questionRef.questionId); FromPipelineOps(ops, promisedAnswer); @@ -2134,7 +2131,7 @@ final class RpcState { } @Override - public Integer getFd() { + public FileDescriptor getFd() { return this.inner.getFd(); } } diff --git a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java index 9af830c..9dbfa37 100644 --- a/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java +++ b/runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java @@ -1,5 +1,6 @@ package org.capnproto; +import java.io.FileDescriptor; import java.nio.channels.AsynchronousSocketChannel; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -110,7 +111,7 @@ public class TwoPartyVatNetwork final class OutgoingMessage implements OutgoingRpcMessage { private final MessageBuilder message; - private List fds = List.of(); + private List fds = List.of(); OutgoingMessage(int firstSegmentWordSize) { this.message = new MessageBuilder(firstSegmentWordSize == 0 @@ -124,7 +125,7 @@ public class TwoPartyVatNetwork } @Override - public void setFds(List fds) { + public void setFds(List fds) { this.fds = fds; } @@ -146,13 +147,13 @@ public class TwoPartyVatNetwork static final class IncomingMessage implements IncomingRpcMessage { private final MessageReader message; - private final List fds; + private final List fds; IncomingMessage(MessageReader message) { this(message, List.of()); } - IncomingMessage(MessageReader message, List fds) { + IncomingMessage(MessageReader message, List fds) { this.message = message; this.fds = fds; } @@ -163,7 +164,7 @@ public class TwoPartyVatNetwork } @Override - public List getAttachedFds() { + public List getAttachedFds() { return this.fds; } } diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index 8115b71..43eeaf6 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -1,5 +1,6 @@ package org.capnproto; +import java.io.FileDescriptor; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -75,7 +76,7 @@ public final class Capability { * The file descriptor will remain open at least as long as the {@link Client} remains alive. * If you need it to last longer, you will need to `dup()` it. */ - default CompletableFuture getFd() { + default CompletableFuture getFd() { var fd = this.getHook().getFd(); if (fd != null) { return CompletableFuture.completedFuture(fd); diff --git a/runtime/src/main/java/org/capnproto/ClientHook.java b/runtime/src/main/java/org/capnproto/ClientHook.java index 2d4842d..bb81550 100644 --- a/runtime/src/main/java/org/capnproto/ClientHook.java +++ b/runtime/src/main/java/org/capnproto/ClientHook.java @@ -1,5 +1,6 @@ package org.capnproto; +import java.io.FileDescriptor; import java.util.concurrent.CompletableFuture; public interface ClientHook { @@ -71,7 +72,7 @@ public interface ClientHook { * Implements {@link Capability.Client.getFd}. If this returns null but whenMoreResolved() returns * non-null, then Capability::Client::getFd() waits for resolution and tries again. */ - default Integer getFd() { + default FileDescriptor getFd() { return null; } diff --git a/runtime/src/main/java/org/capnproto/IncomingRpcMessage.java b/runtime/src/main/java/org/capnproto/IncomingRpcMessage.java index 60b4c5d..eb5692d 100644 --- a/runtime/src/main/java/org/capnproto/IncomingRpcMessage.java +++ b/runtime/src/main/java/org/capnproto/IncomingRpcMessage.java @@ -1,12 +1,13 @@ package org.capnproto; +import java.io.FileDescriptor; import java.util.List; public interface IncomingRpcMessage { AnyPointer.Reader getBody(); - default List getAttachedFds() { + default List getAttachedFds() { return List.of(); } } diff --git a/runtime/src/main/java/org/capnproto/OutgoingRpcMessage.java b/runtime/src/main/java/org/capnproto/OutgoingRpcMessage.java index 86b7a1e..e91f529 100644 --- a/runtime/src/main/java/org/capnproto/OutgoingRpcMessage.java +++ b/runtime/src/main/java/org/capnproto/OutgoingRpcMessage.java @@ -1,12 +1,13 @@ package org.capnproto; +import java.io.FileDescriptor; import java.util.List; public interface OutgoingRpcMessage { AnyPointer.Builder getBody(); - default void setFds(List fds) { + default void setFds(List fds) { } default List getFds() {