diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..a42245c
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,2 @@
+## v0.1.9
+- Add `8` to pom.xml, to increase compability with Java 8.
diff --git a/Makefile b/Makefile
index 4882d9c..493539b 100644
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,5 @@
-CAPNP_CXX_FLAGS=$(shell pkg-config capnp --cflags --libs)
+PKG_CONFIG ?= pkg-config
+CAPNP_CXX_FLAGS=$(shell $(PKG_CONFIG) capnp --cflags --libs)
ifeq ($(CAPNP_CXX_FLAGS),)
$(warning "Warning: pkg-config failed to find compilation configuration for capnp.")
diff --git a/README.md b/README.md
index e74d51a..2384c00 100644
--- a/README.md
+++ b/README.md
@@ -8,3 +8,16 @@ and capabilities, and capnproto-java is a pure Java implementation.
[Read more here.](https://dwrensha.github.io/capnproto-java/index.html)
This repository clone adds an implementation of the RPC framework for Java.
+
+Promise pipelining is provided via java.util.concurrent.CompletableFuture. Unlike the KJ asynchronous model, which completes promises
+only when they are waited upon, a CompletableFuture can complete immediately. This may break E-ordering, as the C++ implementation
+relies on kj::evalLater() to defer method calls, and there is no obvious (to me, anyway) way to replicate the behaviour of
+kj::evalLater() with CompletableFutures.
+
+Most of the C++ RPC test cases have been ported to this implementation, which gives me some comfort that the implementation logic is
+correct, but more extensive testing is required.
+
+This implementation does not support generic interfaces. Extending the schema compiler to output code for generic interfaces is an
+exercise I leave to the reader.
+
+
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index 9426d98..3914d48 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -5,7 +5,7 @@
benchmark
jar
capnproto-java benchmark
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
capnproto-java benchmark
org.capnproto
@@ -33,12 +33,13 @@
UTF-8
14
14
+ 14
org.capnproto
runtime
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
diff --git a/compiler/pom.xml b/compiler/pom.xml
index bab021c..5bd6837 100644
--- a/compiler/pom.xml
+++ b/compiler/pom.xml
@@ -5,7 +5,7 @@
compiler
jar
schema compiler plugin for java
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
capnpc-java
org.capnproto
@@ -31,6 +31,8 @@
UTF-8
+ 14
+ 14
@@ -42,7 +44,7 @@
org.capnproto
runtime
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
diff --git a/examples/pom.xml b/examples/pom.xml
index 7462ccd..793edc7 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -5,7 +5,7 @@
examples
jar
capnproto-java examples
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
capnproto-java examples
org.capnproto
@@ -38,16 +38,16 @@
org.capnproto
runtime
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
org.capnproto
runtime-rpc
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
-
-
+
+
junit
junit
4.13.1
diff --git a/pom.xml b/pom.xml
index 10d5c2b..f33578d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
org.capnproto
capnproto-java
pom
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
Cap'n Proto for Java
https://capnproto.org/
diff --git a/runtime-rpc/pom.xml b/runtime-rpc/pom.xml
index 7677eeb..1d84354 100644
--- a/runtime-rpc/pom.xml
+++ b/runtime-rpc/pom.xml
@@ -5,7 +5,7 @@
runtime-rpc
jar
runtime-rpc
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
Cap'n Proto RPC runtime library
org.capnproto
@@ -49,12 +49,12 @@
org.capnproto
runtime
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
org.capnproto
compiler
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
@@ -227,4 +227,4 @@
-
\ No newline at end of file
+
diff --git a/runtime/pom.xml b/runtime/pom.xml
index af83e63..b2a38d5 100644
--- a/runtime/pom.xml
+++ b/runtime/pom.xml
@@ -5,7 +5,7 @@
runtime
jar
runtime
- 0.1.6-SNAPSHOT
+ 0.1.10-SNAPSHOT
Cap'n Proto runtime library
org.capnproto
@@ -64,7 +64,7 @@
org.apache.maven.plugins
maven-compiler-plugin
- 3.3
+ 3.6.2
-Xlint:unchecked
@@ -75,6 +75,15 @@
+
+ jdk11FF
+
+ (11,)
+
+
+ 11
+
+
release
diff --git a/runtime/src/main/java/org/capnproto/Arena.java b/runtime/src/main/java/org/capnproto/Arena.java
index bb6c42e..2866f61 100644
--- a/runtime/src/main/java/org/capnproto/Arena.java
+++ b/runtime/src/main/java/org/capnproto/Arena.java
@@ -23,5 +23,5 @@ package org.capnproto;
public interface Arena {
public SegmentReader tryGetSegment(int id);
- public void checkReadLimit(int numBytes);
+ public void checkReadLimit(int numWords);
}
diff --git a/runtime/src/main/java/org/capnproto/BuilderArena.java b/runtime/src/main/java/org/capnproto/BuilderArena.java
index a05f341..1ae46d0 100644
--- a/runtime/src/main/java/org/capnproto/BuilderArena.java
+++ b/runtime/src/main/java/org/capnproto/BuilderArena.java
@@ -103,6 +103,30 @@ public final class BuilderArena implements Arena {
return this.localCapTable;
}
+ /**
+ * Constructs a BuilderArena from a ReaderArena and uses the size of the largest segment
+ * as the next allocation size.
+ */
+ BuilderArena(ReaderArena arena) {
+ this.segments = new ArrayList();
+ int largestSegment = SUGGESTED_FIRST_SEGMENT_WORDS*Constants.BYTES_PER_WORD;
+ for (int ii = 0; ii < arena.segments.size(); ++ii) {
+ SegmentReader segment = arena.segments.get(ii);
+ SegmentBuilder segmentBuilder = new SegmentBuilder(segment.buffer, this);
+ segmentBuilder.id = ii;
+ segmentBuilder.pos = segmentBuilder.capacity(); // buffer is pre-filled
+ segments.add(segmentBuilder);
+
+ // Find the largest segment for the allocation strategy.
+ largestSegment = Math.max(largestSegment, segment.buffer.capacity());
+ }
+ DefaultAllocator defaultAllocator = new DefaultAllocator(SUGGESTED_ALLOCATION_STRATEGY);
+
+ // Use largest segment as next size.
+ defaultAllocator.setNextAllocationSizeBytes(largestSegment);
+ this.allocator = defaultAllocator;
+ }
+
@Override
public final SegmentReader tryGetSegment(int id) {
return this.segments.get(id);
@@ -126,6 +150,10 @@ public final class BuilderArena implements Arena {
}
}
+ /**
+ * Allocates `amount` words in an existing segment or, if no suitable segment
+ * exists, in a new segment.
+ */
public AllocateResult allocate(int amount) {
int len = this.segments.size();
@@ -136,6 +164,10 @@ public final class BuilderArena implements Arena {
return new AllocateResult(this.segments.get(len - 1), result);
}
}
+ if (amount >= 1 << 28) {
+ // Computing `amount * Constants.BYTES_PER_WORD` would overflow.
+ throw new RuntimeException("Too many words to allocate: " + amount);
+ }
SegmentBuilder newSegment = new SegmentBuilder(
this.allocator.allocateSegment(amount * Constants.BYTES_PER_WORD),
this);
diff --git a/runtime/src/main/java/org/capnproto/DefaultAllocator.java b/runtime/src/main/java/org/capnproto/DefaultAllocator.java
index d136356..1d1d1bb 100644
--- a/runtime/src/main/java/org/capnproto/DefaultAllocator.java
+++ b/runtime/src/main/java/org/capnproto/DefaultAllocator.java
@@ -18,6 +18,17 @@ public class DefaultAllocator implements Allocator {
public AllocationStrategy allocationStrategy =
AllocationStrategy.GROW_HEURISTICALLY;
+ /**
+ The largest number of bytes to try allocating when using `GROW_HEURISTICALLY`.
+
+ Set this value smaller if you get the error:
+
+ java.lang.OutOfMemoryError: Requested array size exceeds VM limit
+
+ Experimentally, `Integer.MAX_VALUE - 2` seems to work on most systems.
+ */
+ public int maxSegmentBytes = Integer.MAX_VALUE - 2;
+
public DefaultAllocator() {}
public DefaultAllocator(AllocationStrategy allocationStrategy) {
@@ -52,13 +63,16 @@ public class DefaultAllocator implements Allocator {
switch (this.allocationStrategy) {
case GROW_HEURISTICALLY:
- this.nextSize += size;
+ if (size < this.maxSegmentBytes - this.nextSize) {
+ this.nextSize += size;
+ } else {
+ this.nextSize = maxSegmentBytes;
+ }
break;
case FIXED_SIZE:
break;
}
- this.nextSize += size;
return result;
}
}
diff --git a/runtime/src/main/java/org/capnproto/MessageBuilder.java b/runtime/src/main/java/org/capnproto/MessageBuilder.java
index 98a5560..fbfe082 100644
--- a/runtime/src/main/java/org/capnproto/MessageBuilder.java
+++ b/runtime/src/main/java/org/capnproto/MessageBuilder.java
@@ -71,6 +71,22 @@ public final class MessageBuilder {
this.arena = new BuilderArena(new DefaultAllocator(), firstSegment);
}
+ /**
+ * Constructs a MessageBuilder from a MessageReader. This constructor is private
+ * because it is unsafe. To use it, you must call `unsafeConstructFromMessageReader()`.
+ */
+ private MessageBuilder(MessageReader messageReader) {
+ this.arena = new BuilderArena(messageReader.arena);
+ }
+
+ /**
+ * Constructs a MessageBuilder from a MessageReader without copying the segments.
+ * This method should only be used on trusted data. Otherwise you may observe infinite
+ * loops or large memory allocations or index-out-of-bounds errors.
+ */
+ public static MessageBuilder unsafeConstructFromMessageReader(MessageReader messageReader) {
+ return new MessageBuilder(messageReader);
+ }
private AnyPointer.Builder getRootInternal() {
if (this.arena.segments.isEmpty()) {
diff --git a/runtime/src/main/java/org/capnproto/ReaderArena.java b/runtime/src/main/java/org/capnproto/ReaderArena.java
index 13c9c87..c499401 100644
--- a/runtime/src/main/java/org/capnproto/ReaderArena.java
+++ b/runtime/src/main/java/org/capnproto/ReaderArena.java
@@ -26,8 +26,8 @@ import java.nio.ByteBuffer;
public final class ReaderArena implements Arena {
+ // Current limit. -1 means no limit.
public long limit;
- // current limit
public final ArrayList segments;
@@ -45,11 +45,14 @@ public final class ReaderArena implements Arena {
}
@Override
- public final void checkReadLimit(int numBytes) {
- if (numBytes > limit) {
+ public final void checkReadLimit(int numWords) {
+ if (limit == -1) {
+ // No limit.
+ return;
+ } else if (numWords > limit) {
throw new DecodeException("Read limit exceeded.");
} else {
- limit -= numBytes;
+ limit -= numWords;
}
}
}
diff --git a/runtime/src/main/java/org/capnproto/ReaderOptions.java b/runtime/src/main/java/org/capnproto/ReaderOptions.java
index fa22b40..21cee6d 100644
--- a/runtime/src/main/java/org/capnproto/ReaderOptions.java
+++ b/runtime/src/main/java/org/capnproto/ReaderOptions.java
@@ -22,7 +22,18 @@
package org.capnproto;
public final class ReaderOptions {
+ /**
+ How many words are allowed to be read before an exception is thrown,
+ to protect against denial of service attacks.
+
+ -1 means "no limit".
+ */
public final long traversalLimitInWords;
+
+ /**
+ How many pointer indirections deep a message may be before an exception
+ is thrown.
+ */
public final int nestingLimit;
public ReaderOptions(long traversalLimitInWords, int nestingLimit) {
diff --git a/runtime/src/main/java/org/capnproto/SegmentBuilder.java b/runtime/src/main/java/org/capnproto/SegmentBuilder.java
index e9ac57a..9727b58 100644
--- a/runtime/src/main/java/org/capnproto/SegmentBuilder.java
+++ b/runtime/src/main/java/org/capnproto/SegmentBuilder.java
@@ -35,7 +35,7 @@ public final class SegmentBuilder extends SegmentReader {
}
// the total number of words the buffer can hold
- private final int capacity() {
+ final int capacity() {
this.buffer.rewind();
return this.buffer.remaining() / 8;
}
diff --git a/runtime/src/main/java/org/capnproto/Serialize.java b/runtime/src/main/java/org/capnproto/Serialize.java
index 8b51963..9a8406c 100644
--- a/runtime/src/main/java/org/capnproto/Serialize.java
+++ b/runtime/src/main/java/org/capnproto/Serialize.java
@@ -30,6 +30,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+/**
+ * Serialization using the standard (unpacked) stream encoding:
+ * https://capnproto.org/encoding.html#serialization-over-a-stream
+ */
public final class Serialize {
static ByteBuffer makeByteBuffer(int bytes) {
@@ -39,6 +43,16 @@ public final class Serialize {
return result;
}
+ static int MAX_SEGMENT_WORDS = (1 << 28) - 1;
+
+ static ByteBuffer makeByteBufferForWords(int words) throws IOException {
+ if (words > MAX_SEGMENT_WORDS) {
+ // Trying to construct the segment would cause overflow.
+ throw new DecodeException("segment has too many words (" + words + ")");
+ }
+ return makeByteBuffer(words * Constants.BYTES_PER_WORD);
+ }
+
public static void fillBuffer(ByteBuffer buffer, ReadableByteChannel bc) throws IOException {
while(buffer.hasRemaining()) {
int r = bc.read(buffer);
@@ -55,21 +69,22 @@ public final class Serialize {
}
public static MessageReader read(ReadableByteChannel bc, ReaderOptions options) throws IOException {
- ByteBuffer firstWord = makeByteBuffer(Constants.BYTES_PER_WORD);
+ ByteBuffer firstWord = makeByteBufferForWords(1);
fillBuffer(firstWord, bc);
- int segmentCount = 1 + firstWord.getInt(0);
+ int rawSegmentCount = firstWord.getInt(0);
+ if (rawSegmentCount < 0 || rawSegmentCount > 511) {
+ throw new DecodeException("segment count must be between 0 and 512");
+ }
+
+ int segmentCount = 1 + rawSegmentCount;
int segment0Size = 0;
if (segmentCount > 0) {
segment0Size = firstWord.getInt(4);
}
- int totalWords = segment0Size;
-
- if (segmentCount > 512) {
- throw new IOException("too many segments");
- }
+ long totalWords = segment0Size;
// in words
ArrayList moreSizes = new ArrayList();
@@ -88,23 +103,17 @@ public final class Serialize {
throw new DecodeException("Message size exceeds traversal limit.");
}
- ByteBuffer allSegments = makeByteBuffer(totalWords * Constants.BYTES_PER_WORD);
- fillBuffer(allSegments, bc);
-
ByteBuffer[] segmentSlices = new ByteBuffer[segmentCount];
- allSegments.rewind();
- segmentSlices[0] = allSegments.slice();
- segmentSlices[0].limit(segment0Size * Constants.BYTES_PER_WORD);
- segmentSlices[0].order(ByteOrder.LITTLE_ENDIAN);
+ segmentSlices[0] = makeByteBufferForWords(segment0Size);
+ fillBuffer(segmentSlices[0], bc);
+ segmentSlices[0].rewind();
int offset = segment0Size;
for (int ii = 1; ii < segmentCount; ++ii) {
- allSegments.position(offset * Constants.BYTES_PER_WORD);
- segmentSlices[ii] = allSegments.slice();
- segmentSlices[ii].limit(moreSizes.get(ii - 1) * Constants.BYTES_PER_WORD);
- segmentSlices[ii].order(ByteOrder.LITTLE_ENDIAN);
- offset += moreSizes.get(ii - 1);
+ segmentSlices[ii] = makeByteBufferForWords(moreSizes.get(ii - 1));
+ fillBuffer(segmentSlices[ii], bc);
+ segmentSlices[ii].rewind();
}
return new MessageReader(segmentSlices, options);
@@ -114,15 +123,16 @@ public final class Serialize {
return read(bb, ReaderOptions.DEFAULT_READER_OPTIONS);
}
- /*
+ /**
* Upon return, `bb.position()` will be at the end of the message.
*/
public static MessageReader read(ByteBuffer bb, ReaderOptions options) throws IOException {
bb.order(ByteOrder.LITTLE_ENDIAN);
- int segmentCount = 1 + bb.getInt();
- if (segmentCount > 512) {
- throw new IOException("too many segments");
+ int rawSegmentCount = bb.getInt();
+ int segmentCount = 1 + rawSegmentCount;
+ if (rawSegmentCount < 0 || rawSegmentCount > 511) {
+ throw new DecodeException("segment count must be between 0 and 512");
}
ByteBuffer[] segmentSlices = new ByteBuffer[segmentCount];
@@ -137,6 +147,10 @@ public final class Serialize {
for (int ii = 0; ii < segmentCount; ++ii) {
int segmentSize = bb.getInt(segmentSizesBase + ii * 4);
+ if (segmentSize > MAX_SEGMENT_WORDS -
+ (totalWords + segmentBase / Constants.BYTES_PER_WORD)) {
+ throw new DecodeException("segment size is too large");
+ }
bb.position(segmentBase + totalWords * Constants.BYTES_PER_WORD);
segmentSlices[ii] = bb.slice();
@@ -147,7 +161,7 @@ public final class Serialize {
}
bb.position(segmentBase + totalWords * Constants.BYTES_PER_WORD);
- if (totalWords > options.traversalLimitInWords) {
+ if (options.traversalLimitInWords != -1 && totalWords > options.traversalLimitInWords) {
throw new DecodeException("Message size exceeds traversal limit.");
}
@@ -178,9 +192,8 @@ public final class Serialize {
return bytes / Constants.BYTES_PER_WORD;
}
- public static void write(WritableByteChannel outputChannel,
- MessageBuilder message) throws IOException {
- ByteBuffer[] segments = message.getSegmentsForOutput();
+ private static void writeSegmentTable(WritableByteChannel outputChannel,
+ ByteBuffer[] segments) throws IOException {
int tableSize = (segments.length + 2) & (~1);
ByteBuffer table = ByteBuffer.allocate(4 * tableSize);
@@ -196,6 +209,34 @@ public final class Serialize {
while (table.hasRemaining()) {
outputChannel.write(table);
}
+ }
+
+ /**
+ * Serializes a MessageBuilder to a WritableByteChannel.
+ */
+ public static void write(WritableByteChannel outputChannel,
+ MessageBuilder message) throws IOException {
+ ByteBuffer[] segments = message.getSegmentsForOutput();
+ writeSegmentTable(outputChannel, segments);
+
+ for (ByteBuffer buffer : segments) {
+ while(buffer.hasRemaining()) {
+ outputChannel.write(buffer);
+ }
+ }
+ }
+
+ /**
+ * Serializes a MessageReader to a WritableByteChannel.
+ */
+ public static void write(WritableByteChannel outputChannel,
+ MessageReader message) throws IOException {
+ ByteBuffer[] segments = new ByteBuffer[message.arena.segments.size()];
+ for (int ii = 0 ; ii < message.arena.segments.size(); ++ii) {
+ segments[ii] = message.arena.segments.get(ii).buffer.duplicate();
+ }
+
+ writeSegmentTable(outputChannel, segments);
for (ByteBuffer buffer : segments) {
while(buffer.hasRemaining()) {
diff --git a/runtime/src/main/java/org/capnproto/SerializePacked.java b/runtime/src/main/java/org/capnproto/SerializePacked.java
index 1a107eb..88b31a9 100644
--- a/runtime/src/main/java/org/capnproto/SerializePacked.java
+++ b/runtime/src/main/java/org/capnproto/SerializePacked.java
@@ -21,6 +21,9 @@
package org.capnproto;
+/**
+ * Serialization using the packed encoding: https://capnproto.org/encoding.html#packing
+ */
public final class SerializePacked {
public static MessageReader read(BufferedInputStream input) throws java.io.IOException {
@@ -42,16 +45,42 @@ public final class SerializePacked {
return Serialize.read(packedInput, options);
}
+ /**
+ * Serializes a MessageBuilder to a BufferedOutputStream.
+ */
public static void write(BufferedOutputStream output,
MessageBuilder message) throws java.io.IOException {
PackedOutputStream packedOutputStream = new PackedOutputStream(output);
Serialize.write(packedOutputStream, message);
}
+ /**
+ * Serializes a MessageReader to a BufferedOutputStream.
+ */
+ public static void write(BufferedOutputStream output,
+ MessageReader message) throws java.io.IOException {
+ PackedOutputStream packedOutputStream = new PackedOutputStream(output);
+ Serialize.write(packedOutputStream, message);
+ }
+
+ /**
+ * Serializes a MessageBuilder to an unbuffered output stream.
+ */
public static void writeToUnbuffered(java.nio.channels.WritableByteChannel output,
MessageBuilder message) throws java.io.IOException {
BufferedOutputStreamWrapper buffered = new BufferedOutputStreamWrapper(output);
write(buffered, message);
buffered.flush();
}
+
+ /**
+ * Serializes a MessageReader to an unbuffered output stream.
+ */
+ public static void writeToUnbuffered(java.nio.channels.WritableByteChannel output,
+ MessageReader message) throws java.io.IOException {
+ BufferedOutputStreamWrapper buffered = new BufferedOutputStreamWrapper(output);
+ write(buffered, message);
+ buffered.flush();
+ }
+
}
diff --git a/runtime/src/test/java/org/capnproto/DefaultAllocatorTest.java b/runtime/src/test/java/org/capnproto/DefaultAllocatorTest.java
new file mode 100644
index 0000000..3770a7b
--- /dev/null
+++ b/runtime/src/test/java/org/capnproto/DefaultAllocatorTest.java
@@ -0,0 +1,26 @@
+package org.capnproto;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DefaultAllocatorTest {
+ @Test
+ public void maxSegmentBytes() {
+ DefaultAllocator allocator = new DefaultAllocator();
+ Assert.assertEquals(allocator.allocationStrategy,
+ BuilderArena.AllocationStrategy.GROW_HEURISTICALLY);
+ allocator.maxSegmentBytes = (1 << 25) - 1;
+
+ int allocationSize = 1 << 24;
+ allocator.setNextAllocationSizeBytes(allocationSize);
+
+ Assert.assertEquals(allocationSize,
+ allocator.allocateSegment(allocationSize).capacity());
+
+ Assert.assertEquals(allocator.maxSegmentBytes,
+ allocator.allocateSegment(allocationSize).capacity());
+
+ Assert.assertEquals(allocator.maxSegmentBytes,
+ allocator.allocateSegment(allocationSize).capacity());
+ }
+}
diff --git a/runtime/src/test/java/org/capnproto/SerializeTest.java b/runtime/src/test/java/org/capnproto/SerializeTest.java
index d2245bb..9ee4db9 100644
--- a/runtime/src/test/java/org/capnproto/SerializeTest.java
+++ b/runtime/src/test/java/org/capnproto/SerializeTest.java
@@ -61,6 +61,10 @@ public class SerializeTest {
{
MessageReader messageReader = Serialize.read(new ArrayInputStream(ByteBuffer.wrap(exampleBytes)));
checkSegmentContents(exampleSegmentCount, messageReader.arena);
+
+ byte[] outputBytes = new byte[exampleBytes.length];
+ Serialize.write(new ArrayOutputStream(ByteBuffer.wrap(outputBytes)), messageReader);
+ Assert.assertArrayEquals(exampleBytes, outputBytes);
}
// ------