Hi Flink users,
Greetings. I have a question on how Flink invokes checkpoints with a slow
pipeline.
I have a Beam streaming pipeline with one Map() call. It is a Python
program running on Flink with PortableRunner. I’ve experimented with
varying amounts of sleep inside this call to simulate slowness. The
pipeline reads from Kafka, windows into 1-minute fixed windows, and writes
to a file. The pipeline parallelism is 1, and bundle size is 2. Checkpoint
interval is 30s and timeout is 1min.
I post messages to Kafka with kcat utility. The messages are all 32 bytes,
but I can vary the number of messages posted.
With sleep() < 0.6 seconds, i.e. a fast pipeline, I see checkpoints getting
started even when Kafka backlog > 0, i.e. when all the Kafka messages are
not fully drained.
However, with longer sleep() i.e. slower pipeline, I don’t see a checkpoint
getting started until the backlog goes all the way down to 0. I also don’t
see a “Received barrier” message until backlog gets to zero.
Annotated example logs later below. I’m happy to provide additional details
and logs, or run experiments on my setup.
My question is: what causes a fast pipeline to be able to start checkpoints
even when there are outstanding Kafka messages, but this fails on a slow
pipeline?
Thanks,
Deepak
# Posting to Kafka
2022-05-03 09:53:37,184 DEBUG
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] -
Reader-0: backlog 17157
# Checkpoint was triggered
2022-05-03 09:53:38,924 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Trigger
checkpoint 2@1651596818922 for fb40570da5f4dd41e458af269c1a2eaf.
# Messages were slowly getting drained
# ...
2022-05-03 09:53:53,184 DEBUG
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] -
Reader-0: backlog 11696
# Even with non-zero backlog, I saw a checkpoint getting triggered (this is
what I want!)
2022-05-03 09:53:53,369 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Starting
checkpoint (2) CHECKPOINT on task Source:
ReadMessages/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
-> Flat Map -> Map -> [1]ReadMessages/Remove Kafka Metadata ->
[7]{CreateKafkaRecord, Process1, Window, Write to file} -> ToKeyedWorkItem
(1/1)#0
# Q: I cannot get Beam/Flink to behave this way, i.e. trigger checkpoint
with non-zero backlog,
# with slower pipelines (sleep() > 0.6s). Why?