Thanks Ryan, Russell, Let me explain the situation a bit further.
We have time series data written to an Iceberg table, then there is a Flink job which uses this Iceberg table as a source to read the incoming data continuously. *Downstream job -> Iceberg table -> Flink job * The Flink job (using the Iceberg Flink Source) uses windowing to aggregate data based on a timestamp field and separate out late data for specific handling. When the stream is running continuously then the Flink job has enough resources to handle the incoming snapshots in order and run planning for every new snapshot - the only source of the late data is the old data arrived with the new snapshot. *Snapshot1 -> Plan1 -> Snapshot2 -> Plan2 -> Snapshot3 -> Plan3 -> Snapshot4 -> Plan4* When something blocks the stream (either the downstream job is down, or the Flink job is not able to access the Iceberg table temporarily) then multiple snapshots worth of data arrive at the Flink job and the planning is happening for multiple snapshots. *Snapshot1 -> Snapshot2 -> Snapshot3 -> Plan1-3 -> Snapshot4 -> Plan4* Snapshot1/Snapshot2/Snapshot3 might arrive in a single Iceberg snapshot in case of a commit failure. Here I refer to them as a different Snapshot only to describe the data inside the files. If - for whatever reason - during the execution the file order becomes like Snapshot3 -> Snapshot1 then the event time extractor will extract decreasing timestamps for the events. Based on the allowed lateness specification, after reading data from Snapshot3, every event from Snapshot1/Snapshot2 will become late data, and it will be thrown away. The expected behaviour would be to only call something 'late data', if it was late at the time of the arrival. To archive this I see the following possibilities: 1. Define the allowed lateness big enough to encompass Snapshot1-3 - The issue here is that we have to keep the whole data in memory in these cases 2. Do planning on every snapshot separately - Only possible if the Downstream job is able to commit the data files in separate snapshots. Hard to achieve in case of a Downstream job failure 3. Order the files based on the statistics - Here we need to keep every split in memory in the planning phase 4. ??? - Do we have any other option to define the order of the files in an Iceberg plan? For a big enough dataset 1st is not an option. 2nd is also not an option in our case as we have to be prepared for downstream job failure We were able to implement the 3rd option. We created a Scan where we requested statistics for the data files, and used these statistics to execute the read tasks in order. Since the timestamp ranges in the files were correct, we were able to use the original allowed lateness settings to read the data. The issue with the 3rd option is that we are accepting higher memory usage during planning. It turns out that for a big enough table (a few hundred columns) the biggest part of this footprint is the statistics for the columns (columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds). They were ~100k for each GenericDataFile, where a GenericDataFile without statistics is <1k. As a test we used reflection to null out the not needed statistics (columnSizes, valueCounts, nullValueCounts, nanValueCounts, upperBounds) from the Tasks, and the JM memory usage is decreased to 10 percent of the original when we requested the full statistics for the files. If we could define the specific column which is needed, the memory usage would decrease back to the same level as was before we requested any statistics. I think this would be useful for every Iceberg user where the statistics is required for the Tasks: - Decreased memory usage - Decreased serialization cost To achieve this we would need: - Scan.java *public BatchScan includeColumnStats(Collection<String> columns) { return new BatchScanAdapter(scan.includeColumnStats(columns)); }* - ContentFile.java *F copyWithSpecificStats(Collection<Integer> statsToKeep);* I hope this helped to explain my point better. If you have better ideas, I would be happy to examine those. Thanks, Peter Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. máj. 15., H, 18:52): > Thanks Ryan, Russel for the quick response! > > In our Flink job we have TumblingEventTimeWindow to filter out old data. > There was a temporary issue with accessing the Catalog, and our Flink job > was not able to read the data from the Iceberg table for a while. > > When the Flink job was able to access the Catalog again then it fetched > all the data from the table which arrived during the downtime. Since the > planning does not guarantee the order of the Tasks we ended up out of order > records which is not desirable in our case. > > We were able to fetch all of the splits (Flink does this currently) and > sort them based on the stats. Flink SplitAssigner interface allowed us to > serve the splits in the given order and this way we did not have late > events any longer (we needed to do extra work to provide the required > watermarks, but that is not that relevant here). > > If I understand correctly, the ManifestGroups are good for filtering the > plan results. Our requirement is not really filtering, but ordering the > Tasks. Is there a way to do that? > > Thanks, > Peter > > Ryan Blue <b...@tabular.io> ezt írta (időpont: 2023. máj. 15., H, 18:07): > >> Yes, I agree with Russell. You'd want to push the filter into planning >> rather than returning stats. That's why we strip out stats when the file >> metadata is copied. It also would be expensive to copy some, but not all of >> the file stats. It's better not to store the stats you don't need. >> >> What about using the ManifestGroup interface to get finer-grained control >> of the planning? >> >> Ryan >> >> On Mon, May 15, 2023 at 8:05 AM Russell Spitzer < >> russell.spit...@gmail.com> wrote: >> >>> I think currently the recommendation would be to filter the iterator >>> rather than pulling the whole object with stat's into memory. Is there a >>> requirement that all of the DataFiles be pulled into memory before >>> filtering? >>> >>> On Mon, May 15, 2023 at 9:49 AM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Hi Team, >>>> >>>> We have a Flink job where we would like to use the Iceberg File >>>> statistics (lowerBounds, upperBounds) during the planning phase. >>>> >>>> Currently it is possible to parameterize the Scan to include the >>>> statistics using the includeColumnStats [1]. This is an on/off switch, but >>>> currently there is no way to configure this on a finer granularity. >>>> >>>> Sadly our table has plenty of columns and requesting statistics for >>>> every column will result in GenericDataFiles objects where the >>>> retained heap is ~100k each. We have a few thousand data files and >>>> requesting statistics for them would add serious extra memory load to our >>>> job. >>>> >>>> I was considering adding a new method to the Scan class like this: >>>> --------- >>>> ThisT includeColumnStats(Collection<String> columns); >>>> --------- >>>> >>>> Would the community consider this as a valuable addition to the Scan >>>> API? >>>> >>>> Thanks, >>>> Peter >>>> >>>> [1] >>>> https://github.com/apache/iceberg/blob/f536c840350bd5628d7c514d2a4719404c9b8ed1/api/src/main/java/org/apache/iceberg/Scan.java#L71-L78 >>>> >>> >> >> -- >> Ryan Blue >> Tabular >> >