Just to follow up on this for anyone watching the dev list. I merged Gautam's fix for this. Thanks for finding and working on this!
On Thu, Feb 21, 2019 at 4:09 AM Gautam <gautamkows...@gmail.com> wrote: > > Hey Ryan, > > I found the root cause of the post scan filter not working over Iceberg > format. > > *The short explanation: *Iceberg Parquet reader fails to scan rows when > using complex column filter(s). Iceberg Parquet reader doesn't return any > rows for the post scan filter to further inspect with post scan filters. > > *More Detail: * > Although complex filter isn't pushed down by Spark to Iceberg scan, it > does push down an implicit isNotNull(mapCol) filter. Before scanning begins > row groups are evaluated to check if they can be skipped *[1]*. While > doing so ParquetMetricsRowGroupFilter rejects rowgroups on evaluation of > this isNotNull(mapCol) filter. ParquetMetricsRowGroupFilter implements a > BoundExpressionVisitor wherein it's notNull() expression evaluation method > doesn't recognize complex types as being "present" *[2]*, hence leading > the reader to believe that column is not present and is all nulls. > > *In the map filter case: * > The PMRGFilter keeps a `valueCounts` metric, which keeps a count statistic > by column id. This doesn't contain counts for map column but instead has > value counts for the map-keys and map-values ( which have different unique > ids). So a lookup for the map column id fails to return any counts. > > Proposed fix options: > 1 - Reject handling the implicit isNotNull(mapCol) check in Parquet Reader > for nested types as we know nested types are not pushed down. > 2 - We can just skip the stats based check for Nested Types as we know > they need to be re-evaluated by post scan filters anyways. > > Let me know what you think, > > Cheers, > -Gautam. > > [1] - > https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetReader.java#L103-L112 > [2] - > https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetricsRowGroupFilter.java#L159-L163 > > > On Wed, Feb 20, 2019 at 12:25 AM Ryan Blue <rb...@netflix.com.invalid> > wrote: > >> Hi Gautam, >> >> Thanks for reporting this. I'll look into why Spark is filtering out all >> of the Iceberg records. It should use the same filter in both cases, so I'm >> surprised that this is happening. >> >> The problem with the complex predicate is that it is inside a map. >> Parquet doesn't support push-down filters for nested columns. >> >> On Mon, Feb 18, 2019 at 11:03 PM Gautam <gautamkows...@gmail.com> wrote: >> >>> Hello Iceberg Devs, >>> I'v been tracking an issue with predicate pushdowns in >>> Iceberg on complex types. I have compared vanilla Spark reader over Parquet >>> vs. Iceberg format reader. I have an example detailing it here: >>> https://github.com/apache/incubator-iceberg/issues/99 >>> >>> *Vanilla Spark Parquet reader plan* >>> == Physical Plan == >>> *(1) Project [age#428, name#429, friends#430, location#431] >>> +- *(1) Filter (isnotnull(friends#430) && (friends#430[Josh] = 10)) >>> +- *(1) FileScan parquet [age#428,name#429,friends#430,location#431] >>> Batched: false, Format: Parquet, Location: >>> InMemoryFileIndex[file:/usr/local/spark/test/parquet-people-complex], >>> PartitionFilters: [], PushedFilters: [IsNotNull(friends)], ReadSchema: >>> struct<age:int,name:string,friends:map<string,int>,location:struct<lat:int,lon:int>> >>> >>> * Iceberg Plan* >>> == Physical Plan == >>> *(1) Project [age#33, name#34, friends#35] >>> +- *(1) Filter ((friends#35[Josh] = 10) && isnotnull(friends#35)) >>> +- *(1) ScanV2 iceberg[age#33, name#34, friends#35] (Filters: >>> [isnotnull(friends#35)], Options: [path=iceberg-people-complex2,paths=[]]) >>> >>> >>> *Couple of points :* >>> 1) Complex predicate is not passed down to the Scan level in both >>> plans. The complex predicate is termed "non-translateable" by >>> *DataSourceStrategy.translateFilter() *[1] when trying to convert >>> Catalyst expression to data source filter. Ryan & Xabriel had a discussion >>> earlier on this list about Spark not passing expressions to data source (in >>> certain cases). This might be related to that. Maybe a path forward is to >>> fix that translation in Spark so that Iceberg Filter conversion has a >>> chance to handle complex type. Currently Iceberg Reader code is unaware of >>> that filter. >>> >>> 2) Although both vanilla Spark and Iceberg handle complex type >>> predicates post scan, this regression is caused by post scan filtering not >>> returning results in the Iceberg case. I think post scan filtering is >>> unable to handle Iceberg format. So if 1) is not the way forward then the >>> alternative way is to fix this in the post scan filtering. >>> >>> >>> Looking forward to your guidance on the way forward. >>> >>> Cheers, >>> -Gautam. >>> >>> [1] - >>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L450 >>> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > -- Ryan Blue Software Engineer Netflix