Regarding unaligned checkpoints, Flink savepoint is always aligned and recommended for Flink version upgrade. We can potentially recommend users to use Flink savepoint to pick up this fix.
I will take a closer look at the PR. On Thu, Jul 18, 2024 at 6:22 AM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Hi Team, > > Qishang Zhong found a bug with Flink Sink [1]. > > In nutshell: > - Failure during checkpoints could cause duplicate data with Flink sink > with CDC writes > > In more detail: > - If there is a failure after the `prepareSnapshotPreBarrier` but before > the `snapshotState` for CHK1, then the data/delete files are created, but > they are not assigned to the snapshot. So when the Flink job restarts, the > `IcebergFilesCommitter` will receive the data/delete files for the CHK1, > along with the new snapshot (CHK2), and it will commit them in a single > Iceberg commit (S1). > > If there is an equality delete record in CHK2 which should delete a record > created by CHK1, then the table becomes corrupted (the record is > duplicated) since both the data file for CHK1, and the equality delete file > for CHK2 are committed in S1, so the delete will not be applied to the data > file. > > To fix this, we need to assign the checkpointId to the `WriteResult` > immediately after creation at the `IcebergStreamWriter` side and send the > data along with the `WriteResult`. Since the `WriteResult` is part of the > Iceberg API, we could not change that, we need to wrap it to a new object, > and use that for the communication between the writer and the committer. > This would break upgrading Flink Jobs which are using unaligned checkpoints > [2]. > > I would like to understand how widely unaligned checkpointing is used in > the community and I propose to accept this fix if there are no objections. > > Thanks, > Peter > > [1] - https://github.com/apache/iceberg/pull/10526 > [2] - > https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints >