Hi Beam team,

We're seeing Apache Beam have checkpoint timeouts on Flink. They happen
when the pipeline has a slow step and we send a bunch of messages on Kafka.

I have set up a similar pipeline on my laptop that reproduces the problem.

Pipeline details:
-----------------

* Python, running a Beam pipeline on Flink via PortableRunner
* Streaming
* Read from Kafka
* A slow beam.Map() call
* WindowInto, Write to files

The pipeline calls beam.Map() on a function with sleep(20 seconds).

Our pipeline configuration:
* Checkpoint interval = 30s
* Checkpoint timeout = 60s
* Fail on checkpointing errors = false
* Max bundle size = 2
* Parallelism = 1

Steps to reproduce:
-------------------

After the pipeline is running, we send 10 Kafka messages from a file as
follows:
        seq 1 10 > msgs.txt
        kcat -b localhost:9092 -P -k "testMessage" -t echo-input -D '\n' -l
< msgs.txt

What we expected to see:
------------------------

Because we set bundle size = 2, we were expecting Beam to pick up 2 Kafka
records at a time.

Because this would only take 40 seconds, we would be within checkpoint
timeout, so we were expecting all checkpoints to succeed.

What we see instead:
--------------------

When we send the messages, we see any checkpoints that start in the next
200 seconds to fail with timeouts (20 x 10 messages). Then they start
working again.

Based on Flink TaskManager logs, it seems that Kafka consumer has read up
to latest offset (backlog used to be 0, but became 96 bytes):

        2022-04-27 13:12:32,359 DEBUG
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader            [] - Reader-0:
 backlog 96

This backlog goes down ultimately to 0 after 200 seconds. Then, the task
attempts to send an acknowledgment, but by then it's too late.

        2022-04-27 13:15:53,300 WARN
 org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl
[] - Time from receiving all checkpoint barriers/RPC to executing it
exceeded threshold: 198275ms

By this time, Flink JobManager would have already marked that checkpoint as
failed and started a new one.

Questions on Beam:
------------------

We don't fully understand this Beam/Flink behavior, but with regards to
Beam we wanted to ask:

* Why does Beam seem to process all the Kafka records, even when we have
set max bundle size = 2?
* Alternatively, is there any way to limit how many records are read by
beam.io.kafka.ReadFromKafka()?

I'm happy to share additional logs or any other details I missed here.

Many thanks,
Deepak

Reply via email to