disconnect test networks on close
This commit is contained in:
parent
6e066d43c2
commit
330eb50cf0
2 changed files with 25 additions and 12 deletions
|
@ -53,12 +53,8 @@ public class RpcTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class TestNetworkAdapter
|
static final class TestNetworkAdapter
|
||||||
implements VatNetwork<Test.TestSturdyRef.Reader> {
|
implements VatNetwork<Test.TestSturdyRef.Reader>,
|
||||||
|
AutoCloseable {
|
||||||
@Override
|
|
||||||
public CompletableFuture<VatNetwork.Connection<Test.TestSturdyRef.Reader>> baseAccept() {
|
|
||||||
return this.accept().thenApply(conn -> conn);
|
|
||||||
}
|
|
||||||
|
|
||||||
class Connection implements VatNetwork.Connection<Test.TestSturdyRef.Reader> {
|
class Connection implements VatNetwork.Connection<Test.TestSturdyRef.Reader> {
|
||||||
|
|
||||||
|
@ -82,6 +78,14 @@ public class RpcTest {
|
||||||
other.partner = this;
|
other.partner = this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void disconnect(Exception exc) {
|
||||||
|
while (!fulfillers.isEmpty()) {
|
||||||
|
fulfillers.remove().completeExceptionally(exc);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.networkException = exc;
|
||||||
|
}
|
||||||
|
|
||||||
TestNetwork getNetwork() {
|
TestNetwork getNetwork() {
|
||||||
return network;
|
return network;
|
||||||
}
|
}
|
||||||
|
@ -170,10 +174,6 @@ public class RpcTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
var msg = newOutgoingMessage(0);
|
|
||||||
var abort = msg.getBody().initAs(RpcProtocol.Message.factory).initAbort();
|
|
||||||
FromException(RpcException.disconnected(""), abort);
|
|
||||||
msg.send();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -194,6 +194,18 @@ public class RpcTest {
|
||||||
return new Connection(isClient, peerId);
|
return new Connection(isClient, peerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<VatNetwork.Connection<Test.TestSturdyRef.Reader>> baseAccept() {
|
||||||
|
return this.accept().thenApply(conn -> conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
var exc = RpcException.failed("Network was destroyed");
|
||||||
|
for (var conn: this.connections.values()) {
|
||||||
|
conn.disconnect(exc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VatNetwork.Connection<Test.TestSturdyRef.Reader> connect(Test.TestSturdyRef.Reader refId) {
|
public VatNetwork.Connection<Test.TestSturdyRef.Reader> connect(Test.TestSturdyRef.Reader refId) {
|
||||||
var hostId = refId.getHostId().getHost().toString();
|
var hostId = refId.getHostId().getHost().toString();
|
||||||
|
|
|
@ -12,6 +12,7 @@ import java.nio.channels.AsynchronousSocketChannel;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
@SuppressWarnings({"OverlyCoupledMethod", "OverlyLongMethod"})
|
||||||
public class TwoPartyTest {
|
public class TwoPartyTest {
|
||||||
|
|
||||||
static final class PipeThread {
|
static final class PipeThread {
|
||||||
|
@ -163,8 +164,8 @@ public class TwoPartyTest {
|
||||||
var pipelineRequest2 = new Test.TestExtends.Client(promise.getOutBox().getCap()).graultRequest();
|
var pipelineRequest2 = new Test.TestExtends.Client(promise.getOutBox().getCap()).graultRequest();
|
||||||
var pipelinePromise2 = pipelineRequest2.send();
|
var pipelinePromise2 = pipelineRequest2.send();
|
||||||
|
|
||||||
Assert.assertThrows(Exception.class, () -> pipelinePromise.join());
|
Assert.assertThrows(Exception.class, pipelinePromise::join);
|
||||||
Assert.assertThrows(Exception.class, () -> pipelinePromise2.join());
|
Assert.assertThrows(Exception.class, pipelinePromise2::join);
|
||||||
|
|
||||||
Assert.assertEquals(3, callCount.value());
|
Assert.assertEquals(3, callCount.value());
|
||||||
Assert.assertEquals(1, chainedCallCount.value());
|
Assert.assertEquals(1, chainedCallCount.value());
|
||||||
|
|
Loading…
Reference in a new issue