I ran a few more experiments. The gist is that I can get Flink to start checkpoints even when Kafka backlog > 0, as long as my pipeline is fast enough.
That is, if the pipeline sleeps for < 0.6 seconds (empirical, take with a grain of salt), I can see checkpoints getting started even when the Kafka topic is not fully consumed. But when the pipeline sleeps for longer, I have always seen it start checkpointing only when backlog goes to zero. My question now is: why does checkpointing behave differently? i.e., what allows Flink to start a checkpoint on a fast-enough pipeline? Also posted on Flink mailing list. [1] Thanks, Deepak [1] https://lists.apache.org/thread/5pwn41q84l4jd5pt3lpym67nw9k715qd On Wed, Apr 27, 2022 at 9:22 PM Deepak Nagaraj <deepak.naga...@primer.ai> wrote: > Hi Beam team, > > We're seeing Apache Beam have checkpoint timeouts on Flink. They happen > when the pipeline has a slow step and we send a bunch of messages on Kafka. > > I have set up a similar pipeline on my laptop that reproduces the problem. > > Pipeline details: > ----------------- > > * Python, running a Beam pipeline on Flink via PortableRunner > * Streaming > * Read from Kafka > * A slow beam.Map() call > * WindowInto, Write to files > > The pipeline calls beam.Map() on a function with sleep(20 seconds). > > Our pipeline configuration: > * Checkpoint interval = 30s > * Checkpoint timeout = 60s > * Fail on checkpointing errors = false > * Max bundle size = 2 > * Parallelism = 1 > > Steps to reproduce: > ------------------- > > After the pipeline is running, we send 10 Kafka messages from a file as > follows: > seq 1 10 > msgs.txt > kcat -b localhost:9092 -P -k "testMessage" -t echo-input -D '\n' > -l < msgs.txt > > What we expected to see: > ------------------------ > > Because we set bundle size = 2, we were expecting Beam to pick up 2 Kafka > records at a time. > > Because this would only take 40 seconds, we would be within checkpoint > timeout, so we were expecting all checkpoints to succeed. > > What we see instead: > -------------------- > > When we send the messages, we see any checkpoints that start in the next > 200 seconds to fail with timeouts (20 x 10 messages). Then they start > working again. > > Based on Flink TaskManager logs, it seems that Kafka consumer has read up > to latest offset (backlog used to be 0, but became 96 bytes): > > 2022-04-27 13:12:32,359 DEBUG > org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-0: > backlog 96 > > This backlog goes down ultimately to 0 after 200 seconds. Then, the task > attempts to send an acknowledgment, but by then it's too late. > > 2022-04-27 13:15:53,300 WARN > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl > [] - Time from receiving all checkpoint barriers/RPC to executing it > exceeded threshold: 198275ms > > By this time, Flink JobManager would have already marked that checkpoint > as failed and started a new one. > > Questions on Beam: > ------------------ > > We don't fully understand this Beam/Flink behavior, but with regards to > Beam we wanted to ask: > > * Why does Beam seem to process all the Kafka records, even when we have > set max bundle size = 2? > * Alternatively, is there any way to limit how many records are read by > beam.io.kafka.ReadFromKafka()? > > I'm happy to share additional logs or any other details I missed here. > > Many thanks, > Deepak > >