Would it be possible to put the work in progress code in open source?
From: Gautam <gautamkows...@gmail.com> Reply-To: "dev@iceberg.apache.org" <dev@iceberg.apache.org> Date: Monday, July 22, 2019 at 9:46 AM To: Daniel Weeks <dwe...@netflix.com> Cc: Ryan Blue <rb...@netflix.com>, Iceberg Dev List <dev@iceberg.apache.org> Subject: Re: Approaching Vectorized Reading in Iceberg .. That would be great! On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dwe...@netflix.com> wrote: Hey Gautam, We also have a couple people looking into vectorized reading (into Arrow memory). I think it would be good for us to get together and see if we can collaborate on a common approach for this. I'll reach out directly and see if we can get together. -Dan On Sun, Jul 21, 2019 at 10:35 PM Gautam <gautamkows...@gmail.com> wrote: Figured this out. I'm returning ColumnarBatch iterator directly without projection with schema set appropriately in `readSchema() `.. the empty result was due to valuesRead not being set correctly on FileIterator. Did that and things are working. Will circle back with numbers soon. On Fri, Jul 19, 2019 at 5:22 PM Gautam <gautamkows...@gmail.com> wrote: Hey Guys, Sorry bout the delay on this. Just got back on getting a basic working implementation in Iceberg for Vectorization on primitive types. Here's what I have so far : I have added `ParquetValueReader` implementations for some basic primitive types that build the respective Arrow Vector (`ValueVector`) viz. `IntVector` for int, `VarCharVector` for strings and so on. Underneath each value vector reader there are column iterators that read from the parquet pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and stitched together using a `ColumnarBatchReader` (which as the name suggests wraps ColumnarBatches in the iterator) I'v verified that these pieces work properly with the underlying interfaces. I'v also made changes to Iceberg's `Reader` to implement `planBatchPartitions()` (to add the `SupportsScanColumnarBatch` mixin to the reader). So the reader now expects ColumnarBatch instances (instead of InternalRow). The query planning runtime works fine with these changes. Although it fails during query execution, the bit it's currently failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414 [github.com] This code, I think, tries to apply the iterator's schema projection on the InternalRow instances. This seems to be tightly coupled to InternalRow as Spark's catalyst expressions have implemented the UnsafeProjection for InternalRow only. If I take this out and just return the `Iterator<ColumnarBatch>` iterator I built it returns empty result on the client. I'm guessing this is coz Spark is unaware of the iterator's schema? There's a Todo in the code that says "remove the projection by reporting the iterator's schema back to Spark". Is there a simple way to communicate that to Spark for my new iterator? Any pointers on how to get around this? Thanks and Regards, -Gautam. On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote: Replies inline. On Fri, Jun 14, 2019 at 1:11 AM Gautam <gautamkows...@gmail.com> wrote: Thanks for responding Ryan, Couple of follow up questions on ParquetValueReader for Arrow.. I'd like to start with testing Arrow out with readers for primitive type and incrementally add in Struct/Array support, also ArrowWriter [1] currently doesn't have converters for map type. How can I default these types to regular materialization whilst supporting Arrow based support for primitives? We should look at what Spark does to handle maps. I think we should get the prototype working with test cases that don't have maps, structs, or lists. Just getting primitives working is a good start and just won't hit these problems. Lemme know if this makes sense... - I extend PrimitiveReader (for Arrow) that loads primitive types into ArrowColumnVectors of corresponding column types by iterating over underlying ColumnIterator n times, where n is size of batch. Sounds good to me. I'm not sure about extending vs wrapping because I'm not too familiar with the Arrow APIs. - Reader.newParquetIterable() maps primitive column types to the newly added ArrowParquetValueReader but for other types (nested types, etc.) uses current InternalRow based ValueReaders Sounds good for primitives, but I would just leave the nested types un-implemented for now. - Stitch the columns vectors together to create ColumnarBatch, (Since SupportsScanColumnarBatch mixin currently expects this ) .. although I'm a bit lost on how the stitching of columns happens currently? .. and how the ArrowColumnVectors could be stitched alongside regular columns that don't have arrow based support ? I don't think that you can mix regular columns and Arrow columns. It has to be all one or the other. That's why it's easier to start with primitives, then add structs, then lists, and finally maps. - Reader returns readTasks as InputPartition<ColumnarBatch> so that DataSourceV2ScanExec starts using ColumnarBatch scans We will probably need two paths. One for columnar batches and one for row-based reads. That doesn't need to be done right away and what you already have in your working copy makes sense as a start. That's a lot of questions! :-) but hope i'm making sense. -Gautam. [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala [github.com] -- Ryan Blue Software Engineer Netflix
smime.p7s
Description: S/MIME cryptographic signature