diff --git a/runtime/src/main/java/org/capnproto/BufferedInputStreamWrapper.java b/runtime/src/main/java/org/capnproto/BufferedInputStreamWrapper.java new file mode 100644 index 0000000..3a5bb68 --- /dev/null +++ b/runtime/src/main/java/org/capnproto/BufferedInputStreamWrapper.java @@ -0,0 +1,77 @@ +package org.capnproto; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; + +public final class BufferedInputStreamWrapper implements BufferedInputStream { + + private final ReadableByteChannel inner; + private final ByteBuffer buf; + private int cap = 0; + + public BufferedInputStreamWrapper(ReadableByteChannel chan) { + this.inner = chan; + this.buf = ByteBuffer.allocate(8192); + } + + public final int read(ByteBuffer dst) throws IOException { + int numBytes = dst.remaining(); + if (numBytes < cap - this.buf.position()) { + //# Serve from the current buffer. + ByteBuffer slice = this.buf.slice(); + slice.limit(numBytes); + dst.put(slice); + this.buf.position(this.buf.position() + numBytes); + return numBytes; + } else { + //# Copy current available into destination. + ByteBuffer slice = this.buf.slice(); + int fromFirstBuffer = cap - this.buf.position(); + slice.limit(fromFirstBuffer); + dst.put(slice); + + numBytes -= fromFirstBuffer; + if (numBytes <= this.buf.capacity()) { + //# Read the next buffer-full. + this.buf.rewind(); + int n = readAtLeast(this.inner, this.buf, numBytes); + + // ... + //ByteBuffer slice = + //dst.put( + + this.cap = n; + this.buf.position(numBytes); + return fromFirstBuffer + numBytes; + } else { + //# Forward large read to the underlying stream. + } + } + throw new Error("unimplemented"); + } + + public final ByteBuffer getReadBuffer() { + return this.buf; + } + + public final void close() throws IOException { + this.inner.close(); + } + + public final boolean isOpen() { + return this.inner.isOpen(); + } + + public static int readAtLeast(ReadableByteChannel reader, ByteBuffer buf, int minBytes) throws IOException { + int numRead = 0; + while (numRead < minBytes) { + int res = reader.read(buf); + if (res < 0) { + throw new Error("premature EOF"); + } + numRead += res; + } + return numRead; + } +} diff --git a/runtime/src/main/java/org/capnproto/BufferedOutputStreamWrapper.java b/runtime/src/main/java/org/capnproto/BufferedOutputStreamWrapper.java index 23713d1..36acdc4 100644 --- a/runtime/src/main/java/org/capnproto/BufferedOutputStreamWrapper.java +++ b/runtime/src/main/java/org/capnproto/BufferedOutputStreamWrapper.java @@ -6,8 +6,8 @@ import java.nio.channels.WritableByteChannel; public final class BufferedOutputStreamWrapper implements BufferedOutputStream { - public final WritableByteChannel inner; - public final ByteBuffer buf; + private final WritableByteChannel inner; + private final ByteBuffer buf; public BufferedOutputStreamWrapper(WritableByteChannel w) { this.inner = w; diff --git a/runtime/src/main/java/org/capnproto/ByteChannelMessageReader.java b/runtime/src/main/java/org/capnproto/ByteChannelMessageReader.java index 10eb9c3..330c3ec 100644 --- a/runtime/src/main/java/org/capnproto/ByteChannelMessageReader.java +++ b/runtime/src/main/java/org/capnproto/ByteChannelMessageReader.java @@ -15,7 +15,7 @@ public final class ByteChannelMessageReader { return result; } - static void fillBuffer(ByteBuffer buffer, ReadableByteChannel bc) throws IOException { + public static void fillBuffer(ByteBuffer buffer, ReadableByteChannel bc) throws IOException { while(buffer.hasRemaining()) { int r = bc.read(buffer); if (r < 0) {