Hi Gautam, this is very exciting to see. It would be great if this was available behind a flag if possible.
Best, Mouli On Wed, Sep 4, 2019, 7:01 AM Gautam <gautamkows...@gmail.com> wrote: > Hello Devs, > As some of you know there's been ongoing work as part > of [1] to build Arrow based vectorization into Iceberg. There's a separate > thread on this dev list where that is being discussed and progress is being > tracked in a separate branch [2]. The overall approach there is to build a > ground up Arrow based implementation of vectorization within Iceberg so > that any compute engine using Iceberg would benefit from those > optimizations. We feel that is indeed the longer term solution and the best > way forward. > > Meanwhile, Xabriel & I took to investigating an interim approach where > Iceberg could use the current Vectorization code built into Spark Parquet > reading, which I will refer to as "*V1 Vectorization*". This is the code > path that Spark's DataSourceV1 readers use to read Parquet data. The result > is that we have performance parity between Iceberg and Spark's Vanilla > Parquet reader. We thought we should share this with the larger community > so others can benefit from this gain. > > *What we did *: > - Added a new reader viz. *V1VectorizedReader *that internally short > circuits to using the V1 codepath [3] which does most of the setup and > work to perform vectorization. it's exactly what Vanilla Spark's reader > does underneath the DSV1 implementation. > - It builds an iterator which expects ColumnarBatches from the Objects > returned by the resolving iterator. > - We re-organized and optimized code while building *ReadTask *instances which > considerably improved task initiation and planning time. > - Setting `*iceberg.read.enableV1VectorizedReader*` to true enables this > reader in IcebergSource. > - The V1Vectorized reader is an independent class with copied code in some > methods as we didn't want to degrade perf due to inheritance/virtual method > calls (we noticed degradation when we did try to re-use code). > - I'v pushed this code to a separate branch [4] in case others want to > give this a try. > > > *The Numbers*: > > > Flat Data 10 files 10M rows each > > > Benchmark > Mode Cnt Score Error Units > > IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized > ss 5 63.631 ± 1.300 s/op > > IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized > ss 5 28.322 ± 2.400 s/op > > IcebergSourceFlatParquetDataReadBenchmark.readIceberg > ss 5 65.862 ± 2.480 s/op > > IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k > ss 5 28.199 ± 1.255 s/op > > IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k > ss 5 29.822 ± 2.848 s/op > > IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k* > ss 5 27.953 ± 0.949 s/op > > > > > Flat Data Projections 10 files 10M rows each > > > Benchmark > Mode Cnt Score Error Units > > > IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized > ss 5 11.307 ± 1.791 s/op > > IcebergSourceFlatParquetDataReadBenchmark. > *readWithProjectionFileSourceVectorized* ss 5 3.480 ± 0.087 > s/op > > IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg > ss 5 11.057 ± 0.236 s/op > > IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k > ss 5 3.953 ± 1.592 s/op > > IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k > ss 5 3.619 ± 1.305 s/op > > > IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k > ss 5 4.109 ± 1.734 s/op > > > Filtered Data 500 files 10k rows each > > > Benchmark > Mode Cnt Score Error Units > > > IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized > ss 5 2.139 ± 0.719 s/op > > IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized > ss 5 2.213 ± 0.598 s/op > > IcebergSourceFlatParquetDataFilterBenchmark. > *readWithFilterIcebergNonVectorized* ss 5 0.144 ± 0.029 s/op > > > IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k > ss 5 0.179 ± 0.019 s/op > > IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k > ss 5 0.189 ± 0.046 s/op > > > IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k > ss 5 0.195 ± 0.137 s/op > > > *Perf Notes*: > - Iceberg V1 Vectorization's real gain (over current Iceberg impl) is in > flat data scans. Notice how it's almost exactly same as vanilla spark > vectorization. > - Projections work equally well. Although we see Nested column projections > are still not performing as well as we need to be able to push nested > column projections down to Iceberg. > - We saw a slight overhead with Iceberg V1 Vectorization over smaller > workloads, but this goes away with larger data files. > > *Why we think this is useful*: > - This approach allows users to benefit from both: 1) Iceberg's metadata > filtering and 2) Spark's Scan Vectorization. This should help with Iceberg > adoption. > - We think this can be an interim solution (until Arrow based impl is > fully performant) for those who are currently blocked by performance > difference between Iceberg and Spark's native Vectorization for interactive > usecases. There's a lot of optimization work and testing gone into V1 > vectorization that Iceberg can now benefit from. > - In many cases companies have proprietary implementations of > *ParquetFileFormat* that could have extended features like complex type > support etc. Our code can use that at runtime as long as ' > *buildReaderWithPartitionValues()*' signature is consistent.. if not the > reader can be easily modified to plug their own vectorized reader in. > - While profiling the Arrow implementation I found it difficult to compare > bottlenecks due to major differences between DSv1 and DSv2 client-to-source > interface paths. This makes it easier to compare numbers and profile code > between V1 vectorization and Arrow vectorization as we now have both paths > working behind a single DataSourceV2 path (viz. IcebergSource). > > *Limitations*: > - This implementation is specific to Spark so other compute frameworks > like Presto won't benefit from this. > - It doesn't use Iceberg's Value Reader interface as it bypasses > everything under the Task Data Reading. (added a separate > *V1VectorizedTaskDataReader*) > - Need to maintain two readers, as adding any code to Reader.java might > need changes to V1Vectorized Reader. Although, we could minimize this with > a *ReaderUtils* class. > > > I have the code checked in at > https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader . > If folks think this is useful and we can keep this as an interim solution > behind a feature flag, I can get a PR up with proper unit tests. > > thanks and regards, > -Gautam. > > > [1] - https://github.com/apache/incubator-iceberg/issues/9 > [2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read > [3] - > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197 > [4] - > https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader > >