Would it be possible to put the work in progress code in open source?

 

From: Gautam <gautamkows...@gmail.com>
Reply-To: "dev@iceberg.apache.org" <dev@iceberg.apache.org>
Date: Monday, July 22, 2019 at 9:46 AM
To: Daniel Weeks <dwe...@netflix.com>
Cc: Ryan Blue <rb...@netflix.com>, Iceberg Dev List <dev@iceberg.apache.org>
Subject: Re: Approaching Vectorized Reading in Iceberg ..

 

That would be great!

 

On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dwe...@netflix.com> wrote:

Hey Gautam, 

 

We also have a couple people looking into vectorized reading (into Arrow 
memory).  I think it would be good for us to get together and see if we can 
collaborate on a common approach for this.

 

I'll reach out directly and see if we can get together.

 

-Dan

 

On Sun, Jul 21, 2019 at 10:35 PM Gautam <gautamkows...@gmail.com> wrote:

Figured this out. I'm returning ColumnarBatch iterator directly without 
projection with schema set appropriately in `readSchema() `.. the empty result 
was due to valuesRead not being set correctly on FileIterator. Did that and 
things are working. Will circle back with numbers soon. 

 

On Fri, Jul 19, 2019 at 5:22 PM Gautam <gautamkows...@gmail.com> wrote:

Hey Guys, 

           Sorry bout the delay on this. Just got back on getting a basic 
working implementation in Iceberg for Vectorization on primitive types. 

 

Here's what I have so far :  

 

I have added `ParquetValueReader` implementations for some basic primitive 
types that build the respective Arrow Vector (`ValueVector`) viz. `IntVector` 
for int, `VarCharVector` for strings and so on. Underneath each value vector 
reader there are column iterators that read from the parquet pagestores 
(rowgroups) in chunks. These `ValueVector-s` are lined up as 
`ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and 
stitched together using a `ColumnarBatchReader` (which as the name suggests 
wraps ColumnarBatches in the iterator)   I'v verified that these pieces work 
properly with the underlying interfaces.  I'v also made changes to Iceberg's 
`Reader` to  implement `planBatchPartitions()` (to add the 
`SupportsScanColumnarBatch` mixin to the reader).  So the reader now expects 
ColumnarBatch instances (instead of InternalRow). The query planning runtime 
works fine with these changes.

 

Although it fails during query execution, the bit it's  currently failing at is 
this line of code : 
https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
 [github.com]

 

This code, I think,  tries to apply the iterator's schema projection on the 
InternalRow instances. This seems to be tightly coupled to InternalRow as 
Spark's catalyst expressions have implemented the UnsafeProjection for 
InternalRow only. If I take this out and just return the 
`Iterator<ColumnarBatch>` iterator I built it returns empty result on the 
client. I'm guessing this is coz Spark is unaware of the iterator's schema? 
There's a Todo in the code that says "remove the projection by reporting the 
iterator's schema back to Spark".  Is there a simple way to communicate that to 
Spark for my new iterator? Any pointers on how to get around this?

 

 

Thanks and Regards,

-Gautam. 

 

 

 

 

On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:

Replies inline.

 

On Fri, Jun 14, 2019 at 1:11 AM Gautam <gautamkows...@gmail.com> wrote:

Thanks for responding Ryan,  

 

Couple of follow up questions on ParquetValueReader for Arrow.. 

 

I'd like to start with testing Arrow out with readers for primitive type and 
incrementally add in Struct/Array support, also ArrowWriter [1] currently 
doesn't have converters for map type. How can I default these types to regular 
materialization whilst supporting Arrow based support for primitives? 

 

We should look at what Spark does to handle maps.

 

I think we should get the prototype working with test cases that don't have 
maps, structs, or lists. Just getting primitives working is a good start and 
just won't hit these problems.

 

Lemme know if this makes sense...

 

- I extend  PrimitiveReader (for Arrow) that loads primitive types into 
ArrowColumnVectors of corresponding column types by iterating over underlying 
ColumnIterator n times, where n is size of batch.

 

Sounds good to me. I'm not sure about extending vs wrapping because I'm not too 
familiar with the Arrow APIs.

 

- Reader.newParquetIterable()  maps primitive column types to the newly added 
ArrowParquetValueReader but for other types (nested types, etc.) uses current 
InternalRow based ValueReaders 

 

Sounds good for primitives, but I would just leave the nested types 
un-implemented for now.

 

- Stitch the columns vectors together to create ColumnarBatch, (Since 
SupportsScanColumnarBatch mixin currently expects this ) .. although I'm a bit 
lost on how the stitching of columns happens currently? .. and how the 
ArrowColumnVectors could  be stitched alongside regular columns that don't have 
arrow based support ?

 

I don't think that you can mix regular columns and Arrow columns. It has to be 
all one or the other. That's why it's easier to start with primitives, then add 
structs, then lists, and finally maps.

 

- Reader returns readTasks as  InputPartition<ColumnarBatch> so that 
DataSourceV2ScanExec starts using ColumnarBatch scans

 

We will probably need two paths. One for columnar batches and one for row-based 
reads. That doesn't need to be done right away and what you already have in 
your working copy makes sense as a start.

 

That's a lot of questions! :-) but hope i'm making sense.

 

-Gautam.

 

 

 

[1] - 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
 [github.com]

 

-- 

Ryan Blue 

Software Engineer

Netflix

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to