Gautam, could you also share the code for benchmarks and conversion?

Thanks,
Anton

> On 13 Jun 2019, at 19:38, Ryan Blue <rb...@netflix.com.INVALID> wrote:
> 
> Sounds like a good start. I think the next step is to avoid using the 
> ParquetReader.FileIterator and deserialize directly from TripleIterator 
> <https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java>.
>  I think the reason why this is taking longer is that (I think) you’re doing 
> all the work to materialize the data in rows, and then converting to vectors.
> 
> To work on top of TripleIterator, I think you need to create a 
> ParquetValueReader for Arrow batches. That would be configured with a batch 
> size, so that when you pass it into ParquetReader, the FileIterator returns 
> batches instead of individual rows.
> 
> Does that make sense?
> 
> rb
> 
> 
> On Wed, Jun 12, 2019 at 11:22 PM Gautam <gautamkows...@gmail.com 
> <mailto:gautamkows...@gmail.com>> wrote:
> Hey Ryan and Anton,
> 
> I wanted to circle back on some findings I had after taking a first stab at 
> this ..
>  
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an 
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what we 
> want to take advantage of..
> 
> This is how I went about this... I wrapped the ParquetReader.FileIterator 
> with an iterator that creates Arrow batch from rows (one batch for every 100 
> rows) returned by FileIterator.next() . I exposed each Arrow Batch from this 
> iterator as a ColumnarBatch which has a  .rowIterator() that reads this as a 
> sequence of InternalRow. I return this in the  Reader.open() call in Iceberg. 
> 
> Here are some microbenchmark numbers on on flat parquet file scanning ...
> 
> 1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10 files
> 
> Benchmark                    Mode  Cnt   Score(sec)   Error     Units
> readFileV1SourceVectorized   avgt    3   0.870        ± 1.883   s/op
> readIcebergNoBatching        avgt    3   1.854        ± 2.003   s/op
> readIcebergWithBatching100k  avgt    3   3.216        ± 0.520   s/op
> readIcebergWithBatching10k   avgt    3   8.763        ± 2.338   s/op
> readIcebergWithBatching5k    avgt    3  13.964        ± 6.328   s/op
> 
> 
> The Batching doesn't seem to add any benefit. I measured the conversion times 
> and am reading this as the overhead from extra copies to Arrow and then to 
> ColumnarBatch again. Although I was hoping that the materialization to arrow 
> would offset some of that overhead.
> 
> Wondering what my next step should be.. 
> 1) Eliminate the extra conversion IO overhead by reading each column type 
> directly into ArrowColumnVector? 
> 2) Should I extend IcebergSource to support the SupportsScanColumnarBatch 
> mixin and expose the ColumnarBatch?
> 
> 
> Appreciate your guidance,
> 
> -Gautam.
> 
> 
> 
> On Fri, May 24, 2019 at 5:28 PM Ryan Blue <rb...@netflix.com.invalid> wrote:
> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an 
> Iterator[InternalRow] interface, it would still not work right? Coz it seems 
> to me there is a lot more going on upstream in the operator execution path 
> that would be needed to be done here.
> 
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an 
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what we 
> want to take advantage of. You’re right that the first thing that Spark does 
> it to get each row as InternalRow. But we still get a benefit from 
> vectorizing the data materialization to Arrow itself. Spark execution is not 
> vectorized, but that can be updated in Spark later (I think there’s a 
> proposal).
> 
> I wouldn’t pay too much attention to the Parquet vectorized path in Spark 
> because it produces its own in-memory format and not Arrow. We want to take 
> advantage of Arrow so that we can use dictionary-encoded columns in record 
> batches. Spark’s vectorized Parquet reader also works directly on Parquet 
> pages instead of a higher-level abstraction. I’m not sure we are going to 
> want to do that right away instead of using the TripleIterator that Iceberg 
> currently uses to abstract the Parquet page structure away.
> 
> I would start by building converters that can build a column of Arrow data 
> from a TripleIterator. Then we can stitch those columns together to get 
> record batches and see how that performs. Then we can add complexity from 
> there.
> 
> 
> On Fri, May 24, 2019 at 4:28 PM Gautam <gautamkows...@gmail.com 
> <mailto:gautamkows...@gmail.com>> wrote:
> Hello devs,
>        As a follow up to https://github.com/apache/incubator-iceberg/issues/9 
> <https://github.com/apache/incubator-iceberg/issues/9> I'v been reading 
> through how Spark does vectorized reading in it's current implementation 
> which is in DataSource V1 path. Trying to see how we can achieve the same 
> impact in Iceberg's reading. To start with I want to form an understanding at 
> a high level of the approach one would need to take to achieve this. Pardon 
> my ignorance as I'm equally new to Spark codebase as I am to Iceberg. Please 
> correct me if my understanding is wrong.
> 
> So here's what Vectorization seems to be doing for Parquet reading: 
> - The DataSource scan execution uses ParquetFileFormat to build a 
> RecordReaderIterator [1] which underneath uses the 
> VectorizedParquetReaderReader.
> - This record reader is used to iterate over entire batches of columns 
> (ColumnarBatch). The iterator.next() call returns a batch and not just a row. 
> The interfaces are such that allow an ColumnarBatch to be passed around as a 
> generic Object. As stated here [2]
> - On the scan execution side, there is stage Code Generation that compiles 
> code that consumes entire batches at time so that physical operators take 
> advantage of the vectorization feature. So the scanner code is aware that 
> it's reading columnar batches out of the iterator. 
> 
> 
> I'm wondering how one should approach this if one is to achieve Vectorization 
> in Iceberg Reader (DatasourceV2) path. For instance, if Iceberg Reader was to 
> wrap Arrow or ColumnarBatch behind an Iterator[InternalRow] interface, it 
> would still not work right? Coz it seems to me there is a lot more going on 
> upstream in the operator execution path that would be needed to be done here. 
> It would be great if folks who are more well-versed with the Spark codebase 
> shed some light on this. In general, what is the contract needed between V2 
> DataSourceReader (like Iceberg) and the operator execution? 
> 
> thank you,
> -Gautam.
> 
> 
> [1] - 
> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412
>  
> <https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412>
> [2] - 
> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L29
>  
> <https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L29>
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix

Reply via email to