Hey Gautam, We also have a couple people looking into vectorized reading (into Arrow memory). I think it would be good for us to get together and see if we can collaborate on a common approach for this.
I'll reach out directly and see if we can get together. -Dan On Sun, Jul 21, 2019 at 10:35 PM Gautam <gautamkows...@gmail.com> wrote: > Figured this out. I'm returning ColumnarBatch iterator directly without > projection with schema set appropriately in `readSchema() `.. the empty > result was due to valuesRead not being set correctly on FileIterator. Did > that and things are working. Will circle back with numbers soon. > > On Fri, Jul 19, 2019 at 5:22 PM Gautam <gautamkows...@gmail.com> wrote: > >> Hey Guys, >> Sorry bout the delay on this. Just got back on getting a basic >> working implementation in Iceberg for Vectorization on primitive types. >> >> *Here's what I have so far : * >> >> I have added `ParquetValueReader` implementations for some basic >> primitive types that build the respective Arrow Vector (`ValueVector`) viz. >> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each >> value vector reader there are column iterators that read from the parquet >> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as >> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and >> stitched together using a `ColumnarBatchReader` (which as the name suggests >> wraps ColumnarBatches in the iterator) I'v verified that these pieces >> work properly with the underlying interfaces. I'v also made changes to >> Iceberg's `Reader` to implement `planBatchPartitions()` (to add the >> `SupportsScanColumnarBatch` mixin to the reader). So the reader now >> expects ColumnarBatch instances (instead of InternalRow). The query >> planning runtime works fine with these changes. >> >> Although it fails during query execution, the bit it's currently failing >> at is this line of code : >> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414 >> >> This code, I think, tries to apply the iterator's schema projection on >> the InternalRow instances. This seems to be tightly coupled to InternalRow >> as Spark's catalyst expressions have implemented the UnsafeProjection for >> InternalRow only. If I take this out and just return the >> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the >> client. I'm guessing this is coz Spark is unaware of the iterator's schema? >> There's a Todo in the code that says "*remove the projection by >> reporting the iterator's schema back to Spark*". Is there a simple way >> to communicate that to Spark for my new iterator? Any pointers on how to >> get around this? >> >> >> Thanks and Regards, >> -Gautam. >> >> >> >> >> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote: >> >>> Replies inline. >>> >>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <gautamkows...@gmail.com> wrote: >>> >>>> Thanks for responding Ryan, >>>> >>>> Couple of follow up questions on ParquetValueReader for Arrow.. >>>> >>>> I'd like to start with testing Arrow out with readers for primitive >>>> type and incrementally add in Struct/Array support, also ArrowWriter [1] >>>> currently doesn't have converters for map type. How can I default these >>>> types to regular materialization whilst supporting Arrow based support for >>>> primitives? >>>> >>> >>> We should look at what Spark does to handle maps. >>> >>> I think we should get the prototype working with test cases that don't >>> have maps, structs, or lists. Just getting primitives working is a good >>> start and just won't hit these problems. >>> >>> >>>> Lemme know if this makes sense... >>>> >>>> - I extend PrimitiveReader (for Arrow) that loads primitive types into >>>> ArrowColumnVectors of corresponding column types by iterating over >>>> underlying ColumnIterator *n times*, where n is size of batch. >>>> >>> >>> Sounds good to me. I'm not sure about extending vs wrapping because I'm >>> not too familiar with the Arrow APIs. >>> >>> >>>> - Reader.newParquetIterable() maps primitive column types to the newly >>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses >>>> current *InternalRow* based ValueReaders >>>> >>> >>> Sounds good for primitives, but I would just leave the nested types >>> un-implemented for now. >>> >>> >>>> - Stitch the columns vectors together to create ColumnarBatch, (Since >>>> *SupportsScanColumnarBatch* mixin currently expects this ) .. >>>> *although* *I'm a bit lost on how the stitching of columns happens >>>> currently*? .. and how the ArrowColumnVectors could be stitched >>>> alongside regular columns that don't have arrow based support ? >>>> >>> >>> I don't think that you can mix regular columns and Arrow columns. It has >>> to be all one or the other. That's why it's easier to start with >>> primitives, then add structs, then lists, and finally maps. >>> >>> >>>> - Reader returns readTasks as *InputPartition<*ColumnarBatch*> *so >>>> that DataSourceV2ScanExec starts using ColumnarBatch scans >>>> >>> >>> We will probably need two paths. One for columnar batches and one for >>> row-based reads. That doesn't need to be done right away and what you >>> already have in your working copy makes sense as a start. >>> >>> >>>> That's a lot of questions! :-) but hope i'm making sense. >>>> >>>> -Gautam. >>>> >>>> >>>> >>>> [1] - >>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala >>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >>