Yeah does seem like we may have more use cases for this. The more Peter and I discuss this the more I think it makes sense to add in.
On Mon, May 22, 2023 at 8:24 AM Péter Váry <peter.vary.apa...@gmail.com> wrote: > The feature could be useful for Spark as well. See: > https://github.com/apache/iceberg/pull/7636#pullrequestreview-1434981224 > > Maybe we should add this as a topic for the next Iceberg Community Sync. > > Also when trying out possible solutions, I have found that some of the > statistics are modifiable. I think that is a mistake, so I created a PR: > https://github.com/apache/iceberg/pull/7643. Could someone please help > me with the review there? > > > > Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 19., P, > 19:59): > >> The proposal here is essentially column stats projection pushdown. For >> some Flink jobs with watermark alignment, Flink source is only interested >> in the column stats (min-max) for one timestamp column. Hence the column >> stats projection can really help reduce memory footprint for wide tables >> (with hundreds or thousands of columns). >> >> Russel and Ryan suggested a work-around by applying transformation on the >> CloseableIterable of FileScanTask and stripping away unneeded column stats. >> This has a couple of downsides. (1) large temporary objects with all column >> stats are still created and affect JVM GC overhead. Peter gave some data >> points like 100 KB objects per file. (2) the transformation implementation >> probably will look a little hacky. >> >> The main question for adding this feature is probably if column stats >> projection pushdown is more widely useful. Cost base optimizer (e.g. in >> Trino) probably uses column level stats (and partition stats). Query >> engines know which column stats are needed and push down the stats >> projection to Iceberg core during scan planning. >> >> >> On Tue, May 16, 2023 at 2:54 AM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> Thanks Ryan, Russell, >>> >>> Let me explain the situation a bit further. >>> >>> We have time series data written to an Iceberg table, then there is a >>> Flink job which uses this Iceberg table as a source to read the >>> incoming data continuously. >>> >>> *Downstream job -> Iceberg table -> Flink job * >>> >>> The Flink job (using the Iceberg Flink Source) uses windowing to >>> aggregate data based on a timestamp field and separate out late data for >>> specific handling. >>> When the stream is running continuously then the Flink job has enough >>> resources to handle the incoming snapshots in order and run planning for >>> every new snapshot - the only source of the late data is the old data >>> arrived with the new snapshot. >>> >>> *Snapshot1 -> Plan1 -> Snapshot2 -> Plan2 -> Snapshot3 -> Plan3 -> >>> Snapshot4 -> Plan4* >>> >>> When something blocks the stream (either the downstream job is down, or >>> the Flink job is not able to access the Iceberg table temporarily) then >>> multiple snapshots worth of data arrive at the Flink job and the planning >>> is happening for multiple snapshots. >>> >>> *Snapshot1 -> Snapshot2 -> Snapshot3 -> Plan1-3 -> Snapshot4 -> >>> Plan4* >>> >>> Snapshot1/Snapshot2/Snapshot3 might arrive in a single Iceberg snapshot >>> in case of a commit failure. Here I refer to them as a different Snapshot >>> only to describe the data inside the files. >>> >>> If - for whatever reason - during the execution the file order becomes >>> like Snapshot3 -> Snapshot1 then the event time extractor will extract >>> decreasing timestamps for the events. Based on the allowed lateness >>> specification, after reading data from Snapshot3, every event from >>> Snapshot1/Snapshot2 will become late data, and it will be thrown away. The >>> expected behaviour would be to only call something 'late data', if it was >>> late at the time of the arrival. >>> To archive this I see the following possibilities: >>> >>> 1. Define the allowed lateness big enough to encompass Snapshot1-3 - >>> The issue here is that we have to keep the whole data in memory in these >>> cases >>> 2. Do planning on every snapshot separately - Only possible if the >>> Downstream job is able to commit the data files in separate snapshots. >>> Hard >>> to achieve in case of a Downstream job failure >>> 3. Order the files based on the statistics - Here we need to keep >>> every split in memory in the planning phase >>> 4. ??? - Do we have any other option to define the order of the >>> files in an Iceberg plan? >>> >>> For a big enough dataset 1st is not an option. >>> 2nd is also not an option in our case as we have to be prepared for >>> downstream job failure >>> >>> We were able to implement the 3rd option. We created a Scan where we >>> requested statistics for the data files, and used these statistics to >>> execute the read tasks in order. Since the timestamp ranges in the files >>> were correct, we were able to use the original allowed lateness settings to >>> read the data. >>> >>> The issue with the 3rd option is that we are accepting higher memory >>> usage during planning. >>> >>> It turns out that for a big enough table (a few hundred columns) the >>> biggest part of this footprint is the statistics for the >>> columns (columnSizes, valueCounts, nullValueCounts, nanValueCounts, >>> lowerBounds, upperBounds). They were ~100k for each GenericDataFile, where >>> a GenericDataFile without statistics is <1k. As a test we used reflection >>> to null out the not needed statistics (columnSizes, valueCounts, >>> nullValueCounts, nanValueCounts, upperBounds) from the Tasks, and the JM >>> memory usage is decreased to 10 percent of the original when we requested >>> the full statistics for the files. If we could define the specific column >>> which is needed, the memory usage would decrease back to the same level as >>> was before we requested any statistics. >>> >>> I think this would be useful for every Iceberg user where the statistics >>> is required for the Tasks: >>> >>> - Decreased memory usage >>> - Decreased serialization cost >>> >>> To achieve this we would need: >>> >>> - Scan.java >>> >>> >>> *public BatchScan includeColumnStats(Collection<String> columns) { >>> return new BatchScanAdapter(scan.includeColumnStats(columns)); }* >>> - ContentFile.java >>> *F copyWithSpecificStats(Collection<Integer> statsToKeep);* >>> >>> I hope this helped to explain my point better. >>> If you have better ideas, I would be happy to examine those. >>> >>> Thanks, >>> Peter >>> >>> Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. máj. >>> 15., H, 18:52): >>> >>>> Thanks Ryan, Russel for the quick response! >>>> >>>> In our Flink job we have TumblingEventTimeWindow to filter out old data. >>>> There was a temporary issue with accessing the Catalog, and our Flink >>>> job was not able to read the data from the Iceberg table for a while. >>>> >>>> When the Flink job was able to access the Catalog again then it fetched >>>> all the data from the table which arrived during the downtime. Since the >>>> planning does not guarantee the order of the Tasks we ended up out of order >>>> records which is not desirable in our case. >>>> >>>> We were able to fetch all of the splits (Flink does this currently) and >>>> sort them based on the stats. Flink SplitAssigner interface allowed us to >>>> serve the splits in the given order and this way we did not have late >>>> events any longer (we needed to do extra work to provide the required >>>> watermarks, but that is not that relevant here). >>>> >>>> If I understand correctly, the ManifestGroups are good for filtering >>>> the plan results. Our requirement is not really filtering, but ordering the >>>> Tasks. Is there a way to do that? >>>> >>>> Thanks, >>>> Peter >>>> >>>> Ryan Blue <b...@tabular.io> ezt írta (időpont: 2023. máj. 15., H, >>>> 18:07): >>>> >>>>> Yes, I agree with Russell. You'd want to push the filter into planning >>>>> rather than returning stats. That's why we strip out stats when the file >>>>> metadata is copied. It also would be expensive to copy some, but not all >>>>> of >>>>> the file stats. It's better not to store the stats you don't need. >>>>> >>>>> What about using the ManifestGroup interface to get finer-grained >>>>> control of the planning? >>>>> >>>>> Ryan >>>>> >>>>> On Mon, May 15, 2023 at 8:05 AM Russell Spitzer < >>>>> russell.spit...@gmail.com> wrote: >>>>> >>>>>> I think currently the recommendation would be to filter the iterator >>>>>> rather than pulling the whole object with stat's into memory. Is there a >>>>>> requirement that all of the DataFiles be pulled into memory before >>>>>> filtering? >>>>>> >>>>>> On Mon, May 15, 2023 at 9:49 AM Péter Váry < >>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> We have a Flink job where we would like to use the Iceberg File >>>>>>> statistics (lowerBounds, upperBounds) during the planning phase. >>>>>>> >>>>>>> Currently it is possible to parameterize the Scan to include the >>>>>>> statistics using the includeColumnStats [1]. This is an on/off switch, >>>>>>> but >>>>>>> currently there is no way to configure this on a finer granularity. >>>>>>> >>>>>>> Sadly our table has plenty of columns and requesting statistics for >>>>>>> every column will result in GenericDataFiles objects where the >>>>>>> retained heap is ~100k each. We have a few thousand data files and >>>>>>> requesting statistics for them would add serious extra memory load to >>>>>>> our >>>>>>> job. >>>>>>> >>>>>>> I was considering adding a new method to the Scan class like this: >>>>>>> --------- >>>>>>> ThisT includeColumnStats(Collection<String> columns); >>>>>>> --------- >>>>>>> >>>>>>> Would the community consider this as a valuable addition to the Scan >>>>>>> API? >>>>>>> >>>>>>> Thanks, >>>>>>> Peter >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/iceberg/blob/f536c840350bd5628d7c514d2a4719404c9b8ed1/api/src/main/java/org/apache/iceberg/Scan.java#L71-L78 >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Tabular >>>>> >>>>