After some more digging, I think there is no solution yet for vectorized
reads with deletes.

*this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles &&
(allOrcFileScanTasks ||*
*     (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));*


Mayur Srivastava <mayur.srivast...@twosigma.com> ezt írta (időpont: 2021.
márc. 2., Ke 15:48):

> Hi Peter,
>
>
>
> Good point.
>
>
>
> Most of the ArrowReader code is inspired from the Spark’s vectorized
> reader (src:
> https://github.com/apache/iceberg/blob/631efec2f9ce1f0526d6613c81ac8fd0ccb95b5e/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java#L68
> ).
>
>
>
>
>
> We’ll probably need some help from someone who understands the Spark
> vectorized read path.
>
>
>
> But, I’ll read the code to understand the deletes.
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Peter Vary <pv...@cloudera.com.INVALID>
> *Sent:* Tuesday, March 2, 2021 8:51 AM
> *To:* Iceberg Dev List <dev@iceberg.apache.org>
> *Cc:* rb...@netflix.com
> *Subject:* Re: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> Hi Mayur,
>
>
>
> Playing around with the idea of implementing vectorized reads for Hive so
> your message come just in time :)
>
>
>
> Took a quick look at the code but I do not really understand how
> vectorized reads handle deletes.
>
>
>
> In non-vectorized code-path I have found this which filters the rows
> one-by-one:
>
>
> https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277
>
>
>
>
>
> *          DeleteFilter deletes = new GenericDeleteFilter(io, currentTask,
> tableSchema, readSchema);
>   Schema requiredSchema = deletes.requiredSchema();
>   return deletes.filter(openTask(currentTask, requiredSchema));*
>
>
>
> In your code I have found that the delete files encryption keys are
> collected, but not sure how they are used:
>
>
> https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59
>
>
>
> *    task.files().stream()*
>
> *
> .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()),
> fileScanTask.deletes().stream()))*
>
> *        .forEach(file -> keyMetadata.put(file.path().toString(),
> file.keyMetadata()));*
>
>
>
> Could you please help me with some quick pointers?
>
>
>
> Thanks,
>
> Peter
>
>
>
> On Mar 1, 2021, at 16:17, Mayur Srivastava <mayur.srivast...@twosigma.com>
> wrote:
>
>
>
> Hi Ryan,
>
>
>
> I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the
> vectorized arrow reader:
>
>
>
> This is my first Iceberg pull request - I'm not fully aware of the
> contributing conventions of this repo, so let me know if any changes are
> needed in the pr.
>
> I've refactored some code from the Spark vectorized reader and added an
> ArrowReader which is a vectorized reader in Iceberg core.
>
>
>
> About the ArrowReader:
>
> 1.      I’ve put the ArrowReader in the iceberg-data module because it
> needed to access the Iceberg table scan. Let me know if the reader needs to
> be moved.
>
> 2.      I had to make a dependency addition of ‘iceberg-arrow’ for the
> iceberg-data module. Specially for the ArrowReaderTest, I had to add the
> following. Let me know if there is a better way for doing this.
>
> compileOnly("org.apache.arrow:arrow-vector") {
>
> exclude group: 'io.netty', module: 'netty-buffer'
>
> exclude group: 'com.google.code.findbugs', module: 'jsr305'
>
> }
>
> 3.      Most of the code in ArrowReader is taken from the spark
> vectorized reader. I think there is a potential to share
> ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do
> it yet.
>
> 4.      ArrowReader returns an iterator of VectorSchemaRoot and the
> behavior is explained in the Javadoc.
>
> 5.      Some small changes were needed in IcebergGenerics to expose the
> table scan object and VectorizedArrowReader to allocate different timestamp
> Arrow vectors based on with/without timezone.
>
> 6.      All prepush gradle tests pass except one which is still running
> (and it seems very slow - TestFlinkIcebergSink).
>
> 7.      I've not performed any performance tests with the implementation
> yet. I'm planning to do so this week.
>
>
>
> Following are some limitations/questions for this implementation:
>
> 1.      The arrow vector type is coupled with the physical data type in
> the parquet file: When column data contains a constant value, the column is
> dictionary encoded and the returned Arrow type is int32 irrespective of the
> Iceberg data type. I think that the Arrow vector type should be consistent
> with the logical Iceberg data type (and not change due to the physical data
> type). There is a test ArrowReaderTest.testReadAllWithConstantRecords()
> that is currently ignored.
>
> 2.      Type promotion does not work: In the ArrowReaderTest, the data
> for column ‘int_promotion’ was written as int, and then type was promoted
> from int to long, but the Arrow reader still returns IntVector. I think
> that the Arrow vector type should be consistent with the promoted logical
> Iceberg data type.
>
> 3.      Data type limitations:
>
> a.      Types not tested: UUID, FixedType, DecimalType. In the
> ArrowReaderTest, the parquet write was failing for these data types (due to
> a null pointer exception in ParquetMetadataConverter.addRowGroup:
> columnMetadata.getStatistics() was null). Are there unit tests with these
> types that write to parquet?
>
> b.      Types not supported: TimeType, ListType, MapType, StructType.
> What is the path to add Arrow support for these data types?
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Mayur Srivastava <mayur.srivast...@twosigma.com>
> *Sent:* Friday, February 12, 2021 7:41 PM
> *To:* dev@iceberg.apache.org; rb...@netflix.com
> *Subject:* RE: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> Thank you Ryan.
>
>
>
> I’ll dig into the file scan plan and Spark codebase to learn about the
> internals of Iceberg vectorized read path. Then, I’ll try to implement the
> vectorized reader using core components only. I’ll be happy to work with
> you to contribute it back to the upstream. I’ll get back to you if I’ve any
> question or need any more pointers.
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Ryan Blue <rb...@netflix.com.INVALID>
> *Sent:* Friday, February 12, 2021 2:26 PM
> *To:* Iceberg Dev List <dev@iceberg.apache.org>
> *Subject:* Re: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> Hi Mayur,
>
>
>
> We built the Arrow support with Spark as the first use case, so the best
> examples of how to use it are in Spark.
>
>
>
> The generic reader does two things: it plans a scan and sets up an
> iterator of file readers to produce generic records. What you want to do is
> the same thing, but set up the file readers to produce Arrow batches. You
> can do that by changing the `Parquet.read` call and passing the callback to
> create an Arrow batch reader rather than generic row reader. I don't think
> there is a public example of this, but maybe someone else knows about one.
> This isn't available in Iceberg yet, but if you want to add it we'd be
> happy to help you get it in.
>
>
>
> The Spark read path has a good example, but it also wraps the Arrow
> batches so Spark can read them. Also, keep in mind that the Arrow
> integration only supports flat schemas right now, not fully nested schemas.
> So you'd need to still fall back to the row-based path. (Side note, if you
> have code to convert generics to Arrow, that's really useful to post
> somewhere.)
>
>
>
> I hope that helps. It would be great to work with you to improve this in a
> couple of PRs!
>
>
>
> rb
>
>
>
> On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <
> mayur.srivast...@twosigma.com> wrote:
>
> Hi,
>
>
>
> We have an existing time series data access service based on Arrow/Flight
> which uses Apache Arrow format data to perform writes and reads (using time
> range queries) from a bespoke table-backend based on a S3 compatible
> storage.
>
>
>
> We are trying to replace our bespoke table-backend with Iceberg tables.
> For integrating with Iceberg, we are using Iceberg core+data+parquet
> modules directly to write and read data. I would like to note that our
> service cannot use the Spark route to write or read the data. In our
> current Iceberg reader integration code, we are using
> IcebergGenerics.read(table).select(...).where(...).build() to iterate
> through the data row-by-row. Instead of this (potentially slower) read path
> which needs conversion between rows and Arrow VectorSchemaRoot, we want to
> use a vectorized read path which directly returns an Arrow VectorSchemaRoot
> as a callback or Arrow record batches as the result set.
>
>
>
> I have noticed that Iceberg already has an Arrow module
> https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow.
> I  have also looked into https://github.com/apache/iceberg/issues/9 and
> https://github.com/apache/iceberg/milestone/2. But, I’m not sure about
> the current status of the vectorized reader support. I’m also not sure how
> this Arrow module is being used to perform a vectorized read to execute a
> query on an Iceberg table in the core/data/parquet library.
>
>
>
> I have a few questions regarding the Vectorized reader/Arrow support:
>
> 1.      Is it possible to run a vectorized read on an Iceberg table to
> return data in Arrow format using a non-Spark reader in Java?
>
> 2.      Is there an example of reading data in Arrow format from an
> Iceberg table?
>
> 3.      Is the Spark read path completely vectorized? I ask this question
> to find out if we can borrow from the vectorized Spark reader or we can
> move code from vectorized Spark reader to the Iceberg core library.
>
>
>
> Let me know if you have any questions for me.
>
>
>
> Thanks,
>
> Mayur
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>

Reply via email to