That would be great! On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dwe...@netflix.com> wrote:
> Hey Gautam, > > We also have a couple people looking into vectorized reading (into Arrow > memory). I think it would be good for us to get together and see if we can > collaborate on a common approach for this. > > I'll reach out directly and see if we can get together. > > -Dan > > On Sun, Jul 21, 2019 at 10:35 PM Gautam <gautamkows...@gmail.com> wrote: > >> Figured this out. I'm returning ColumnarBatch iterator directly without >> projection with schema set appropriately in `readSchema() `.. the empty >> result was due to valuesRead not being set correctly on FileIterator. Did >> that and things are working. Will circle back with numbers soon. >> >> On Fri, Jul 19, 2019 at 5:22 PM Gautam <gautamkows...@gmail.com> wrote: >> >>> Hey Guys, >>> Sorry bout the delay on this. Just got back on getting a >>> basic working implementation in Iceberg for Vectorization on primitive >>> types. >>> >>> *Here's what I have so far : * >>> >>> I have added `ParquetValueReader` implementations for some basic >>> primitive types that build the respective Arrow Vector (`ValueVector`) viz. >>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each >>> value vector reader there are column iterators that read from the parquet >>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as >>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and >>> stitched together using a `ColumnarBatchReader` (which as the name suggests >>> wraps ColumnarBatches in the iterator) I'v verified that these pieces >>> work properly with the underlying interfaces. I'v also made changes to >>> Iceberg's `Reader` to implement `planBatchPartitions()` (to add the >>> `SupportsScanColumnarBatch` mixin to the reader). So the reader now >>> expects ColumnarBatch instances (instead of InternalRow). The query >>> planning runtime works fine with these changes. >>> >>> Although it fails during query execution, the bit it's currently >>> failing at is this line of code : >>> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414 >>> >>> This code, I think, tries to apply the iterator's schema projection on >>> the InternalRow instances. This seems to be tightly coupled to InternalRow >>> as Spark's catalyst expressions have implemented the UnsafeProjection for >>> InternalRow only. If I take this out and just return the >>> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the >>> client. I'm guessing this is coz Spark is unaware of the iterator's schema? >>> There's a Todo in the code that says "*remove the projection by >>> reporting the iterator's schema back to Spark*". Is there a simple way >>> to communicate that to Spark for my new iterator? Any pointers on how to >>> get around this? >>> >>> >>> Thanks and Regards, >>> -Gautam. >>> >>> >>> >>> >>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote: >>> >>>> Replies inline. >>>> >>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <gautamkows...@gmail.com> wrote: >>>> >>>>> Thanks for responding Ryan, >>>>> >>>>> Couple of follow up questions on ParquetValueReader for Arrow.. >>>>> >>>>> I'd like to start with testing Arrow out with readers for primitive >>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1] >>>>> currently doesn't have converters for map type. How can I default these >>>>> types to regular materialization whilst supporting Arrow based support for >>>>> primitives? >>>>> >>>> >>>> We should look at what Spark does to handle maps. >>>> >>>> I think we should get the prototype working with test cases that don't >>>> have maps, structs, or lists. Just getting primitives working is a good >>>> start and just won't hit these problems. >>>> >>>> >>>>> Lemme know if this makes sense... >>>>> >>>>> - I extend PrimitiveReader (for Arrow) that loads primitive types >>>>> into ArrowColumnVectors of corresponding column types by iterating over >>>>> underlying ColumnIterator *n times*, where n is size of batch. >>>>> >>>> >>>> Sounds good to me. I'm not sure about extending vs wrapping because I'm >>>> not too familiar with the Arrow APIs. >>>> >>>> >>>>> - Reader.newParquetIterable() maps primitive column types to the >>>>> newly added ArrowParquetValueReader but for other types (nested types, >>>>> etc.) uses current *InternalRow* based ValueReaders >>>>> >>>> >>>> Sounds good for primitives, but I would just leave the nested types >>>> un-implemented for now. >>>> >>>> >>>>> - Stitch the columns vectors together to create ColumnarBatch, (Since >>>>> *SupportsScanColumnarBatch* mixin currently expects this ) .. >>>>> *although* *I'm a bit lost on how the stitching of columns happens >>>>> currently*? .. and how the ArrowColumnVectors could be stitched >>>>> alongside regular columns that don't have arrow based support ? >>>>> >>>> >>>> I don't think that you can mix regular columns and Arrow columns. It >>>> has to be all one or the other. That's why it's easier to start with >>>> primitives, then add structs, then lists, and finally maps. >>>> >>>> >>>>> - Reader returns readTasks as *InputPartition<*ColumnarBatch*> *so >>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans >>>>> >>>> >>>> We will probably need two paths. One for columnar batches and one for >>>> row-based reads. That doesn't need to be done right away and what you >>>> already have in your working copy makes sense as a start. >>>> >>>> >>>>> That's a lot of questions! :-) but hope i'm making sense. >>>>> >>>>> -Gautam. >>>>> >>>>> >>>>> >>>>> [1] - >>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala >>>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>>