Hi,

I'd like to discuss resolving equality deletes in the Flink write
path, which will get us one step closer to removing equality deletes
from the spec.

## tl;dr

We're planning to add an equality delete to deletion vector (DV)
conversion to Flink. Equality deletes may remain as an internal
intermediary format.

## Background

For deletes, Flink currently produces equality delete files.

Equality deletes are used to support deletes in the write path, which
is a requirement for many use cases like CDC [3]. They are cheap for
the writer; it only notes down the to-be-deleted values of the
identifier fields inside so-called delete files, and leaves it up for
the readers to match the values to the corresponding rows. The heavy
lifting has to be done by the readers, which potentially need to scan
the entire table to resolve equality deletes.

Therefore, equality deletes have been criticized. There are
discussions around deprecating / removing them [1].

## Resolving Equality Deletes

Steven, Peter, and a few other contributors came up with a proposal to
convert equality deletes into DV [2]. The original solution was quite
complex, mainly due to the conflict handling between streaming writes,
table maintenance, and equality delete resolution. The proposal is
also blocked on index support in the Iceberg spec [5].

We may need to simplify further to make some progress. The old table
specs are going to be around for some time, even after we have a new
spec with index support. Users have been asking for a solution to this
issue for quite some time [3].

The following is a modification of the original design document which
adapts the ideas described under "use lock to avoid conflicts" [2].

## Proposed Solution

The idea is to add the equality delete to deletion vector (DV)
conversion as a Flink table maintenance task. After recent changes, we
can now run the writer and the maintenance in the same Flink job and
use a Flink-maintained lock to avoid conflicts between the maintenance
tasks.

1. Instead of writing directly to the target branch, the writer
commits data files + equality deletes to a staging branch.
2. The new "EqualityDeleteResolver" maintenance task reads from the
staging branch and converts the equality deletes to DVs using a
Flink-maintained primary key index, then commits data files + DVs to
the target branch.
3. The existing Flink maintenance framework's lock mechanism ensures
mutual exclusion between the convert task and table compaction to
avoid conflicts.

After conversion, the target branch contains only data files and DVs,
no equality deletes.

## Limitations

- Readers will only see new data until the conversion is complete.
This is partially mitigated by the fact that snapshots with equality
deletes cannot be read properly with Flink today [4].
- The Flink-maintained index needs to be built initially which
requires reading the entire table. We will use Flink's state backend
which apart from heap-based storage, also supports spilling to disk
via the RocksDB state backend.

## Wrapping up

This solution may not be perfect because of the above limitations, but
it provides a viable path to free users of the burden of equality
deletes, which cannot be read efficiently by most engines today.
Eventually, the Flink-maintained index can be replaced by an Iceberg
index, which will allow for the index to be shared across engines.

What does the community think?

Thanks,
Max

[1] Deprecate equality deletes:
https://lists.apache.org/thread/z0gvco6hn2bpgngvk4h6xqrnw8b32sw6
[2] Design doc:
https://docs.google.com/document/d/1Jz4Fjt-6jRmwqbgHX_u0ohuyTB9ytDzfslS7lYraIjk/edit
[3] Upserts use case:
https://lists.apache.org/thread/rt7dmg7l78xpzc9w3lwn090yzqq4fyyw
[4] Handling upserts downstream:
https://lists.apache.org/thread/bx1ntfr45c0g9rh643yw7w8znv6wtrno
[5] V4 Index: https://lists.apache.org/thread/xdkzllzt4p3tvcd3ft4t7jsvyvztr41j

Reply via email to