I'd suggest:
 a) use unaligned checkpoints, if possible

 b) verify the number of buckets you use for EOS sink, this limits parallelism [1].

Best,

 Jan

[1] https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-

On 6/18/24 09:32, Ruben Vargas wrote:
Hello Lukavsky

Thanks for your reply !

I thought was due backpreassure but i increased the resources of the cluster and problem still presist. More that that, data stop flowing and the checkpoint still fail.

I have configured the checkpoint to do it per minute. The timeout is 1h. Is aligned checkpoint.

El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský <[email protected]> escribió:

    H Ruben,

    from the provided screenshot it seems to me, that the pipeline in
    backpressured by the sink. Can you please share your checkpoint
    configuration? Are you using unaligned checkpoints? What is the
    checkpointing interval and the volume of data coming in from the
    source?
    With EOS data is committed after checkpoint, before that, the data is
    buffered in state, which makes the sink more resource intensive.

      Jan

    On 6/18/24 05:30, Ruben Vargas wrote:
    > Attached a better image of the console.
    >
    > Thanks!
    >
    > On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas
    <[email protected]> wrote:
    >> Hello guys
    >>
    >> Wondering if some of you have experiences enabling Exactly Once in
    >> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
    >> where all the checkpoints are failing. I cannot see any
    exception on
    >> the logs.
    >>
    >> Flink console only mentions this "Asynchronous task checkpoint
    >> failed." I also noticed that some operators don't acknowledge the
    >> checkpointing  (Attached a screenshot).
    >>
    >> I did this:
    >>
    >> 1) KafkaIO.Read:
    >>
    >> update consumer properties with enable.auto.commit = false
    >> .withReadCommitted()
    >> .commitOffsetsInFinalize()
    >>
    >> 2) KafkaIO#write:
    >>
    >> .withEOS(numShards, sinkGroupId)
    >>
    >> But my application is not able to deliver messages to the
    output topic
    >> due the checkpoint failing.
    >> I also reviewed the timeout and other time sensitive
    parameters, those
    >> are high right now.
    >>
    >> I really appreciate your guidance on this. Thank you

Reply via email to