Yeah does seem like we may have more use cases for this. The more Peter and
I discuss this the more I think it makes sense to add in.

On Mon, May 22, 2023 at 8:24 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> The feature could be useful for Spark as well. See:
> https://github.com/apache/iceberg/pull/7636#pullrequestreview-1434981224
>
> Maybe we should add this as a topic for the next Iceberg Community Sync.
>
> Also when trying out possible solutions, I have found that some of the
> statistics are modifiable. I think that is a mistake, so I created a PR:
> https://github.com/apache/iceberg/pull/7643. Could someone please help
> me with the review there?
>
>
>
> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 19., P,
> 19:59):
>
>> The proposal here is essentially column stats projection pushdown. For
>> some Flink jobs with watermark alignment, Flink source is only interested
>> in the column stats (min-max) for one timestamp column. Hence the column
>> stats projection can really help reduce memory footprint for wide tables
>> (with hundreds or thousands of columns).
>>
>> Russel and Ryan suggested a work-around by applying transformation on the
>> CloseableIterable of FileScanTask and stripping away unneeded column stats.
>> This has a couple of downsides. (1) large temporary objects with all column
>> stats are still created and affect JVM GC overhead. Peter gave some data
>> points like 100 KB objects per file. (2) the transformation implementation
>> probably will look a little hacky.
>>
>> The main question for adding this feature is probably if column stats
>> projection pushdown is more widely useful. Cost base optimizer (e.g. in
>> Trino) probably uses column level stats (and partition stats). Query
>> engines know which column stats are needed and push down the stats
>> projection to Iceberg core during scan planning.
>>
>>
>> On Tue, May 16, 2023 at 2:54 AM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> Thanks Ryan, Russell,
>>>
>>> Let me explain the situation a bit further.
>>>
>>> We have time series data written to an Iceberg table, then there is a
>>> Flink job which uses this Iceberg table as a source to read the
>>> incoming data continuously.
>>>
>>>      *Downstream job -> Iceberg table -> Flink job *
>>>
>>> The Flink job (using the Iceberg Flink Source) uses windowing to
>>> aggregate data based on a timestamp field and separate out late data for
>>> specific handling.
>>> When the stream is running continuously then the Flink job has enough
>>> resources to handle the incoming snapshots in order and run planning for
>>> every new snapshot - the only source of the late data is the old data
>>> arrived with the new snapshot.
>>>
>>>      *Snapshot1 -> Plan1 -> Snapshot2 -> Plan2 -> Snapshot3 -> Plan3 ->
>>> Snapshot4 -> Plan4*
>>>
>>> When something blocks the stream (either the downstream job is down, or
>>> the Flink job is not able to access the Iceberg table temporarily) then
>>> multiple snapshots worth of data arrive at the Flink job and the planning
>>> is happening for multiple snapshots.
>>>
>>>      *Snapshot1 -> Snapshot2 -> Snapshot3 -> Plan1-3 -> Snapshot4 ->
>>> Plan4*
>>>
>>> Snapshot1/Snapshot2/Snapshot3 might arrive in a single Iceberg snapshot
>>> in case of a commit failure. Here I refer to them as a different Snapshot
>>> only to describe the data inside the files.
>>>
>>> If - for whatever reason - during the execution the file order becomes
>>> like Snapshot3 -> Snapshot1 then the event time extractor will extract
>>> decreasing timestamps for the events. Based on the allowed lateness
>>> specification, after reading data from Snapshot3, every event from
>>> Snapshot1/Snapshot2 will become late data, and it will be thrown away. The
>>> expected behaviour would be to only call something 'late data', if it was
>>> late at the time of the arrival.
>>> To archive this I see the following possibilities:
>>>
>>>    1. Define the allowed lateness big enough to encompass Snapshot1-3 -
>>>    The issue here is that we have to keep the whole data in memory in these
>>>    cases
>>>    2. Do planning on every snapshot separately - Only possible if the
>>>    Downstream job is able to commit the data files in separate snapshots. 
>>> Hard
>>>    to achieve in case of a Downstream job failure
>>>    3. Order the files based on the statistics - Here we need to keep
>>>    every split in memory in the planning phase
>>>    4. ??? - Do we have any other option to define the order of the
>>>    files in an Iceberg plan?
>>>
>>> For a big enough dataset 1st is not an option.
>>> 2nd is also not an option in our case as we have to be prepared for
>>> downstream job failure
>>>
>>> We were able to implement the 3rd option. We created a Scan where we
>>> requested statistics for the data files, and used these statistics to
>>> execute the read tasks in order. Since the timestamp ranges in the files
>>> were correct, we were able to use the original allowed lateness settings to
>>> read the data.
>>>
>>> The issue with the 3rd option is that we are accepting higher memory
>>> usage during planning.
>>>
>>> It turns out that for a big enough table (a few hundred columns) the
>>> biggest part of this footprint is the statistics for the
>>> columns (columnSizes, valueCounts, nullValueCounts, nanValueCounts,
>>> lowerBounds, upperBounds). They were ~100k for each GenericDataFile, where
>>> a GenericDataFile without statistics is <1k. As a test we used reflection
>>> to null out the not needed statistics (columnSizes, valueCounts,
>>> nullValueCounts, nanValueCounts, upperBounds) from the Tasks, and the JM
>>> memory usage is decreased to 10 percent of the original when we requested
>>> the full statistics for the files. If we could define the specific column
>>> which is needed, the memory usage would decrease back to the same level as
>>> was before we requested any statistics.
>>>
>>> I think this would be useful for every Iceberg user where the statistics
>>> is required for the Tasks:
>>>
>>>    - Decreased memory usage
>>>    - Decreased serialization cost
>>>
>>> To achieve this we would need:
>>>
>>>    - Scan.java
>>>
>>>
>>> *public BatchScan includeColumnStats(Collection<String> columns) {
>>>    return new BatchScanAdapter(scan.includeColumnStats(columns)); }*
>>>    - ContentFile.java
>>>    *F copyWithSpecificStats(Collection<Integer> statsToKeep);*
>>>
>>> I hope this helped to explain my point better.
>>> If you have better ideas, I would be happy to examine those.
>>>
>>> Thanks,
>>> Peter
>>>
>>> Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. máj.
>>> 15., H, 18:52):
>>>
>>>> Thanks Ryan, Russel for the quick response!
>>>>
>>>> In our Flink job we have TumblingEventTimeWindow to filter out old data.
>>>> There was a temporary issue with accessing the Catalog, and our Flink
>>>> job was not able to read the data from the Iceberg table for a while.
>>>>
>>>> When the Flink job was able to access the Catalog again then it fetched
>>>> all the data from the table which arrived during the downtime. Since the
>>>> planning does not guarantee the order of the Tasks we ended up out of order
>>>> records which is not desirable in our case.
>>>>
>>>> We were able to fetch all of the splits (Flink does this currently) and
>>>> sort them based on the stats. Flink SplitAssigner interface allowed us to
>>>> serve the splits in the given order and this way we did not have late
>>>> events any longer (we needed to do extra work to provide the required
>>>> watermarks, but that is not that relevant here).
>>>>
>>>> If I understand correctly, the ManifestGroups are good for filtering
>>>> the plan results. Our requirement is not really filtering, but ordering the
>>>> Tasks. Is there a way to do that?
>>>>
>>>> Thanks,
>>>> Peter
>>>>
>>>> Ryan Blue <b...@tabular.io> ezt írta (időpont: 2023. máj. 15., H,
>>>> 18:07):
>>>>
>>>>> Yes, I agree with Russell. You'd want to push the filter into planning
>>>>> rather than returning stats. That's why we strip out stats when the file
>>>>> metadata is copied. It also would be expensive to copy some, but not all 
>>>>> of
>>>>> the file stats. It's better not to store the stats you don't need.
>>>>>
>>>>> What about using the ManifestGroup interface to get finer-grained
>>>>> control of the planning?
>>>>>
>>>>> Ryan
>>>>>
>>>>> On Mon, May 15, 2023 at 8:05 AM Russell Spitzer <
>>>>> russell.spit...@gmail.com> wrote:
>>>>>
>>>>>> I think currently the recommendation would be to filter the iterator
>>>>>> rather than pulling the whole object with stat's into memory. Is there a
>>>>>> requirement that all of the DataFiles be pulled into memory before
>>>>>> filtering?
>>>>>>
>>>>>> On Mon, May 15, 2023 at 9:49 AM Péter Váry <
>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> We have a Flink job where we would like to use the Iceberg File
>>>>>>> statistics (lowerBounds, upperBounds) during the planning phase.
>>>>>>>
>>>>>>> Currently it is possible to parameterize the Scan to include the
>>>>>>> statistics using the includeColumnStats [1]. This is an on/off switch, 
>>>>>>> but
>>>>>>> currently there is no way to configure this on a finer granularity.
>>>>>>>
>>>>>>> Sadly our table has plenty of columns and requesting statistics for
>>>>>>> every column will result in GenericDataFiles objects where the
>>>>>>> retained heap is ~100k each. We have a few thousand data files and
>>>>>>> requesting statistics for them would add serious extra memory load to 
>>>>>>> our
>>>>>>> job.
>>>>>>>
>>>>>>> I was considering adding a new method to the Scan class like this:
>>>>>>> ---------
>>>>>>> ThisT includeColumnStats(Collection<String> columns);
>>>>>>> ---------
>>>>>>>
>>>>>>> Would the community consider this as a valuable addition to the Scan
>>>>>>> API?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Peter
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/iceberg/blob/f536c840350bd5628d7c514d2a4719404c9b8ed1/api/src/main/java/org/apache/iceberg/Scan.java#L71-L78
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Tabular
>>>>>
>>>>

Reply via email to