This sounds like a reasonable solution to me. Thanks, Peter! On Thu, Jul 18, 2024 at 8:28 AM Steven Wu <stevenz...@gmail.com> wrote:
> Regarding unaligned checkpoints, Flink savepoint is always aligned and > recommended for Flink version upgrade. We can potentially recommend users > to use Flink savepoint to pick up this fix. > > I will take a closer look at the PR. > > On Thu, Jul 18, 2024 at 6:22 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Hi Team, >> >> Qishang Zhong found a bug with Flink Sink [1]. >> >> In nutshell: >> - Failure during checkpoints could cause duplicate data with Flink sink >> with CDC writes >> >> In more detail: >> - If there is a failure after the `prepareSnapshotPreBarrier` but before >> the `snapshotState` for CHK1, then the data/delete files are created, but >> they are not assigned to the snapshot. So when the Flink job restarts, the >> `IcebergFilesCommitter` will receive the data/delete files for the CHK1, >> along with the new snapshot (CHK2), and it will commit them in a single >> Iceberg commit (S1). >> >> If there is an equality delete record in CHK2 which should delete a >> record created by CHK1, then the table becomes corrupted (the record is >> duplicated) since both the data file for CHK1, and the equality delete file >> for CHK2 are committed in S1, so the delete will not be applied to the data >> file. >> >> To fix this, we need to assign the checkpointId to the `WriteResult` >> immediately after creation at the `IcebergStreamWriter` side and send the >> data along with the `WriteResult`. Since the `WriteResult` is part of the >> Iceberg API, we could not change that, we need to wrap it to a new object, >> and use that for the communication between the writer and the committer. >> This would break upgrading Flink Jobs which are using unaligned checkpoints >> [2]. >> >> I would like to understand how widely unaligned checkpointing is used in >> the community and I propose to accept this fix if there are no objections. >> >> Thanks, >> Peter >> >> [1] - https://github.com/apache/iceberg/pull/10526 >> [2] - >> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints >> > -- Ryan Blue Databricks