I'v added unit tests and created a PR for the v1 vectorization work :
https://github.com/apache/incubator-iceberg/pull/452

I'm sure there's scope for further improvement so lemme know your feedback
over the PR so I can sharpen it further.

Cheers,
-Gautam.


On Wed, Sep 4, 2019 at 10:33 PM Mouli Mukherjee <moulimukher...@gmail.com>
wrote:

> Hi Gautam, this is very exciting to see. It would be great if this was
> available behind a flag if possible.
>
> Best,
> Mouli
>
> On Wed, Sep 4, 2019, 7:01 AM Gautam <gautamkows...@gmail.com> wrote:
>
>> Hello Devs,
>>                    As some of you know there's been ongoing work as part
>> of [1] to build Arrow based vectorization into Iceberg. There's a separate
>> thread on this dev list where that is being discussed and progress is being
>> tracked in a separate branch [2]. The overall approach there is to build a
>> ground up Arrow based implementation of vectorization within Iceberg so
>> that any compute engine using Iceberg would benefit from those
>> optimizations. We feel that is indeed the longer term solution and the best
>> way forward.
>>
>> Meanwhile, Xabriel & I took to investigating an interim approach where
>> Iceberg could use the current Vectorization code built into Spark Parquet
>> reading, which I will refer to as "*V1 Vectorization*". This is the code
>> path that Spark's DataSourceV1 readers use to read Parquet data. The result
>> is that we have performance parity between Iceberg and Spark's Vanilla
>> Parquet reader. We thought we should share this with the larger community
>> so others can benefit from this gain.
>>
>> *What we did *:
>> - Added a new reader viz. *V1VectorizedReader *that internally short
>> circuits to using the V1 codepath [3]  which does most of the setup and
>> work to perform vectorization. it's exactly what Vanilla Spark's reader
>> does underneath the DSV1 implementation.
>> - It builds an iterator which expects ColumnarBatches from the Objects
>> returned by the resolving iterator.
>> - We re-organized and optimized code while building *ReadTask *instances 
>> which
>> considerably improved task initiation and planning time.
>> - Setting `*iceberg.read.enableV1VectorizedReader*` to true enables this
>> reader in IcebergSource.
>> - The V1Vectorized reader is an independent class with copied code in
>> some methods as we didn't want to degrade perf due to inheritance/virtual
>> method calls (we noticed degradation when we did try to re-use code).
>> - I'v pushed this code to a separate branch [4] in case others want to
>> give this a try.
>>
>>
>> *The Numbers*:
>>
>>
>> Flat Data 10 files 10M rows each
>>
>>
>> Benchmark
>>             Mode  Cnt   Score   Error  Units
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>>               ss    5  63.631 ± 1.300   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>>               ss    5  28.322 ± 2.400   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>>               ss    5  65.862 ± 2.480   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k
>>               ss    5  28.199 ± 1.255   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k
>>               ss    5  29.822 ± 2.848   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k*
>>                 ss    5  27.953 ± 0.949   s/op
>>
>>
>>
>>
>> Flat Data Projections 10 files 10M rows each
>>
>>
>> Benchmark
>>             Mode  Cnt   Score   Error  Units
>>
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>>   ss    5  11.307 ± 1.791   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.
>> *readWithProjectionFileSourceVectorized*       ss    5   3.480 ± 0.087
>> s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>>               ss    5  11.057 ± 0.236   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k
>>     ss    5   3.953 ± 1.592   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k
>>     ss    5   3.619 ± 1.305   s/op
>>
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k
>>     ss    5   4.109 ± 1.734   s/op
>>
>>
>> Filtered Data 500 files 10k rows each
>>
>>
>> Benchmark
>>           Mode  Cnt  Score   Error  Units
>>
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
>>   ss    5  2.139 ± 0.719   s/op
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
>>       ss    5  2.213 ± 0.598   s/op
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.
>> *readWithFilterIcebergNonVectorized*       ss    5  0.144 ± 0.029   s/op
>>
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k
>>   ss    5  0.179 ± 0.019   s/op
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k
>>     ss    5  0.189 ± 0.046   s/op
>>
>>
>> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k
>>     ss    5  0.195 ± 0.137   s/op
>>
>>
>> *Perf Notes*:
>> - Iceberg V1 Vectorization's real gain (over current Iceberg impl) is in
>> flat data scans. Notice how it's almost exactly same as vanilla spark
>> vectorization.
>> - Projections work equally well. Although we see Nested column
>> projections are still not performing as well as we need to be able to push
>> nested column projections down to Iceberg.
>> - We saw a slight overhead with Iceberg V1 Vectorization over smaller
>> workloads, but this goes away with larger data files.
>>
>> *Why we think this is useful*:
>> - This approach allows users to benefit from both: 1) Iceberg's metadata
>> filtering and 2) Spark's Scan Vectorization. This should help with Iceberg
>> adoption.
>> - We think this can be an interim solution (until Arrow based impl is
>> fully performant) for those who are currently blocked by performance
>> difference between Iceberg and Spark's native Vectorization for interactive
>> usecases. There's a lot of optimization work and testing gone into V1
>> vectorization that Iceberg can now benefit from.
>> - In many cases companies have proprietary implementations of
>> *ParquetFileFormat* that could have extended features like complex type
>> support etc. Our code can use that at runtime as long as '
>> *buildReaderWithPartitionValues()*'  signature is consistent.. if not
>> the reader can be easily modified to plug their own vectorized reader in.
>> - While profiling the Arrow implementation I found it difficult to
>> compare bottlenecks due to major differences between DSv1 and DSv2
>> client-to-source interface paths. This makes it easier to compare numbers
>> and profile code between V1 vectorization and Arrow vectorization as we now
>> have both paths working behind a single DataSourceV2 path (viz.
>> IcebergSource).
>>
>> *Limitations*:
>> - This implementation is specific to Spark so other compute frameworks
>> like Presto won't benefit from this.
>> - It doesn't use Iceberg's Value Reader interface as it bypasses
>> everything under the Task Data Reading. (added a separate
>> *V1VectorizedTaskDataReader*)
>> - Need to maintain two readers, as adding any code to Reader.java might
>> need changes to V1Vectorized Reader. Although, we could minimize this with
>> a *ReaderUtils* class.
>>
>>
>> I have the code checked in at
>> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader .
>> If folks think this is useful and we can keep this as an interim solution
>> behind a feature flag, I can get a PR up with proper unit tests.
>>
>> thanks and regards,
>> -Gautam.
>>
>>
>> [1] - https://github.com/apache/incubator-iceberg/issues/9
>> [2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read
>> [3] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197
>> [4] -
>> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader
>>
>>

Reply via email to