Hi, I'd like to discuss resolving equality deletes in the Flink write path, which will get us one step closer to removing equality deletes from the spec.
## tl;dr We're planning to add an equality delete to deletion vector (DV) conversion to Flink. Equality deletes may remain as an internal intermediary format. ## Background For deletes, Flink currently produces equality delete files. Equality deletes are used to support deletes in the write path, which is a requirement for many use cases like CDC [3]. They are cheap for the writer; it only notes down the to-be-deleted values of the identifier fields inside so-called delete files, and leaves it up for the readers to match the values to the corresponding rows. The heavy lifting has to be done by the readers, which potentially need to scan the entire table to resolve equality deletes. Therefore, equality deletes have been criticized. There are discussions around deprecating / removing them [1]. ## Resolving Equality Deletes Steven, Peter, and a few other contributors came up with a proposal to convert equality deletes into DV [2]. The original solution was quite complex, mainly due to the conflict handling between streaming writes, table maintenance, and equality delete resolution. The proposal is also blocked on index support in the Iceberg spec [5]. We may need to simplify further to make some progress. The old table specs are going to be around for some time, even after we have a new spec with index support. Users have been asking for a solution to this issue for quite some time [3]. The following is a modification of the original design document which adapts the ideas described under "use lock to avoid conflicts" [2]. ## Proposed Solution The idea is to add the equality delete to deletion vector (DV) conversion as a Flink table maintenance task. After recent changes, we can now run the writer and the maintenance in the same Flink job and use a Flink-maintained lock to avoid conflicts between the maintenance tasks. 1. Instead of writing directly to the target branch, the writer commits data files + equality deletes to a staging branch. 2. The new "EqualityDeleteResolver" maintenance task reads from the staging branch and converts the equality deletes to DVs using a Flink-maintained primary key index, then commits data files + DVs to the target branch. 3. The existing Flink maintenance framework's lock mechanism ensures mutual exclusion between the convert task and table compaction to avoid conflicts. After conversion, the target branch contains only data files and DVs, no equality deletes. ## Limitations - Readers will only see new data until the conversion is complete. This is partially mitigated by the fact that snapshots with equality deletes cannot be read properly with Flink today [4]. - The Flink-maintained index needs to be built initially which requires reading the entire table. We will use Flink's state backend which apart from heap-based storage, also supports spilling to disk via the RocksDB state backend. ## Wrapping up This solution may not be perfect because of the above limitations, but it provides a viable path to free users of the burden of equality deletes, which cannot be read efficiently by most engines today. Eventually, the Flink-maintained index can be replaced by an Iceberg index, which will allow for the index to be shared across engines. What does the community think? Thanks, Max [1] Deprecate equality deletes: https://lists.apache.org/thread/z0gvco6hn2bpgngvk4h6xqrnw8b32sw6 [2] Design doc: https://docs.google.com/document/d/1Jz4Fjt-6jRmwqbgHX_u0ohuyTB9ytDzfslS7lYraIjk/edit [3] Upserts use case: https://lists.apache.org/thread/rt7dmg7l78xpzc9w3lwn090yzqq4fyyw [4] Handling upserts downstream: https://lists.apache.org/thread/bx1ntfr45c0g9rh643yw7w8znv6wtrno [5] V4 Index: https://lists.apache.org/thread/xdkzllzt4p3tvcd3ft4t7jsvyvztr41j
