It's possible my attachment was rejected.  I'm sending it again without the
Java extension, and as a fail safe posting the code in line below.

Also there is one JIRA from earlier this year that is still open that could
be related:
https://issues.apache.org/jira/browse/ARROW-522

Derek

// Begin source
package com.stitchfix.algorithms.arrowjava;

import io.netty.buffer.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.NullableIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
import org.apache.arrow.vector.complex.writer.BaseWriter;
import org.apache.arrow.vector.complex.writer.IntWriter;
import org.apache.arrow.vector.complex.writer.VarCharWriter;
import org.apache.arrow.vector.stream.ArrowStreamReader;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

import org.apache.arrow.vector.stream.ArrowStreamWriter;

import java.io.IOException;
import java.nio.channels.Pipe;
import java.util.ArrayList;

import static java.util.Arrays.asList;

public class MapReadWrite {
    static final BufferAllocator rootAllocator = new
RootAllocator(Integer.MAX_VALUE);

    public void printAllocatorInfo(String message, BufferAllocator alloc) {
        printAllocatorInfo(message, alloc, false);
    }

    public void printAllocatorInfo(String message, BufferAllocator alloc,
boolean isVerbose) {
        System.out.println(message);
        if (isVerbose) {
            System.out.println(alloc.toVerboseString());
        }
        else {
            System.out.println(alloc.toString());
        }
    }

    /**
     * Execute an example where we start with a schema, add "rows" of data
to a record batch, then
     * simulate sending it and reading it back.
     */
    public void run()  {
        try {
            BufferAllocator allocator =
rootAllocator.newChildAllocator("test allocator", 0, Integer.MAX_VALUE);
            // Schema where one field is a map
            Field f1 = new Field("index", FieldType.nullable(new
ArrowType.Int(32, true)), null);

            // Define children then add those to the map type.  Map of
string to int:
            Field mapf1 = new Field("map1_f1", FieldType.nullable(new
ArrowType.Utf8()), null);
            Field mapf2 = new Field("map1_f2", FieldType.nullable(new
ArrowType.Int(32, true)), null);
            Field f2 = new Field("map1", FieldType.nullable(new
ArrowType.Struct()), asList(mapf1, mapf2));
            System.out.println("Map field: " + f2.toString());

            Schema schema = new Schema(asList(f1, f2));

            // Create "schema root"
            System.out.println("Schema : " + schema.toString());
            VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator);
            assert (root.getFieldVectors().size() == 2);

            // Create a stream writer with this pipe as a sink
            Pipe initialPipe = Pipe.open();
            ArrowStreamWriter streamWriter = new ArrowStreamWriter(root,
null, initialPipe.sink());
            streamWriter.start();

            // Get vectors, mutator for simple field, writers for map
            ArrayList<FieldVector> fieldVectors = new
ArrayList<FieldVector>();
             for (FieldVector fv : root.getFieldVectors()) {
                 fv.allocateNew();
                 fieldVectors.add(fv);
            }

            NullableIntVector.Mutator m1 = (NullableIntVector.Mutator)
fieldVectors.get(0).getMutator();
            MapVector mapVec = (MapVector) fieldVectors.get(1);
            BaseWriter.ComplexWriter cw = new ComplexWriterImpl("root",
mapVec);
            BaseWriter.MapWriter writer = cw.rootAsMap();

            VarCharWriter writerKey = writer.varChar("map1_f1");
            IntWriter writerValue = writer.integer("map1_f2");

            int COUNT = 5;
            for (int i = 0; i < COUNT; i++) {
                m1.setSafe(i, 2 * i);

                writer.setPosition(i);
                writerKey.setPosition(i);  // Maybe these are redundant -
another attempt to get this to work
                writerValue.setPosition(i);
                writer.start();

                // Write a string (UTF-8 bytes):
                String keyValue = "map key 1";
                byte[] keyValueBytes = keyValue.getBytes("UTF-8");
                ArrowBuf tempBuf = allocator.buffer(keyValueBytes.length);
                tempBuf.writeBytes(keyValueBytes, 0, keyValueBytes.length);
                writerKey.writeVarChar(0, keyValueBytes.length, tempBuf);
                tempBuf.close();

                writerValue.writeInt(101);

                writer.end();
            }

            m1.setValueCount(COUNT);
            mapVec.getMutator().setValueCount(COUNT);  // Redundant I think
            cw.setValueCount(COUNT);
            root.setRowCount(COUNT);
            printAllocatorInfo("After writing data and setting row counts",
allocator);

            System.out.println("Write out the record batch to the stream");
            streamWriter.writeBatch();
            printAllocatorInfo("After writing one batch", allocator);
            streamWriter.close();
            System.out.println("Closed stream writer.");

            root.close();
            printAllocatorInfo("After complete close of root object used
when writing", allocator);
            allocator.close();

            System.out.println("Setting up objects to read batches back");
            BufferAllocator valAllocator =
rootAllocator.newChildAllocator("readback allocator", 0, Integer.MAX_VALUE);
            ArrowStreamReader streamReader = new
ArrowStreamReader(initialPipe.source(), valAllocator);
            VectorSchemaRoot vsr = streamReader.getVectorSchemaRoot();
            assert(vsr.getSchema().equals(schema));

            while (streamReader.loadNextBatch()) {
                // Eventually read data here.  But loadNextBatch throws an
exception:
                //   java.lang.IllegalArgumentException: not all nodes and
buffers were consumed ...
                System.out.println("Read one batch back.");
            }

            streamReader.close();
            valAllocator.close();
            System.out.println("DONE");
        }
        catch (IOException ioe) {
            System.out.println("Ugly IOException : " + ioe.getMessage());
            ioe.printStackTrace();
        }
    }

    /**
     * Main method that performs one run.
     *
     * @param args
     */
    public static void main(String[] args) {
        MapReadWrite mrw = new MapReadWrite();
        mrw.run();
        System.out.println("DONE with MapReadWrite");
    }
}
// End source


*Derek Bennett*
*Software Engineer - Data Platform* | Stitch Fix

One Montgomery Tower Suite 1500
San Francisco, CA 94104

On Wed, Dec 6, 2017 at 12:16 PM, Timothy Chen <tnac...@gmail.com> wrote:

> Hey Derek,
>
> I don’t think you have attached your example code.
>
> Tim
> On Wed, Dec 6, 2017 at 1:38 PM Derek Bennett <dbenn...@stitchfix.com>
> wrote:
>
> > Hello,
> >
> > I've been using the Java API to send and receive data in Arrow format.  I
> > am having trouble in cases where the schema has map fields, and wanted to
> > see if this is a usage problem before logging an issue (most everything
> > else is working as expected so far).
> >
> > I have an attached example that creates a simple schema with 1 integer
> > field and 1 map field and writes one record batch.  This part works
> > correctly.  However when trying to read back the record batch it always
> > fails with an exception (see stack trace below).  This only happens when
> I
> > use map types -- I've done a similar thing with a list and it works
> > perfectly.
> >
> > The error makes me think more is being written out in the record batch
> > than should be, and there are unexpected buffers when the batch is being
> > read back.  I have to admit I am a little confused by the MapWriter API
> so
> > I may be doing something wrong here -- but again everything is fine with
> > lists and ListWriter.
> >
> > I've attached the source example I used.  If it's helpful I could rewrite
> > this as a unit test and submit a PR to include it and can log an issue in
> > GitHub if this turns out to be a real error.
> >
> > Thanks!
> > Derek
> >
> >
> >
> > Exception in thread "main" java.lang.IllegalArgumentException: not all
> > nodes and buffers were consumed. nodes: [ArrowFieldNode [length=5,
> > nullCount=0], ArrowFieldNode [length=5, nullCount=0], ArrowFieldNode
> > [length=5, nullCount=0]] buffers: [ArrowBuf[30], udle: [21 104..105],
> > ArrowBuf[31], udle: [21 112..113], ArrowBuf[32], udle: [21 120..144],
> > ArrowBuf[33], udle: [21 144..189], ArrowBuf[34], udle: [21 192..193],
> > ArrowBuf[35], udle: [21 200..220]]
> > at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:66)
> > at org.apache.arrow.vector.file.ArrowReader$1.visit(
> ArrowReader.java:109)
> > at org.apache.arrow.vector.file.ArrowReader$1.visit(ArrowReader.java:95)
> > at
> > org.apache.arrow.vector.schema.ArrowRecordBatch.
> accepts(ArrowRecordBatch.java:128)
> > at
> > org.apache.arrow.vector.file.ArrowReader.loadNextBatch(
> ArrowReader.java:121)
> > at
> > com.stitchfix.algorithms.arrowjava.MapReadWrite.run(
> MapReadWrite.java:136)
> >
> >
> >
> > *Derek Bennett*
> > *Software Engineer - Data Platform* | Stitch Fix
> >
> > One Montgomery Tower Suite 1500
> > San Francisco, CA 94104
> >
>

Reply via email to