ByteChannelMessageReader

This commit is contained in:
David Renshaw 2014-06-30 11:45:51 -04:00
parent dbeb32dcca
commit e87a9df4de
2 changed files with 31 additions and 27 deletions

View file

@ -1,11 +1,12 @@
package org.capnproto.examples;
import java.io.FileOutputStream;
import java.io.FileInputStream;
import java.io.FileDescriptor;
import org.capnproto.MessageBuilder;
import org.capnproto.MessageReader;
import org.capnproto.InputStreamMessageReader;
import org.capnproto.ByteChannelMessageReader;
import org.capnproto.Serialize;
import org.capnproto.StructList;
import org.capnproto.Text;
@ -47,7 +48,8 @@ public class AddressbookMain {
}
public static void printAddressBook() throws java.io.IOException {
MessageReader message = InputStreamMessageReader.create(System.in);
MessageReader message = ByteChannelMessageReader.create(
(new FileInputStream(FileDescriptor.in)).getChannel());
AddressBook.Reader addressbook = message.getRoot(AddressBook.factory);
for(Person.Reader person : addressbook.getPeople()) {
System.out.println(person.getName() + ": " + person.getEmail());

View file

@ -1,37 +1,33 @@
package org.capnproto;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
public final class InputStreamMessageReader {
public final class ByteChannelMessageReader {
static byte[] readExact(InputStream is, int length) throws IOException {
byte[] bytes = new byte[length];
int bytesRead = 0;
while (bytesRead < length) {
int r = is.read(bytes, bytesRead, length - bytesRead);
if (r < 0) {
throw new IOException("premature EOF");
}
bytesRead += r;
}
return bytes;
}
static ByteBuffer makeByteBuffer(byte[] bytes) {
ByteBuffer result = ByteBuffer.wrap(bytes);
static ByteBuffer makeByteBuffer(int bytes) {
ByteBuffer result = ByteBuffer.allocate(bytes);
result.order(ByteOrder.LITTLE_ENDIAN);
result.mark();
return result;
}
public static MessageReader create(InputStream is) throws IOException {
ByteBuffer firstWord = makeByteBuffer(readExact(is, 8));
static void fillBuffer(ByteBuffer buffer, ReadableByteChannel bc) throws IOException {
while(buffer.hasRemaining()) {
int r = bc.read(buffer);
if (r < 0) {
throw new IOException("premature EOF");
}
// TODO check for r == 0 ?.
}
}
public static MessageReader create(ReadableByteChannel bc) throws IOException {
ByteBuffer firstWord = makeByteBuffer(Constants.BYTES_PER_WORD);
fillBuffer(firstWord, bc);
int segmentCount = 1 + firstWord.getInt(0);
@ -50,7 +46,8 @@ public final class InputStreamMessageReader {
ArrayList<Integer> moreSizes = new ArrayList<Integer>();
if (segmentCount > 1) {
ByteBuffer moreSizesRaw = makeByteBuffer(readExact(is, 4 * (segmentCount & ~1)));
ByteBuffer moreSizesRaw = makeByteBuffer(4 * (segmentCount & ~1));
fillBuffer(moreSizesRaw, bc);
for (int ii = 0; ii < segmentCount - 1; ++ii) {
int size = moreSizesRaw.getInt(ii * 4);
moreSizes.add(size);
@ -60,18 +57,23 @@ public final class InputStreamMessageReader {
// TODO check that totalWords is reasonable
byte[] allSegments = readExact(is, totalWords * 8);
ByteBuffer allSegments = makeByteBuffer(totalWords * Constants.BYTES_PER_WORD);
fillBuffer(allSegments, bc);
ByteBuffer[] segmentSlices = new ByteBuffer[segmentCount];
segmentSlices[0] = ByteBuffer.wrap(allSegments, 0, segment0Size * 8);
segmentSlices[0] = ByteBuffer.wrap(allSegments.array(),
0,
segment0Size * Constants.BYTES_PER_WORD);
segmentSlices[0].order(ByteOrder.LITTLE_ENDIAN);
segmentSlices[0].mark();
int offset = segment0Size;
for (int ii = 1; ii < segmentCount; ++ii) {
segmentSlices[ii] = ByteBuffer.wrap(allSegments, offset * 8, moreSizes.get(ii - 1) * 8);
segmentSlices[ii] = ByteBuffer.wrap(allSegments.array(),
offset * Constants.BYTES_PER_WORD,
moreSizes.get(ii - 1) * Constants.BYTES_PER_WORD);
segmentSlices[ii].order(ByteOrder.LITTLE_ENDIAN);
segmentSlices[ii].mark();
offset += moreSizes.get(ii - 1);