I'v added unit tests and created a PR for the v1 vectorization work : https://github.com/apache/incubator-iceberg/pull/452
I'm sure there's scope for further improvement so lemme know your feedback over the PR so I can sharpen it further. Cheers, -Gautam. On Wed, Sep 4, 2019 at 10:33 PM Mouli Mukherjee <moulimukher...@gmail.com> wrote: > Hi Gautam, this is very exciting to see. It would be great if this was > available behind a flag if possible. > > Best, > Mouli > > On Wed, Sep 4, 2019, 7:01 AM Gautam <gautamkows...@gmail.com> wrote: > >> Hello Devs, >> As some of you know there's been ongoing work as part >> of [1] to build Arrow based vectorization into Iceberg. There's a separate >> thread on this dev list where that is being discussed and progress is being >> tracked in a separate branch [2]. The overall approach there is to build a >> ground up Arrow based implementation of vectorization within Iceberg so >> that any compute engine using Iceberg would benefit from those >> optimizations. We feel that is indeed the longer term solution and the best >> way forward. >> >> Meanwhile, Xabriel & I took to investigating an interim approach where >> Iceberg could use the current Vectorization code built into Spark Parquet >> reading, which I will refer to as "*V1 Vectorization*". This is the code >> path that Spark's DataSourceV1 readers use to read Parquet data. The result >> is that we have performance parity between Iceberg and Spark's Vanilla >> Parquet reader. We thought we should share this with the larger community >> so others can benefit from this gain. >> >> *What we did *: >> - Added a new reader viz. *V1VectorizedReader *that internally short >> circuits to using the V1 codepath [3] which does most of the setup and >> work to perform vectorization. it's exactly what Vanilla Spark's reader >> does underneath the DSV1 implementation. >> - It builds an iterator which expects ColumnarBatches from the Objects >> returned by the resolving iterator. >> - We re-organized and optimized code while building *ReadTask *instances >> which >> considerably improved task initiation and planning time. >> - Setting `*iceberg.read.enableV1VectorizedReader*` to true enables this >> reader in IcebergSource. >> - The V1Vectorized reader is an independent class with copied code in >> some methods as we didn't want to degrade perf due to inheritance/virtual >> method calls (we noticed degradation when we did try to re-use code). >> - I'v pushed this code to a separate branch [4] in case others want to >> give this a try. >> >> >> *The Numbers*: >> >> >> Flat Data 10 files 10M rows each >> >> >> Benchmark >> Mode Cnt Score Error Units >> >> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized >> ss 5 63.631 ± 1.300 s/op >> >> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized >> ss 5 28.322 ± 2.400 s/op >> >> IcebergSourceFlatParquetDataReadBenchmark.readIceberg >> ss 5 65.862 ± 2.480 s/op >> >> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k >> ss 5 28.199 ± 1.255 s/op >> >> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k >> ss 5 29.822 ± 2.848 s/op >> >> IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k* >> ss 5 27.953 ± 0.949 s/op >> >> >> >> >> Flat Data Projections 10 files 10M rows each >> >> >> Benchmark >> Mode Cnt Score Error Units >> >> >> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized >> ss 5 11.307 ± 1.791 s/op >> >> IcebergSourceFlatParquetDataReadBenchmark. >> *readWithProjectionFileSourceVectorized* ss 5 3.480 ± 0.087 >> s/op >> >> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg >> ss 5 11.057 ± 0.236 s/op >> >> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k >> ss 5 3.953 ± 1.592 s/op >> >> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k >> ss 5 3.619 ± 1.305 s/op >> >> >> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k >> ss 5 4.109 ± 1.734 s/op >> >> >> Filtered Data 500 files 10k rows each >> >> >> Benchmark >> Mode Cnt Score Error Units >> >> >> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized >> ss 5 2.139 ± 0.719 s/op >> >> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized >> ss 5 2.213 ± 0.598 s/op >> >> IcebergSourceFlatParquetDataFilterBenchmark. >> *readWithFilterIcebergNonVectorized* ss 5 0.144 ± 0.029 s/op >> >> >> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k >> ss 5 0.179 ± 0.019 s/op >> >> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k >> ss 5 0.189 ± 0.046 s/op >> >> >> IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k >> ss 5 0.195 ± 0.137 s/op >> >> >> *Perf Notes*: >> - Iceberg V1 Vectorization's real gain (over current Iceberg impl) is in >> flat data scans. Notice how it's almost exactly same as vanilla spark >> vectorization. >> - Projections work equally well. Although we see Nested column >> projections are still not performing as well as we need to be able to push >> nested column projections down to Iceberg. >> - We saw a slight overhead with Iceberg V1 Vectorization over smaller >> workloads, but this goes away with larger data files. >> >> *Why we think this is useful*: >> - This approach allows users to benefit from both: 1) Iceberg's metadata >> filtering and 2) Spark's Scan Vectorization. This should help with Iceberg >> adoption. >> - We think this can be an interim solution (until Arrow based impl is >> fully performant) for those who are currently blocked by performance >> difference between Iceberg and Spark's native Vectorization for interactive >> usecases. There's a lot of optimization work and testing gone into V1 >> vectorization that Iceberg can now benefit from. >> - In many cases companies have proprietary implementations of >> *ParquetFileFormat* that could have extended features like complex type >> support etc. Our code can use that at runtime as long as ' >> *buildReaderWithPartitionValues()*' signature is consistent.. if not >> the reader can be easily modified to plug their own vectorized reader in. >> - While profiling the Arrow implementation I found it difficult to >> compare bottlenecks due to major differences between DSv1 and DSv2 >> client-to-source interface paths. This makes it easier to compare numbers >> and profile code between V1 vectorization and Arrow vectorization as we now >> have both paths working behind a single DataSourceV2 path (viz. >> IcebergSource). >> >> *Limitations*: >> - This implementation is specific to Spark so other compute frameworks >> like Presto won't benefit from this. >> - It doesn't use Iceberg's Value Reader interface as it bypasses >> everything under the Task Data Reading. (added a separate >> *V1VectorizedTaskDataReader*) >> - Need to maintain two readers, as adding any code to Reader.java might >> need changes to V1Vectorized Reader. Although, we could minimize this with >> a *ReaderUtils* class. >> >> >> I have the code checked in at >> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader . >> If folks think this is useful and we can keep this as an interim solution >> behind a feature flag, I can get a PR up with proper unit tests. >> >> thanks and regards, >> -Gautam. >> >> >> [1] - https://github.com/apache/incubator-iceberg/issues/9 >> [2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read >> [3] - >> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197 >> [4] - >> https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader >> >>