diff --git a/runtime/src/main/java/org/capnproto/BufferedInputStream.java b/runtime/src/main/java/org/capnproto/BufferedInputStream.java index 9e1b0a3..13c8f8c 100644 --- a/runtime/src/main/java/org/capnproto/BufferedInputStream.java +++ b/runtime/src/main/java/org/capnproto/BufferedInputStream.java @@ -4,5 +4,5 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; public interface BufferedInputStream extends ReadableByteChannel { - public ByteBuffer getReadBuffer(); + public ByteBuffer getReadBuffer() throws java.io.IOException; } diff --git a/runtime/src/main/java/org/capnproto/BufferedInputStreamWrapper.java b/runtime/src/main/java/org/capnproto/BufferedInputStreamWrapper.java index 69d6851..2c40dfb 100644 --- a/runtime/src/main/java/org/capnproto/BufferedInputStreamWrapper.java +++ b/runtime/src/main/java/org/capnproto/BufferedInputStreamWrapper.java @@ -57,7 +57,11 @@ public final class BufferedInputStreamWrapper implements BufferedInputStream { } } - public final ByteBuffer getReadBuffer() { + public final ByteBuffer getReadBuffer() throws IOException { + if (this.cap - this.buf.position() == 0) { + this.buf.rewind(); + this.cap = readAtLeast(this.inner, this.buf, 1); + } return this.buf; } diff --git a/runtime/src/main/java/org/capnproto/PackedInputStream.java b/runtime/src/main/java/org/capnproto/PackedInputStream.java index f0f73e9..6d14eae 100644 --- a/runtime/src/main/java/org/capnproto/PackedInputStream.java +++ b/runtime/src/main/java/org/capnproto/PackedInputStream.java @@ -20,8 +20,8 @@ public final class PackedInputStream implements ReadableByteChannel { throw new Error("PackedInputStream reads must be word-aligned"); } - int out = outBuf.position(); - int outEnd = out + len; + int outPtr = outBuf.position(); + int outEnd = outPtr + len; ByteBuffer inBuf = this.inner.getReadBuffer(); @@ -33,27 +33,42 @@ public final class PackedInputStream implements ReadableByteChannel { //if (outBuf if (inBuf.remaining() < 10) { - if (out >= outEnd) { + if (outBuf.remaining() == 0) { return len; } if (inBuf.remaining() == 0) { - // refresh buffer... + inBuf = this.inner.getReadBuffer(); continue; } //# We have at least 1, but not 10, bytes available. We need to read //# slowly, doing a bounds check on each byte. - // TODO + tag = inBuf.get(); + + for (int i = 0; i < 8; ++i) { + if ((tag & (1 << i)) != 0) { + if (inBuf.remaining() == 0) { + inBuf = this.inner.getReadBuffer(); + } + outBuf.put(inBuf.get()); + } else { + outBuf.put((byte)0); + } + } + + if (inBuf.remaining() ==0 && (tag == 0 || tag == (byte)0xff)) { + inBuf = this.inner.getReadBuffer(); + } } else { tag = inBuf.get(); for (int n = 0; n < 8; ++n) { boolean isNonzero = (tag & (1 << n)) != 0; - // ... + outBuf.put((byte)(inBuf.get() & (isNonzero ? -1 : 0))); + inBuf.position(inBuf.position() + (isNonzero ? 0 : -1)); } - } if (tag == 0) { @@ -63,11 +78,13 @@ public final class PackedInputStream implements ReadableByteChannel { int runLength = inBuf.get() * 8; - if (runLength > outEnd - out) { + if (runLength > outEnd - outPtr) { throw new Error("Packed input did not end cleanly on a segment boundary"); } - + for (int i = 0; i < runLength; ++i) { + outBuf.put((byte) 0); + } } else if (tag == (byte)0xff) { int runLength = inBuf.get() * 8; @@ -81,7 +98,7 @@ public final class PackedInputStream implements ReadableByteChannel { } - if (out == outEnd) { + if (outBuf.remaining() == 0) { return len; } } diff --git a/runtime/src/test/scala/org/capnproto/SerializePackedTest.scala b/runtime/src/test/scala/org/capnproto/SerializePackedTest.scala index 483dd53..bcca6ea 100644 --- a/runtime/src/test/scala/org/capnproto/SerializePackedTest.scala +++ b/runtime/src/test/scala/org/capnproto/SerializePackedTest.scala @@ -9,13 +9,28 @@ class SerializePackedSuite extends FunSuite { def expectPacksTo(unpacked : Array[Byte], packed : Array[Byte]) { // ---- // write + { + val bytes = new Array[Byte](packed.length); + val writer = new ArrayOutputStream(ByteBuffer.wrap(bytes)); + val packedOutputStream = new PackedOutputStream(writer); + packedOutputStream.write(ByteBuffer.wrap(unpacked)); - val bytes = new Array[Byte](packed.length); - val writer = new ArrayOutputStream(ByteBuffer.wrap(bytes)); - val packedOutputStream = new PackedOutputStream (writer); - packedOutputStream.write(ByteBuffer.wrap(unpacked)); + (bytes) should equal (packed); + } + + // ------ + // read + { + val reader = new ArrayInputStream(ByteBuffer.wrap(packed)); + val packedInputStream = new PackedInputStream(reader); + val bytes = new Array[Byte](unpacked.length); + val n = packedInputStream.read(ByteBuffer.wrap(bytes)); + + //(n) should equal (unpacked.length); + + //(bytes) should equal (unpacked); + } - (bytes) should equal (packed); }