To bump this back up, I think this is a pretty important change to the core library so it's necessary that we get more folks involved in this discussion. I
I agree that the Rewrite Data Files needs to be broken up and realigned if we want to be able to reuuse the code in flink. I think I prefer that we essentially have Three classes 1) RewriteGroup: A Container that holds all the files that are meant to be compacted along with information about them 2) Rewriter: An engine specific class which knows how to take a RewriteGroup and generate new files, I think this should be independent of the planner below 3) Planner: A Non-Engine specific class which knows how to generate RewriteGroups given a set of parameters On Tue, Jan 14, 2025 at 7:08 AM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Hi Team, > > There is ongoing work to bring Flink Table Maintenance to Iceberg [1]. We > already merged the main infrastructure and are currently working on > implementing the data file rewrite [2]. During the implementation we found > that part of the compaction planning implemented for Spark compaction, > could and should, be reused in Flink as well. Created a PR [3] to bring > those changes to the core Iceberg. > > The main changes in the API: > > - We need to separate the companction planning from the rewrite > execution > - The planning would collect the files to be compacted and organize > them to compaction tasks/groups. This could be reused (in the same way > as > the query planning) > - The rewrite would actually execute the rewrite. This needs to > contain engine specific code, so we need to have separate implementation > for in for the separate engines > - We need to decide on the new compaction planning API > > The planning currently generates the data for multiple levels: > > 1. Plan level > - Statistics about the plan: > - Total group count > - Group count in a partition > - Target file size > - Output specification id - only relevant in case of the data > rewrite plan > 2. Group level > - General group info > - Global index > - Partition index > - Partition value > - List of tasks to read the data > - Split size - reader input split size when rewriting (Spark > specific) > - Number of expected output files - used to calculate shuffling > partition numbers (Spark specific) > > I see the following decision points: > > - Data organization: > 1. Plan is the 'result' - everything below that is only organized > based on the multiplicity of the data. So if some value applies to every > group, then that value belongs to the 'global' plan variables. If a > value > is different for every group, then that value belongs to the group > (current > code) > 2. The group should contain every information which is required for > a single job. So the job (executor) only receives a single group and > every > other bit of information is global. The drawback is that some > information > is duplicated, but cleaner on the executor side. > - Parameter handling: > 1. Use string maps, like we do with the FileRewriter.options - this > allows for more generic API which will be more stable > 2. Use typed, named parameters - when the API is changing the users > might have breaking code, but could easily spot the changes > - Engine specific parameter handling: > 1. We generate a common set of parameters > 2. Engines get the whole compaction configuration, and can have > their own parameter generators > > Currently I am leaning towards: > > - Data organization - 2 - group should contain every information > - Parameter handling - 2 - specific types and named parameters > - Engine specific parameters - 1 - create a common set of parameters > > Your thoughts? > Thanks, > Peter > > [1] - https://github.com/apache/iceberg/issues/10264 > [2] - https://github.com/apache/iceberg/pull/11497 > [3] - https://github.com/apache/iceberg/pull/11513 >