Thanks Ryan, Russel for the quick response! In our Flink job we have TumblingEventTimeWindow to filter out old data. There was a temporary issue with accessing the Catalog, and our Flink job was not able to read the data from the Iceberg table for a while.
When the Flink job was able to access the Catalog again then it fetched all the data from the table which arrived during the downtime. Since the planning does not guarantee the order of the Tasks we ended up out of order records which is not desirable in our case. We were able to fetch all of the splits (Flink does this currently) and sort them based on the stats. Flink SplitAssigner interface allowed us to serve the splits in the given order and this way we did not have late events any longer (we needed to do extra work to provide the required watermarks, but that is not that relevant here). If I understand correctly, the ManifestGroups are good for filtering the plan results. Our requirement is not really filtering, but ordering the Tasks. Is there a way to do that? Thanks, Peter Ryan Blue <b...@tabular.io> ezt írta (időpont: 2023. máj. 15., H, 18:07): > Yes, I agree with Russell. You'd want to push the filter into planning > rather than returning stats. That's why we strip out stats when the file > metadata is copied. It also would be expensive to copy some, but not all of > the file stats. It's better not to store the stats you don't need. > > What about using the ManifestGroup interface to get finer-grained control > of the planning? > > Ryan > > On Mon, May 15, 2023 at 8:05 AM Russell Spitzer <russell.spit...@gmail.com> > wrote: > >> I think currently the recommendation would be to filter the iterator >> rather than pulling the whole object with stat's into memory. Is there a >> requirement that all of the DataFiles be pulled into memory before >> filtering? >> >> On Mon, May 15, 2023 at 9:49 AM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> Hi Team, >>> >>> We have a Flink job where we would like to use the Iceberg File >>> statistics (lowerBounds, upperBounds) during the planning phase. >>> >>> Currently it is possible to parameterize the Scan to include the >>> statistics using the includeColumnStats [1]. This is an on/off switch, but >>> currently there is no way to configure this on a finer granularity. >>> >>> Sadly our table has plenty of columns and requesting statistics for >>> every column will result in GenericDataFiles objects where the >>> retained heap is ~100k each. We have a few thousand data files and >>> requesting statistics for them would add serious extra memory load to our >>> job. >>> >>> I was considering adding a new method to the Scan class like this: >>> --------- >>> ThisT includeColumnStats(Collection<String> columns); >>> --------- >>> >>> Would the community consider this as a valuable addition to the Scan API? >>> >>> Thanks, >>> Peter >>> >>> [1] >>> https://github.com/apache/iceberg/blob/f536c840350bd5628d7c514d2a4719404c9b8ed1/api/src/main/java/org/apache/iceberg/Scan.java#L71-L78 >>> >> > > -- > Ryan Blue > Tabular >