Hi Team,

Qishang Zhong found a bug with Flink Sink [1].

In nutshell:
- Failure during checkpoints could cause duplicate data with Flink sink
with CDC writes

In more detail:
- If there is a failure after the `prepareSnapshotPreBarrier` but before
the `snapshotState` for CHK1, then the data/delete files are created, but
they are not assigned to the snapshot. So when the Flink job restarts, the
`IcebergFilesCommitter` will receive the data/delete files for the CHK1,
along with the new snapshot (CHK2), and it will commit them in a single
Iceberg commit (S1).

If there is an equality delete record in CHK2 which should delete a record
created by CHK1, then the table becomes corrupted (the record is
duplicated) since both the data file for CHK1, and the equality delete file
for CHK2 are committed in S1, so the delete will not be applied to the data
file.

To fix this, we need to assign the checkpointId to the `WriteResult`
immediately after creation at the `IcebergStreamWriter` side and send the
data along with the `WriteResult`. Since the `WriteResult` is part of the
Iceberg API, we could not change that, we need to wrap it to a new object,
and use that for the communication between the writer and the committer.
This would break upgrading Flink Jobs which are using unaligned checkpoints
[2].

I would like to understand how widely unaligned checkpointing is used in
the community and I propose to accept this fix if there are no objections.

Thanks,
Peter

[1] - https://github.com/apache/iceberg/pull/10526
[2] -
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints

Reply via email to