replace Integers with FileDescriptors
This commit is contained in:
parent
ab44843b12
commit
66ee9471f9
6 changed files with 34 additions and 32 deletions
|
@ -1,5 +1,6 @@
|
||||||
package org.capnproto;
|
package org.capnproto;
|
||||||
|
|
||||||
|
import java.io.FileDescriptor;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
|
@ -162,7 +163,7 @@ final class RpcState<VatId> {
|
||||||
final class Import {
|
final class Import {
|
||||||
final int importId;
|
final int importId;
|
||||||
ImportDisposer disposer;
|
ImportDisposer disposer;
|
||||||
Integer fd;
|
FileDescriptor fd;
|
||||||
int remoteRefCount;
|
int remoteRefCount;
|
||||||
RpcClient appClient;
|
RpcClient appClient;
|
||||||
CompletableFuture<ClientHook> promise;
|
CompletableFuture<ClientHook> promise;
|
||||||
|
@ -176,7 +177,7 @@ final class RpcState<VatId> {
|
||||||
this.remoteRefCount++;
|
this.remoteRefCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setFdIfMissing(Integer fd) {
|
void setFdIfMissing(FileDescriptor fd) {
|
||||||
if (this.fd == null) {
|
if (this.fd == null) {
|
||||||
this.fd = fd;
|
this.fd = fd;
|
||||||
}
|
}
|
||||||
|
@ -541,7 +542,7 @@ final class RpcState<VatId> {
|
||||||
? caps[0]
|
? caps[0]
|
||||||
: Capability.newNullCap();
|
: Capability.newNullCap();
|
||||||
|
|
||||||
var fds = List.<Integer>of();
|
var fds = List.<FileDescriptor>of();
|
||||||
response.setFds(List.of());
|
response.setFds(List.of());
|
||||||
|
|
||||||
answer.resultExports = writeDescriptors(caps, payload, fds);
|
answer.resultExports = writeDescriptors(caps, payload, fds);
|
||||||
|
@ -872,7 +873,7 @@ final class RpcState<VatId> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int[] writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List<Integer> fds) {
|
private int[] writeDescriptors(ClientHook[] capTable, RpcProtocol.Payload.Builder payload, List<FileDescriptor> fds) {
|
||||||
if (capTable.length == 0) {
|
if (capTable.length == 0) {
|
||||||
return new int[0];
|
return new int[0];
|
||||||
}
|
}
|
||||||
|
@ -897,7 +898,7 @@ final class RpcState<VatId> {
|
||||||
.toArray();
|
.toArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Integer writeDescriptor(ClientHook cap, RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds) {
|
private Integer writeDescriptor(ClientHook cap, RpcProtocol.CapDescriptor.Builder descriptor, List<FileDescriptor> fds) {
|
||||||
ClientHook inner = cap;
|
ClientHook inner = cap;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
var resolved = inner.getResolved();
|
var resolved = inner.getResolved();
|
||||||
|
@ -982,7 +983,7 @@ final class RpcState<VatId> {
|
||||||
var message = connection.newOutgoingMessage(sizeHint);
|
var message = connection.newOutgoingMessage(sizeHint);
|
||||||
var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve();
|
var resolve = message.getBody().initAs(RpcProtocol.Message.factory).initResolve();
|
||||||
resolve.setPromiseId(exportId);
|
resolve.setPromiseId(exportId);
|
||||||
var fds = List.<Integer>of();
|
var fds = List.<FileDescriptor>of();
|
||||||
writeDescriptor(exp.clientHook, resolve.initCap(), fds);
|
writeDescriptor(exp.clientHook, resolve.initCap(), fds);
|
||||||
message.setFds(fds);
|
message.setFds(fds);
|
||||||
LOGGER.fine(() -> this.toString() + ": > RESOLVE export=" + exportId);
|
LOGGER.fine(() -> this.toString() + ": > RESOLVE export=" + exportId);
|
||||||
|
@ -1027,7 +1028,7 @@ final class RpcState<VatId> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ClientHook> receiveCaps(StructList.Reader<RpcProtocol.CapDescriptor.Reader> capTable, List<Integer> fds) {
|
private List<ClientHook> receiveCaps(StructList.Reader<RpcProtocol.CapDescriptor.Reader> capTable, List<FileDescriptor> fds) {
|
||||||
var result = new ArrayList<ClientHook>();
|
var result = new ArrayList<ClientHook>();
|
||||||
for (var cap: capTable) {
|
for (var cap: capTable) {
|
||||||
result.add(receiveCap(cap, fds));
|
result.add(receiveCap(cap, fds));
|
||||||
|
@ -1035,10 +1036,8 @@ final class RpcState<VatId> {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClientHook receiveCap(RpcProtocol.CapDescriptor.Reader descriptor, List<Integer> fds) {
|
private ClientHook receiveCap(RpcProtocol.CapDescriptor.Reader descriptor, List<FileDescriptor> fds) {
|
||||||
// TODO AutoCloseFd
|
FileDescriptor fd = null;
|
||||||
Integer fd = null;
|
|
||||||
|
|
||||||
int fdIndex = descriptor.getAttachedFd();
|
int fdIndex = descriptor.getAttachedFd();
|
||||||
if (fdIndex >= 0 && fdIndex < fds.size()) {
|
if (fdIndex >= 0 && fdIndex < fds.size()) {
|
||||||
fd = fds.get(fdIndex);
|
fd = fds.get(fdIndex);
|
||||||
|
@ -1097,10 +1096,8 @@ final class RpcState<VatId> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClientHook importCap(int importId, boolean isPromise, Integer fd) {
|
private ClientHook importCap(int importId, boolean isPromise, FileDescriptor fd) {
|
||||||
// Receive a new import.
|
// Receive a new import.
|
||||||
|
|
||||||
|
|
||||||
var imp = imports.put(importId);
|
var imp = imports.put(importId);
|
||||||
|
|
||||||
ImportClient importClient;
|
ImportClient importClient;
|
||||||
|
@ -1260,7 +1257,7 @@ final class RpcState<VatId> {
|
||||||
|
|
||||||
int[] send() {
|
int[] send() {
|
||||||
var capTable = this.capTable.getTable();
|
var capTable = this.capTable.getTable();
|
||||||
var fds = List.<Integer>of();
|
var fds = List.<FileDescriptor>of();
|
||||||
var exports = writeDescriptors(capTable, payload, fds);
|
var exports = writeDescriptors(capTable, payload, fds);
|
||||||
// TODO process FDs
|
// TODO process FDs
|
||||||
message.setFds(fds);
|
message.setFds(fds);
|
||||||
|
@ -1606,7 +1603,7 @@ final class RpcState<VatId> {
|
||||||
|
|
||||||
abstract class RpcClient implements ClientHook {
|
abstract class RpcClient implements ClientHook {
|
||||||
|
|
||||||
public abstract Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds);
|
public abstract Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List<FileDescriptor> fds);
|
||||||
|
|
||||||
public abstract ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target);
|
public abstract ClientHook writeTarget(RpcProtocol.MessageTarget.Builder target);
|
||||||
|
|
||||||
|
@ -1708,7 +1705,7 @@ final class RpcState<VatId> {
|
||||||
|
|
||||||
QuestionRef sendInternal(boolean isTailCall) {
|
QuestionRef sendInternal(boolean isTailCall) {
|
||||||
// TODO refactor
|
// TODO refactor
|
||||||
var fds = List.<Integer>of();
|
var fds = List.<FileDescriptor>of();
|
||||||
var exports = writeDescriptors(capTable.getTable(), callBuilder.getParams(), fds);
|
var exports = writeDescriptors(capTable.getTable(), callBuilder.getParams(), fds);
|
||||||
message.setFds(fds);
|
message.setFds(fds);
|
||||||
var question = questions.next();
|
var question = questions.next();
|
||||||
|
@ -1790,7 +1787,7 @@ final class RpcState<VatId> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds) {
|
public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List<FileDescriptor> fds) {
|
||||||
descriptor.setReceiverHosted(this.importRef.importId);
|
descriptor.setReceiverHosted(this.importRef.importId);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -1807,7 +1804,7 @@ final class RpcState<VatId> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer getFd() {
|
public FileDescriptor getFd() {
|
||||||
var imp = imports.find(this.importRef.importId);
|
var imp = imports.find(this.importRef.importId);
|
||||||
return imp != null ? imp.fd : null;
|
return imp != null ? imp.fd : null;
|
||||||
}
|
}
|
||||||
|
@ -1871,7 +1868,7 @@ final class RpcState<VatId> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder target, List<Integer> fds) {
|
public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder target, List<FileDescriptor> fds) {
|
||||||
this.receivedCall = true;
|
this.receivedCall = true;
|
||||||
return RpcState.this.writeDescriptor(this.cap, target, fds);
|
return RpcState.this.writeDescriptor(this.cap, target, fds);
|
||||||
}
|
}
|
||||||
|
@ -1913,7 +1910,7 @@ final class RpcState<VatId> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer getFd() {
|
public FileDescriptor getFd() {
|
||||||
if (this.isResolved()) {
|
if (this.isResolved()) {
|
||||||
return this.cap.getFd();
|
return this.cap.getFd();
|
||||||
}
|
}
|
||||||
|
@ -2023,7 +2020,7 @@ final class RpcState<VatId> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List<Integer> fds) {
|
public Integer writeDescriptor(RpcProtocol.CapDescriptor.Builder descriptor, List<FileDescriptor> fds) {
|
||||||
var promisedAnswer = descriptor.initReceiverAnswer();
|
var promisedAnswer = descriptor.initReceiverAnswer();
|
||||||
promisedAnswer.setQuestionId(questionRef.questionId);
|
promisedAnswer.setQuestionId(questionRef.questionId);
|
||||||
FromPipelineOps(ops, promisedAnswer);
|
FromPipelineOps(ops, promisedAnswer);
|
||||||
|
@ -2134,7 +2131,7 @@ final class RpcState<VatId> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer getFd() {
|
public FileDescriptor getFd() {
|
||||||
return this.inner.getFd();
|
return this.inner.getFd();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package org.capnproto;
|
package org.capnproto;
|
||||||
|
|
||||||
|
import java.io.FileDescriptor;
|
||||||
import java.nio.channels.AsynchronousSocketChannel;
|
import java.nio.channels.AsynchronousSocketChannel;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
@ -110,7 +111,7 @@ public class TwoPartyVatNetwork
|
||||||
final class OutgoingMessage implements OutgoingRpcMessage {
|
final class OutgoingMessage implements OutgoingRpcMessage {
|
||||||
|
|
||||||
private final MessageBuilder message;
|
private final MessageBuilder message;
|
||||||
private List<Integer> fds = List.of();
|
private List<FileDescriptor> fds = List.of();
|
||||||
|
|
||||||
OutgoingMessage(int firstSegmentWordSize) {
|
OutgoingMessage(int firstSegmentWordSize) {
|
||||||
this.message = new MessageBuilder(firstSegmentWordSize == 0
|
this.message = new MessageBuilder(firstSegmentWordSize == 0
|
||||||
|
@ -124,7 +125,7 @@ public class TwoPartyVatNetwork
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setFds(List<Integer> fds) {
|
public void setFds(List<FileDescriptor> fds) {
|
||||||
this.fds = fds;
|
this.fds = fds;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,13 +147,13 @@ public class TwoPartyVatNetwork
|
||||||
static final class IncomingMessage implements IncomingRpcMessage {
|
static final class IncomingMessage implements IncomingRpcMessage {
|
||||||
|
|
||||||
private final MessageReader message;
|
private final MessageReader message;
|
||||||
private final List<Integer> fds;
|
private final List<FileDescriptor> fds;
|
||||||
|
|
||||||
IncomingMessage(MessageReader message) {
|
IncomingMessage(MessageReader message) {
|
||||||
this(message, List.of());
|
this(message, List.of());
|
||||||
}
|
}
|
||||||
|
|
||||||
IncomingMessage(MessageReader message, List<Integer> fds) {
|
IncomingMessage(MessageReader message, List<FileDescriptor> fds) {
|
||||||
this.message = message;
|
this.message = message;
|
||||||
this.fds = fds;
|
this.fds = fds;
|
||||||
}
|
}
|
||||||
|
@ -163,7 +164,7 @@ public class TwoPartyVatNetwork
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Integer> getAttachedFds() {
|
public List<FileDescriptor> getAttachedFds() {
|
||||||
return this.fds;
|
return this.fds;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package org.capnproto;
|
package org.capnproto;
|
||||||
|
|
||||||
|
import java.io.FileDescriptor;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
|
@ -75,7 +76,7 @@ public final class Capability {
|
||||||
* The file descriptor will remain open at least as long as the {@link Client} remains alive.
|
* The file descriptor will remain open at least as long as the {@link Client} remains alive.
|
||||||
* If you need it to last longer, you will need to `dup()` it.
|
* If you need it to last longer, you will need to `dup()` it.
|
||||||
*/
|
*/
|
||||||
default CompletableFuture<Integer> getFd() {
|
default CompletableFuture<FileDescriptor> getFd() {
|
||||||
var fd = this.getHook().getFd();
|
var fd = this.getHook().getFd();
|
||||||
if (fd != null) {
|
if (fd != null) {
|
||||||
return CompletableFuture.completedFuture(fd);
|
return CompletableFuture.completedFuture(fd);
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package org.capnproto;
|
package org.capnproto;
|
||||||
|
|
||||||
|
import java.io.FileDescriptor;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
public interface ClientHook {
|
public interface ClientHook {
|
||||||
|
@ -71,7 +72,7 @@ public interface ClientHook {
|
||||||
* Implements {@link Capability.Client.getFd}. If this returns null but whenMoreResolved() returns
|
* Implements {@link Capability.Client.getFd}. If this returns null but whenMoreResolved() returns
|
||||||
* non-null, then Capability::Client::getFd() waits for resolution and tries again.
|
* non-null, then Capability::Client::getFd() waits for resolution and tries again.
|
||||||
*/
|
*/
|
||||||
default Integer getFd() {
|
default FileDescriptor getFd() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
package org.capnproto;
|
package org.capnproto;
|
||||||
|
|
||||||
|
import java.io.FileDescriptor;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public interface IncomingRpcMessage {
|
public interface IncomingRpcMessage {
|
||||||
|
|
||||||
AnyPointer.Reader getBody();
|
AnyPointer.Reader getBody();
|
||||||
|
|
||||||
default List<Integer> getAttachedFds() {
|
default List<FileDescriptor> getAttachedFds() {
|
||||||
return List.of();
|
return List.of();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
package org.capnproto;
|
package org.capnproto;
|
||||||
|
|
||||||
|
import java.io.FileDescriptor;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public interface OutgoingRpcMessage {
|
public interface OutgoingRpcMessage {
|
||||||
|
|
||||||
AnyPointer.Builder getBody();
|
AnyPointer.Builder getBody();
|
||||||
|
|
||||||
default void setFds(List<Integer> fds) {
|
default void setFds(List<FileDescriptor> fds) {
|
||||||
}
|
}
|
||||||
|
|
||||||
default List<Integer> getFds() {
|
default List<Integer> getFds() {
|
||||||
|
|
Loading…
Reference in a new issue