It's possible my attachment was rejected. I'm sending it again without the Java extension, and as a fail safe posting the code in line below.
Also there is one JIRA from earlier this year that is still open that could be related: https://issues.apache.org/jira/browse/ARROW-522 Derek // Begin source package com.stitchfix.algorithms.arrowjava; import io.netty.buffer.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.NullableIntVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; import org.apache.arrow.vector.complex.writer.BaseWriter; import org.apache.arrow.vector.complex.writer.IntWriter; import org.apache.arrow.vector.complex.writer.VarCharWriter; import org.apache.arrow.vector.stream.ArrowStreamReader; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.stream.ArrowStreamWriter; import java.io.IOException; import java.nio.channels.Pipe; import java.util.ArrayList; import static java.util.Arrays.asList; public class MapReadWrite { static final BufferAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); public void printAllocatorInfo(String message, BufferAllocator alloc) { printAllocatorInfo(message, alloc, false); } public void printAllocatorInfo(String message, BufferAllocator alloc, boolean isVerbose) { System.out.println(message); if (isVerbose) { System.out.println(alloc.toVerboseString()); } else { System.out.println(alloc.toString()); } } /** * Execute an example where we start with a schema, add "rows" of data to a record batch, then * simulate sending it and reading it back. */ public void run() { try { BufferAllocator allocator = rootAllocator.newChildAllocator("test allocator", 0, Integer.MAX_VALUE); // Schema where one field is a map Field f1 = new Field("index", FieldType.nullable(new ArrowType.Int(32, true)), null); // Define children then add those to the map type. Map of string to int: Field mapf1 = new Field("map1_f1", FieldType.nullable(new ArrowType.Utf8()), null); Field mapf2 = new Field("map1_f2", FieldType.nullable(new ArrowType.Int(32, true)), null); Field f2 = new Field("map1", FieldType.nullable(new ArrowType.Struct()), asList(mapf1, mapf2)); System.out.println("Map field: " + f2.toString()); Schema schema = new Schema(asList(f1, f2)); // Create "schema root" System.out.println("Schema : " + schema.toString()); VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); assert (root.getFieldVectors().size() == 2); // Create a stream writer with this pipe as a sink Pipe initialPipe = Pipe.open(); ArrowStreamWriter streamWriter = new ArrowStreamWriter(root, null, initialPipe.sink()); streamWriter.start(); // Get vectors, mutator for simple field, writers for map ArrayList<FieldVector> fieldVectors = new ArrayList<FieldVector>(); for (FieldVector fv : root.getFieldVectors()) { fv.allocateNew(); fieldVectors.add(fv); } NullableIntVector.Mutator m1 = (NullableIntVector.Mutator) fieldVectors.get(0).getMutator(); MapVector mapVec = (MapVector) fieldVectors.get(1); BaseWriter.ComplexWriter cw = new ComplexWriterImpl("root", mapVec); BaseWriter.MapWriter writer = cw.rootAsMap(); VarCharWriter writerKey = writer.varChar("map1_f1"); IntWriter writerValue = writer.integer("map1_f2"); int COUNT = 5; for (int i = 0; i < COUNT; i++) { m1.setSafe(i, 2 * i); writer.setPosition(i); writerKey.setPosition(i); // Maybe these are redundant - another attempt to get this to work writerValue.setPosition(i); writer.start(); // Write a string (UTF-8 bytes): String keyValue = "map key 1"; byte[] keyValueBytes = keyValue.getBytes("UTF-8"); ArrowBuf tempBuf = allocator.buffer(keyValueBytes.length); tempBuf.writeBytes(keyValueBytes, 0, keyValueBytes.length); writerKey.writeVarChar(0, keyValueBytes.length, tempBuf); tempBuf.close(); writerValue.writeInt(101); writer.end(); } m1.setValueCount(COUNT); mapVec.getMutator().setValueCount(COUNT); // Redundant I think cw.setValueCount(COUNT); root.setRowCount(COUNT); printAllocatorInfo("After writing data and setting row counts", allocator); System.out.println("Write out the record batch to the stream"); streamWriter.writeBatch(); printAllocatorInfo("After writing one batch", allocator); streamWriter.close(); System.out.println("Closed stream writer."); root.close(); printAllocatorInfo("After complete close of root object used when writing", allocator); allocator.close(); System.out.println("Setting up objects to read batches back"); BufferAllocator valAllocator = rootAllocator.newChildAllocator("readback allocator", 0, Integer.MAX_VALUE); ArrowStreamReader streamReader = new ArrowStreamReader(initialPipe.source(), valAllocator); VectorSchemaRoot vsr = streamReader.getVectorSchemaRoot(); assert(vsr.getSchema().equals(schema)); while (streamReader.loadNextBatch()) { // Eventually read data here. But loadNextBatch throws an exception: // java.lang.IllegalArgumentException: not all nodes and buffers were consumed ... System.out.println("Read one batch back."); } streamReader.close(); valAllocator.close(); System.out.println("DONE"); } catch (IOException ioe) { System.out.println("Ugly IOException : " + ioe.getMessage()); ioe.printStackTrace(); } } /** * Main method that performs one run. * * @param args */ public static void main(String[] args) { MapReadWrite mrw = new MapReadWrite(); mrw.run(); System.out.println("DONE with MapReadWrite"); } } // End source *Derek Bennett* *Software Engineer - Data Platform* | Stitch Fix One Montgomery Tower Suite 1500 San Francisco, CA 94104 On Wed, Dec 6, 2017 at 12:16 PM, Timothy Chen <tnac...@gmail.com> wrote: > Hey Derek, > > I don’t think you have attached your example code. > > Tim > On Wed, Dec 6, 2017 at 1:38 PM Derek Bennett <dbenn...@stitchfix.com> > wrote: > > > Hello, > > > > I've been using the Java API to send and receive data in Arrow format. I > > am having trouble in cases where the schema has map fields, and wanted to > > see if this is a usage problem before logging an issue (most everything > > else is working as expected so far). > > > > I have an attached example that creates a simple schema with 1 integer > > field and 1 map field and writes one record batch. This part works > > correctly. However when trying to read back the record batch it always > > fails with an exception (see stack trace below). This only happens when > I > > use map types -- I've done a similar thing with a list and it works > > perfectly. > > > > The error makes me think more is being written out in the record batch > > than should be, and there are unexpected buffers when the batch is being > > read back. I have to admit I am a little confused by the MapWriter API > so > > I may be doing something wrong here -- but again everything is fine with > > lists and ListWriter. > > > > I've attached the source example I used. If it's helpful I could rewrite > > this as a unit test and submit a PR to include it and can log an issue in > > GitHub if this turns out to be a real error. > > > > Thanks! > > Derek > > > > > > > > Exception in thread "main" java.lang.IllegalArgumentException: not all > > nodes and buffers were consumed. nodes: [ArrowFieldNode [length=5, > > nullCount=0], ArrowFieldNode [length=5, nullCount=0], ArrowFieldNode > > [length=5, nullCount=0]] buffers: [ArrowBuf[30], udle: [21 104..105], > > ArrowBuf[31], udle: [21 112..113], ArrowBuf[32], udle: [21 120..144], > > ArrowBuf[33], udle: [21 144..189], ArrowBuf[34], udle: [21 192..193], > > ArrowBuf[35], udle: [21 200..220]] > > at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:66) > > at org.apache.arrow.vector.file.ArrowReader$1.visit( > ArrowReader.java:109) > > at org.apache.arrow.vector.file.ArrowReader$1.visit(ArrowReader.java:95) > > at > > org.apache.arrow.vector.schema.ArrowRecordBatch. > accepts(ArrowRecordBatch.java:128) > > at > > org.apache.arrow.vector.file.ArrowReader.loadNextBatch( > ArrowReader.java:121) > > at > > com.stitchfix.algorithms.arrowjava.MapReadWrite.run( > MapReadWrite.java:136) > > > > > > > > *Derek Bennett* > > *Software Engineer - Data Platform* | Stitch Fix > > > > One Montgomery Tower Suite 1500 > > San Francisco, CA 94104 > > >