Hi Team,

There is ongoing work to bring Flink Table Maintenance to Iceberg [1]. We
already merged the main infrastructure and are currently working on
implementing the data file rewrite [2]. During the implementation we found
that part of the compaction planning implemented for Spark compaction,
could and should, be reused in Flink as well. Created a PR [3] to bring
those changes to the core Iceberg.

The main changes in the API:

   - We need to separate the companction planning from the rewrite execution
      - The planning would collect the files to be compacted and organize
      them to compaction tasks/groups. This could be reused (in the same way as
      the query planning)
      - The rewrite would actually execute the rewrite. This needs to
      contain engine specific code, so we need to have separate implementation
      for in for the separate engines
   - We need to decide on the new compaction planning API

The planning currently generates the data for multiple levels:

   1. Plan level
      - Statistics about the plan:
         - Total group count
         - Group count in a partition
      - Target file size
      - Output specification id - only relevant in case of the data rewrite
      plan
   2. Group level
      - General group info
         - Global index
         - Partition index
         - Partition value
      - List of tasks to read the data
      - Split size - reader input split size when rewriting (Spark
      specific)
      - Number of expected output files - used to calculate shuffling
      partition numbers (Spark specific)

I see the following decision points:

   - Data organization:
      1. Plan is the 'result' - everything below that is only organized
      based on the multiplicity of the data. So if some value applies to every
      group, then that value belongs to the 'global' plan variables. If a value
      is different for every group, then that value belongs to the
group (current
      code)
      2. The group should contain every information which is required for a
      single job. So the job (executor) only receives a single group and every
      other bit of information is global. The drawback is that some information
      is duplicated, but cleaner on the executor side.
   - Parameter handling:
      1. Use string maps, like we do with the FileRewriter.options - this
      allows for more generic API which will be more stable
      2. Use typed, named parameters - when the API is changing the users
      might have breaking code, but could easily spot the changes
   - Engine specific parameter handling:
      1. We generate a common set of parameters
      2. Engines get the whole compaction configuration, and can have their
      own parameter generators

Currently I am leaning towards:

   - Data organization - 2 - group should contain every information
   - Parameter handling - 2 - specific types and named parameters
   - Engine specific parameters - 1 - create a common set of parameters

Your thoughts?
Thanks,
Peter

[1] - https://github.com/apache/iceberg/issues/10264
[2] - https://github.com/apache/iceberg/pull/11497
[3] - https://github.com/apache/iceberg/pull/11513

Reply via email to