From 8eacc8cadacf4ab5710a3c85162ba4a5a018c712 Mon Sep 17 00:00:00 2001 From: Vaci Koblizek Date: Fri, 30 Oct 2020 18:16:49 +0000 Subject: [PATCH] major refactor of RemotePromise and Pipeline --- compiler/src/main/cpp/capnpc-java.c++ | 418 ++++++++++++------ .../main/java/org/capnproto/AnyPointer.java | 61 ++- .../main/java/org/capnproto/CallContext.java | 4 +- .../main/java/org/capnproto/Capability.java | 180 +++++--- .../main/java/org/capnproto/ClientHook.java | 9 +- .../src/main/java/org/capnproto/Pipeline.java | 41 -- .../main/java/org/capnproto/PipelineBase.java | 5 + .../java/org/capnproto/PipelineFactory.java | 2 +- .../main/java/org/capnproto/PipelineHook.java | 1 + .../main/java/org/capnproto/PipelineImpl.java | 32 ++ .../java/org/capnproto/RemotePromise.java | 32 +- .../src/main/java/org/capnproto/Request.java | 58 ++- .../src/main/java/org/capnproto/Response.java | 10 +- .../main/java/org/capnproto/RpcDumper.java | 10 +- .../src/main/java/org/capnproto/RpcState.java | 19 +- 15 files changed, 580 insertions(+), 302 deletions(-) delete mode 100644 runtime/src/main/java/org/capnproto/Pipeline.java create mode 100644 runtime/src/main/java/org/capnproto/PipelineBase.java create mode 100644 runtime/src/main/java/org/capnproto/PipelineImpl.java diff --git a/compiler/src/main/cpp/capnpc-java.c++ b/compiler/src/main/cpp/capnpc-java.c++ index 723901d..8b2387d 100644 --- a/compiler/src/main/cpp/capnpc-java.c++ +++ b/compiler/src/main/cpp/capnpc-java.c++ @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -334,9 +335,23 @@ private: return kj::strTree(javaFullName(type.asStruct()), ".", suffix); } } - case schema::Type::INTERFACE: - return javaFullName(type.asInterface()); - + + case schema::Type::INTERFACE: { + auto interfaceSchema = type.asInterface(); + if (interfaceSchema.getProto().getIsGeneric()) { + auto typeArgs = getTypeArguments(interfaceSchema, interfaceSchema, kj::str(suffix)); + return kj::strTree( + javaFullName(interfaceSchema), ".", suffix, "<", + kj::StringTree(KJ_MAP(arg, typeArgs){ + return kj::strTree(arg); + }, ", "), + ">" + ); + } else { + return kj::strTree(javaFullName(type.asInterface()), ".", suffix); + } + } + case schema::Type::LIST: { auto elementType = type.asList().getElementType(); @@ -394,7 +409,12 @@ private: "_", kj::hex(brandParam->scopeId), "_", suffix); } else { - return kj::strTree("org.capnproto.AnyPointer.", suffix); + switch (type.whichAnyPointerKind()) { + case schema::Type::AnyPointer::Unconstrained::CAPABILITY: + return kj::strTree("org.capnproto.Capability.", suffix); + default: + return kj::strTree("org.capnproto.AnyPointer.", suffix); + } } } } @@ -721,7 +741,8 @@ private: STRUCT, LIST, INTERFACE, - ANY_POINTER + ANY_POINTER, + BRAND_PARAMETER }; kj::StringTree makeEnumGetter(EnumSchema schema, uint offset, kj::String defaultMaskParam, int indent) { @@ -753,7 +774,12 @@ private: "_", kj::hex(brandParam->scopeId), "_Factory"); } else { - return kj::str("org.capnproto.AnyPointer.factory"); + switch (type.whichAnyPointerKind()) { + case schema::Type::AnyPointer::Unconstrained::CAPABILITY: + return kj::str("org.capnproto.Capability.factory"); + default: + return kj::str("org.capnproto.AnyPointer.factory"); + } } } case schema::Type::STRUCT : { @@ -908,6 +934,7 @@ private: auto typeBody = slot.getType(); auto defaultBody = slot.getDefaultValue(); + switch (typeBody.which()) { case schema::Type::VOID: kind = FieldKind::PRIMITIVE; @@ -992,10 +1019,27 @@ private: kind = FieldKind::INTERFACE; break; case schema::Type::ANY_POINTER: - kind = FieldKind::ANY_POINTER; if (defaultBody.hasAnyPointer()) { defaultOffset = field.getDefaultValueSchemaOffset(); } + if (field.getType().getBrandParameter() != nullptr && false) { + kind = FieldKind::BRAND_PARAMETER; + } else { + kind = FieldKind::ANY_POINTER; + switch (field.getType().whichAnyPointerKind()) { + case schema::Type::AnyPointer::Unconstrained::ANY_KIND: + kind = FieldKind::ANY_POINTER; + break; + case schema::Type::AnyPointer::Unconstrained::STRUCT: + kind = FieldKind::STRUCT; + break; + case schema::Type::AnyPointer::Unconstrained::LIST: + kind = FieldKind::LIST; + case schema::Type::AnyPointer::Unconstrained::CAPABILITY: + kind = FieldKind::INTERFACE; + break; + } + } break; } @@ -1050,7 +1094,8 @@ private: } else if (kind == FieldKind::INTERFACE) { auto factoryArg = makeFactoryArg(field.getType()); - auto clientType = kj::str(typeName(field.getType()), ".Client"); + auto clientType = typeName(field.getType(), kj::str("Client")).flatten(); + auto serverType = typeName(field.getType(), kj::str("Server")).flatten(); return FieldText { kj::strTree( @@ -1080,11 +1125,16 @@ private: unionDiscrim.set, spaces(indent), " _setPointerField(", factoryArg, ", ", offset, ", value);\n", spaces(indent), " }\n", - "\n"), + spaces(indent), " public void set", titleCase, "(", serverType, " value) {\n", + spaces(indent), " this.set", titleCase, "(new ", clientType, "(value));\n", + spaces(indent), " }\n" + ), kj::strTree( - spaces(indent), " public ", clientType, " get", titleCase, "() {\n", - spaces(indent), " return new ", clientType, "(this.getPointerField((short)", offset, ").asCap());\n", + spaces(indent), " default ", clientType, " get", titleCase, "() {\n", + spaces(indent), " return new ", clientType, "(\n", + spaces(indent), " this.typelessPipeline().getPointerField((short)", offset, ").asCap()\n", + spaces(indent), " );\n", spaces(indent), " }\n" ) }; @@ -1145,7 +1195,8 @@ private: auto typeParamVec = getTypeParameters(field.getContainingStruct()); auto factoryArg = makeFactoryArg(field.getType()); - + auto pipelineType = typeName(field.getType(), kj::str("Pipeline")).flatten(); + return FieldText { kj::strTree( kj::mv(unionDiscrim.readerIsDef), @@ -1194,6 +1245,12 @@ private: spaces(indent), " return ", "_initPointerField(", factoryArg, ",", offset, ", 0);\n", spaces(indent), " }\n"), + + kj::strTree( + spaces(indent), " default ", pipelineType, " get", titleCase, "() {\n", + spaces(indent), " var pipeline = this.typelessPipeline().getPointerField((short)", offset, ");\n", + spaces(indent), " return () -> pipeline;\n", + spaces(indent), " }\n") }; } else if (kind == FieldKind::BLOB) { @@ -1374,8 +1431,6 @@ private: spaces(indent), " }\n") ) ), - - }; } else { KJ_UNREACHABLE; @@ -1479,8 +1534,8 @@ private: ",(short)", structNode.getPointerCount(), ");\n"), spaces(indent), " public static final class Factory", factoryTypeParams, "\n", - spaces(indent), " extends org.capnproto.StructFactory\n", - spaces(indent), " implements org.capnproto.PipelineFactory {\n", + spaces(indent), " extends org.capnproto.StructFactory {\n", + //spaces(indent), " implements org.capnproto.PipelineFactory {\n", factoryMembers.flatten(), spaces(indent), " public Factory(", factoryArgs.flatten(), @@ -1515,13 +1570,13 @@ private: (hasTypeParams ? kj::strTree("this") : kj::strTree()), ");\n", spaces(indent), " }\n", - spaces(indent), " public Pipeline", readerTypeParams, " newPipeline(org.capnproto.RemotePromise promise) {\n", - spaces(indent), " return new Pipeline", readerTypeParamsInferred, "(", - kj::StringTree(KJ_MAP(p, typeParamVec) { - return kj::strTree(p, "_Factory"); - }, ", "), - (hasTypeParams ? ", ": ""), "promise);\n", - spaces(indent), " }\n", + //spaces(indent), " public Pipeline", readerTypeParams, " newPipeline(org.capnproto.RemotePromise promise, org.capnproto.PipelineImpl typeless) {\n", + //spaces(indent), " return new Pipeline", readerTypeParamsInferred, "(", + //kj::StringTree(KJ_MAP(p, typeParamVec) { + // return kj::strTree(p, "_Factory"); + // }, ", "), + //(hasTypeParams ? ", ": ""), "promise, typeless);\n", + //spaces(indent), " }\n", spaces(indent), " }\n", (hasTypeParams ? @@ -1609,17 +1664,23 @@ private: spaces(indent), " }\n"), KJ_MAP(n, nestedTypeDecls) { return kj::mv(n); }, - spaces(indent), " public static class Pipeline", readerTypeParams, " extends org.capnproto.Pipeline {\n", - spaces(indent), " public Pipeline(", - KJ_MAP(p, typeParamVec) { - return kj::strTree("org.capnproto.PointerFactory ", p, "_Factory,"); - }, " org.capnproto.RemotePromise remotePromise) {\n", - spaces(indent), " super(org.capnproto.RemotePromise.fromTypeless(", factoryRef, ", remotePromise));\n", - spaces(indent), " }\n", - KJ_MAP(f, fieldTexts) { return kj::mv(f.pipelineMethodDecls); }, + //spaces(indent), " public interface Pipeline", readerTypeParams, " extends org.capnproto.RemotePromise {\n", + spaces(indent), " public interface Pipeline", readerTypeParams, " extends org.capnproto.PipelineBase {\n", + //spaces(indent), " private final org.capnproto.PipelineImpl typeless;\n", + //spaces(indent), " org.capnproto.AnyPointer.Pipeline getTypeless();\n", + //spaces(indent), " public Pipeline(", + //#KJ_MAP(p, typeParamVec) { + // return kj::strTree("org.capnproto.PointerFactory ", p, "_Factory,"); + //}, " org.capnproto.RemotePromise remotePromise,\n", + //spaces(indent), " org.capnproto.PipelineImpl typelessPipeline) {\n", + //spaces(indent), " super(org.capnproto.RemotePromise.fromTypeless(", factoryRef, ", remotePromise));\n", + //spaces(indent), " this.typeless = typelessPipeline;\n", + //spaces(indent), " }\n", + KJ_MAP(f, fieldTexts) { + return kj::mv(f.pipelineMethodDecls); + }, spaces(indent), " }\n", - spaces(indent), "}\n", - "\n"), + spaces(indent), "}\n"), kj::strTree(), kj::strTree() @@ -1633,36 +1694,64 @@ private: kj::StringTree clientServerDefs; }; + struct ExtendInfo { + kj::String typeName; + uint64_t id; + }; + + + void getTransitiveSuperclasses(InterfaceSchema schema, std::map& map) { + if (map.insert(std::make_pair(schema.getProto().getId(), schema)).second) { + for (auto sup: schema.getSuperclasses()) { + getTransitiveSuperclasses(sup, map); + } + } + } + InterfaceText makeInterfaceText(kj::StringPtr scope, kj::StringPtr name, InterfaceSchema schema, kj::Array nestedTypeDecls, int indent) { + + auto sp = spaces(indent); auto fullName = kj::str(scope, name); auto methods = KJ_MAP(m, schema.getMethods()) { - return makeMethodText(fullName, m); + return makeMethodText(fullName, m, indent+2); }; auto proto = schema.getProto(); auto hexId = kj::hex(proto.getId()); - auto typeName = javaFullName(schema); + auto superclasses = KJ_MAP(superclass, schema.getSuperclasses()) { + return ExtendInfo { + kj::str(javaFullName(superclass, nullptr)), + superclass.getProto().getId() + }; + }; - - kj::String genericParamTypes; - if (proto.getIsGeneric()) { - auto typeParams = getTypeParameters(schema); - genericParamTypes = kj::strTree( - "<", - kj::StringTree( - KJ_MAP(arg, typeParams) { - return kj::strTree(arg); - }, ", "), - ">").flatten(); + kj::Array transitiveSuperclasses; + { + std::map map; + getTransitiveSuperclasses(schema, map); + map.erase(schema.getProto().getId()); + transitiveSuperclasses = KJ_MAP(entry, map) { + return ExtendInfo { + kj::str(javaFullName(entry.second, nullptr)), + entry.second.getProto().getId() + }; + }; } - else { - genericParamTypes = kj::str(""); - } - + + auto typeNameVec = javaFullName(schema); auto typeParamVec = getTypeParameters(schema); bool hasTypeParams = typeParamVec.size() > 0; + + kj::String genericParamTypes = proto.getIsGeneric() + ? kj::strTree("<", + kj::StringTree( + KJ_MAP(arg, typeParamVec) { + return kj::strTree(arg); + }, ", "), + ">").flatten() + : kj::str(); kj::StringTree readerTypeParamsTree; kj::StringTree builderTypeParamsTree; @@ -1701,6 +1790,10 @@ private: return kj::strTree(spaces(indent), " final org.capnproto.PointerFactory<", p, "_Builder, ", p, "_Reader> ", p, "_Factory;\n"); }); + auto factoryConstructorParams = kj::StringTree(KJ_MAP(p, typeParamVec) { + return kj::strTree(p, "_Factory"); + }, ", ").flatten(); + kj::String factoryRef = hasTypeParams ? kj::str(kj::strTree("newFactory(", kj::StringTree(KJ_MAP(p, typeParamVec) { @@ -1712,65 +1805,99 @@ private: return InterfaceText { kj::strTree( - spaces(indent), "public static class ", name, genericParamTypes, " {\n", - - spaces(indent), " public static final class Factory", factoryTypeParams, "\n", - spaces(indent), " extends org.capnproto.Capability.Factory {\n", + sp, "public static class ", name, genericParamTypes, " {\n", + sp, " public static final class Factory", factoryTypeParams, "\n", + sp, " extends org.capnproto.Capability.Factory {\n", factoryMembers.flatten(), - spaces(indent), " public Factory(", + sp, " public Factory(", factoryArgs.flatten(), ") {\n", KJ_MAP(p, typeParamVec) { - return kj::strTree(spaces(indent), " this.", p, "_Factory = ", p, "_Factory;\n"); + return kj::strTree(sp, " this.", p, "_Factory = ", p, "_Factory;\n"); }, - spaces(indent), " }\n", - - - //spaces(indent), " public static final class Factory extends org.capnproto.Capability.Factory {\n", - spaces(indent), " public final Client newClient(org.capnproto.ClientHook hook) {\n", - spaces(indent), " return new Client(hook);\n", - spaces(indent), " }\n", - spaces(indent), " }\n", + sp, " }\n", + sp, " public final Client newClient(org.capnproto.ClientHook hook) {\n", + sp, " return new Client(hook);\n", + sp, " }\n", + sp, " }\n", "\n", - spaces(indent), " public static final Factory factory = new Factory();\n", + (hasTypeParams + ? kj::strTree( + sp, " public static final ", factoryTypeParams, "Factory", factoryTypeParams, "\n", + sp, " newFactory(", factoryArgs.flatten(), ") {\n", + sp, " return new Factory<>(", factoryConstructorParams, ");\n", + sp, " }\n" + ) + : kj::strTree( + sp, " public static final Factory factory = new Factory();\n" + ) + ), "\n", - spaces(indent), " public static class Client extends org.capnproto.Capability.Client {\n", - spaces(indent), " public Client() {}\n", - spaces(indent), " public Client(org.capnproto.ClientHook hook) { super(hook); }\n", - spaces(indent), " public Client(org.capnproto.Capability.Client cap) { super(cap); }\n", - spaces(indent), " public Client(Server server) { super(server); }\n", - spaces(indent), " public Client(java.util.concurrent.CompletionStage promise) {\n", - spaces(indent), " super(promise);\n", - spaces(indent), " }\n", + sp, " public static class Client\n", + (superclasses.size() == 0 + ? kj::str(sp, " extends org.capnproto.Capability.Client ") + : kj::str( + KJ_MAP(s, superclasses) { + return kj::strTree(sp, " extends ", s.typeName, ".Client "); + }) + ), + "{\n", + sp, " public Client() {}\n", + sp, " public Client(org.capnproto.ClientHook hook) { super(hook); }\n", + sp, " public Client(org.capnproto.Capability.Client cap) { super(cap); }\n", + sp, " public Client(Server server) { super(server); }\n", + sp, " public Client(java.util.concurrent.CompletionStage promise) {\n", + sp, " super(promise);\n", + sp, " }\n", "\n", - KJ_MAP(m, methods) { return kj::mv(m.clientDefs); }, - spaces(indent), " }\n", + sp, " public static final class Methods {\n", + KJ_MAP(m, methods) { return kj::mv(m.clientMethodDefs); }, + sp, " }\n", "\n", - spaces(indent), " public static abstract class Server extends org.capnproto.Capability.Server {\n", - spaces(indent), " protected org.capnproto.DispatchCallResult dispatchCall(\n", - spaces(indent), " long interfaceId, short methodId,\n", - spaces(indent), " org.capnproto.CallContext context) {\n", - spaces(indent), " if (interfaceId == 0x", hexId, "L) {\n", - spaces(indent), " return dispatchCallInternal(methodId, context);\n", - spaces(indent), " }\n", - spaces(indent), " return org.capnproto.Capability.Server.result(\n", - spaces(indent), " org.capnproto.Capability.Server.internalUnimplemented(\"", name, "\", interfaceId));\n", - spaces(indent), " }\n", + KJ_MAP(m, methods) { return kj::mv(m.clientCalls); }, + sp, " }\n", "\n", - spaces(indent), " protected org.capnproto.DispatchCallResult dispatchCallInternal(short methodId, org.capnproto.CallContext context) {\n", - spaces(indent), " switch (methodId) {\n", + sp, " public static abstract class Server\n", + (superclasses.size() == 0 + ? kj::str(sp, " extends org.capnproto.Capability.Server ") + : kj::str( + KJ_MAP(s, superclasses) { + return kj::strTree(sp, " extends ", s.typeName, ".Server "); + }) + ), + "{\n", + sp, " protected org.capnproto.DispatchCallResult dispatchCall(\n", + sp, " long interfaceId, short methodId,\n", + sp, " org.capnproto.CallContext context) {\n", + sp, " if (interfaceId == 0x", hexId, "L) {\n", + sp, " return this.dispatchCallInternal(methodId, context);\n", + sp, " }\n", + KJ_MAP(s, transitiveSuperclasses) { + return kj::strTree( + sp, " else if (interfaceId == 0x", kj::hex(s.id), "L) {\n", + sp, " return super.dispatchCall(interfaceId, methodId, context);\n", + sp, " }\n"); + }, + sp, " else {\n", + sp, " return org.capnproto.Capability.Server.result(\n", + sp, " org.capnproto.Capability.Server.internalUnimplemented(\"", name, "\", interfaceId));\n", + sp, " }\n", + sp, " }\n", + "\n", + sp, " private org.capnproto.DispatchCallResult dispatchCallInternal(short methodId, org.capnproto.CallContext context) {\n", + sp, " switch (methodId) {\n", KJ_MAP(m, methods) { return kj::mv(m.dispatchCase); }, - spaces(indent), " default:\n", - spaces(indent), " return org.capnproto.Capability.Server.result(\n", - spaces(indent), " org.capnproto.Capability.Server.internalUnimplemented(\"", name, "\", 0x", hexId, "L, methodId));\n", - spaces(indent), " }\n", - spaces(indent), " }\n\n", + sp, " default:\n", + sp, " return org.capnproto.Capability.Server.result(\n", + sp, " org.capnproto.Capability.Server.internalUnimplemented(\"", name, "\", 0x", hexId, "L, methodId));\n", + sp, " }\n", + sp, " }\n\n", KJ_MAP(m, methods) { return kj::mv(m.serverDefs); }, - spaces(indent), " }\n", - spaces(indent), "\n", + sp, " }\n", + sp, "\n", KJ_MAP(n, nestedTypeDecls) { return kj::mv(n); }, "\n", - spaces(indent), "}\n", + sp, "}\n", "\n") }; } @@ -1778,12 +1905,15 @@ private: // ----------------------------------------------------------------- struct MethodText { - kj::StringTree clientDefs; + kj::StringTree clientMethodDefs; + kj::StringTree clientCalls; kj::StringTree serverDefs; kj::StringTree dispatchCase; }; - MethodText makeMethodText(kj::StringPtr interfaceName, InterfaceSchema::Method method) { + MethodText makeMethodText(kj::StringPtr interfaceName, InterfaceSchema::Method method, int indent = 0) { + + auto sp = spaces(indent); auto proto = method.getProto(); auto methodName = proto.getName(); auto titleCase = toTitleCase(methodName); @@ -1876,7 +2006,10 @@ private: }, ", "), ")").flatten(); } - + + auto paramBuilder = kj::str(shortParamType, ".Builder"); + auto resultReader = kj::str(shortResultType, ".Reader"); + auto interfaceProto = method.getContainingInterface().getProto(); uint64_t interfaceId = interfaceProto.getId(); auto interfaceIdHex = kj::hex(interfaceId); @@ -1884,45 +2017,80 @@ private: if (isStreaming) { return MethodText { - kj::strTree( - " public org.capnproto.StreamingRequest<", shortParamType, ".Builder> ", methodName, "Request() {\n", - " return newStreamingCall(", paramFactory, ", 0x", interfaceIdHex, "L, (short)", methodId, ");\n" - " }\n"), + // client method defs + kj::strTree(), + // client call kj::strTree( - " protected java.util.concurrent.CompletableFuture ", identifierName, "(org.capnproto.StreamingCallContext<", shortParamType, ".Reader> context) {\n" - " return org.capnproto.Capability.Server.internalUnimplemented(\n" - " \"", interfaceProto.getDisplayName(), "\", \"", methodName, "\",\n" - " 0x", interfaceIdHex, "L, (short)", methodId, ");\n" - " }\n\n"), + sp, "public org.capnproto.StreamingRequest<", shortParamType, ".Builder> ", methodName, "Request() {\n", + sp, " return newStreamingCall(", paramFactory, ", 0x", interfaceIdHex, "L, (short)", methodId, ");\n", + sp, "}\n" + ), + // server defs kj::strTree( - " case ", methodId, ":\n", - " return org.capnproto.Capability.Server.streamResult(\n", - " this.", identifierName, "(org.capnproto.Capability.Server.internalGetTypedStreamingContext(\n" - " ", paramFactory, ", context)));\n") + sp, "protected java.util.concurrent.CompletableFuture ", identifierName, "(org.capnproto.StreamingCallContext<", shortParamType, ".Reader> context) {\n", + sp, " return org.capnproto.Capability.Server.internalUnimplemented(\n", + sp, " \"", interfaceProto.getDisplayName(), "\", \"", methodName, "\",\n", + sp, " 0x", interfaceIdHex, "L, (short)", methodId, ");\n", + sp, "}\n" + ), + + // dispatch + kj::strTree( + sp, "case ", methodId, ":\n", + sp, " return org.capnproto.Capability.Server.streamResult(\n", + sp, " this.", identifierName, "(org.capnproto.Capability.Server.internalGetTypedStreamingContext(\n", + sp, " ", paramFactory, ", context)));\n" + ) }; - } else { return MethodText { - + // client method defs kj::strTree( - " public org.capnproto.Request<", shortParamType, ".Builder, ", shortResultType, ".Pipeline> ", methodName, "Request() {\n", - " return newCall(", paramFactory, ", ", shortResultType, ".factory, 0x", interfaceIdHex, "L, (short)", methodId, ");\n" - " }\n"), + sp, " public static final class ", methodName, " {\n", + sp, " public interface Request extends org.capnproto.Request<", paramBuilder, "> {\n", + sp, " default org.capnproto.FromPointerBuilder<", paramBuilder, "> getParamsFactory() {\n", + sp, " return ", paramFactory, ";\n", + sp, " }\n", + sp, " default Response send() {\n", + sp, " return new Response(this.getHook().send());\n", + sp, " }\n", + sp, " }\n", + sp, " public static final class Response\n", + sp, " extends org.capnproto.RemotePromise<", resultReader, ">\n", + sp, " implements ", shortResultType, ".Pipeline {\n", + sp, " public Response(org.capnproto.RemotePromise response) {\n", + sp, " super(", resultFactory, ", response);\n", + sp, " }\n", + sp, " public org.capnproto.AnyPointer.Pipeline typelessPipeline() {\n", + sp, " return this.pipeline();\n", + sp, " }\n", + sp, " }\n", + sp, " }\n" + ), + // client call kj::strTree( - " protected java.util.concurrent.CompletableFuture ", identifierName, "(org.capnproto.CallContext<", shortParamType, ".Reader, ", shortResultType, ".Builder> context) {\n" - " return org.capnproto.Capability.Server.internalUnimplemented(\n" - " \"", interfaceProto.getDisplayName(), "\", \"", methodName, "\",\n" - " 0x", interfaceIdHex, "L, (short)", methodId, ");\n" - " }\n\n"), + sp, "public Methods.", methodName, ".Request ", methodName, "Request() {\n", + sp, " var result = newCall(0x", interfaceIdHex, "L, (short)", methodId, ");\n", + sp, " return () -> result;\n", + sp, "}\n" + ), + // server defs kj::strTree( - " case ", methodId, ":\n", - " return org.capnproto.Capability.Server.result (\n", - " this.", identifierName, "(org.capnproto.Capability.Server.internalGetTypedContext(\n" - " ", paramFactory, ", ", resultFactory, ", context)));\n") + sp, "protected java.util.concurrent.CompletableFuture ", identifierName, "(org.capnproto.CallContext<", shortParamType, ".Reader, ", shortResultType, ".Builder> context) {\n", + sp, " return org.capnproto.Capability.Server.internalUnimplemented(\n", + sp, " \"", interfaceProto.getDisplayName(), "\", \"", methodName, "\", 0x", interfaceIdHex, "L, (short)", methodId, ");\n", + sp, "}\n"), + + // dispatch + kj::strTree( + sp, "case ", methodId, ":\n", + sp, " return org.capnproto.Capability.Server.result(\n", + sp, " this.", identifierName, "(org.capnproto.Capability.Server.internalGetTypedContext(\n", + sp, " ", paramFactory, ", ", resultFactory, ", context)));\n") }; } } diff --git a/runtime/src/main/java/org/capnproto/AnyPointer.java b/runtime/src/main/java/org/capnproto/AnyPointer.java index 5f8f398..105c66d 100644 --- a/runtime/src/main/java/org/capnproto/AnyPointer.java +++ b/runtime/src/main/java/org/capnproto/AnyPointer.java @@ -23,8 +23,7 @@ package org.capnproto; public final class AnyPointer { public static final class Factory - implements PointerFactory, - PipelineFactory { + implements PointerFactory { public final Reader fromPointerReader(SegmentReader segment, CapTableReader capTable, int pointer, int nestingLimit) { return new Reader(segment, capTable, pointer, nestingLimit); } @@ -36,8 +35,8 @@ public final class AnyPointer { result.clear(); return result; } - public Pipeline newPipeline(RemotePromise promise) { - return new AnyPointer.Pipeline(promise); + public Pipeline newPipeline(PipelineImpl typeless) { + return new AnyPointer.Pipeline(typeless.hook, typeless.ops); } } public static final Factory factory = new Factory(); @@ -133,7 +132,7 @@ public final class AnyPointer { } final void setAsCap(Capability.Client cap) { - WireHelpers.setCapabilityPointer(this.segment, capTable, this.pointer, cap.hook); + WireHelpers.setCapabilityPointer(this.segment, capTable, this.pointer, cap.getHook()); } public final Reader asReader() { @@ -146,15 +145,55 @@ public final class AnyPointer { } } - public static final class Pipeline - extends org.capnproto.Pipeline { + public static class Pipeline extends PipelineImpl implements PipelineBase { - public Pipeline(RemotePromise promise) { - super(promise); + public Pipeline(PipelineHook hook) { + this(hook, new PipelineOp[0]); } - public Pipeline(RemotePromise promise, PipelineOp[] ops) { - super(promise, ops); + Pipeline(PipelineHook hook, PipelineOp[] ops) { + super(hook, ops); + } + + @Override + public Pipeline typelessPipeline() { + return this; + } + } + + public static final class Request + implements org.capnproto.Request { + + private final AnyPointer.Builder params; + private final RequestHook requestHook; + + Request(AnyPointer.Builder params, RequestHook requestHook) { + this.params = params; + this.requestHook = requestHook; + } + + @Override + public AnyPointer.Builder getParams() { + return this.params; + } + + @Override + public org.capnproto.Request getTypelessRequest() { + return this; + } + + @Override + public RequestHook getHook() { + return this.requestHook; + } + + @Override + public FromPointerBuilder getParamsFactory() { + return AnyPointer.factory; + } + + public RemotePromise send() { + return this.getHook().send(); } } } diff --git a/runtime/src/main/java/org/capnproto/CallContext.java b/runtime/src/main/java/org/capnproto/CallContext.java index af044c1..553d4a7 100644 --- a/runtime/src/main/java/org/capnproto/CallContext.java +++ b/runtime/src/main/java/org/capnproto/CallContext.java @@ -32,8 +32,8 @@ public class CallContext { return this.hook.getResults().initAs(results); } - public final CompletableFuture tailCall(Request tailRequest) { - return this.hook.tailCall(tailRequest.hook); + public final CompletableFuture tailCall(Request tailRequest) { + return this.hook.tailCall(tailRequest.getHook()); } public final void allowCancellation() { diff --git a/runtime/src/main/java/org/capnproto/Capability.java b/runtime/src/main/java/org/capnproto/Capability.java index e0b4f22..5f9da11 100644 --- a/runtime/src/main/java/org/capnproto/Capability.java +++ b/runtime/src/main/java/org/capnproto/Capability.java @@ -2,6 +2,7 @@ package org.capnproto; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; public final class Capability { @@ -13,7 +14,7 @@ public final class Capability { CapTableReader capTable; } - public static abstract class Factory + public static abstract class Factory implements FromPointerReader, FromPointerBuilder, SetPointerBuilder { @@ -44,37 +45,21 @@ public final class Capability { } } - public static class Client { - - final ClientHook hook; - - public Client() { - this.hook = null; + public static class CapabilityFactory extends Factory { + @Override + public Client newClient(ClientHook hook) { + return new Client(hook); } + } - public Client(Client other) { - this.hook = other.hook; - } + public static final CapabilityFactory factory = new CapabilityFactory(); - public Client(ClientHook hook) { - this.hook = hook; - } + public interface ClientBase { - public Client(Server server) { - this(makeLocalClient(server)); - } + ClientHook getHook(); - public Client(CompletionStage promise) { - this(Capability.newLocalPromiseClient( - promise.thenApply(client -> client.getHook()))); - } - - public Client(Throwable exc) { - this(newBrokenCap(exc)); - } - - ClientHook getHook() { - return this.hook; + default CompletionStage whenResolved() { + return this.getHook().whenResolved(); } /** @@ -90,37 +75,72 @@ public final class Capability { * The file descriptor will remain open at least as long as the {@link Client} remains alive. * If you need it to last longer, you will need to `dup()` it. */ - public CompletableFuture getFd() { - var fd = this.hook.getFd(); + default CompletableFuture getFd() { + var fd = this.getHook().getFd(); if (fd != null) { return CompletableFuture.completedFuture(fd); } - var promise = this.hook.whenMoreResolved(); + var promise = this.getHook().whenMoreResolved(); if (promise != null) { return promise.thenCompose(newHook -> new Client(newHook).getFd()); } return CompletableFuture.completedFuture(null); } +/* + default + Request newCall(FromPointerBuilder paramsFactory, + FromPointerReader resultsFactory, + long interfaceId, short methodId) { + return Request.fromTypeless(paramsFactory, resultsFactory, this.getHook().newCall(interfaceId, methodId)); + }*/ + + default Request newCall(long interfaceId, short methodId) { + return this.getHook().newCall(interfaceId, methodId); + } + + default StreamingRequest newStreamingCall(FromPointerBuilder paramsBuilder, + long interfaceId, short methodId) { + var request = getHook().newCall(interfaceId, methodId); + return new StreamingRequest<> (paramsBuilder, request.getParams(), request.getHook()); + } + } + + public static class Client implements ClientBase { + + private final ClientHook hook; + + public Client() { + this(newNullCap()); + } + + public Client(Client other) { + this(other.hook); + } + + public Client(Server server) { + this(makeLocalClient(server)); + } + + public Client(ClientHook hook) { + this.hook = hook; + } + + public Client(CompletionStage promise) { + this(Capability.newLocalPromiseClient( + promise.thenApply(client -> client.getHook()))); + } + + public Client(Throwable exc) { + this(newBrokenCap(exc)); + } + + public ClientHook getHook() { + return this.hook; + } private static ClientHook makeLocalClient(Server server) { return server.makeLocalClient(); } - - CompletionStage whenResolved() { - return this.hook.whenResolved(); - } - - protected Request newCall(FromPointerBuilder

paramsFactory, - PipelineFactory pipelineFactory, - long interfaceId, short methodId) { - return Request.fromTypeless(paramsFactory, pipelineFactory, hook.newCall(interfaceId, methodId)); - } - - protected StreamingRequest newStreamingCall(FromPointerBuilder paramsBuilder, - long interfaceId, short methodId) { - var request = hook.newCall(interfaceId, methodId); - return new StreamingRequest<> (paramsBuilder, request.getParams(), request.hook); - } } public abstract static class Server { @@ -136,12 +156,10 @@ public final class Capability { return new LocalClient(capServerSet); } - private final class LocalClient implements ClientHook { private final CompletableFuture resolveTask; private ClientHook resolved; private boolean blocked = false; - private Exception brokenException; private final CapabilityServerSetBase capServerSet; LocalClient() { @@ -159,10 +177,10 @@ public final class Capability { } @Override - public Request newCall(long interfaceId, short methodId) { + public AnyPointer.Request newCall(long interfaceId, short methodId) { var hook = new LocalRequest(interfaceId, methodId, this); var root = hook.message.getRoot(AnyPointer.factory); - return new Request<>(root, AnyPointer.factory, hook); + return new AnyPointer.Request(root, hook); } @Override @@ -173,7 +191,8 @@ public final class Capability { return null; } - var promise = callInternal(interfaceId, methodId, ctx); + var promise = this.whenResolved().thenCompose( + x -> this.callInternal(interfaceId, methodId, ctx)); CompletableFuture pipelinePromise = promise.thenApply(x -> { ctx.releaseParams(); @@ -186,7 +205,7 @@ public final class Capability { } return new VoidPromiseAndPipeline( - promise.copy(), + promise, new QueuedPipeline(pipelinePromise)); } @@ -197,9 +216,15 @@ public final class Capability { @Override public CompletableFuture whenMoreResolved() { - return this.resolved != null - ? CompletableFuture.completedFuture(this.resolved) - : this.resolveTask.thenApply(x -> this.resolved); + if (this.resolved != null) { + return CompletableFuture.completedFuture(this.resolved); + } + else if (this.resolveTask != null) { + return this.resolveTask.thenApply(x -> this.resolved); + } + else { + return null; + } } @Override @@ -207,11 +232,10 @@ public final class Capability { return BRAND; } - CompletableFuture callInternal(long interfaceId, short methodId, CallContextHook context) { + CompletableFuture callInternal(long interfaceId, short methodId, CallContextHook ctx) { var result = dispatchCall( - interfaceId, - methodId, - new CallContext<>(AnyPointer.factory, AnyPointer.factory, context)); + interfaceId, methodId, + new CallContext<>(AnyPointer.factory, AnyPointer.factory, ctx)); if (result.isStreaming()) { // TODO streaming return null; @@ -233,6 +257,23 @@ public final class Capability { } } + /** + * If this returns non-null, then it is a promise which, when resolved, points to a new + * capability to which future calls can be sent. Use this in cases where an object implementation + * might discover a more-optimized path some time after it starts. + * + * Implementing this (and returning non-null) will cause the capability to be advertised as a + * promise at the RPC protocol level. Once the promise returned by shortenPath() resolves, the + * remote client will receive a `Resolve` message updating it to point at the new destination. + * + * `shortenPath()` can also be used as a hack to shut up the client. If shortenPath() returns + * a promise that resolves to an exception, then the client will be notified that the capability + * is now broken. Assuming the client is using a correct RPC implemnetation, this should cause + * all further calls initiated by the client to this capability to immediately fail client-side, + * sparing the server's bandwidth. + * + * The default implementation always returns null. + */ public CompletableFuture shortenPath() { return null; } @@ -320,7 +361,7 @@ public final class Capability { }); assert promiseAndPipeline.pipeline != null; - return new RemotePromise<>(promise, promiseAndPipeline.pipeline); + return new RemotePromise<>(promise, new AnyPointer.Pipeline(promiseAndPipeline.pipeline)); } @Override @@ -337,12 +378,12 @@ public final class Capability { } private static final class LocalPipeline implements PipelineHook { - private final CallContextHook context; + private final CallContextHook ctx; private final AnyPointer.Reader results; - LocalPipeline(CallContextHook context) { - this.context = context; - this.results = context.getResults().asReader(); + LocalPipeline(CallContextHook ctx) { + this.ctx = ctx; + this.results = ctx.getResults().asReader(); } @Override @@ -439,13 +480,14 @@ public final class Capability { static private ClientHook newBrokenClient(Throwable exc, boolean resolved, Object brand) { return new ClientHook() { @Override - public Request newCall(long interfaceId, short methodId) { - return Request.newBrokenRequest(exc); + public AnyPointer.Request newCall(long interfaceId, short methodId) { + var broken = Request.newBrokenRequest(AnyPointer.factory, exc); + return new AnyPointer.Request(broken.getParams(), broken.getHook()); } @Override public VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context) { - return new VoidPromiseAndPipeline(CompletableFuture.failedFuture(exc), null); + return new VoidPromiseAndPipeline(CompletableFuture.failedFuture(exc), PipelineHook.newBrokenPipeline(exc)); } @Override @@ -514,10 +556,10 @@ public final class Capability { } @Override - public Request newCall(long interfaceId, short methodId) { + public AnyPointer.Request newCall(long interfaceId, short methodId) { var hook = new LocalRequest(interfaceId, methodId, this); var root = hook.message.getRoot(AnyPointer.factory); - return new Request<>(root, AnyPointer.factory, hook); + return new AnyPointer.Request(root, hook); } @Override diff --git a/runtime/src/main/java/org/capnproto/ClientHook.java b/runtime/src/main/java/org/capnproto/ClientHook.java index 573c072..7dd34de 100644 --- a/runtime/src/main/java/org/capnproto/ClientHook.java +++ b/runtime/src/main/java/org/capnproto/ClientHook.java @@ -8,7 +8,7 @@ public interface ClientHook { Object NULL_CAPABILITY_BRAND = new Object(); Object BROKEN_CAPABILITY_BRAND = new Object(); - Request newCall(long interfaceId, short methodId); + Request newCall(long interfaceId, short methodId); VoidPromiseAndPipeline call(long interfaceId, short methodId, CallContextHook context); @@ -46,7 +46,7 @@ public interface ClientHook { /** * Repeatedly calls whenMoreResolved() until it returns nullptr. */ - default CompletionStage whenResolved() { + default CompletableFuture whenResolved() { var promise = whenMoreResolved(); return promise != null ? promise.thenCompose(ClientHook::whenResolved) @@ -77,13 +77,14 @@ public interface ClientHook { } final class VoidPromiseAndPipeline { + public final CompletableFuture promise; public final PipelineHook pipeline; - VoidPromiseAndPipeline(CompletableFuture promise, PipelineHook pipeline) { + VoidPromiseAndPipeline(CompletableFuture promise, + PipelineHook pipeline) { this.promise = promise; this.pipeline = pipeline; } } - } diff --git a/runtime/src/main/java/org/capnproto/Pipeline.java b/runtime/src/main/java/org/capnproto/Pipeline.java deleted file mode 100644 index 0d06cfc..0000000 --- a/runtime/src/main/java/org/capnproto/Pipeline.java +++ /dev/null @@ -1,41 +0,0 @@ -package org.capnproto; - -public class Pipeline - extends RemotePromise { - - protected final PipelineOp[] ops; - protected PipelineHook hook; - - public Pipeline(RemotePromise remotePromise) { - this(remotePromise, new PipelineOp[0]); - } - - public Pipeline(RemotePromise remotePromise, PipelineOp[] ops) { - super(remotePromise.response, remotePromise.hook); - this.ops = ops; - this.hook = remotePromise.hook; - } - - public PipelineHook getHook() { - return hook; - } - - Pipeline noop() { - return new Pipeline<>(this, this.ops.clone()); - } - - public ClientHook asCap() { - return this.hook.getPipelinedCap(this.ops); - } - - public Pipeline getPointerField(short pointerIndex) { - var newOps = new PipelineOp[this.ops.length+1]; - for (int ii = 0; ii < this.ops.length; ++ii) { - newOps[ii] = this.ops[ii]; - } - newOps[this.ops.length] = PipelineOp.PointerField(pointerIndex); - return new Pipeline<>(this, newOps); - } -} - - diff --git a/runtime/src/main/java/org/capnproto/PipelineBase.java b/runtime/src/main/java/org/capnproto/PipelineBase.java new file mode 100644 index 0000000..03cc4d5 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/PipelineBase.java @@ -0,0 +1,5 @@ +package org.capnproto; + +public interface PipelineBase { + AnyPointer.Pipeline typelessPipeline(); +} diff --git a/runtime/src/main/java/org/capnproto/PipelineFactory.java b/runtime/src/main/java/org/capnproto/PipelineFactory.java index fb817ec..0adad03 100644 --- a/runtime/src/main/java/org/capnproto/PipelineFactory.java +++ b/runtime/src/main/java/org/capnproto/PipelineFactory.java @@ -1,5 +1,5 @@ package org.capnproto; public interface PipelineFactory { - Pipeline newPipeline(RemotePromise promise); + Pipeline newPipeline(RemotePromise promise, PipelineImpl typeless); } diff --git a/runtime/src/main/java/org/capnproto/PipelineHook.java b/runtime/src/main/java/org/capnproto/PipelineHook.java index 50c1ac3..e42a5db 100644 --- a/runtime/src/main/java/org/capnproto/PipelineHook.java +++ b/runtime/src/main/java/org/capnproto/PipelineHook.java @@ -1,6 +1,7 @@ package org.capnproto; public interface PipelineHook { + ClientHook getPipelinedCap(PipelineOp[] ops); static PipelineHook newBrokenPipeline(Throwable exc) { diff --git a/runtime/src/main/java/org/capnproto/PipelineImpl.java b/runtime/src/main/java/org/capnproto/PipelineImpl.java new file mode 100644 index 0000000..de97fff --- /dev/null +++ b/runtime/src/main/java/org/capnproto/PipelineImpl.java @@ -0,0 +1,32 @@ +package org.capnproto; + +public class PipelineImpl { + protected final PipelineHook hook; + protected final PipelineOp[] ops; + + public PipelineImpl(PipelineHook hook) { + this(hook, new PipelineOp[0]); + } + + public PipelineImpl(PipelineHook hook, PipelineOp[] ops) { + this.hook = hook; + this.ops = ops; + } + + PipelineImpl noop() { + return new PipelineImpl(this.hook, this.ops.clone()); + } + + public ClientHook asCap() { + return this.hook.getPipelinedCap(ops); + } + + public AnyPointer.Pipeline getPointerField(short pointerIndex) { + var newOps = new PipelineOp[this.ops.length+1]; + for (int ii = 0; ii < this.ops.length; ++ii) { + newOps[ii] = this.ops[ii]; + } + newOps[this.ops.length] = PipelineOp.PointerField(pointerIndex); + return new AnyPointer.Pipeline(this.hook, newOps); + } +} diff --git a/runtime/src/main/java/org/capnproto/RemotePromise.java b/runtime/src/main/java/org/capnproto/RemotePromise.java index 8cf7bb9..9b60a34 100644 --- a/runtime/src/main/java/org/capnproto/RemotePromise.java +++ b/runtime/src/main/java/org/capnproto/RemotePromise.java @@ -1,19 +1,35 @@ package org.capnproto; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; public class RemotePromise extends CompletableFutureWrapper { - final CompletableFuture> response; - final PipelineHook hook; + private final CompletableFuture> response; + private final AnyPointer.Pipeline pipeline; - RemotePromise(CompletableFuture> promise, - PipelineHook hook) { - super(promise.thenApply(response -> response.getResults())); + public RemotePromise(FromPointerReader factory, + RemotePromise other) { + super(other.thenApply(response -> response.getAs(factory))); + this.response = other.response.thenApply( + response -> new Response<>( + response.getResults().getAs(factory), + response.getHook())); + this.pipeline = other.pipeline; + } + + public RemotePromise(CompletableFuture> promise, + AnyPointer.Pipeline pipeline) { + super(promise.thenApply(response -> { + //System.out.println("Got a response for remote promise " + promise.toString()); + return response.getResults(); + })); this.response = promise; - this.hook = hook; + this.pipeline = pipeline; + } + + public AnyPointer.Pipeline pipeline() { + return this.pipeline; } public static RemotePromise fromTypeless( @@ -21,7 +37,7 @@ public class RemotePromise RemotePromise typeless) { var promise = typeless.response.thenApply( response -> Response.fromTypeless(resultsFactory, response)); - return new RemotePromise<>(promise, typeless.hook); + return new RemotePromise<>(promise, typeless.pipeline); } } diff --git a/runtime/src/main/java/org/capnproto/Request.java b/runtime/src/main/java/org/capnproto/Request.java index 6d501a5..6bde1ff 100644 --- a/runtime/src/main/java/org/capnproto/Request.java +++ b/runtime/src/main/java/org/capnproto/Request.java @@ -2,31 +2,22 @@ package org.capnproto; import java.util.concurrent.CompletableFuture; -public class Request { +public interface Request { - protected Params params; - private PipelineFactory pipelineFactory; - RequestHook hook; + FromPointerBuilder getParamsFactory(); - public Request(Params params, - PipelineFactory pipelineFactory, - RequestHook hook) { - this.params = params; - this.pipelineFactory = pipelineFactory; - this.hook = hook; + default Params getParams() { + return this.getTypelessRequest().getParams().getAs(this.getParamsFactory()); } - public Params getParams() { - return params; + default RequestHook getHook() { + return this.getTypelessRequest().getHook(); } - public Results send() { - var typelessPromise = this.hook.send(); - this.hook = null; // prevent reuse - return pipelineFactory.newPipeline(typelessPromise); - } + Request getTypelessRequest(); - static Request newBrokenRequest(Throwable exc) { + static Request newBrokenRequest(FromPointerBuilder paramsFactory, + Throwable exc) { final MessageBuilder message = new MessageBuilder(); @@ -48,13 +39,32 @@ public class Request { }; var root = message.getRoot(AnyPointer.factory); - return new Request(null, null, hook); + return new Request<>() { + @Override + public FromPointerBuilder getParamsFactory() { + return paramsFactory; + } + + @Override + public Request getTypelessRequest() { + return null; + } + }; } - static Request fromTypeless( - FromPointerBuilder

paramsFactory, - PipelineFactory pipelineFactory, - Request typeless) { - return new Request<>(typeless.params.getAs(paramsFactory), pipelineFactory, typeless.hook); + static Request fromTypeless( + FromPointerBuilder paramsFactory, + Request typeless) { + return new Request<>() { + @Override + public FromPointerBuilder getParamsFactory() { + return paramsFactory; + } + + @Override + public Request getTypelessRequest() { + return typeless; + } + }; } } diff --git a/runtime/src/main/java/org/capnproto/Response.java b/runtime/src/main/java/org/capnproto/Response.java index c08944f..3d5ab0d 100644 --- a/runtime/src/main/java/org/capnproto/Response.java +++ b/runtime/src/main/java/org/capnproto/Response.java @@ -1,9 +1,9 @@ package org.capnproto; -public class Response { +public final class Response { - private Results results; - private ResponseHook hook; + private final Results results; + private final ResponseHook hook; public Response(Results results, ResponseHook hook) { @@ -15,6 +15,10 @@ public class Response { return this.results; } + public ResponseHook getHook() { + return this.hook; + } + static Response fromTypeless(FromPointerReader resultsFactory, Response typeless) { return new Response<>(typeless.getResults().getAs(resultsFactory), typeless.hook); diff --git a/runtime/src/main/java/org/capnproto/RpcDumper.java b/runtime/src/main/java/org/capnproto/RpcDumper.java index 5f3a81f..220ff23 100644 --- a/runtime/src/main/java/org/capnproto/RpcDumper.java +++ b/runtime/src/main/java/org/capnproto/RpcDumper.java @@ -5,13 +5,13 @@ import java.util.Map; public class RpcDumper { - private final Map schemas = new HashMap<>(); + //private final Map schemas = new HashMap<>(); private final Map clientReturnTypes = new HashMap<>(); private final Map serverReturnTypes = new HashMap<>(); - void addSchema(long schemaId, Schema.Node.Reader node) { + /*void addSchema(long schemaId, Schema.Node.Reader node) { this.schemas.put(schemaId, node); - } + }*/ private void setReturnType(RpcTwoPartyProtocol.Side side, int schemaId, long schema) { switch (side) { @@ -68,7 +68,7 @@ public class RpcDumper { var payload = call.getParams(); var params = payload.getContent(); var sendResultsTo = call.getSendResultsTo(); - +/* var schema = this.schemas.get(iface); if (schema != null) { interfaceName = schema.getDisplayName().toString(); @@ -91,7 +91,7 @@ public class RpcDumper { } } - } + }*/ yield sender.name() + "(" + call.getQuestionId() + "): call " + call.getTarget() + " <- " + interfaceName + "." + diff --git a/runtime/src/main/java/org/capnproto/RpcState.java b/runtime/src/main/java/org/capnproto/RpcState.java index becaed4..af704f2 100644 --- a/runtime/src/main/java/org/capnproto/RpcState.java +++ b/runtime/src/main/java/org/capnproto/RpcState.java @@ -1532,7 +1532,7 @@ final class RpcState { } @Override - public Request newCall(long interfaceId, short methodId) { + public Request newCall(long interfaceId, short methodId) { return newCallNoIntercept(interfaceId, methodId); } @@ -1545,7 +1545,7 @@ final class RpcState { var params = context.getParams(); var request = newCallNoIntercept(interfaceId, methodId); context.allowCancellation(); - return context.directTailCall(request.hook); + return context.directTailCall(request.getHook()); } @Override @@ -1553,9 +1553,9 @@ final class RpcState { return RpcState.this; } - private Request newCallNoIntercept(long interfaceId, short methodId) { + private Request newCallNoIntercept(long interfaceId, short methodId) { if (isDisconnected()) { - return Request.newBrokenRequest(disconnected); + return Request.newBrokenRequest(AnyPointer.factory, disconnected); } var request = new RpcRequest(this); @@ -1563,7 +1563,7 @@ final class RpcState { callBuilder.setInterfaceId(interfaceId); callBuilder.setMethodId(methodId); var root = request.getRoot(); - return new Request<>(root, AnyPointer.factory, request); + return new AnyPointer.Request(root, request); } } @@ -1605,10 +1605,11 @@ final class RpcState { var redirect = this.target.writeTarget(this.callBuilder.getTarget()); if (redirect != null) { - var replacement = redirect.newCall( + var redirected = redirect.newCall( this.callBuilder.getInterfaceId(), this.callBuilder.getMethodId()); - replacement.params = paramsBuilder; - return replacement.hook.send(); + //replacement.params = paramsBuilder; + var replacement = new AnyPointer.Request(paramsBuilder, redirected.getHook()); + return replacement.send(); } final var question = sendInternal(false); @@ -1624,7 +1625,7 @@ final class RpcState { var loop = CompletableFuture.anyOf( getMessageLoop(), appPromise).thenCompose(x -> appPromise); - return new RemotePromise<>(loop, pipeline); + return new RemotePromise<>(loop, new AnyPointer.Pipeline(pipeline)); } @Override