remove baseAccept and tidy TowPartyVatNetwork

This commit is contained in:
Vaci Koblizek 2020-11-19 15:17:04 +00:00
parent 709751a885
commit 5e797d36a3
4 changed files with 39 additions and 49 deletions

View file

@ -82,7 +82,7 @@ public class RpcSystem<VatId extends StructReader> {
}
private void startAcceptLoop() {
this.network.baseAccept()
this.network.accept()
.thenAccept(this::accept)
.thenRunAsync(this::startAcceptLoop);
}

View file

@ -1,6 +1,5 @@
package org.capnproto;
import java.io.IOException;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.List;
@ -10,11 +9,6 @@ public class TwoPartyVatNetwork
implements VatNetwork<RpcTwoPartyProtocol.VatId.Reader>,
VatNetwork.Connection<RpcTwoPartyProtocol.VatId.Reader> {
@Override
public CompletableFuture<Connection<RpcTwoPartyProtocol.VatId.Reader>> baseAccept() {
return this.accept();
}
public interface MessageTap {
void incoming(IncomingRpcMessage message, RpcTwoPartyProtocol.Side side);
}
@ -49,23 +43,7 @@ public class TwoPartyVatNetwork
@Override
public String toString() {
return this.getSide().toString();
}
public RpcTwoPartyProtocol.Side getSide() {
return side;
}
public void setTap(MessageTap tap) {
this.tap = tap;
}
public Connection<RpcTwoPartyProtocol.VatId.Reader> asConnection() {
return this;
}
public CompletableFuture<java.lang.Void> onDisconnect() {
return this.disconnectPromise.copy();
return this.side.toString();
}
@Override
@ -75,17 +53,7 @@ public class TwoPartyVatNetwork
: null;
}
public CompletableFuture<Connection<RpcTwoPartyProtocol.VatId.Reader>> accept() {
if (side == RpcTwoPartyProtocol.Side.SERVER & !accepted) {
accepted = true;
return CompletableFuture.completedFuture(this.asConnection());
}
else {
// never completes
return new CompletableFuture<>();
}
}
@Override
public RpcTwoPartyProtocol.VatId.Reader getPeerVatId() {
return this.peerVatId.getRoot(RpcTwoPartyProtocol.VatId.factory).asReader();
}
@ -108,7 +76,7 @@ public class TwoPartyVatNetwork
return;
}
var side = this.getSide() == RpcTwoPartyProtocol.Side.CLIENT
var side = this.side == RpcTwoPartyProtocol.Side.CLIENT
? RpcTwoPartyProtocol.Side.SERVER
: RpcTwoPartyProtocol.Side.CLIENT;
@ -137,6 +105,34 @@ public class TwoPartyVatNetwork
return result;
}
public RpcTwoPartyProtocol.Side getSide() {
return side;
}
public void setTap(MessageTap tap) {
this.tap = tap;
}
public Connection<RpcTwoPartyProtocol.VatId.Reader> asConnection() {
return this;
}
public CompletableFuture<java.lang.Void> onDisconnect() {
return this.disconnectPromise.copy();
}
public CompletableFuture<Connection<RpcTwoPartyProtocol.VatId.Reader>> accept() {
if (side == RpcTwoPartyProtocol.Side.SERVER & !accepted) {
accepted = true;
return CompletableFuture.completedFuture(this.asConnection());
}
else {
// never completes
return new CompletableFuture<>();
}
}
final class OutgoingMessage implements OutgoingRpcMessage {
private final MessageBuilder message;
@ -160,7 +156,7 @@ public class TwoPartyVatNetwork
@Override
public void send() {
previousWrite = previousWrite.thenCompose(x -> Serialize.writeAsync(channel, message));
previousWrite = previousWrite.thenRun(() -> Serialize.writeAsync(channel, message));
}
@Override
@ -173,7 +169,7 @@ public class TwoPartyVatNetwork
}
}
final class IncomingMessage implements IncomingRpcMessage {
static final class IncomingMessage implements IncomingRpcMessage {
private final MessageReader message;
private final List<Integer> fds;

View file

@ -15,7 +15,6 @@ public interface VatNetwork<VatId>
void close();
}
CompletableFuture<Connection<VatId>> baseAccept();
CompletableFuture<Connection<VatId>> accept();
Connection<VatId> connect(VatId hostId);
}

View file

@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.capnproto.RpcState.FromException;
public class RpcTest {
@ -187,7 +186,7 @@ public class RpcTest {
int sent = 0;
int received = 0;
Map<TestNetworkAdapter, Connection> connections = new HashMap<>();
Queue<CompletableFuture<Connection>> fulfillerQueue = new ArrayDeque<>();
Queue<CompletableFuture<VatNetwork.Connection<Test.TestSturdyRef.Reader>>> fulfillerQueue = new ArrayDeque<>();
Queue<Connection> connectionQueue = new ArrayDeque<>();
TestNetworkAdapter(TestNetwork network, String self) {
@ -199,10 +198,6 @@ public class RpcTest {
return new Connection(isClient, peerId);
}
public CompletableFuture<VatNetwork.Connection<Test.TestSturdyRef.Reader>> baseAccept() {
return this.accept().thenApply(conn -> conn);
}
@Override
public void close() {
var exc = RpcException.failed("Network was destroyed");
@ -241,11 +236,11 @@ public class RpcTest {
return local;
}
public CompletableFuture<Connection> accept() {
public CompletableFuture<VatNetwork.Connection<Test.TestSturdyRef.Reader>> accept() {
if (this.connections.isEmpty()) {
var promise = new CompletableFuture<Connection>();
var promise = new CompletableFuture<VatNetwork.Connection<Test.TestSturdyRef.Reader>>();
this.fulfillerQueue.add(promise);
return promise.thenApply(conn -> conn);
return promise;
}
else {
return CompletableFuture.completedFuture(this.connectionQueue.remove());