The feature could be useful for Spark as well. See:
https://github.com/apache/iceberg/pull/7636#pullrequestreview-1434981224

Maybe we should add this as a topic for the next Iceberg Community Sync.

Also when trying out possible solutions, I have found that some of the
statistics are modifiable. I think that is a mistake, so I created a PR:
https://github.com/apache/iceberg/pull/7643. Could someone please help
me with the review there?



Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 19., P,
19:59):

> The proposal here is essentially column stats projection pushdown. For
> some Flink jobs with watermark alignment, Flink source is only interested
> in the column stats (min-max) for one timestamp column. Hence the column
> stats projection can really help reduce memory footprint for wide tables
> (with hundreds or thousands of columns).
>
> Russel and Ryan suggested a work-around by applying transformation on the
> CloseableIterable of FileScanTask and stripping away unneeded column stats.
> This has a couple of downsides. (1) large temporary objects with all column
> stats are still created and affect JVM GC overhead. Peter gave some data
> points like 100 KB objects per file. (2) the transformation implementation
> probably will look a little hacky.
>
> The main question for adding this feature is probably if column stats
> projection pushdown is more widely useful. Cost base optimizer (e.g. in
> Trino) probably uses column level stats (and partition stats). Query
> engines know which column stats are needed and push down the stats
> projection to Iceberg core during scan planning.
>
>
> On Tue, May 16, 2023 at 2:54 AM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Thanks Ryan, Russell,
>>
>> Let me explain the situation a bit further.
>>
>> We have time series data written to an Iceberg table, then there is a
>> Flink job which uses this Iceberg table as a source to read the
>> incoming data continuously.
>>
>>      *Downstream job -> Iceberg table -> Flink job *
>>
>> The Flink job (using the Iceberg Flink Source) uses windowing to
>> aggregate data based on a timestamp field and separate out late data for
>> specific handling.
>> When the stream is running continuously then the Flink job has enough
>> resources to handle the incoming snapshots in order and run planning for
>> every new snapshot - the only source of the late data is the old data
>> arrived with the new snapshot.
>>
>>      *Snapshot1 -> Plan1 -> Snapshot2 -> Plan2 -> Snapshot3 -> Plan3 ->
>> Snapshot4 -> Plan4*
>>
>> When something blocks the stream (either the downstream job is down, or
>> the Flink job is not able to access the Iceberg table temporarily) then
>> multiple snapshots worth of data arrive at the Flink job and the planning
>> is happening for multiple snapshots.
>>
>>      *Snapshot1 -> Snapshot2 -> Snapshot3 -> Plan1-3 -> Snapshot4 ->
>> Plan4*
>>
>> Snapshot1/Snapshot2/Snapshot3 might arrive in a single Iceberg snapshot
>> in case of a commit failure. Here I refer to them as a different Snapshot
>> only to describe the data inside the files.
>>
>> If - for whatever reason - during the execution the file order becomes
>> like Snapshot3 -> Snapshot1 then the event time extractor will extract
>> decreasing timestamps for the events. Based on the allowed lateness
>> specification, after reading data from Snapshot3, every event from
>> Snapshot1/Snapshot2 will become late data, and it will be thrown away. The
>> expected behaviour would be to only call something 'late data', if it was
>> late at the time of the arrival.
>> To archive this I see the following possibilities:
>>
>>    1. Define the allowed lateness big enough to encompass Snapshot1-3 -
>>    The issue here is that we have to keep the whole data in memory in these
>>    cases
>>    2. Do planning on every snapshot separately - Only possible if the
>>    Downstream job is able to commit the data files in separate snapshots. 
>> Hard
>>    to achieve in case of a Downstream job failure
>>    3. Order the files based on the statistics - Here we need to keep
>>    every split in memory in the planning phase
>>    4. ??? - Do we have any other option to define the order of the files
>>    in an Iceberg plan?
>>
>> For a big enough dataset 1st is not an option.
>> 2nd is also not an option in our case as we have to be prepared for
>> downstream job failure
>>
>> We were able to implement the 3rd option. We created a Scan where we
>> requested statistics for the data files, and used these statistics to
>> execute the read tasks in order. Since the timestamp ranges in the files
>> were correct, we were able to use the original allowed lateness settings to
>> read the data.
>>
>> The issue with the 3rd option is that we are accepting higher memory
>> usage during planning.
>>
>> It turns out that for a big enough table (a few hundred columns) the
>> biggest part of this footprint is the statistics for the
>> columns (columnSizes, valueCounts, nullValueCounts, nanValueCounts,
>> lowerBounds, upperBounds). They were ~100k for each GenericDataFile, where
>> a GenericDataFile without statistics is <1k. As a test we used reflection
>> to null out the not needed statistics (columnSizes, valueCounts,
>> nullValueCounts, nanValueCounts, upperBounds) from the Tasks, and the JM
>> memory usage is decreased to 10 percent of the original when we requested
>> the full statistics for the files. If we could define the specific column
>> which is needed, the memory usage would decrease back to the same level as
>> was before we requested any statistics.
>>
>> I think this would be useful for every Iceberg user where the statistics
>> is required for the Tasks:
>>
>>    - Decreased memory usage
>>    - Decreased serialization cost
>>
>> To achieve this we would need:
>>
>>    - Scan.java
>>
>>
>> *public BatchScan includeColumnStats(Collection<String> columns) {
>>    return new BatchScanAdapter(scan.includeColumnStats(columns)); }*
>>    - ContentFile.java
>>    *F copyWithSpecificStats(Collection<Integer> statsToKeep);*
>>
>> I hope this helped to explain my point better.
>> If you have better ideas, I would be happy to examine those.
>>
>> Thanks,
>> Peter
>>
>> Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. máj.
>> 15., H, 18:52):
>>
>>> Thanks Ryan, Russel for the quick response!
>>>
>>> In our Flink job we have TumblingEventTimeWindow to filter out old data.
>>> There was a temporary issue with accessing the Catalog, and our Flink
>>> job was not able to read the data from the Iceberg table for a while.
>>>
>>> When the Flink job was able to access the Catalog again then it fetched
>>> all the data from the table which arrived during the downtime. Since the
>>> planning does not guarantee the order of the Tasks we ended up out of order
>>> records which is not desirable in our case.
>>>
>>> We were able to fetch all of the splits (Flink does this currently) and
>>> sort them based on the stats. Flink SplitAssigner interface allowed us to
>>> serve the splits in the given order and this way we did not have late
>>> events any longer (we needed to do extra work to provide the required
>>> watermarks, but that is not that relevant here).
>>>
>>> If I understand correctly, the ManifestGroups are good for filtering the
>>> plan results. Our requirement is not really filtering, but ordering the
>>> Tasks. Is there a way to do that?
>>>
>>> Thanks,
>>> Peter
>>>
>>> Ryan Blue <b...@tabular.io> ezt írta (időpont: 2023. máj. 15., H,
>>> 18:07):
>>>
>>>> Yes, I agree with Russell. You'd want to push the filter into planning
>>>> rather than returning stats. That's why we strip out stats when the file
>>>> metadata is copied. It also would be expensive to copy some, but not all of
>>>> the file stats. It's better not to store the stats you don't need.
>>>>
>>>> What about using the ManifestGroup interface to get finer-grained
>>>> control of the planning?
>>>>
>>>> Ryan
>>>>
>>>> On Mon, May 15, 2023 at 8:05 AM Russell Spitzer <
>>>> russell.spit...@gmail.com> wrote:
>>>>
>>>>> I think currently the recommendation would be to filter the iterator
>>>>> rather than pulling the whole object with stat's into memory. Is there a
>>>>> requirement that all of the DataFiles be pulled into memory before
>>>>> filtering?
>>>>>
>>>>> On Mon, May 15, 2023 at 9:49 AM Péter Váry <
>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> We have a Flink job where we would like to use the Iceberg File
>>>>>> statistics (lowerBounds, upperBounds) during the planning phase.
>>>>>>
>>>>>> Currently it is possible to parameterize the Scan to include the
>>>>>> statistics using the includeColumnStats [1]. This is an on/off switch, 
>>>>>> but
>>>>>> currently there is no way to configure this on a finer granularity.
>>>>>>
>>>>>> Sadly our table has plenty of columns and requesting statistics for
>>>>>> every column will result in GenericDataFiles objects where the
>>>>>> retained heap is ~100k each. We have a few thousand data files and
>>>>>> requesting statistics for them would add serious extra memory load to our
>>>>>> job.
>>>>>>
>>>>>> I was considering adding a new method to the Scan class like this:
>>>>>> ---------
>>>>>> ThisT includeColumnStats(Collection<String> columns);
>>>>>> ---------
>>>>>>
>>>>>> Would the community consider this as a valuable addition to the Scan
>>>>>> API?
>>>>>>
>>>>>> Thanks,
>>>>>> Peter
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/iceberg/blob/f536c840350bd5628d7c514d2a4719404c9b8ed1/api/src/main/java/org/apache/iceberg/Scan.java#L71-L78
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Tabular
>>>>
>>>

Reply via email to