Merge pull request #18 from vaci/vaci/merge-0110

Merge from upstream 0.1.10
This commit is contained in:
Vaci 2021-08-21 09:38:31 +01:00 committed by GitHub
commit eab88298e2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 256 additions and 52 deletions

2
CHANGELOG.md Normal file
View file

@ -0,0 +1,2 @@
## v0.1.9
- Add `<maven.compiler.release>8</maven.compiler.release>` to pom.xml, to increase compability with Java 8.

View file

@ -1,4 +1,5 @@
CAPNP_CXX_FLAGS=$(shell pkg-config capnp --cflags --libs)
PKG_CONFIG ?= pkg-config
CAPNP_CXX_FLAGS=$(shell $(PKG_CONFIG) capnp --cflags --libs)
ifeq ($(CAPNP_CXX_FLAGS),)
$(warning "Warning: pkg-config failed to find compilation configuration for capnp.")

View file

@ -8,3 +8,16 @@ and capabilities, and capnproto-java is a pure Java implementation.
[Read more here.](https://dwrensha.github.io/capnproto-java/index.html)
This repository clone adds an implementation of the RPC framework for Java.
Promise pipelining is provided via java.util.concurrent.CompletableFuture. Unlike the KJ asynchronous model, which completes promises
only when they are waited upon, a CompletableFuture can complete immediately. This may break E-ordering, as the C++ implementation
relies on kj::evalLater() to defer method calls, and there is no obvious (to me, anyway) way to replicate the behaviour of
kj::evalLater() with CompletableFutures.
Most of the C++ RPC test cases have been ported to this implementation, which gives me some comfort that the implementation logic is
correct, but more extensive testing is required.
This implementation does not support generic interfaces. Extending the schema compiler to output code for generic interfaces is an
exercise I leave to the reader.

View file

@ -5,7 +5,7 @@
<artifactId>benchmark</artifactId>
<packaging>jar</packaging>
<description>capnproto-java benchmark</description>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
<name>capnproto-java benchmark</name>
<organization>
<name>org.capnproto</name>
@ -33,12 +33,13 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>14</maven.compiler.source>
<maven.compiler.target>14</maven.compiler.target>
<maven.compiler.release>14</maven.compiler.release>
</properties>
<dependencies>
<dependency>
<groupId>org.capnproto</groupId>
<artifactId>runtime</artifactId>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
</dependency>
</dependencies>

View file

@ -5,7 +5,7 @@
<artifactId>compiler</artifactId>
<packaging>jar</packaging>
<description>schema compiler plugin for java</description>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
<name>capnpc-java</name>
<organization>
<name>org.capnproto</name>
@ -31,6 +31,8 @@
</developers>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>14</maven.compiler.source>
<maven.compiler.target>14</maven.compiler.target>
</properties>
<dependencies>
<dependency>
@ -42,7 +44,7 @@
<dependency>
<groupId>org.capnproto</groupId>
<artifactId>runtime</artifactId>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
</dependency>
</dependencies>

View file

@ -5,7 +5,7 @@
<artifactId>examples</artifactId>
<packaging>jar</packaging>
<description>capnproto-java examples</description>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
<name>capnproto-java examples</name>
<organization>
<name>org.capnproto</name>
@ -38,16 +38,16 @@
<dependency>
<groupId>org.capnproto</groupId>
<artifactId>runtime</artifactId>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.capnproto</groupId>
<artifactId>runtime-rpc</artifactId>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
</dependency>
<dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>

View file

@ -4,7 +4,7 @@
<groupId>org.capnproto</groupId>
<artifactId>capnproto-java</artifactId>
<packaging>pom</packaging>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
<name>Cap'n Proto for Java</name>
<url>https://capnproto.org/</url>
<modules>

View file

@ -5,7 +5,7 @@
<artifactId>runtime-rpc</artifactId>
<packaging>jar</packaging>
<description>runtime-rpc</description>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
<name>Cap'n Proto RPC runtime library</name>
<organization>
<name>org.capnproto</name>
@ -49,12 +49,12 @@
<dependency>
<groupId>org.capnproto</groupId>
<artifactId>runtime</artifactId>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.capnproto</groupId>
<artifactId>compiler</artifactId>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
</dependency>
</dependencies>

View file

@ -5,7 +5,7 @@
<artifactId>runtime</artifactId>
<packaging>jar</packaging>
<description>runtime</description>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.10-SNAPSHOT</version>
<name>Cap'n Proto runtime library</name>
<organization>
<name>org.capnproto</name>
@ -64,7 +64,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<version>3.6.2</version>
<configuration>
<compilerArgument>-Xlint:unchecked</compilerArgument>
<source>11</source>
@ -75,6 +75,15 @@
</build>
<profiles>
<profile>
<id>jdk11FF</id>
<activation>
<jdk>(11,)</jdk>
</activation>
<properties>
<maven.compiler.release>11</maven.compiler.release>
</properties>
</profile>
<profile>
<id>release</id>
<build>

View file

@ -23,5 +23,5 @@ package org.capnproto;
public interface Arena {
public SegmentReader tryGetSegment(int id);
public void checkReadLimit(int numBytes);
public void checkReadLimit(int numWords);
}

View file

@ -103,6 +103,30 @@ public final class BuilderArena implements Arena {
return this.localCapTable;
}
/**
* Constructs a BuilderArena from a ReaderArena and uses the size of the largest segment
* as the next allocation size.
*/
BuilderArena(ReaderArena arena) {
this.segments = new ArrayList<SegmentBuilder>();
int largestSegment = SUGGESTED_FIRST_SEGMENT_WORDS*Constants.BYTES_PER_WORD;
for (int ii = 0; ii < arena.segments.size(); ++ii) {
SegmentReader segment = arena.segments.get(ii);
SegmentBuilder segmentBuilder = new SegmentBuilder(segment.buffer, this);
segmentBuilder.id = ii;
segmentBuilder.pos = segmentBuilder.capacity(); // buffer is pre-filled
segments.add(segmentBuilder);
// Find the largest segment for the allocation strategy.
largestSegment = Math.max(largestSegment, segment.buffer.capacity());
}
DefaultAllocator defaultAllocator = new DefaultAllocator(SUGGESTED_ALLOCATION_STRATEGY);
// Use largest segment as next size.
defaultAllocator.setNextAllocationSizeBytes(largestSegment);
this.allocator = defaultAllocator;
}
@Override
public final SegmentReader tryGetSegment(int id) {
return this.segments.get(id);
@ -126,6 +150,10 @@ public final class BuilderArena implements Arena {
}
}
/**
* Allocates `amount` words in an existing segment or, if no suitable segment
* exists, in a new segment.
*/
public AllocateResult allocate(int amount) {
int len = this.segments.size();
@ -136,6 +164,10 @@ public final class BuilderArena implements Arena {
return new AllocateResult(this.segments.get(len - 1), result);
}
}
if (amount >= 1 << 28) {
// Computing `amount * Constants.BYTES_PER_WORD` would overflow.
throw new RuntimeException("Too many words to allocate: " + amount);
}
SegmentBuilder newSegment = new SegmentBuilder(
this.allocator.allocateSegment(amount * Constants.BYTES_PER_WORD),
this);

View file

@ -18,6 +18,17 @@ public class DefaultAllocator implements Allocator {
public AllocationStrategy allocationStrategy =
AllocationStrategy.GROW_HEURISTICALLY;
/**
The largest number of bytes to try allocating when using `GROW_HEURISTICALLY`.
Set this value smaller if you get the error:
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
Experimentally, `Integer.MAX_VALUE - 2` seems to work on most systems.
*/
public int maxSegmentBytes = Integer.MAX_VALUE - 2;
public DefaultAllocator() {}
public DefaultAllocator(AllocationStrategy allocationStrategy) {
@ -52,13 +63,16 @@ public class DefaultAllocator implements Allocator {
switch (this.allocationStrategy) {
case GROW_HEURISTICALLY:
this.nextSize += size;
if (size < this.maxSegmentBytes - this.nextSize) {
this.nextSize += size;
} else {
this.nextSize = maxSegmentBytes;
}
break;
case FIXED_SIZE:
break;
}
this.nextSize += size;
return result;
}
}

View file

@ -71,6 +71,22 @@ public final class MessageBuilder {
this.arena = new BuilderArena(new DefaultAllocator(), firstSegment);
}
/**
* Constructs a MessageBuilder from a MessageReader. This constructor is private
* because it is unsafe. To use it, you must call `unsafeConstructFromMessageReader()`.
*/
private MessageBuilder(MessageReader messageReader) {
this.arena = new BuilderArena(messageReader.arena);
}
/**
* Constructs a MessageBuilder from a MessageReader without copying the segments.
* This method should only be used on trusted data. Otherwise you may observe infinite
* loops or large memory allocations or index-out-of-bounds errors.
*/
public static MessageBuilder unsafeConstructFromMessageReader(MessageReader messageReader) {
return new MessageBuilder(messageReader);
}
private AnyPointer.Builder getRootInternal() {
if (this.arena.segments.isEmpty()) {

View file

@ -26,8 +26,8 @@ import java.nio.ByteBuffer;
public final class ReaderArena implements Arena {
// Current limit. -1 means no limit.
public long limit;
// current limit
public final ArrayList<SegmentReader> segments;
@ -45,11 +45,14 @@ public final class ReaderArena implements Arena {
}
@Override
public final void checkReadLimit(int numBytes) {
if (numBytes > limit) {
public final void checkReadLimit(int numWords) {
if (limit == -1) {
// No limit.
return;
} else if (numWords > limit) {
throw new DecodeException("Read limit exceeded.");
} else {
limit -= numBytes;
limit -= numWords;
}
}
}

View file

@ -22,7 +22,18 @@
package org.capnproto;
public final class ReaderOptions {
/**
How many words are allowed to be read before an exception is thrown,
to protect against denial of service attacks.
-1 means "no limit".
*/
public final long traversalLimitInWords;
/**
How many pointer indirections deep a message may be before an exception
is thrown.
*/
public final int nestingLimit;
public ReaderOptions(long traversalLimitInWords, int nestingLimit) {

View file

@ -35,7 +35,7 @@ public final class SegmentBuilder extends SegmentReader {
}
// the total number of words the buffer can hold
private final int capacity() {
final int capacity() {
this.buffer.rewind();
return this.buffer.remaining() / 8;
}

View file

@ -30,6 +30,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Serialization using the standard (unpacked) stream encoding:
* https://capnproto.org/encoding.html#serialization-over-a-stream
*/
public final class Serialize {
static ByteBuffer makeByteBuffer(int bytes) {
@ -39,6 +43,16 @@ public final class Serialize {
return result;
}
static int MAX_SEGMENT_WORDS = (1 << 28) - 1;
static ByteBuffer makeByteBufferForWords(int words) throws IOException {
if (words > MAX_SEGMENT_WORDS) {
// Trying to construct the segment would cause overflow.
throw new DecodeException("segment has too many words (" + words + ")");
}
return makeByteBuffer(words * Constants.BYTES_PER_WORD);
}
public static void fillBuffer(ByteBuffer buffer, ReadableByteChannel bc) throws IOException {
while(buffer.hasRemaining()) {
int r = bc.read(buffer);
@ -55,21 +69,22 @@ public final class Serialize {
}
public static MessageReader read(ReadableByteChannel bc, ReaderOptions options) throws IOException {
ByteBuffer firstWord = makeByteBuffer(Constants.BYTES_PER_WORD);
ByteBuffer firstWord = makeByteBufferForWords(1);
fillBuffer(firstWord, bc);
int segmentCount = 1 + firstWord.getInt(0);
int rawSegmentCount = firstWord.getInt(0);
if (rawSegmentCount < 0 || rawSegmentCount > 511) {
throw new DecodeException("segment count must be between 0 and 512");
}
int segmentCount = 1 + rawSegmentCount;
int segment0Size = 0;
if (segmentCount > 0) {
segment0Size = firstWord.getInt(4);
}
int totalWords = segment0Size;
if (segmentCount > 512) {
throw new IOException("too many segments");
}
long totalWords = segment0Size;
// in words
ArrayList<Integer> moreSizes = new ArrayList<Integer>();
@ -88,23 +103,17 @@ public final class Serialize {
throw new DecodeException("Message size exceeds traversal limit.");
}
ByteBuffer allSegments = makeByteBuffer(totalWords * Constants.BYTES_PER_WORD);
fillBuffer(allSegments, bc);
ByteBuffer[] segmentSlices = new ByteBuffer[segmentCount];
allSegments.rewind();
segmentSlices[0] = allSegments.slice();
segmentSlices[0].limit(segment0Size * Constants.BYTES_PER_WORD);
segmentSlices[0].order(ByteOrder.LITTLE_ENDIAN);
segmentSlices[0] = makeByteBufferForWords(segment0Size);
fillBuffer(segmentSlices[0], bc);
segmentSlices[0].rewind();
int offset = segment0Size;
for (int ii = 1; ii < segmentCount; ++ii) {
allSegments.position(offset * Constants.BYTES_PER_WORD);
segmentSlices[ii] = allSegments.slice();
segmentSlices[ii].limit(moreSizes.get(ii - 1) * Constants.BYTES_PER_WORD);
segmentSlices[ii].order(ByteOrder.LITTLE_ENDIAN);
offset += moreSizes.get(ii - 1);
segmentSlices[ii] = makeByteBufferForWords(moreSizes.get(ii - 1));
fillBuffer(segmentSlices[ii], bc);
segmentSlices[ii].rewind();
}
return new MessageReader(segmentSlices, options);
@ -114,15 +123,16 @@ public final class Serialize {
return read(bb, ReaderOptions.DEFAULT_READER_OPTIONS);
}
/*
/**
* Upon return, `bb.position()` will be at the end of the message.
*/
public static MessageReader read(ByteBuffer bb, ReaderOptions options) throws IOException {
bb.order(ByteOrder.LITTLE_ENDIAN);
int segmentCount = 1 + bb.getInt();
if (segmentCount > 512) {
throw new IOException("too many segments");
int rawSegmentCount = bb.getInt();
int segmentCount = 1 + rawSegmentCount;
if (rawSegmentCount < 0 || rawSegmentCount > 511) {
throw new DecodeException("segment count must be between 0 and 512");
}
ByteBuffer[] segmentSlices = new ByteBuffer[segmentCount];
@ -137,6 +147,10 @@ public final class Serialize {
for (int ii = 0; ii < segmentCount; ++ii) {
int segmentSize = bb.getInt(segmentSizesBase + ii * 4);
if (segmentSize > MAX_SEGMENT_WORDS -
(totalWords + segmentBase / Constants.BYTES_PER_WORD)) {
throw new DecodeException("segment size is too large");
}
bb.position(segmentBase + totalWords * Constants.BYTES_PER_WORD);
segmentSlices[ii] = bb.slice();
@ -147,7 +161,7 @@ public final class Serialize {
}
bb.position(segmentBase + totalWords * Constants.BYTES_PER_WORD);
if (totalWords > options.traversalLimitInWords) {
if (options.traversalLimitInWords != -1 && totalWords > options.traversalLimitInWords) {
throw new DecodeException("Message size exceeds traversal limit.");
}
@ -178,9 +192,8 @@ public final class Serialize {
return bytes / Constants.BYTES_PER_WORD;
}
public static void write(WritableByteChannel outputChannel,
MessageBuilder message) throws IOException {
ByteBuffer[] segments = message.getSegmentsForOutput();
private static void writeSegmentTable(WritableByteChannel outputChannel,
ByteBuffer[] segments) throws IOException {
int tableSize = (segments.length + 2) & (~1);
ByteBuffer table = ByteBuffer.allocate(4 * tableSize);
@ -196,6 +209,34 @@ public final class Serialize {
while (table.hasRemaining()) {
outputChannel.write(table);
}
}
/**
* Serializes a MessageBuilder to a WritableByteChannel.
*/
public static void write(WritableByteChannel outputChannel,
MessageBuilder message) throws IOException {
ByteBuffer[] segments = message.getSegmentsForOutput();
writeSegmentTable(outputChannel, segments);
for (ByteBuffer buffer : segments) {
while(buffer.hasRemaining()) {
outputChannel.write(buffer);
}
}
}
/**
* Serializes a MessageReader to a WritableByteChannel.
*/
public static void write(WritableByteChannel outputChannel,
MessageReader message) throws IOException {
ByteBuffer[] segments = new ByteBuffer[message.arena.segments.size()];
for (int ii = 0 ; ii < message.arena.segments.size(); ++ii) {
segments[ii] = message.arena.segments.get(ii).buffer.duplicate();
}
writeSegmentTable(outputChannel, segments);
for (ByteBuffer buffer : segments) {
while(buffer.hasRemaining()) {

View file

@ -21,6 +21,9 @@
package org.capnproto;
/**
* Serialization using the packed encoding: https://capnproto.org/encoding.html#packing
*/
public final class SerializePacked {
public static MessageReader read(BufferedInputStream input) throws java.io.IOException {
@ -42,16 +45,42 @@ public final class SerializePacked {
return Serialize.read(packedInput, options);
}
/**
* Serializes a MessageBuilder to a BufferedOutputStream.
*/
public static void write(BufferedOutputStream output,
MessageBuilder message) throws java.io.IOException {
PackedOutputStream packedOutputStream = new PackedOutputStream(output);
Serialize.write(packedOutputStream, message);
}
/**
* Serializes a MessageReader to a BufferedOutputStream.
*/
public static void write(BufferedOutputStream output,
MessageReader message) throws java.io.IOException {
PackedOutputStream packedOutputStream = new PackedOutputStream(output);
Serialize.write(packedOutputStream, message);
}
/**
* Serializes a MessageBuilder to an unbuffered output stream.
*/
public static void writeToUnbuffered(java.nio.channels.WritableByteChannel output,
MessageBuilder message) throws java.io.IOException {
BufferedOutputStreamWrapper buffered = new BufferedOutputStreamWrapper(output);
write(buffered, message);
buffered.flush();
}
/**
* Serializes a MessageReader to an unbuffered output stream.
*/
public static void writeToUnbuffered(java.nio.channels.WritableByteChannel output,
MessageReader message) throws java.io.IOException {
BufferedOutputStreamWrapper buffered = new BufferedOutputStreamWrapper(output);
write(buffered, message);
buffered.flush();
}
}

View file

@ -0,0 +1,26 @@
package org.capnproto;
import org.junit.Assert;
import org.junit.Test;
public class DefaultAllocatorTest {
@Test
public void maxSegmentBytes() {
DefaultAllocator allocator = new DefaultAllocator();
Assert.assertEquals(allocator.allocationStrategy,
BuilderArena.AllocationStrategy.GROW_HEURISTICALLY);
allocator.maxSegmentBytes = (1 << 25) - 1;
int allocationSize = 1 << 24;
allocator.setNextAllocationSizeBytes(allocationSize);
Assert.assertEquals(allocationSize,
allocator.allocateSegment(allocationSize).capacity());
Assert.assertEquals(allocator.maxSegmentBytes,
allocator.allocateSegment(allocationSize).capacity());
Assert.assertEquals(allocator.maxSegmentBytes,
allocator.allocateSegment(allocationSize).capacity());
}
}

View file

@ -61,6 +61,10 @@ public class SerializeTest {
{
MessageReader messageReader = Serialize.read(new ArrayInputStream(ByteBuffer.wrap(exampleBytes)));
checkSegmentContents(exampleSegmentCount, messageReader.arena);
byte[] outputBytes = new byte[exampleBytes.length];
Serialize.write(new ArrayOutputStream(ByteBuffer.wrap(outputBytes)), messageReader);
Assert.assertArrayEquals(exampleBytes, outputBytes);
}
// ------