Hi Beam team,

We have seen data loss with Beam pipelines under the following
condition. i.e., Beam thinks it has processed data when in reality it
has not:

  * Kafka consumer config: enable.auto.commit set to true (default),
auto.offset.reset set to latest (default)
  * ReadFromKafka(): commit_offset_in_finalize set to false (default)
  * No successful checkpoints (i.e. every checkpoint times out)

Under this condition, if we post to Kafka, the pipeline starts up and
reads Kafka messages, and the offsets are auto-committed after 5
seconds (Kafka consumer default).

This is usually not a problem, because Beam saves offsets to
checkpoints and uses the offsets in checkpoints upon pipeline restart.

But if the checkpointing never succeeds (e.g. we hit this with a slow
processor pipeline), or if there are no previous checkpoints, then
upon restart Beam starts with the latest Kafka offset. This is a data
loss situation.

We can prevent this problem by setting by default:
* ReadFromKafka(): commit_offset_in_finalize to true
* Kafka consumer config: enable.auto.commit to false

If the checkpointing is problematic, this can cause continuous
reprocessing, but it's still better than data loss.

What do you think of this?

Regards,
Deepak

Reply via email to