Hi Team, Qishang Zhong found a bug with Flink Sink [1].
In nutshell: - Failure during checkpoints could cause duplicate data with Flink sink with CDC writes In more detail: - If there is a failure after the `prepareSnapshotPreBarrier` but before the `snapshotState` for CHK1, then the data/delete files are created, but they are not assigned to the snapshot. So when the Flink job restarts, the `IcebergFilesCommitter` will receive the data/delete files for the CHK1, along with the new snapshot (CHK2), and it will commit them in a single Iceberg commit (S1). If there is an equality delete record in CHK2 which should delete a record created by CHK1, then the table becomes corrupted (the record is duplicated) since both the data file for CHK1, and the equality delete file for CHK2 are committed in S1, so the delete will not be applied to the data file. To fix this, we need to assign the checkpointId to the `WriteResult` immediately after creation at the `IcebergStreamWriter` side and send the data along with the `WriteResult`. Since the `WriteResult` is part of the Iceberg API, we could not change that, we need to wrap it to a new object, and use that for the communication between the writer and the committer. This would break upgrading Flink Jobs which are using unaligned checkpoints [2]. I would like to understand how widely unaligned checkpointing is used in the community and I propose to accept this fix if there are no objections. Thanks, Peter [1] - https://github.com/apache/iceberg/pull/10526 [2] - https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints