Hey Gautam,

We also have a couple people looking into vectorized reading (into Arrow
memory).  I think it would be good for us to get together and see if we can
collaborate on a common approach for this.

I'll reach out directly and see if we can get together.

-Dan

On Sun, Jul 21, 2019 at 10:35 PM Gautam <gautamkows...@gmail.com> wrote:

> Figured this out. I'm returning ColumnarBatch iterator directly without
> projection with schema set appropriately in `readSchema() `.. the empty
> result was due to valuesRead not being set correctly on FileIterator. Did
> that and things are working. Will circle back with numbers soon.
>
> On Fri, Jul 19, 2019 at 5:22 PM Gautam <gautamkows...@gmail.com> wrote:
>
>> Hey Guys,
>>            Sorry bout the delay on this. Just got back on getting a basic
>> working implementation in Iceberg for Vectorization on primitive types.
>>
>> *Here's what I have so far :  *
>>
>> I have added `ParquetValueReader` implementations for some basic
>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>> value vector reader there are column iterators that read from the parquet
>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>> work properly with the underlying interfaces.  I'v also made changes to
>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>> expects ColumnarBatch instances (instead of InternalRow). The query
>> planning runtime works fine with these changes.
>>
>> Although it fails during query execution, the bit it's  currently failing
>> at is this line of code :
>> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>
>> This code, I think,  tries to apply the iterator's schema projection on
>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>> InternalRow only. If I take this out and just return the
>> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>> There's a Todo in the code that says "*remove the projection by
>> reporting the iterator's schema back to Spark*".  Is there a simple way
>> to communicate that to Spark for my new iterator? Any pointers on how to
>> get around this?
>>
>>
>> Thanks and Regards,
>> -Gautam.
>>
>>
>>
>>
>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Replies inline.
>>>
>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <gautamkows...@gmail.com> wrote:
>>>
>>>> Thanks for responding Ryan,
>>>>
>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>
>>>> I'd like to start with testing Arrow out with readers for primitive
>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>> currently doesn't have converters for map type. How can I default these
>>>> types to regular materialization whilst supporting Arrow based support for
>>>> primitives?
>>>>
>>>
>>> We should look at what Spark does to handle maps.
>>>
>>> I think we should get the prototype working with test cases that don't
>>> have maps, structs, or lists. Just getting primitives working is a good
>>> start and just won't hit these problems.
>>>
>>>
>>>> Lemme know if this makes sense...
>>>>
>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>>>> ArrowColumnVectors of corresponding column types by iterating over
>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>
>>>
>>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>>> not too familiar with the Arrow APIs.
>>>
>>>
>>>> - Reader.newParquetIterable()  maps primitive column types to the newly
>>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>>>> current *InternalRow* based ValueReaders
>>>>
>>>
>>> Sounds good for primitives, but I would just leave the nested types
>>> un-implemented for now.
>>>
>>>
>>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>>> *SupportsScanColumnarBatch* mixin currently expects this ) ..
>>>> *although* *I'm a bit lost on how the stitching of columns happens
>>>> currently*? .. and how the ArrowColumnVectors could  be stitched
>>>> alongside regular columns that don't have arrow based support ?
>>>>
>>>
>>> I don't think that you can mix regular columns and Arrow columns. It has
>>> to be all one or the other. That's why it's easier to start with
>>> primitives, then add structs, then lists, and finally maps.
>>>
>>>
>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>
>>>
>>> We will probably need two paths. One for columnar batches and one for
>>> row-based reads. That doesn't need to be done right away and what you
>>> already have in your working copy makes sense as a start.
>>>
>>>
>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>> [1] -
>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>

Reply via email to