Hi all.

Just to close this topic. The problem was in our usage of Valkey as our cache. 
We had a couple of instances where we were using SCAN operation (SCAN 
cache-wo-operation:${wo_id}-*), which scans the entire database. That slowed 
down processing to a stop. Which, in turn, prevented checkpoints from finishing.

So, Gabor’s hints were spot-on. Python is OK, Valkey was the problem.

Nikola.

From: Nikola Milutinovic <[email protected]>
Date: Friday, September 26, 2025 at 3:21 PM
To: Gabor Somogyi <[email protected]>
Subject: Re: Checkpoint timeouting

Hello Gabor.

Just a quick update on our issues. Recap: We have several jobs, one of which 
was exhibiting these checkpoint timeouts. That one job was under “heavy" load 
(like 100k messages on Kafka topic). Others were far less loaded.

Then another job started timing out, as well (with modest load, compared to the 
first one). Today, the first job that started the “show” started working again 
and pulled through. Now it is happily idle and checkpoints are doing fine. The 
second job is still failing constantly.

What we have tried so far:


  1.
Unaligned checkpoints: no effect.
  2.
Increase checkpoint timeout to 50 minutes: that helped somewhat.
  3.
Increase resources (CPU was set to 0.3 cores per task slot): still waiting to 
see.

When we increased the checkpoint timeout to 50 minutes, here and there a 
checkpoint would come through - after 30 minutes or more!!! And the size would 
be 2-3 kB. So, it is not the size issue, it is not the AzureBlob having 
problems. I suspect it is the alignment of barriers.

One thing to note, our jobs have 1 input (Kafka) and 2 or 3 outputs (also 
Kafka). Basically, IN -> processing ->(OUT or ERR). We used to have 2 Kafka 
Sinks, but that was not working well, so we decided to use side outputs + 
regular pipeline. Regular pipeline goes to the output topic and side-outputs go 
to the error topic. The reason why I am mentioning this is that there was a 
period of time when the side output would not be pushed out to Kafka Topic 
until the checkpoint has finished. I'm not sure if that is the case still.

Are we messing things up with side outputs? Could they be the cause of these 
timeouts? Or them with Redis in conjunction?

Nikola.

From: Gabor Somogyi <[email protected]>
Date: Monday, September 22, 2025 at 11:19 AM
To: Nikola Milutinovic <[email protected]>
Cc: Flink Users <[email protected]>
Subject: Re: Checkpoint timeouting

Hi Nicola,

First of all Python is not that bad just to throw it away, especially 
considering that there are actual efforts in the community to make it even 
better.

In a little bit more detailed PyFlink vs Java Flink: The Python runtime itself 
isn't necessarily the root cause of your checkpoint issues.
No matter whether one uses python or java one must find the root cause of the 
issue which is to take a look at the unchained graph and search for 
backpressure.

Potential Redis-related issues apart from python itself:
Your Redis usage pattern (lookup + conditional backlog insertion) could indeed 
contribute to checkpoint problems if:
- Redis operations are blocking and taking too long during checkpointing
- Network timeouts or Redis latency spikes occur during barrier alignment
- The Redis client isn't properly handling connection issues

All in all I would use disable_chaining() to separate operators and increasing 
parallelism for better throughput.
To be clear, this would be my suggestion even in java as well. If the Redis 
operator is slow then it will be slow in java too...

BR,
G


On Sat, Sep 20, 2025 at 12:04 PM Nikola Milutinovic 
<[email protected]<mailto:[email protected]>> wrote:

Hi Gabor.

I will definitely try this out. I have one question on my general direction. 
Would I benefit from ditching PyFlink and using Java Flink?

I know that PyFlink has its set of quirks (Apache Beam is rather old, etc...) 
and your suspicion fell on Python Operator, immediately. However, is Python at 
the root of our problems?

The operator in question has one non-Flink thing in it: it contacts Redis DB 
directly. We use Redis (Valkey) as our primary cache, which we update using 
Flink and CDC, in a separate job. That part works.

Our usage of Redis, in the problematic case, is (via regular Python Redis 
library):

  1.  Lookup an entity in Redis
  2.  If not found, put it in "backlog" (which we store in Redis, also).

Is this usage pattern perhaps the cause of barriers, sometimes, not propagating 
correctly? And would it improve if I switch over to Java? Or do I need to do 
something with how we use Redis?

Nix.

On 19/09/2025 15:27, Gabor Somogyi wrote:
Hi Nikola,

The first and most obvious suggestions are:

# Enable unaligned checkpoint
execution.checkpointing.unaligned: true
# Or allow fallback to unaligned when alignment drags:
execution.checkpointing.aligned-checkpoint-timeout: 10s

“Checkpoint expired before completing” + “Not all required tasks are currently 
running”
usually means one operator (often the Python one) intermittently stops making 
progress or restarts.
When that happens, checkpoint barriers can’t cross the graph, the checkpoint 
times out
and Kafka sinks can’t commit transactions -> processing stalls and Kafka lag 
grows.

Your graph shows everything chained into a single task with parallelism 1
where any hiccup anywhere blocks everything, including barriers.
For this issue one can use disable_chaining() to separate operators and 
increasing parallelism for better throughput.
After de-chaining backpressure can be checked on Flink UI which tells which 
operator is slow or not progressing.

Hope this helps!

BR,
G


On Fri, Sep 19, 2025 at 2:11 PM Nikola Milutinovic 
<[email protected]<mailto:[email protected]>> wrote:
Hi group.

I’d like some advice on how to debug a problem we have encountered. We have a 
session cluster deployed on AKS, using Kubernetes Operator. Our jobs have 
checkpointing enabled and most of them work fine. However, one job is having 
problems checkpointing.

The job is relatively simple: Kafka Source -> Filter -> ProcessFunction -> 
Filter (with some side-channel output) -> Kafka Sink. This is a PyFlink job, if 
it is relevant.

The job starts and runs, but after several successful checkpoints it fails to 
create a checkpoint. Then it succeeds, then fails again and after some 
oscillating, it fails all of them. The job is behaving erratically, after some 
time it stops processing the messages. The Kafka source topic is showing a big 
lag (150k messages), so it stops processing after a while. The exceptions we 
get are uninformative:

2025-09-19 11:49:17,084 WARN 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
trigger or complete checkpoint 18 for job 6ef604e79abcdd65eb65d6cb3fa20d6a. (0 
consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired 
before completing.

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold. The latest checkpoint failed due to Checkpoint expired 
before completing., view the Checkpoint History tab or the Job Manager log to 
find out why continuous checkpoints failed.

2025-09-19 11:49:17,088 INFO 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
trigger checkpoint for job 6ef604e79abcdd65eb65d6cb3fa20d6a since Checkpoint 
triggering task Source: Kafka source -> Input mapper, Filter incoming, Process 
entity request, PROCESS, PROCESS -> (Kafka Sink: Writer -> Kafka Sink: 
Committer, Get side error output -> Error topic: Writer -> Error topic: 
Committer, Get side IMS config output, Process IMS config creation, PROCESS -> 
IMS config topic: Writer -> IMS config topic: Committer) (1/1) of job 
6ef604e79abcdd65eb65d6cb3fa20d6a is not being executed at the moment. Aborting 
checkpoint. Failure reason: Not all required tasks are currently running..

Our checkpoints are minuscule, less than 5kB, and when they do succeed time is 
3 seconds, max. Basically, we do not have any state. The storage used is Azure 
Blob, since we are running in Azure AKS (any recommendations there?). What we 
do observe on Azure Blob is that the old checkpoints have been deleted (as 
desired), but new ones were not created. Our checkpoint-related settings are:


jobmanager.memory.heap.size             6522981568b

jobmanager.memory.jvm-metaspace.size    1073741824b

jobmanager.memory.jvm-overhead.max      858993472b

jobmanager.memory.jvm-overhead.min      858993472b

jobmanager.memory.off-heap.size         134217728b

jobmanager.memory.process.size          8 gb


execution.checkpointing.dir             
wasbs://[email protected]/flink-cluster-ims-flink/flink-checkpoints<http://[email protected]/flink-cluster-ims-flink/flink-checkpoints>

execution.checkpointing.externalized-checkpoint-retention       
DELETE_ON_CANCELLATION

execution.checkpointing.incremental     true

execution.checkpointing.interval        60000

execution.checkpointing.min-pause       5000

execution.checkpointing.mode            EXACTLY_ONCE

execution.checkpointing.num-retained    3

execution.checkpointing.savepoint-dir   
wasbs://[email protected]/flink-cluster-ims-flink/flink-savepoints<http://[email protected]/flink-cluster-ims-flink/flink-savepoints>

execution.checkpointing.storage         jobmanager

execution.checkpointing.timeout         300000

Nix

Reply via email to