use java11 switches and completablefutures

This commit is contained in:
Vaci Koblizek 2021-03-18 09:57:08 +00:00
parent 1bf1228756
commit 950ba824b9
5 changed files with 92 additions and 82 deletions

View file

@ -58,8 +58,8 @@ public class RpcDumper {
} }
String dump(RpcProtocol.Message.Reader message, RpcTwoPartyProtocol.Side sender) { String dump(RpcProtocol.Message.Reader message, RpcTwoPartyProtocol.Side sender) {
return switch (message.which()) { switch (message.which()) {
case CALL -> { case CALL: {
var call = message.getCall(); var call = message.getCall();
var iface = call.getInterfaceId(); var iface = call.getInterfaceId();
@ -93,14 +93,14 @@ public class RpcDumper {
} }
}*/ }*/
yield sender.name() + "(" + call.getQuestionId() + "): call " + return sender.name() + "(" + call.getQuestionId() + "): call " +
call.getTarget() + " <- " + interfaceName + "." + call.getTarget() + " <- " + interfaceName + "." +
methodName + " " + params.getClass().getName() + " caps:[" + methodName + " " + params.getClass().getName() + " caps:[" +
dumpCaps(payload.getCapTable()) + "]" + dumpCaps(payload.getCapTable()) + "]" +
(sendResultsTo.isCaller() ? "" : (" sendResultsTo:" + sendResultsTo)); (sendResultsTo.isCaller() ? "" : (" sendResultsTo:" + sendResultsTo));
} }
case RETURN -> { case RETURN: {
var ret = message.getReturn(); var ret = message.getReturn();
var text = sender.name() + "(" + ret.getAnswerId() + "): "; var text = sender.name() + "(" + ret.getAnswerId() + "): ";
var returnType = getReturnType( var returnType = getReturnType(
@ -108,56 +108,60 @@ public class RpcDumper {
? RpcTwoPartyProtocol.Side.SERVER ? RpcTwoPartyProtocol.Side.SERVER
: RpcTwoPartyProtocol.Side.CLIENT, : RpcTwoPartyProtocol.Side.CLIENT,
ret.getAnswerId()); ret.getAnswerId());
yield switch (ret.which()) { switch (ret.which()) {
case RESULTS -> { case RESULTS: {
var payload = ret.getResults(); var payload = ret.getResults();
yield text + "return " + payload + return text + "return " + payload +
" caps:[" + dumpCaps(payload.getCapTable()) + "]"; " caps:[" + dumpCaps(payload.getCapTable()) + "]";
} }
case EXCEPTION -> { case EXCEPTION: {
var exc = ret.getException(); var exc = ret.getException();
yield text + "exception " return text + "exception "
+ exc.getType().toString() + + exc.getType().toString() +
" " + exc.getReason(); " " + exc.getReason();
} }
default -> { default: {
yield text + ret.which().name(); return text + ret.which().name();
} }
}; }
} }
case BOOTSTRAP -> { case BOOTSTRAP: {
var restore = message.getBootstrap(); var restore = message.getBootstrap();
setReturnType(sender, restore.getQuestionId(), 0); setReturnType(sender, restore.getQuestionId(), 0);
yield sender.name() + "(" + restore.getQuestionId() + "): bootstrap " + return sender.name() + "(" + restore.getQuestionId() + "): bootstrap " +
restore.getDeprecatedObjectId(); restore.getDeprecatedObjectId();
} }
case ABORT -> { case ABORT: {
var abort = message.getAbort(); var abort = message.getAbort();
yield sender.name() + ": abort " return sender.name() + ": abort "
+ abort.getType().toString() + abort.getType().toString()
+ " \"" + abort.getReason().toString() + "\""; + " \"" + abort.getReason().toString() + "\"";
} }
case RESOLVE -> { case RESOLVE: {
var resolve = message.getResolve(); var resolve = message.getResolve();
var id = resolve.getPromiseId(); var id = resolve.getPromiseId();
var text = switch (resolve.which()) { String text;
case CAP -> { switch (resolve.which()) {
case CAP: {
var cap = resolve.getCap(); var cap = resolve.getCap();
yield cap.which().toString(); text = cap.which().toString();
break;
} }
case EXCEPTION -> { case EXCEPTION: {
var exc = resolve.getException(); var exc = resolve.getException();
yield exc.getType().toString() + ": " + exc.getReason().toString(); text = exc.getType().toString() + ": " + exc.getReason().toString();
break;
} }
default -> resolve.which().toString(); default: text = resolve.which().toString(); break;
}; };
yield sender.name() + "(" + id + "): resolve " + text; return sender.name() + "(" + id + "): resolve " + text;
} }
default -> sender.name() + ": " + message.which().name(); default:
}; return sender.name() + ": " + message.which().name();
}
} }
} }

View file

@ -343,7 +343,7 @@ final class RpcState<VatId> {
} }
var shutdownPromise = this.connection.shutdown() var shutdownPromise = this.connection.shutdown()
.exceptionallyCompose(ioExc -> { .exceptionally(ioExc -> {
assert !(ioExc instanceof IOException); assert !(ioExc instanceof IOException);
@ -352,17 +352,18 @@ final class RpcState<VatId> {
// Don't report disconnects as an error // Don't report disconnects as an error
if (rpcExc.getType() == RpcException.Type.DISCONNECTED) { if (rpcExc.getType() == RpcException.Type.DISCONNECTED) {
return CompletableFuture.completedFuture(null); return null;
} }
} }
else if (ioExc instanceof CompletionException) { else if (ioExc instanceof CompletionException) {
var compExc = (CompletionException)ioExc; var compExc = (CompletionException)ioExc;
if (compExc.getCause() instanceof ClosedChannelException) { if (compExc.getCause() instanceof ClosedChannelException) {
return CompletableFuture.completedFuture(null); return null;
} }
} }
return CompletableFuture.failedFuture(ioExc); return null;
//return CompletableFuture.failedFuture(ioExc);
}); });
this.disconnected = networkExc; this.disconnected = networkExc;
@ -443,16 +444,16 @@ final class RpcState<VatId> {
var reader = message.getBody().getAs(RpcProtocol.Message.factory); var reader = message.getBody().getAs(RpcProtocol.Message.factory);
LOGGER.fine(() -> this.toString() + ": < RPC message: " + reader.which().toString()); LOGGER.fine(() -> this.toString() + ": < RPC message: " + reader.which().toString());
switch (reader.which()) { switch (reader.which()) {
case UNIMPLEMENTED -> handleUnimplemented(reader.getUnimplemented()); case UNIMPLEMENTED: handleUnimplemented(reader.getUnimplemented()); break;
case ABORT -> handleAbort(reader.getAbort()); case ABORT: handleAbort(reader.getAbort()); break;
case BOOTSTRAP -> handleBootstrap(reader.getBootstrap()); case BOOTSTRAP: handleBootstrap(reader.getBootstrap()); break;
case CALL -> handleCall(message, reader.getCall()); case CALL: handleCall(message, reader.getCall()); break;
case RETURN -> handleReturn(message, reader.getReturn()); case RETURN: handleReturn(message, reader.getReturn()); break;
case FINISH -> handleFinish(reader.getFinish()); case FINISH: handleFinish(reader.getFinish()); break;
case RESOLVE -> handleResolve(message, reader.getResolve()); case RESOLVE: handleResolve(message, reader.getResolve()); break;
case DISEMBARGO -> handleDisembargo(reader.getDisembargo()); case DISEMBARGO: handleDisembargo(reader.getDisembargo()); break;
case RELEASE -> handleRelease(reader.getRelease()); case RELEASE: handleRelease(reader.getRelease()); break;
default -> { default: {
LOGGER.warning(() -> this.toString() + ": < Unhandled RPC message: " + reader.which().toString()); LOGGER.warning(() -> this.toString() + ": < Unhandled RPC message: " + reader.which().toString());
if (!isDisconnected()) { if (!isDisconnected()) {
// boomin' back atcha // boomin' back atcha
@ -569,9 +570,9 @@ final class RpcState<VatId> {
boolean redirectResults; boolean redirectResults;
switch (call.getSendResultsTo().which()) { switch (call.getSendResultsTo().which()) {
case CALLER -> redirectResults = false; case CALLER: redirectResults = false; break;
case YOURSELF -> redirectResults = true; case YOURSELF: redirectResults = true; break;
default -> { default: {
assert false : "Unsupported 'Call.sendResultsTo'."; assert false : "Unsupported 'Call.sendResultsTo'.";
return; return;
} }
@ -785,15 +786,17 @@ final class RpcState<VatId> {
// This import is an unfulfilled promise. // This import is an unfulfilled promise.
switch (resolve.which()) { switch (resolve.which()) {
case CAP -> { case CAP:{
var cap = receiveCap(resolve.getCap(), message.getAttachedFds()); var cap = receiveCap(resolve.getCap(), message.getAttachedFds());
imp.promise.complete(cap); imp.promise.complete(cap);
break;
} }
case EXCEPTION -> { case EXCEPTION: {
var exc = ToException(resolve.getException()); var exc = ToException(resolve.getException());
imp.promise.completeExceptionally(exc); imp.promise.completeExceptionally(exc);
break;
} }
default -> { default: {
assert false : "Unknown 'Resolve' type."; assert false : "Unknown 'Resolve' type.";
} }
} }
@ -1157,7 +1160,7 @@ final class RpcState<VatId> {
ClientHook getMessageTarget(RpcProtocol.MessageTarget.Reader target) { ClientHook getMessageTarget(RpcProtocol.MessageTarget.Reader target) {
switch (target.which()) { switch (target.which()) {
case IMPORTED_CAP -> { case IMPORTED_CAP: {
var exp = exports.find(target.getImportedCap()); var exp = exports.find(target.getImportedCap());
if (exp != null) { if (exp != null) {
return exp.clientHook; return exp.clientHook;
@ -1167,7 +1170,7 @@ final class RpcState<VatId> {
return null; return null;
} }
} }
case PROMISED_ANSWER -> { case PROMISED_ANSWER: {
var promisedAnswer = target.getPromisedAnswer(); var promisedAnswer = target.getPromisedAnswer();
var questionId = promisedAnswer.getQuestionId(); var questionId = promisedAnswer.getQuestionId();
var base = answers.put(questionId); var base = answers.put(questionId);
@ -1186,7 +1189,7 @@ final class RpcState<VatId> {
} }
return pipeline.getPipelinedCap(ops); return pipeline.getPipelinedCap(ops);
} }
default -> { default: {
assert false: "Unknown message target type. " + target.which(); assert false: "Unknown message target type. " + target.which();
return null; return null;
} }
@ -1570,28 +1573,30 @@ final class RpcState<VatId> {
} }
return this.clientMap.computeIfAbsent(key, k -> { return this.clientMap.computeIfAbsent(key, k -> {
return switch (state) { switch (state) {
case WAITING -> { case WAITING: {
var pipelineClient = new PipelineClient(this.questionRef, ops); var pipelineClient = new PipelineClient(this.questionRef, ops);
if (this.redirectLater == null) { if (this.redirectLater == null) {
// This pipeline will never get redirected, so just return the PipelineClient. // This pipeline will never get redirected, so just return the PipelineClient.
yield pipelineClient; return pipelineClient;
} }
assert this.resolveSelf != null; assert this.resolveSelf != null;
var resolutionPromise = this.resolveSelf.thenApply( var resolutionPromise = this.resolveSelf.thenApply(
response -> response.getResults().getPipelinedCap(ops)); response -> response.getResults().getPipelinedCap(ops));
yield new PromiseClient(pipelineClient, resolutionPromise, null); return new PromiseClient(pipelineClient, resolutionPromise, null);
} }
case RESOLVED -> { case RESOLVED: {
assert this.resolved != null; assert this.resolved != null;
yield this.resolved.getResults().getPipelinedCap(ops); return this.resolved.getResults().getPipelinedCap(ops);
} }
case BROKEN -> { case BROKEN: {
assert this.broken != null; assert this.broken != null;
yield Capability.newBrokenCap(broken); return Capability.newBrokenCap(broken);
} }
}; };
assert false;
return null;
}); });
} }
@ -2066,11 +2071,11 @@ final class RpcState<VatId> {
var type = RpcProtocol.Exception.Type.FAILED; var type = RpcProtocol.Exception.Type.FAILED;
if (exc instanceof RpcException) { if (exc instanceof RpcException) {
var rpcExc = (RpcException) exc; var rpcExc = (RpcException) exc;
type = switch (rpcExc.getType()) { switch (rpcExc.getType()) {
case FAILED -> RpcProtocol.Exception.Type.FAILED; case FAILED: type = RpcProtocol.Exception.Type.FAILED; break;
case OVERLOADED -> RpcProtocol.Exception.Type.OVERLOADED; case OVERLOADED: type = RpcProtocol.Exception.Type.OVERLOADED; break;
case DISCONNECTED -> RpcProtocol.Exception.Type.DISCONNECTED; case DISCONNECTED: type = RpcProtocol.Exception.Type.DISCONNECTED; break;
case UNIMPLEMENTED -> RpcProtocol.Exception.Type.UNIMPLEMENTED; case UNIMPLEMENTED: type = RpcProtocol.Exception.Type.UNIMPLEMENTED; break;
}; };
} }
builder.setType(type); builder.setType(type);
@ -2081,11 +2086,12 @@ final class RpcState<VatId> {
} }
static RpcException ToException(RpcProtocol.Exception.Reader reader) { static RpcException ToException(RpcProtocol.Exception.Reader reader) {
var type = switch (reader.getType()) { var type = RpcException.Type.FAILED;
case OVERLOADED -> RpcException.Type.OVERLOADED; switch (reader.getType()) {
case DISCONNECTED -> RpcException.Type.DISCONNECTED; case OVERLOADED: type = RpcException.Type.OVERLOADED; break;
case UNIMPLEMENTED -> RpcException.Type.UNIMPLEMENTED; case DISCONNECTED: type = RpcException.Type.DISCONNECTED; break;
default -> RpcException.Type.FAILED; case UNIMPLEMENTED: type = RpcException.Type.UNIMPLEMENTED; break;
default: type = RpcException.Type.FAILED; break;
}; };
return new RpcException(type, reader.getReason().toString()); return new RpcException(type, reader.getReason().toString());
} }

View file

@ -537,9 +537,9 @@ public class RpcTest {
AtomicBoolean returned = new AtomicBoolean(false); AtomicBoolean returned = new AtomicBoolean(false);
var req = client.callHeldRequest().send().exceptionallyCompose(exc -> { var req = client.callHeldRequest().send().exceptionally(exc -> {
returned.set(true); returned.set(true);
return CompletableFuture.failedFuture(exc); return null;
}).thenAccept(results -> { }).thenAccept(results -> {
returned.set(true); returned.set(true);
}); });

View file

@ -306,10 +306,10 @@ public final class Capability {
} }
else { else {
this.blocked = true; this.blocked = true;
return result.promise.exceptionallyCompose(exc -> { return result.promise.whenComplete((void_, exc) -> {
this.brokenException = exc; if (exc != null) {
return CompletableFuture.failedFuture(exc); this.brokenException = exc;
}).whenComplete((void_, exc) -> { }
this.unblock(); this.unblock();
}); });
} }

View file

@ -21,15 +21,15 @@ public class RemotePromise<Results>
public RemotePromise(CompletableFuture<Response<Results>> promise, public RemotePromise(CompletableFuture<Response<Results>> promise,
AnyPointer.Pipeline pipeline) { AnyPointer.Pipeline pipeline) {
this.response = promise this.response = promise.whenComplete((response, exc) -> {
.thenApply(response -> { if (exc != null) {
this.complete(response.getResults()); this.completeExceptionally(exc);
return response; }
}) else {
.exceptionallyCompose(exc -> { this.complete(response.getResults());
this.completeExceptionally(exc); }
return CompletableFuture.failedFuture(exc); });
});
this.pipeline = pipeline; this.pipeline = pipeline;
} }