On Sat, Jun 4, 2022 at 3:35 PM Chamikara Jayalath <chamik...@google.com> wrote:
> On Sat, Jun 4, 2022 at 1:55 PM Reuven Lax <re...@google.com> wrote: > >> Cham do you know if the Flunk runner uses the sdf version or the old >> version? >> > > I think that depends on whether the experiment "use_deprecated_read" was > specified or not. If that was specified, Flink will use the old version > (which will also be overridden by a native Flink Kafka source > implementation IIRC). > Yes, we have this experiment specified. Thanks, Deepak > >> >> On Sat, Jun 4, 2022, 6:55 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> >>> Hi Deepak, >>> >>> On Fri, Jun 3, 2022 at 5:46 PM Deepak Nagaraj <deepak.naga...@primer.ai> >>> wrote: >>> >>>> Hi Beam team, >>>> >>>> We have seen data loss with Beam pipelines under the following >>>> condition. i.e., Beam thinks it has processed data when in reality it >>>> has not: >>>> >>>> * Kafka consumer config: enable.auto.commit set to true (default), >>>> auto.offset.reset set to latest (default) >>>> >>> >>> To clarify, these are defaults for Kafka, not for Beam, right ? >>> >>> >>>> * ReadFromKafka(): commit_offset_in_finalize set to false (default) >>>> * No successful checkpoints (i.e. every checkpoint times out) >>> >>> >>>> Under this condition, if we post to Kafka, the pipeline starts up and >>>> reads Kafka messages, and the offsets are auto-committed after 5 >>>> seconds (Kafka consumer default). >>>> >>> >>> I think this is again the default behavior of Kafka, right ? So once >>> messages are read (by Beam or otherwise) and "enable.auto.commit" is set. >>> Messages are auto-committed ? I don't think Beam would have control over >>> this so possibly the pipeline user should set the Kafka broker settings to >>> not auto-commit if the runner does not preserve messages that are read. >>> >>> >>>> >>>> This is usually not a problem, because Beam saves offsets to >>>> checkpoints and uses the offsets in checkpoints upon pipeline restart. >>>> >>>> But if the checkpointing never succeeds (e.g. we hit this with a slow >>>> processor pipeline), or if there are no previous checkpoints, then >>>> upon restart Beam starts with the latest Kafka offset. This is a data >>>> loss situation. >>>> >>> >>> Could you clarify what you mean by pipeline restart here ? For example, >>> with Dataflow would ensure a successful checkpoint if a pipeline is >>> manually drained. >>> >>> Thanks, >>> Cham >>> >>> >>>> We can prevent this problem by setting by default: >>>> * ReadFromKafka(): commit_offset_in_finalize to true >>>> * Kafka consumer config: enable.auto.commit to false >>> >>> >>>> If the checkpointing is problematic, this can cause continuous >>>> reprocessing, but it's still better than data loss. >>>> >>>> What do you think of this? >>>> >>>> Regards, >>>> Deepak >>>> >>> -- Deepak Nagaraj | Machine Learning Engineer We create the tools behind the decisions that change the world. We're hiring! <https://boards.greenhouse.io/primerai> | primer.ai | blog <https://primer.ai/blog/> | twitter <https://twitter.com/primer_ai>