fix getNetwork bug, remove unnecessary TwoPartVatNetwork specialisation
This commit is contained in:
parent
1f004779d4
commit
7d2e541603
6 changed files with 19 additions and 36 deletions
|
@ -23,6 +23,11 @@ public class RpcSystem<VatId extends StructReader> {
|
|||
return this.network;
|
||||
}
|
||||
|
||||
public RpcSystem(VatNetwork<VatId> network,
|
||||
Capability.Server bootstrapInterface) {
|
||||
this(network, new Capability.Client(bootstrapInterface));
|
||||
}
|
||||
|
||||
public RpcSystem(VatNetwork<VatId> network,
|
||||
Capability.Client bootstrapInterface) {
|
||||
this(network, new BootstrapFactory<VatId>() {
|
||||
|
@ -78,7 +83,7 @@ public class RpcSystem<VatId extends StructReader> {
|
|||
}
|
||||
|
||||
private CompletableFuture<java.lang.Void> doAcceptLoop() {
|
||||
return this.getNetwork().baseAccept().thenCompose(connection -> {
|
||||
return this.network.baseAccept().thenCompose(connection -> {
|
||||
this.accept(connection);
|
||||
return this.doAcceptLoop();
|
||||
});
|
||||
|
|
|
@ -6,7 +6,7 @@ import java.util.concurrent.CompletableFuture;
|
|||
public class TwoPartyClient {
|
||||
|
||||
private final TwoPartyVatNetwork network;
|
||||
private final TwoPartyRpcSystem rpcSystem;
|
||||
private final RpcSystem<RpcTwoPartyProtocol.VatId.Reader> rpcSystem;
|
||||
|
||||
public TwoPartyClient(AsynchronousSocketChannel channel) {
|
||||
this(channel, null);
|
||||
|
@ -20,7 +20,7 @@ public class TwoPartyClient {
|
|||
Capability.Client bootstrapInterface,
|
||||
RpcTwoPartyProtocol.Side side) {
|
||||
this.network = new TwoPartyVatNetwork(channel, side);
|
||||
this.rpcSystem = new TwoPartyRpcSystem(network, bootstrapInterface);
|
||||
this.rpcSystem = new RpcSystem<RpcTwoPartyProtocol.VatId.Reader>(network, bootstrapInterface);
|
||||
}
|
||||
|
||||
public Capability.Client bootstrap() {
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
package org.capnproto;
|
||||
|
||||
public class TwoPartyRpcSystem
|
||||
extends RpcSystem<RpcTwoPartyProtocol.VatId.Reader> {
|
||||
|
||||
private TwoPartyVatNetwork network;
|
||||
|
||||
public TwoPartyRpcSystem(TwoPartyVatNetwork network, Capability.Client bootstrapInterface) {
|
||||
super(network, bootstrapInterface);
|
||||
this.network = network;
|
||||
}
|
||||
|
||||
public TwoPartyRpcSystem(TwoPartyVatNetwork network, Capability.Server bootstrapInterface) {
|
||||
super(network, new Capability.Client(bootstrapInterface));
|
||||
this.network = network;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VatNetwork<RpcTwoPartyProtocol.VatId.Reader> getNetwork() {
|
||||
return this.network;
|
||||
}
|
||||
}
|
|
@ -13,13 +13,13 @@ public class TwoPartyServer {
|
|||
private class AcceptedConnection {
|
||||
final AsynchronousSocketChannel channel;
|
||||
final TwoPartyVatNetwork network;
|
||||
final TwoPartyRpcSystem rpcSystem;
|
||||
final RpcSystem<RpcTwoPartyProtocol.VatId.Reader> rpcSystem;
|
||||
private final CompletableFuture<?> messageLoop;
|
||||
|
||||
AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousSocketChannel channel) {
|
||||
this.channel = channel;
|
||||
this.network = new TwoPartyVatNetwork(channel, RpcTwoPartyProtocol.Side.SERVER);
|
||||
this.rpcSystem = new TwoPartyRpcSystem(network, bootstrapInterface);
|
||||
this.rpcSystem = new RpcSystem<>(network, bootstrapInterface);
|
||||
this.messageLoop = this.rpcSystem.getMessageLoop().exceptionally(exc -> {
|
||||
connections.remove(this);
|
||||
return null;
|
||||
|
|
|
@ -43,12 +43,12 @@ public class TwoPartyVatNetwork
|
|||
this.tap = tap;
|
||||
}
|
||||
|
||||
public Connection asConnection() {
|
||||
public Connection<RpcTwoPartyProtocol.VatId.Reader> asConnection() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection connect(RpcTwoPartyProtocol.VatId.Reader vatId) {
|
||||
public Connection<RpcTwoPartyProtocol.VatId.Reader> connect(RpcTwoPartyProtocol.VatId.Reader vatId) {
|
||||
return vatId.getSide() != side
|
||||
? this.asConnection()
|
||||
: null;
|
||||
|
@ -60,7 +60,7 @@ public class TwoPartyVatNetwork
|
|||
return CompletableFuture.completedFuture(this.asConnection());
|
||||
}
|
||||
else {
|
||||
// never completes
|
||||
// never /home/vaci/g/capnproto-java/compilercompletes
|
||||
return new CompletableFuture<>();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ public class TwoPartyTest {
|
|||
|
||||
@Test
|
||||
public void testNullCap() throws ExecutionException, InterruptedException {
|
||||
var server = new TwoPartyRpcSystem(this.serverNetwork, new Capability.Client());
|
||||
var server = new RpcSystem<>(this.serverNetwork, new Capability.Client());
|
||||
var cap = this.client.bootstrap();
|
||||
var resolved = cap.whenResolved().toCompletableFuture();
|
||||
resolved.get();
|
||||
|
@ -113,7 +113,7 @@ public class TwoPartyTest {
|
|||
|
||||
@Test
|
||||
public void testBasic() throws ExecutionException, InterruptedException, IOException {
|
||||
var server = new TwoPartyRpcSystem(this.serverNetwork, new TestCap0Impl());
|
||||
var server = new RpcSystem<>(this.serverNetwork, new TestCap0Impl());
|
||||
|
||||
var demo = new Demo.TestCap0.Client(this.client.bootstrap());
|
||||
var request = demo.testMethod0Request();
|
||||
|
@ -130,7 +130,7 @@ public class TwoPartyTest {
|
|||
|
||||
@Test
|
||||
public void testBasicCleanup() throws ExecutionException, InterruptedException, TimeoutException {
|
||||
var server = new TwoPartyRpcSystem(this.serverNetwork, new TestCap0Impl());
|
||||
var server = new RpcSystem<>(this.serverNetwork, new TestCap0Impl());
|
||||
var demo = new Demo.TestCap0.Client(this.client.bootstrap());
|
||||
var request = demo.testMethod0Request();
|
||||
var params = request.getParams();
|
||||
|
@ -146,7 +146,7 @@ public class TwoPartyTest {
|
|||
|
||||
@Test
|
||||
public void testShutdown() throws InterruptedException, IOException {
|
||||
var server = new TwoPartyRpcSystem(this.serverNetwork, new TestCap0Impl());
|
||||
var server = new RpcSystem<>(this.serverNetwork, new TestCap0Impl());
|
||||
var demo = new Demo.TestCap0.Client(this.client.bootstrap());
|
||||
this.clientSocket.shutdownOutput();
|
||||
serverThread.join();
|
||||
|
@ -163,7 +163,7 @@ public class TwoPartyTest {
|
|||
}
|
||||
};
|
||||
|
||||
var rpcSystem = new TwoPartyRpcSystem(this.serverNetwork, impl);
|
||||
var rpcSystem = new RpcSystem<>(this.serverNetwork, impl);
|
||||
|
||||
var demoClient = new Demo.TestCap0.Client(this.client.bootstrap());
|
||||
{
|
||||
|
@ -188,7 +188,7 @@ public class TwoPartyTest {
|
|||
public void testReturnCap() throws ExecutionException, InterruptedException {
|
||||
// send a capability back from the server to the client
|
||||
var capServer = new TestCap0Impl();
|
||||
var rpcSystem = new TwoPartyRpcSystem(this.serverNetwork, capServer);
|
||||
var rpcSystem = new RpcSystem<>(this.serverNetwork, capServer);
|
||||
var demoClient = new Demo.TestCap0.Client(this.client.bootstrap());
|
||||
var request = demoClient.testMethod1Request();
|
||||
var response = request.send();
|
||||
|
|
Loading…
Reference in a new issue