Just to follow up on this for anyone watching the dev list. I merged
Gautam's fix for this. Thanks for finding and working on this!

On Thu, Feb 21, 2019 at 4:09 AM Gautam <gautamkows...@gmail.com> wrote:

>
> Hey Ryan,
>
> I found the root cause of the post scan filter not working over Iceberg
> format.
>
> *The short explanation: *Iceberg  Parquet reader fails to scan rows when
> using complex column filter(s). Iceberg Parquet reader doesn't return any
> rows for the post scan filter to further inspect with post scan filters.
>
> *More Detail: *
> Although complex filter isn't pushed down by Spark to Iceberg scan, it
> does push down an implicit isNotNull(mapCol) filter. Before scanning begins
> row groups are evaluated to check if they can be skipped *[1]*. While
> doing so ParquetMetricsRowGroupFilter rejects rowgroups on evaluation of
> this isNotNull(mapCol) filter. ParquetMetricsRowGroupFilter implements a
> BoundExpressionVisitor wherein it's notNull() expression evaluation method
> doesn't recognize complex types as being "present" *[2]*, hence leading
> the reader to believe that column is not present and is all nulls.
>
> *In the map filter case: *
> The PMRGFilter keeps a `valueCounts` metric, which keeps a count statistic
> by column id. This doesn't contain counts for map column but instead has
> value counts for the map-keys and map-values ( which have different unique
> ids). So a lookup for the map column id fails to return any counts.
>
> Proposed fix options:
> 1 - Reject handling the implicit isNotNull(mapCol) check in Parquet Reader
> for nested types as we know nested types are not pushed down.
> 2 - We can just skip the stats based check for Nested Types as we know
> they need to be re-evaluated by post scan filters anyways.
>
> Let me know what you think,
>
> Cheers,
> -Gautam.
>
> [1] -
> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetReader.java#L103-L112
> [2] -
> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetricsRowGroupFilter.java#L159-L163
>
>
> On Wed, Feb 20, 2019 at 12:25 AM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Hi Gautam,
>>
>> Thanks for reporting this. I'll look into why Spark is filtering out all
>> of the Iceberg records. It should use the same filter in both cases, so I'm
>> surprised that this is happening.
>>
>> The problem with the complex predicate is that it is inside a map.
>> Parquet doesn't support push-down filters for nested columns.
>>
>> On Mon, Feb 18, 2019 at 11:03 PM Gautam <gautamkows...@gmail.com> wrote:
>>
>>> Hello Iceberg Devs,
>>>                I'v been tracking an issue with predicate pushdowns in
>>> Iceberg on complex types. I have compared vanilla Spark reader over Parquet
>>> vs. Iceberg format reader.  I have an example detailing it here:
>>> https://github.com/apache/incubator-iceberg/issues/99
>>>
>>> *Vanilla Spark Parquet reader plan*
>>> == Physical Plan ==
>>> *(1) Project [age#428, name#429, friends#430, location#431]
>>> +- *(1) Filter (isnotnull(friends#430) && (friends#430[Josh] = 10))
>>>    +- *(1) FileScan parquet [age#428,name#429,friends#430,location#431]
>>> Batched: false, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/usr/local/spark/test/parquet-people-complex],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(friends)], ReadSchema:
>>> struct<age:int,name:string,friends:map<string,int>,location:struct<lat:int,lon:int>>
>>>
>>> * Iceberg Plan*
>>> == Physical Plan ==
>>> *(1) Project [age#33, name#34, friends#35]
>>> +- *(1) Filter ((friends#35[Josh] = 10) && isnotnull(friends#35))
>>>    +- *(1) ScanV2 iceberg[age#33, name#34, friends#35] (Filters:
>>> [isnotnull(friends#35)], Options: [path=iceberg-people-complex2,paths=[]])
>>>
>>>
>>> *Couple of points :*
>>> 1)  Complex predicate is not passed down to the Scan level in both
>>> plans. The complex predicate is termed "non-translateable" by
>>> *DataSourceStrategy.translateFilter()  *[1] when trying to convert
>>> Catalyst expression to data source filter. Ryan & Xabriel had a discussion
>>> earlier on this list about Spark not passing expressions to data source (in
>>> certain cases). This might be related to that. Maybe a path forward is to
>>> fix that translation in Spark so that Iceberg Filter conversion has a
>>> chance to handle complex type. Currently Iceberg Reader code is unaware of
>>> that filter.
>>>
>>> 2) Although both vanilla Spark and Iceberg handle complex type
>>> predicates post scan, this regression is caused by post scan filtering not
>>> returning results in the Iceberg case. I think post scan filtering is
>>> unable to handle Iceberg format. So if 1) is not the way forward then the
>>> alternative way is to fix this in the post scan filtering.
>>>
>>>
>>> Looking forward to your guidance on the way forward.
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>> [1] -
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L450
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to