Hi Gautam, this is very exciting to see. It would be great if this was
available behind a flag if possible.

Best,
Mouli

On Wed, Sep 4, 2019, 7:01 AM Gautam <gautamkows...@gmail.com> wrote:

> Hello Devs,
>                    As some of you know there's been ongoing work as part
> of [1] to build Arrow based vectorization into Iceberg. There's a separate
> thread on this dev list where that is being discussed and progress is being
> tracked in a separate branch [2]. The overall approach there is to build a
> ground up Arrow based implementation of vectorization within Iceberg so
> that any compute engine using Iceberg would benefit from those
> optimizations. We feel that is indeed the longer term solution and the best
> way forward.
>
> Meanwhile, Xabriel & I took to investigating an interim approach where
> Iceberg could use the current Vectorization code built into Spark Parquet
> reading, which I will refer to as "*V1 Vectorization*". This is the code
> path that Spark's DataSourceV1 readers use to read Parquet data. The result
> is that we have performance parity between Iceberg and Spark's Vanilla
> Parquet reader. We thought we should share this with the larger community
> so others can benefit from this gain.
>
> *What we did *:
> - Added a new reader viz. *V1VectorizedReader *that internally short
> circuits to using the V1 codepath [3]  which does most of the setup and
> work to perform vectorization. it's exactly what Vanilla Spark's reader
> does underneath the DSV1 implementation.
> - It builds an iterator which expects ColumnarBatches from the Objects
> returned by the resolving iterator.
> - We re-organized and optimized code while building *ReadTask *instances which
> considerably improved task initiation and planning time.
> - Setting `*iceberg.read.enableV1VectorizedReader*` to true enables this
> reader in IcebergSource.
> - The V1Vectorized reader is an independent class with copied code in some
> methods as we didn't want to degrade perf due to inheritance/virtual method
> calls (we noticed degradation when we did try to re-use code).
> - I'v pushed this code to a separate branch [4] in case others want to
> give this a try.
>
>
> *The Numbers*:
>
>
> Flat Data 10 files 10M rows each
>
>
> Benchmark
>             Mode  Cnt   Score   Error  Units
>
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>               ss    5  63.631 ± 1.300   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>               ss    5  28.322 ± 2.400   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>               ss    5  65.862 ± 2.480   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k
>               ss    5  28.199 ± 1.255   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k
>               ss    5  29.822 ± 2.848   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k*
>                 ss    5  27.953 ± 0.949   s/op
>
>
>
>
> Flat Data Projections 10 files 10M rows each
>
>
> Benchmark
>             Mode  Cnt   Score   Error  Units
>
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>   ss    5  11.307 ± 1.791   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.
> *readWithProjectionFileSourceVectorized*       ss    5   3.480 ± 0.087
> s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>               ss    5  11.057 ± 0.236   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k
>     ss    5   3.953 ± 1.592   s/op
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k
>     ss    5   3.619 ± 1.305   s/op
>
>
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k
>     ss    5   4.109 ± 1.734   s/op
>
>
> Filtered Data 500 files 10k rows each
>
>
> Benchmark
>           Mode  Cnt  Score   Error  Units
>
>
> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
>   ss    5  2.139 ± 0.719   s/op
>
> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
>       ss    5  2.213 ± 0.598   s/op
>
> IcebergSourceFlatParquetDataFilterBenchmark.
> *readWithFilterIcebergNonVectorized*       ss    5  0.144 ± 0.029   s/op
>
>
> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k
>   ss    5  0.179 ± 0.019   s/op
>
> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k
>     ss    5  0.189 ± 0.046   s/op
>
>
> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k
>     ss    5  0.195 ± 0.137   s/op
>
>
> *Perf Notes*:
> - Iceberg V1 Vectorization's real gain (over current Iceberg impl) is in
> flat data scans. Notice how it's almost exactly same as vanilla spark
> vectorization.
> - Projections work equally well. Although we see Nested column projections
> are still not performing as well as we need to be able to push nested
> column projections down to Iceberg.
> - We saw a slight overhead with Iceberg V1 Vectorization over smaller
> workloads, but this goes away with larger data files.
>
> *Why we think this is useful*:
> - This approach allows users to benefit from both: 1) Iceberg's metadata
> filtering and 2) Spark's Scan Vectorization. This should help with Iceberg
> adoption.
> - We think this can be an interim solution (until Arrow based impl is
> fully performant) for those who are currently blocked by performance
> difference between Iceberg and Spark's native Vectorization for interactive
> usecases. There's a lot of optimization work and testing gone into V1
> vectorization that Iceberg can now benefit from.
> - In many cases companies have proprietary implementations of
> *ParquetFileFormat* that could have extended features like complex type
> support etc. Our code can use that at runtime as long as '
> *buildReaderWithPartitionValues()*'  signature is consistent.. if not the
> reader can be easily modified to plug their own vectorized reader in.
> - While profiling the Arrow implementation I found it difficult to compare
> bottlenecks due to major differences between DSv1 and DSv2 client-to-source
> interface paths. This makes it easier to compare numbers and profile code
> between V1 vectorization and Arrow vectorization as we now have both paths
> working behind a single DataSourceV2 path (viz. IcebergSource).
>
> *Limitations*:
> - This implementation is specific to Spark so other compute frameworks
> like Presto won't benefit from this.
> - It doesn't use Iceberg's Value Reader interface as it bypasses
> everything under the Task Data Reading. (added a separate
> *V1VectorizedTaskDataReader*)
> - Need to maintain two readers, as adding any code to Reader.java might
> need changes to V1Vectorized Reader. Although, we could minimize this with
> a *ReaderUtils* class.
>
>
> I have the code checked in at
> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader .
> If folks think this is useful and we can keep this as an interim solution
> behind a feature flag, I can get a PR up with proper unit tests.
>
> thanks and regards,
> -Gautam.
>
>
> [1] - https://github.com/apache/incubator-iceberg/issues/9
> [2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read
> [3] -
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197
> [4] -
> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader
>
>

Reply via email to