Hey Anton,
I'm curious how you are using the Struct metrics in your company,
are you planning to use it for predicate pushdowns or something else
entirely?
Regarding timeline, that's fine, we can wait a week or two for your changes
on collecting metrics. If I can assume that your changes will add the
struct metrics, I could open a separate Iceberg issue about the struct
expression handling. If Ryan and you agree on allowing struct based
filtering in Iceberg as long as we avoid mixed filtering (map<struct<int>>
, array<struct<Int>> , etc.) I can go ahead and work on it.
Cheers,
-Gautam.
On Tue, Mar 5, 2019 at 10:30 PM Anton Okolnychyi
<[email protected]> wrote:
> Sorry for my late reply and thanks for testing Gautam!
>
> I had a local prototype that only collected metrics for nested structs and
> stored them. I haven’t checked if Iceberg can make use of that right now.
> As I understand Ryan’s comment and Gautam’s observations, we will need
> changes to make it work even if we have proper min/max statistics. So, we
> have two independent issues then. I was planning to add tests and submit
> the collection upstream. However, open source approval within my company
> might easily take another week or more. So, if we need this change earlier,
> someone can implement it. Just let me know, I can help to review then.
>
> Thanks,
> Anton
>
>
> On 5 Mar 2019, at 09:51, Gautam <[email protected]> wrote:
>
> Thanks for the response Ryan, comments in line ...
>
> > Iceberg doesn't support binding expressions in sub-structs yet. So the
> fix on the Iceberg side requires a few steps. First, collecting the metrics
> from Parquet with Anton's PR, and second, updating expression binding to
> work with structs.
>
> I don't think there is a PR up yet on collecting metrics on struct fields,
> I could work on one if Anton isn't already on it (thanks for calling it out
> in the issue Anton!).
>
> > The reason why binding doesn't work with structs yet it that we don't
> want to bind structs that are within maps or arrays because those will
> change the semantics of the expression. For example, a.b = 5 can be run on
> a: struct<b: int> but can't be run on a: list<struct<b: int>>.
>
> From the discussion on said issue [1] seems like we are ok with structs
> being filtered on. About structs inside maps or arrays, can we not reject
> the invalid cases in the expression evaluation? As in, detect of what
> nested type field 'a' is and allow or disallow appropriately? Having
> support for just structs is a good incremental feature methinks. Especially
> coz, as Anton pointed out, Spark has a PR up [2] on pushing down
> struct-based filters which one can cherry pick locally.
>
> > Also, the Avro problem wasn't because the manifests are stored as Avro.
> Avro doesn't collect metrics about the data that is stored, but the
> manifests have the metrics that were added with each file, so the problem
> is not adding the metrics when you added the files. I think you've solved
> the problem and correctly built your table metadata using the metrics from
> the Parquet footers, but I still want to note the distinction: Avro
> manifests store metrics correctly. Avro data files don't generate metrics.
>
> Gotcha!
>
> Cheers,
> -Gautam.
>
> [1] - https://github.com/apache/incubator-iceberg/issues/78
> [2] - https://github.com/apache/spark/pull/22573
>
>
> On Sat, Mar 2, 2019 at 6:47 AM Ryan Blue <[email protected]>
> wrote:
>
>> Iceberg doesn't support binding expressions in sub-structs yet. So the
>> fix on the Iceberg side requires a few steps. First, collecting the metrics
>> from Parquet with Anton's PR, and second, updating expression binding to
>> work with structs.
>>
>> The reason why binding doesn't work with structs yet it that we don't
>> want to bind structs that are within maps or arrays because those will
>> change the semantics of the expression. For example, a.b = 5 can be run on
>> a: struct<b: int> but can't be run on a: list<struct<b: int>>.
>>
>> Also, the Avro problem wasn't because the manifests are stored as Avro.
>> Avro doesn't collect metrics about the data that is stored, but the
>> manifests have the metrics that were added with each file, so the problem
>> is not adding the metrics when you added the files. I think you've solved
>> the problem and correctly built your table metadata using the metrics from
>> the Parquet footers, but I still want to note the distinction: Avro
>> manifests store metrics correctly. Avro data files don't generate metrics.
>>
>> On Thu, Feb 28, 2019 at 1:32 AM Gautam <[email protected]> wrote:
>>
>>> Hey Anton,
>>> Wanted to circle back on the Spark PR [1] to add support for
>>> nested fields .. I tried applying it, tested it. With this change Spark
>>> pushes filters on structs down to Iceberg, but Iceberg expression handling
>>> seems to fail in validation ..
>>>
>>>
>>> Caused by: com.netflix.iceberg.exceptions.ValidationException: Cannot
>>> find field 'location.lat' in struct: struct<1: age: optional int, 2: name:
>>> optional string, 3: friends: optional map<string, int>, 4: location:
>>> optional struct<7: lat: optional double, 8: lon: optional double>>
>>> at
>>> com.netflix.iceberg.exceptions.ValidationException.check(ValidationException.java:42)
>>> at
>>> com.netflix.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:76)
>>> at
>>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:138)
>>> at
>>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:94)
>>> at
>>> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:147)
>>> at
>>> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:160)
>>> at
>>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.project(Projections.java:108)
>>> at
>>> com.netflix.iceberg.expressions.InclusiveManifestEvaluator.<init>(InclusiveManifestEvaluator.java:57)
>>> at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:153)
>>> at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:149)
>>> at
>>> com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>>> at
>>> com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>>> at
>>> com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>>> at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
>>>
>>>
>>> I think this should be handled in Iceberg as struct filters like a.b.c
>>> = "blah" is a legit way to query in SQL. If you feel this is a valid
>>> assumption I can work on a fix. Thoughts?
>>>
>>>
>>> *Test Table Schema:*
>>> scala> iceDf.printSchema
>>> root
>>> |-- age: integer (nullable = true)
>>> |-- name: string (nullable = true)
>>> |-- friends: map (nullable = true)
>>> | |-- key: string
>>> | |-- value: integer (valueContainsNull = true)
>>> |-- location: struct (nullable = true)
>>> | |-- lat: double (nullable = true)
>>> | |-- lon: double (nullable = true)
>>>
>>>
>>> *Gist to recreate issue:*
>>> https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac
>>>
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>>
>>>
>>> [1] - https://github.com/apache/spark/pull/22573
>>>
>>> On Tue, Feb 26, 2019 at 10:35 PM Anton Okolnychyi <[email protected]>
>>> wrote:
>>>
>>>> Unfortunately, Spark doesn’t push down filters for nested columns. I
>>>> remember an effort to implement it [1]. However, it is not merged.
>>>> So, even if we have proper statistics in Iceberg, we cannot leverage it
>>>> from Spark.
>>>>
>>>> [1] - https://github.com/apache/spark/pull/22573
>>>>
>>>>
>>>> On 26 Feb 2019, at 16:52, Gautam <[email protected]> wrote:
>>>>
>>>> Thanks Anton, this is very helpful! I will apply the patch from
>>>> pull#63 and give it a shot.
>>>>
>>>> Re: Collecting min/max stas on nested structures (
>>>> *https://github.com/apache/incubator-iceberg/issues/78
>>>> <https://github.com/apache/incubator-iceberg/issues/78>* ) ...
>>>>
>>>> We have the exact same use case for skipping files on nested field
>>>> filters. I was intrigued by your comment on enabling stats on nested
>>>> structures by replacing `fileSchema.asStruct().field(fieldId)` with `
>>>> fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` ..
>>>> Have you had success with this? If so, I can try it out on our data as
>>>> well.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Gautam,
>>>>>
>>>>> I believe you see this behaviour because SparkAppenderFactory is
>>>>> configured to use ParquetWriteAdapter. It only tracks the number of
>>>>> records
>>>>> and uses ParquetWriteSupport from Spark. This means that the statistics is
>>>>> not collected on writes and cannot be used on reads.
>>>>>
>>>>> Once [1] is merged, proper statistics will be fetched from the footer
>>>>> and persisted in the manifests. The statistics is collected when writing
>>>>> data files not manifests. See [2] for more info. Also, [3] contains an
>>>>> example that filters out files (it requires [1] to be cherry-picked
>>>>> locally).
>>>>>
>>>>> Hope that helps,
>>>>> Anton
>>>>>
>>>>> [1] - https://github.com/apache/incubator-iceberg/pull/63
>>>>> [2] -
>>>>> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
>>>>> [3] - https://github.com/apache/incubator-iceberg/pull/105
>>>>>
>>>>>
>>>>> On 26 Feb 2019, at 13:58, Gautam <[email protected]> wrote:
>>>>>
>>>>> .. Just to be clear my concern is around Iceberg not skipping files.
>>>>> Iceberg does skip rowGroups when scanning files as
>>>>> *iceberg.parquet.ParquetReader* uses the parquet stats under it while
>>>>> skipping, albeit none of these stats come from the manifests.
>>>>>
>>>>> On Tue, Feb 26, 2019 at 7:24 PM Gautam <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hello Devs,
>>>>>> I am looking into leveraging Iceberg to speed up
>>>>>> split generation and to minimize file scans. My understanding was that
>>>>>> Iceberg keeps key statistics as listed under Metrics.java [1] viz. column
>>>>>> lower/upper bounds, nullValues, distinct value counts, etc. and that
>>>>>> table
>>>>>> scanning leverages these to skip partitions, files & row-groups (in the
>>>>>> Parquet context).
>>>>>>
>>>>>> What I found is files aren't skipped when a predicate applies only to
>>>>>> a subset of the table's files. Within a partition it will scan all files
>>>>>> as
>>>>>> manifests only keep record counts but the rest of the metrics (lower,
>>>>>> upper, distinct value counts, null values) are null / empty. This is coz
>>>>>> AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently
>>>>>> that is the only appender supported for writing manifest files.
>>>>>>
>>>>>>
>>>>>> *Example :*
>>>>>>
>>>>>> In following example iceTable was generated by iteratively adding two
>>>>>> files so it has two separate parquet files under it ..
>>>>>>
>>>>>> scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl))
>>>>>>
>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>>>>> partition_data=PartitionData{}, residual=true}
>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>>>>> partition_data=PartitionData{}, residual=true}
>>>>>>
>>>>>>
>>>>>> *Only one file contains row with age = null .. *
>>>>>>
>>>>>> scala> iceDf.show()
>>>>>> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a
>>>>>> task of very large size (113 KB). The maximum recommended task size is
>>>>>> 100
>>>>>> KB.
>>>>>> +----+-------+--------------------+
>>>>>> | age| name| friends|
>>>>>> +----+-------+--------------------+
>>>>>> | 60| Kannan| [Justin -> 19]|
>>>>>> | 75| Sharon|[Michael -> 30, J...|
>>>>>> |null|Michael| null|
>>>>>> | 30| Andy|[Josh -> 10, Bisw...|
>>>>>> | 19| Justin|[Kannan -> 75, Sa...|
>>>>>> +----+-------+--------------------+
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Running filter on isNull(age) scans both files .. *
>>>>>>
>>>>>> val isNullExp = Expressions.isNull("age")
>>>>>> val isNullScan = iceTable.newScan().filter(isNullExp)
>>>>>>
>>>>>> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl))
>>>>>>
>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>>>>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>>>>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>>>>>>
>>>>>>
>>>>>>
>>>>>> I would expect only one file to be scanned as Iceberg should track
>>>>>> nullValueCounts as per Metrics.java [1] .. The same issue holds for
>>>>>> integer
>>>>>> comparison filters scanning too many files.
>>>>>>
>>>>>> When I looked through the code, there is provision for using Parquet
>>>>>> file footer stats to populate Manifest Metrics [3] but this is never used
>>>>>> as Iceberg currently only allows AvroFileAppender for creating manifest
>>>>>> files.
>>>>>>
>>>>>> What's the plan around using Parquet footer stats in manifests which
>>>>>> can be very useful during split generation? I saw some discussions around
>>>>>> this in the Iceberg Spec document [4] but couldn't glean if any of those
>>>>>> are actually implemented yet.
>>>>>>
>>>>>> I can work on a proposal PR for adding these in but wanted to know
>>>>>> the current thoughts around this.
>>>>>>
>>>>>>
>>>>>> *Gist for above example *:
>>>>>> https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7
>>>>>>
>>>>>>
>>>>>> Looking forward to your feedback,
>>>>>>
>>>>>> Cheers,
>>>>>> -Gautam.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1] -
>>>>>> https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java
>>>>>> [2] -
>>>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56
>>>>>> [3] -
>>>>>> https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118
>>>>>> [4] -
>>>>>> https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>