[ https://issues.apache.org/jira/browse/ARROW-5230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17662253#comment-17662253 ]
Rok Mihevc commented on ARROW-5230: ----------------------------------- This issue has been migrated to [issue #21703|https://github.com/apache/arrow/issues/21703] on GitHub. Please see the [migration documentation|https://github.com/apache/arrow/issues/14542] for further details. > [Java] Read Struct Vector from ArrowStreamReader bugs > ----------------------------------------------------- > > Key: ARROW-5230 > URL: https://issues.apache.org/jira/browse/ARROW-5230 > Project: Apache Arrow > Issue Type: Bug > Components: Java > Environment: Mac OS 10.13.6, Arrow 0.13.0, JDK8 > Reporter: Shawn Yang > Priority: Major > > After writing struct vector using ArrowStreamWriter to a file, read it back > using ArrowStreamReader throws exception: > {quote}Exception in thread "main" java.lang.IllegalArgumentException: not all > nodes and buffers were consumed. nodes: [ArrowFieldNode [length=100, > nullCount=0], ArrowFieldNode [length=100, nullCount=0]] buffers: > [ArrowBuf[26], udle: [11 16..29], ArrowBuf[27], udle: [11 32..432], > ArrowBuf[28], udle: [11 432..445], ArrowBuf[29], udle: [11 448..848]] > at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:64) > at > org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:219) > at > org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:121) > {quote} > Here's the code to reproduce this exception: > {code:java} > import org.apache.arrow.memory.RootAllocator; > import org.apache.arrow.vector.FieldVector; > import org.apache.arrow.vector.IntVector; > import org.apache.arrow.vector.VectorSchemaRoot; > import org.apache.arrow.vector.complex.StructVector; > import org.apache.arrow.vector.dictionary.DictionaryProvider; > import org.apache.arrow.vector.ipc.ArrowStreamReader; > import org.apache.arrow.vector.ipc.ArrowStreamWriter; > import org.apache.arrow.vector.types.pojo.ArrowType; > import org.apache.arrow.vector.types.pojo.Field; > import org.apache.arrow.vector.types.pojo.FieldType; > import org.apache.arrow.vector.types.pojo.Schema; > import java.io.ByteArrayInputStream; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.OutputStream; > import java.nio.file.Files; > import java.nio.file.Paths; > import java.util.Collections; > import java.util.List; > import java.util.concurrent.ThreadLocalRandom; > public class StructTest { > public static void writeBatch(OutputStream os) throws IOException { > List<Field> fields = Collections.singletonList(new > Field("f-Struct(Int, Int)", FieldType.nullable(ArrowType.Struct.INSTANCE), > null)); > Schema schema = new Schema(fields); > VectorSchemaRoot root = VectorSchemaRoot.create(schema, new > RootAllocator(Integer.MAX_VALUE)); > DictionaryProvider.MapDictionaryProvider provider = new > DictionaryProvider.MapDictionaryProvider(); > ArrowStreamWriter writer = new ArrowStreamWriter(root, provider, os); > writer.start(); > for (int i = 0; i < 2; i++) { > root.setRowCount(100); > List<FieldVector> vectors = root.getFieldVectors(); > StructVector vector = (StructVector) vectors.get(0); > fillVector(vector, 100); > for (int j = 0; j < 100; j++) { > if (!vector.isNull(j)) { > System.out.println(vector.getObject(j)); > } > } > writer.writeBatch(); > } > writer.end(); > writer.close(); > } > public static void fillVector(StructVector vector, int batchSize) { > vector.setInitialCapacity(batchSize); > vector.allocateNew(); > vector.addOrGet("s1", FieldType.nullable(new ArrowType.Int(32, > true)), IntVector.class); > vector.addOrGet("s2", FieldType.nullable(new ArrowType.Int(32, > true)), IntVector.class); > fillVector((IntVector)(vector.getChild("s1")), batchSize); > fillVector((IntVector) (vector.getChild("s2")), batchSize); > for (int i = 0; i < batchSize; i++) { > vector.setIndexDefined(i); > } > vector.setValueCount(batchSize); > } > public static void fillVector(IntVector vector, int batchSize) { > vector.setInitialCapacity(batchSize); > vector.allocateNew(); > for (int i = 0; i < batchSize; i++) { > vector.setSafe(i, 1, ThreadLocalRandom.current().nextInt()); > } > vector.setValueCount(batchSize); > } > public static void main(String[] args) throws IOException { > try (FileOutputStream fos = new > FileOutputStream("result/struct.arrow")) { > writeBatch(fos); > System.out.println("write succeed"); > fos.flush(); > } > RootAllocator allocator = new RootAllocator(1000000000); > ByteArrayInputStream in = new > ByteArrayInputStream(Files.readAllBytes(Paths.get("result/struct.arrow"))); > ArrowStreamReader reader = new ArrowStreamReader(in, allocator); > reader.loadNextBatch(); > } > } > {code} > Even arrow java write union data seems succeed, python read nothing: > > {code:java} > def read_batch(path, number=100): > with open(path, 'rb') as f: > data = f.read() > print("size: ", len(data)) > batches = list(pyarrow.ipc.open_stream(data)) > print(batches) > print(batches[0].schema) > print(batches[0].slice(0, 8).to_pydict()) > return > if __name__ == "__main__": > read_batch('../result/struct.arrow') > {code} > The result is: > OrderedDict([('f-Struct(Int, Int)', [{}, {}, {}, {}, {}, {}, {}, {}])]) > > If I make struct record batches in python, java can read it back: > Write data: > {code:java} > def make_struct(path, batch_size=200, num_batch=2): > obj = get_struct_obj(batch_size) > batch = pa.RecordBatch.from_arrays([obj], ['fo']) > writer = pa.RecordBatchStreamWriter(path, batch.schema) > for _ in range(num_batch): > writer.write_batch(batch) > writer.close() > make_struct("struct.arrow") > {code} > Read back: > {code:java} > RootAllocator allocator = new RootAllocator(1000000000); > ByteArrayInputStream in = new > ByteArrayInputStream(Files.readAllBytes(Paths.get("struct.arrow"))); > ArrowStreamReader reader = new ArrowStreamReader(in, allocator); > reader.loadNextBatch(); > {code} > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)