After some more digging, I think there is no solution yet for vectorized reads with deletes.
*this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||* * (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));* Mayur Srivastava <mayur.srivast...@twosigma.com> ezt írta (időpont: 2021. márc. 2., Ke 15:48): > Hi Peter, > > > > Good point. > > > > Most of the ArrowReader code is inspired from the Spark’s vectorized > reader (src: > https://github.com/apache/iceberg/blob/631efec2f9ce1f0526d6613c81ac8fd0ccb95b5e/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java#L68 > ). > > > > > > We’ll probably need some help from someone who understands the Spark > vectorized read path. > > > > But, I’ll read the code to understand the deletes. > > > > Thanks, > > Mayur > > > > *From:* Peter Vary <pv...@cloudera.com.INVALID> > *Sent:* Tuesday, March 2, 2021 8:51 AM > *To:* Iceberg Dev List <dev@iceberg.apache.org> > *Cc:* rb...@netflix.com > *Subject:* Re: Reading data from Iceberg table into Apache Arrow in Java > > > > Hi Mayur, > > > > Playing around with the idea of implementing vectorized reads for Hive so > your message come just in time :) > > > > Took a quick look at the code but I do not really understand how > vectorized reads handle deletes. > > > > In non-vectorized code-path I have found this which filters the rows > one-by-one: > > > https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277 > > > > > > * DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, > tableSchema, readSchema); > Schema requiredSchema = deletes.requiredSchema(); > return deletes.filter(openTask(currentTask, requiredSchema));* > > > > In your code I have found that the delete files encryption keys are > collected, but not sure how they are used: > > > https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59 > > > > * task.files().stream()* > > * > .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), > fileScanTask.deletes().stream()))* > > * .forEach(file -> keyMetadata.put(file.path().toString(), > file.keyMetadata()));* > > > > Could you please help me with some quick pointers? > > > > Thanks, > > Peter > > > > On Mar 1, 2021, at 16:17, Mayur Srivastava <mayur.srivast...@twosigma.com> > wrote: > > > > Hi Ryan, > > > > I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the > vectorized arrow reader: > > > > This is my first Iceberg pull request - I'm not fully aware of the > contributing conventions of this repo, so let me know if any changes are > needed in the pr. > > I've refactored some code from the Spark vectorized reader and added an > ArrowReader which is a vectorized reader in Iceberg core. > > > > About the ArrowReader: > > 1. I’ve put the ArrowReader in the iceberg-data module because it > needed to access the Iceberg table scan. Let me know if the reader needs to > be moved. > > 2. I had to make a dependency addition of ‘iceberg-arrow’ for the > iceberg-data module. Specially for the ArrowReaderTest, I had to add the > following. Let me know if there is a better way for doing this. > > compileOnly("org.apache.arrow:arrow-vector") { > > exclude group: 'io.netty', module: 'netty-buffer' > > exclude group: 'com.google.code.findbugs', module: 'jsr305' > > } > > 3. Most of the code in ArrowReader is taken from the spark > vectorized reader. I think there is a potential to share > ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do > it yet. > > 4. ArrowReader returns an iterator of VectorSchemaRoot and the > behavior is explained in the Javadoc. > > 5. Some small changes were needed in IcebergGenerics to expose the > table scan object and VectorizedArrowReader to allocate different timestamp > Arrow vectors based on with/without timezone. > > 6. All prepush gradle tests pass except one which is still running > (and it seems very slow - TestFlinkIcebergSink). > > 7. I've not performed any performance tests with the implementation > yet. I'm planning to do so this week. > > > > Following are some limitations/questions for this implementation: > > 1. The arrow vector type is coupled with the physical data type in > the parquet file: When column data contains a constant value, the column is > dictionary encoded and the returned Arrow type is int32 irrespective of the > Iceberg data type. I think that the Arrow vector type should be consistent > with the logical Iceberg data type (and not change due to the physical data > type). There is a test ArrowReaderTest.testReadAllWithConstantRecords() > that is currently ignored. > > 2. Type promotion does not work: In the ArrowReaderTest, the data > for column ‘int_promotion’ was written as int, and then type was promoted > from int to long, but the Arrow reader still returns IntVector. I think > that the Arrow vector type should be consistent with the promoted logical > Iceberg data type. > > 3. Data type limitations: > > a. Types not tested: UUID, FixedType, DecimalType. In the > ArrowReaderTest, the parquet write was failing for these data types (due to > a null pointer exception in ParquetMetadataConverter.addRowGroup: > columnMetadata.getStatistics() was null). Are there unit tests with these > types that write to parquet? > > b. Types not supported: TimeType, ListType, MapType, StructType. > What is the path to add Arrow support for these data types? > > > > Thanks, > > Mayur > > > > *From:* Mayur Srivastava <mayur.srivast...@twosigma.com> > *Sent:* Friday, February 12, 2021 7:41 PM > *To:* dev@iceberg.apache.org; rb...@netflix.com > *Subject:* RE: Reading data from Iceberg table into Apache Arrow in Java > > > > Thank you Ryan. > > > > I’ll dig into the file scan plan and Spark codebase to learn about the > internals of Iceberg vectorized read path. Then, I’ll try to implement the > vectorized reader using core components only. I’ll be happy to work with > you to contribute it back to the upstream. I’ll get back to you if I’ve any > question or need any more pointers. > > > > Thanks, > > Mayur > > > > *From:* Ryan Blue <rb...@netflix.com.INVALID> > *Sent:* Friday, February 12, 2021 2:26 PM > *To:* Iceberg Dev List <dev@iceberg.apache.org> > *Subject:* Re: Reading data from Iceberg table into Apache Arrow in Java > > > > Hi Mayur, > > > > We built the Arrow support with Spark as the first use case, so the best > examples of how to use it are in Spark. > > > > The generic reader does two things: it plans a scan and sets up an > iterator of file readers to produce generic records. What you want to do is > the same thing, but set up the file readers to produce Arrow batches. You > can do that by changing the `Parquet.read` call and passing the callback to > create an Arrow batch reader rather than generic row reader. I don't think > there is a public example of this, but maybe someone else knows about one. > This isn't available in Iceberg yet, but if you want to add it we'd be > happy to help you get it in. > > > > The Spark read path has a good example, but it also wraps the Arrow > batches so Spark can read them. Also, keep in mind that the Arrow > integration only supports flat schemas right now, not fully nested schemas. > So you'd need to still fall back to the row-based path. (Side note, if you > have code to convert generics to Arrow, that's really useful to post > somewhere.) > > > > I hope that helps. It would be great to work with you to improve this in a > couple of PRs! > > > > rb > > > > On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava < > mayur.srivast...@twosigma.com> wrote: > > Hi, > > > > We have an existing time series data access service based on Arrow/Flight > which uses Apache Arrow format data to perform writes and reads (using time > range queries) from a bespoke table-backend based on a S3 compatible > storage. > > > > We are trying to replace our bespoke table-backend with Iceberg tables. > For integrating with Iceberg, we are using Iceberg core+data+parquet > modules directly to write and read data. I would like to note that our > service cannot use the Spark route to write or read the data. In our > current Iceberg reader integration code, we are using > IcebergGenerics.read(table).select(...).where(...).build() to iterate > through the data row-by-row. Instead of this (potentially slower) read path > which needs conversion between rows and Arrow VectorSchemaRoot, we want to > use a vectorized read path which directly returns an Arrow VectorSchemaRoot > as a callback or Arrow record batches as the result set. > > > > I have noticed that Iceberg already has an Arrow module > https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow. > I have also looked into https://github.com/apache/iceberg/issues/9 and > https://github.com/apache/iceberg/milestone/2. But, I’m not sure about > the current status of the vectorized reader support. I’m also not sure how > this Arrow module is being used to perform a vectorized read to execute a > query on an Iceberg table in the core/data/parquet library. > > > > I have a few questions regarding the Vectorized reader/Arrow support: > > 1. Is it possible to run a vectorized read on an Iceberg table to > return data in Arrow format using a non-Spark reader in Java? > > 2. Is there an example of reading data in Arrow format from an > Iceberg table? > > 3. Is the Spark read path completely vectorized? I ask this question > to find out if we can borrow from the vectorized Spark reader or we can > move code from vectorized Spark reader to the Iceberg core library. > > > > Let me know if you have any questions for me. > > > > Thanks, > > Mayur > > > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix > > >