The feature could be useful for Spark as well. See: https://github.com/apache/iceberg/pull/7636#pullrequestreview-1434981224
Maybe we should add this as a topic for the next Iceberg Community Sync. Also when trying out possible solutions, I have found that some of the statistics are modifiable. I think that is a mistake, so I created a PR: https://github.com/apache/iceberg/pull/7643. Could someone please help me with the review there? Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 19., P, 19:59): > The proposal here is essentially column stats projection pushdown. For > some Flink jobs with watermark alignment, Flink source is only interested > in the column stats (min-max) for one timestamp column. Hence the column > stats projection can really help reduce memory footprint for wide tables > (with hundreds or thousands of columns). > > Russel and Ryan suggested a work-around by applying transformation on the > CloseableIterable of FileScanTask and stripping away unneeded column stats. > This has a couple of downsides. (1) large temporary objects with all column > stats are still created and affect JVM GC overhead. Peter gave some data > points like 100 KB objects per file. (2) the transformation implementation > probably will look a little hacky. > > The main question for adding this feature is probably if column stats > projection pushdown is more widely useful. Cost base optimizer (e.g. in > Trino) probably uses column level stats (and partition stats). Query > engines know which column stats are needed and push down the stats > projection to Iceberg core during scan planning. > > > On Tue, May 16, 2023 at 2:54 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Thanks Ryan, Russell, >> >> Let me explain the situation a bit further. >> >> We have time series data written to an Iceberg table, then there is a >> Flink job which uses this Iceberg table as a source to read the >> incoming data continuously. >> >> *Downstream job -> Iceberg table -> Flink job * >> >> The Flink job (using the Iceberg Flink Source) uses windowing to >> aggregate data based on a timestamp field and separate out late data for >> specific handling. >> When the stream is running continuously then the Flink job has enough >> resources to handle the incoming snapshots in order and run planning for >> every new snapshot - the only source of the late data is the old data >> arrived with the new snapshot. >> >> *Snapshot1 -> Plan1 -> Snapshot2 -> Plan2 -> Snapshot3 -> Plan3 -> >> Snapshot4 -> Plan4* >> >> When something blocks the stream (either the downstream job is down, or >> the Flink job is not able to access the Iceberg table temporarily) then >> multiple snapshots worth of data arrive at the Flink job and the planning >> is happening for multiple snapshots. >> >> *Snapshot1 -> Snapshot2 -> Snapshot3 -> Plan1-3 -> Snapshot4 -> >> Plan4* >> >> Snapshot1/Snapshot2/Snapshot3 might arrive in a single Iceberg snapshot >> in case of a commit failure. Here I refer to them as a different Snapshot >> only to describe the data inside the files. >> >> If - for whatever reason - during the execution the file order becomes >> like Snapshot3 -> Snapshot1 then the event time extractor will extract >> decreasing timestamps for the events. Based on the allowed lateness >> specification, after reading data from Snapshot3, every event from >> Snapshot1/Snapshot2 will become late data, and it will be thrown away. The >> expected behaviour would be to only call something 'late data', if it was >> late at the time of the arrival. >> To archive this I see the following possibilities: >> >> 1. Define the allowed lateness big enough to encompass Snapshot1-3 - >> The issue here is that we have to keep the whole data in memory in these >> cases >> 2. Do planning on every snapshot separately - Only possible if the >> Downstream job is able to commit the data files in separate snapshots. >> Hard >> to achieve in case of a Downstream job failure >> 3. Order the files based on the statistics - Here we need to keep >> every split in memory in the planning phase >> 4. ??? - Do we have any other option to define the order of the files >> in an Iceberg plan? >> >> For a big enough dataset 1st is not an option. >> 2nd is also not an option in our case as we have to be prepared for >> downstream job failure >> >> We were able to implement the 3rd option. We created a Scan where we >> requested statistics for the data files, and used these statistics to >> execute the read tasks in order. Since the timestamp ranges in the files >> were correct, we were able to use the original allowed lateness settings to >> read the data. >> >> The issue with the 3rd option is that we are accepting higher memory >> usage during planning. >> >> It turns out that for a big enough table (a few hundred columns) the >> biggest part of this footprint is the statistics for the >> columns (columnSizes, valueCounts, nullValueCounts, nanValueCounts, >> lowerBounds, upperBounds). They were ~100k for each GenericDataFile, where >> a GenericDataFile without statistics is <1k. As a test we used reflection >> to null out the not needed statistics (columnSizes, valueCounts, >> nullValueCounts, nanValueCounts, upperBounds) from the Tasks, and the JM >> memory usage is decreased to 10 percent of the original when we requested >> the full statistics for the files. If we could define the specific column >> which is needed, the memory usage would decrease back to the same level as >> was before we requested any statistics. >> >> I think this would be useful for every Iceberg user where the statistics >> is required for the Tasks: >> >> - Decreased memory usage >> - Decreased serialization cost >> >> To achieve this we would need: >> >> - Scan.java >> >> >> *public BatchScan includeColumnStats(Collection<String> columns) { >> return new BatchScanAdapter(scan.includeColumnStats(columns)); }* >> - ContentFile.java >> *F copyWithSpecificStats(Collection<Integer> statsToKeep);* >> >> I hope this helped to explain my point better. >> If you have better ideas, I would be happy to examine those. >> >> Thanks, >> Peter >> >> Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. máj. >> 15., H, 18:52): >> >>> Thanks Ryan, Russel for the quick response! >>> >>> In our Flink job we have TumblingEventTimeWindow to filter out old data. >>> There was a temporary issue with accessing the Catalog, and our Flink >>> job was not able to read the data from the Iceberg table for a while. >>> >>> When the Flink job was able to access the Catalog again then it fetched >>> all the data from the table which arrived during the downtime. Since the >>> planning does not guarantee the order of the Tasks we ended up out of order >>> records which is not desirable in our case. >>> >>> We were able to fetch all of the splits (Flink does this currently) and >>> sort them based on the stats. Flink SplitAssigner interface allowed us to >>> serve the splits in the given order and this way we did not have late >>> events any longer (we needed to do extra work to provide the required >>> watermarks, but that is not that relevant here). >>> >>> If I understand correctly, the ManifestGroups are good for filtering the >>> plan results. Our requirement is not really filtering, but ordering the >>> Tasks. Is there a way to do that? >>> >>> Thanks, >>> Peter >>> >>> Ryan Blue <b...@tabular.io> ezt írta (időpont: 2023. máj. 15., H, >>> 18:07): >>> >>>> Yes, I agree with Russell. You'd want to push the filter into planning >>>> rather than returning stats. That's why we strip out stats when the file >>>> metadata is copied. It also would be expensive to copy some, but not all of >>>> the file stats. It's better not to store the stats you don't need. >>>> >>>> What about using the ManifestGroup interface to get finer-grained >>>> control of the planning? >>>> >>>> Ryan >>>> >>>> On Mon, May 15, 2023 at 8:05 AM Russell Spitzer < >>>> russell.spit...@gmail.com> wrote: >>>> >>>>> I think currently the recommendation would be to filter the iterator >>>>> rather than pulling the whole object with stat's into memory. Is there a >>>>> requirement that all of the DataFiles be pulled into memory before >>>>> filtering? >>>>> >>>>> On Mon, May 15, 2023 at 9:49 AM Péter Váry < >>>>> peter.vary.apa...@gmail.com> wrote: >>>>> >>>>>> Hi Team, >>>>>> >>>>>> We have a Flink job where we would like to use the Iceberg File >>>>>> statistics (lowerBounds, upperBounds) during the planning phase. >>>>>> >>>>>> Currently it is possible to parameterize the Scan to include the >>>>>> statistics using the includeColumnStats [1]. This is an on/off switch, >>>>>> but >>>>>> currently there is no way to configure this on a finer granularity. >>>>>> >>>>>> Sadly our table has plenty of columns and requesting statistics for >>>>>> every column will result in GenericDataFiles objects where the >>>>>> retained heap is ~100k each. We have a few thousand data files and >>>>>> requesting statistics for them would add serious extra memory load to our >>>>>> job. >>>>>> >>>>>> I was considering adding a new method to the Scan class like this: >>>>>> --------- >>>>>> ThisT includeColumnStats(Collection<String> columns); >>>>>> --------- >>>>>> >>>>>> Would the community consider this as a valuable addition to the Scan >>>>>> API? >>>>>> >>>>>> Thanks, >>>>>> Peter >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/iceberg/blob/f536c840350bd5628d7c514d2a4719404c9b8ed1/api/src/main/java/org/apache/iceberg/Scan.java#L71-L78 >>>>>> >>>>> >>>> >>>> -- >>>> Ryan Blue >>>> Tabular >>>> >>>