Hi Beam team, We're seeing Apache Beam have checkpoint timeouts on Flink. They happen when the pipeline has a slow step and we send a bunch of messages on Kafka.
I have set up a similar pipeline on my laptop that reproduces the problem. Pipeline details: ----------------- * Python, running a Beam pipeline on Flink via PortableRunner * Streaming * Read from Kafka * A slow beam.Map() call * WindowInto, Write to files The pipeline calls beam.Map() on a function with sleep(20 seconds). Our pipeline configuration: * Checkpoint interval = 30s * Checkpoint timeout = 60s * Fail on checkpointing errors = false * Max bundle size = 2 * Parallelism = 1 Steps to reproduce: ------------------- After the pipeline is running, we send 10 Kafka messages from a file as follows: seq 1 10 > msgs.txt kcat -b localhost:9092 -P -k "testMessage" -t echo-input -D '\n' -l < msgs.txt What we expected to see: ------------------------ Because we set bundle size = 2, we were expecting Beam to pick up 2 Kafka records at a time. Because this would only take 40 seconds, we would be within checkpoint timeout, so we were expecting all checkpoints to succeed. What we see instead: -------------------- When we send the messages, we see any checkpoints that start in the next 200 seconds to fail with timeouts (20 x 10 messages). Then they start working again. Based on Flink TaskManager logs, it seems that Kafka consumer has read up to latest offset (backlog used to be 0, but became 96 bytes): 2022-04-27 13:12:32,359 DEBUG org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-0: backlog 96 This backlog goes down ultimately to 0 after 200 seconds. Then, the task attempts to send an acknowledgment, but by then it's too late. 2022-04-27 13:15:53,300 WARN org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Time from receiving all checkpoint barriers/RPC to executing it exceeded threshold: 198275ms By this time, Flink JobManager would have already marked that checkpoint as failed and started a new one. Questions on Beam: ------------------ We don't fully understand this Beam/Flink behavior, but with regards to Beam we wanted to ask: * Why does Beam seem to process all the Kafka records, even when we have set max bundle size = 2? * Alternatively, is there any way to limit how many records are read by beam.io.kafka.ReadFromKafka()? I'm happy to share additional logs or any other details I missed here. Many thanks, Deepak