Hi Cham,

Thank you for your response. One thing I didn't mention earlier is:
all of this is with Beam's Flink runner.

On Sat, Jun 4, 2022 at 9:55 AM Chamikara Jayalath <chamik...@google.com> wrote:
>
>>
>>   * Kafka consumer config: enable.auto.commit set to true (default),
>> auto.offset.reset set to latest (default)
>
> To clarify, these are defaults for Kafka, not for Beam, right ?
>

You are right, these are the defaults for Kafka consumer.  They can be
overridden within Beam's ReadFromKafka(), via the parameter
consumer_config.

>>   * ReadFromKafka(): commit_offset_in_finalize set to false (default)
>>   * No successful checkpoints (i.e. every checkpoint times out)
>>
>>
>> Under this condition, if we post to Kafka, the pipeline starts up and
>> reads Kafka messages, and the offsets are auto-committed after 5
>> seconds (Kafka consumer default).
>
>
> I think this is again the default behavior of Kafka, right ? So once messages 
> are read (by Beam or otherwise) and "enable.auto.commit" is set. Messages are 
> auto-committed ? I don't think Beam would have control over this so possibly 
> the pipeline user should set the Kafka broker settings to not auto-commit if 
> the runner does not preserve messages that are read.
>

Here it's a bit of grey. ReadFromKafka() is provided by Beam, and
commit_offset_in_finalize is a parameter for this function.

Also, enabling checkpoints is part of Beam's Flink runner.

>> But if the checkpointing never succeeds (e.g. we hit this with a slow
>> processor pipeline), or if there are no previous checkpoints, then
>> upon restart Beam starts with the latest Kafka offset. This is a data
>> loss situation.
>
> Could you clarify what you mean by pipeline restart here ?  For example, with 
> Dataflow would ensure a successful checkpoint if a pipeline is manually 
> drained.
>

Sure -- upon a checkpoint failure, the default behavior is for the
pipeline job to be marked failed and restarted (in Flink). When the
job restarts, because there have not been any successful checkpoints,
Beam starts reading from the latest offset.  This causes data loss.

It seems to me that the system should at least warn the user to set
the relevant config bits correctly when checkpointing is enabled (I
know for sure about Flink, not sure about Dataflow):

If checkpointing is enabled, when ReadFromKafka() is invoked, warn
about possible data loss unless both of these are true:

a) commit_offset_in_finalize is true in ReadFromKafka(),
b) enable.auto.commit is false in Kafka consumer config

We could also choose not to make any code changes and instead document
it within Flink runner's checkpointing_interval parameter. However,
that creates a bit of distance because this is relevant within
ReadFromKafka().

Let me know if I can clarify anything else.

Thanks again,
Deepak

Reply via email to