Gautam, could you also share the code for benchmarks and conversion? Thanks, Anton
> On 13 Jun 2019, at 19:38, Ryan Blue <rb...@netflix.com.INVALID> wrote: > > Sounds like a good start. I think the next step is to avoid using the > ParquetReader.FileIterator and deserialize directly from TripleIterator > <https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java>. > I think the reason why this is taking longer is that (I think) you’re doing > all the work to materialize the data in rows, and then converting to vectors. > > To work on top of TripleIterator, I think you need to create a > ParquetValueReader for Arrow batches. That would be configured with a batch > size, so that when you pass it into ParquetReader, the FileIterator returns > batches instead of individual rows. > > Does that make sense? > > rb > > > On Wed, Jun 12, 2019 at 11:22 PM Gautam <gautamkows...@gmail.com > <mailto:gautamkows...@gmail.com>> wrote: > Hey Ryan and Anton, > > I wanted to circle back on some findings I had after taking a first stab at > this .. > > There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an > iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what we > want to take advantage of.. > > This is how I went about this... I wrapped the ParquetReader.FileIterator > with an iterator that creates Arrow batch from rows (one batch for every 100 > rows) returned by FileIterator.next() . I exposed each Arrow Batch from this > iterator as a ColumnarBatch which has a .rowIterator() that reads this as a > sequence of InternalRow. I return this in the Reader.open() call in Iceberg. > > Here are some microbenchmark numbers on on flat parquet file scanning ... > > 1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10 files > > Benchmark Mode Cnt Score(sec) Error Units > readFileV1SourceVectorized avgt 3 0.870 ± 1.883 s/op > readIcebergNoBatching avgt 3 1.854 ± 2.003 s/op > readIcebergWithBatching100k avgt 3 3.216 ± 0.520 s/op > readIcebergWithBatching10k avgt 3 8.763 ± 2.338 s/op > readIcebergWithBatching5k avgt 3 13.964 ± 6.328 s/op > > > The Batching doesn't seem to add any benefit. I measured the conversion times > and am reading this as the overhead from extra copies to Arrow and then to > ColumnarBatch again. Although I was hoping that the materialization to arrow > would offset some of that overhead. > > Wondering what my next step should be.. > 1) Eliminate the extra conversion IO overhead by reading each column type > directly into ArrowColumnVector? > 2) Should I extend IcebergSource to support the SupportsScanColumnarBatch > mixin and expose the ColumnarBatch? > > > Appreciate your guidance, > > -Gautam. > > > > On Fri, May 24, 2019 at 5:28 PM Ryan Blue <rb...@netflix.com.invalid> wrote: > if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an > Iterator[InternalRow] interface, it would still not work right? Coz it seems > to me there is a lot more going on upstream in the operator execution path > that would be needed to be done here. > > There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an > iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what we > want to take advantage of. You’re right that the first thing that Spark does > it to get each row as InternalRow. But we still get a benefit from > vectorizing the data materialization to Arrow itself. Spark execution is not > vectorized, but that can be updated in Spark later (I think there’s a > proposal). > > I wouldn’t pay too much attention to the Parquet vectorized path in Spark > because it produces its own in-memory format and not Arrow. We want to take > advantage of Arrow so that we can use dictionary-encoded columns in record > batches. Spark’s vectorized Parquet reader also works directly on Parquet > pages instead of a higher-level abstraction. I’m not sure we are going to > want to do that right away instead of using the TripleIterator that Iceberg > currently uses to abstract the Parquet page structure away. > > I would start by building converters that can build a column of Arrow data > from a TripleIterator. Then we can stitch those columns together to get > record batches and see how that performs. Then we can add complexity from > there. > > > On Fri, May 24, 2019 at 4:28 PM Gautam <gautamkows...@gmail.com > <mailto:gautamkows...@gmail.com>> wrote: > Hello devs, > As a follow up to https://github.com/apache/incubator-iceberg/issues/9 > <https://github.com/apache/incubator-iceberg/issues/9> I'v been reading > through how Spark does vectorized reading in it's current implementation > which is in DataSource V1 path. Trying to see how we can achieve the same > impact in Iceberg's reading. To start with I want to form an understanding at > a high level of the approach one would need to take to achieve this. Pardon > my ignorance as I'm equally new to Spark codebase as I am to Iceberg. Please > correct me if my understanding is wrong. > > So here's what Vectorization seems to be doing for Parquet reading: > - The DataSource scan execution uses ParquetFileFormat to build a > RecordReaderIterator [1] which underneath uses the > VectorizedParquetReaderReader. > - This record reader is used to iterate over entire batches of columns > (ColumnarBatch). The iterator.next() call returns a batch and not just a row. > The interfaces are such that allow an ColumnarBatch to be passed around as a > generic Object. As stated here [2] > - On the scan execution side, there is stage Code Generation that compiles > code that consumes entire batches at time so that physical operators take > advantage of the vectorization feature. So the scanner code is aware that > it's reading columnar batches out of the iterator. > > > I'm wondering how one should approach this if one is to achieve Vectorization > in Iceberg Reader (DatasourceV2) path. For instance, if Iceberg Reader was to > wrap Arrow or ColumnarBatch behind an Iterator[InternalRow] interface, it > would still not work right? Coz it seems to me there is a lot more going on > upstream in the operator execution path that would be needed to be done here. > It would be great if folks who are more well-versed with the Spark codebase > shed some light on this. In general, what is the contract needed between V2 > DataSourceReader (like Iceberg) and the operator execution? > > thank you, > -Gautam. > > > [1] - > https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412 > > <https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412> > [2] - > https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L29 > > <https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L29> > > -- > Ryan Blue > Software Engineer > Netflix > > > -- > Ryan Blue > Software Engineer > Netflix