On Sat, Jun 4, 2022 at 3:35 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> On Sat, Jun 4, 2022 at 1:55 PM Reuven Lax <re...@google.com> wrote:
>
>> Cham do you know if the Flunk runner uses the sdf version or the old
>> version?
>>
>
> I think that depends on whether the experiment "use_deprecated_read" was
> specified or not. If that was specified, Flink will use the old version
> (which will also be overridden by a native Flink Kafka source
> implementation IIRC).
>

Yes, we have this experiment specified. Thanks,
Deepak


>
>>
>> On Sat, Jun 4, 2022, 6:55 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>> Hi Deepak,
>>>
>>> On Fri, Jun 3, 2022 at 5:46 PM Deepak Nagaraj <deepak.naga...@primer.ai>
>>> wrote:
>>>
>>>> Hi Beam team,
>>>>
>>>> We have seen data loss with Beam pipelines under the following
>>>> condition. i.e., Beam thinks it has processed data when in reality it
>>>> has not:
>>>>
>>>>   * Kafka consumer config: enable.auto.commit set to true (default),
>>>> auto.offset.reset set to latest (default)
>>>>
>>>
>>> To clarify, these are defaults for Kafka, not for Beam, right ?
>>>
>>>
>>>>   * ReadFromKafka(): commit_offset_in_finalize set to false (default)
>>>>   * No successful checkpoints (i.e. every checkpoint times out)
>>>
>>>
>>>> Under this condition, if we post to Kafka, the pipeline starts up and
>>>> reads Kafka messages, and the offsets are auto-committed after 5
>>>> seconds (Kafka consumer default).
>>>>
>>>
>>> I think this is again the default behavior of Kafka, right ? So once
>>> messages are read (by Beam or otherwise) and "enable.auto.commit" is set.
>>> Messages are auto-committed ? I don't think Beam would have control over
>>> this so possibly the pipeline user should set the Kafka broker settings to
>>> not auto-commit if the runner does not preserve messages that are read.
>>>
>>>
>>>>
>>>> This is usually not a problem, because Beam saves offsets to
>>>> checkpoints and uses the offsets in checkpoints upon pipeline restart.
>>>>
>>>> But if the checkpointing never succeeds (e.g. we hit this with a slow
>>>> processor pipeline), or if there are no previous checkpoints, then
>>>> upon restart Beam starts with the latest Kafka offset. This is a data
>>>> loss situation.
>>>>
>>>
>>> Could you clarify what you mean by pipeline restart here ?  For example,
>>> with Dataflow would ensure a successful checkpoint if a pipeline is
>>> manually drained.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>> We can prevent this problem by setting by default:
>>>> * ReadFromKafka(): commit_offset_in_finalize to true
>>>> * Kafka consumer config: enable.auto.commit to false
>>>
>>>
>>>> If the checkpointing is problematic, this can cause continuous
>>>> reprocessing, but it's still better than data loss.
>>>>
>>>> What do you think of this?
>>>>
>>>> Regards,
>>>> Deepak
>>>>
>>> --
Deepak Nagaraj  |  Machine Learning Engineer


We create the tools behind the decisions that change the world.
We're hiring! <https://boards.greenhouse.io/primerai>  |  primer.ai  |  blog
<https://primer.ai/blog/>  |  twitter <https://twitter.com/primer_ai>

Reply via email to