Hi Gabor.

I will definitely try this out. I have one question on my general direction. Would I benefit from ditching PyFlink and using Java Flink?

I know that PyFlink has its set of quirks (Apache Beam is rather old, etc...) and your suspicion fell on Python Operator, immediately. However, is Python at the root of our problems?

The operator in question has one non-Flink thing in it: _it contacts Redis DB directly_. We use Redis (Valkey) as our primary cache, which we update using Flink and CDC, in a separate job. That part works.

Our usage of Redis, in the problematic case, is (via regular Python Redis library):

1. Lookup an entity in Redis
2. If not found, put it in "backlog" (which we store in Redis, also).

Is this usage pattern perhaps the cause of barriers, sometimes, not propagating correctly? And would it improve if I switch over to Java? Or do I need to do something with how we use Redis?

Nix.

On 19/09/2025 15:27, Gabor Somogyi wrote:
Hi Nikola,

The first and most obvious suggestions are:

# Enable unaligned checkpoint
execution.checkpointing.unaligned: true
# Or allow fallback to unaligned when alignment drags:
execution.checkpointing.aligned-checkpoint-timeout: 10s

“Checkpoint expired before completing” + “Not all required tasks are currently running” usually means one operator (often the Python one) intermittently stops making progress or restarts. When that happens, checkpoint barriers can’t cross the graph, the checkpoint times out and Kafka sinks can’t commit transactions -> processing stalls and Kafka lag grows.

Your graph shows everything chained into a single task with parallelism 1
where any hiccup anywhere blocks everything, including barriers.
For this issue one can use disable_chaining() to separate operators and increasing parallelism for better throughput. After de-chaining backpressure can be checked on Flink UI which tells which operator is slow or not progressing.

Hope this helps!

BR,
G


On Fri, Sep 19, 2025 at 2:11 PM Nikola Milutinovic <[email protected]> wrote:

    Hi group.

    I’d like some advice on how to debug a problem we have
    encountered. We have a session cluster deployed on AKS, using
    Kubernetes Operator. Our jobs have checkpointing enabled and most
    of them work fine. However, one job is having problems checkpointing.

    The job is relatively simple: Kafka Source -> Filter ->
    ProcessFunction -> Filter (with some side-channel output) -> Kafka
    Sink. This is a PyFlink job, if it is relevant.

    The job starts and runs, but after several successful checkpoints
    it fails to create a checkpoint. Then it succeeds, then fails
    again and after some oscillating, it fails all of them. The job is
    behaving erratically, after some time it stops processing the
    messages. The Kafka source topic is showing a big lag (150k
    messages), so it stops processing after a while. The exceptions we
    get are uninformative:

    2025-09-1911:49:17,084WARN 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager []
    - Failed to triggeror complete checkpoint 18for job
    6ef604e79abcdd65eb65d6cb3fa20d6a. (0 consecutive failed attempts
    so far)
    org.apache.flink.runtime.checkpoint.CheckpointException:
    Checkpoint expired before completing.

    org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
    tolerable failure threshold. The latest checkpoint failed due to
    Checkpoint expired before completing., view the
    CheckpointHistory tab or the JobManager log to find out why
    continuous checkpoints failed.

    2025-09-1911:49:17,088INFO 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager []
    - Failed to trigger checkpoint for job
    6ef604e79abcdd65eb65d6cb3fa20d6a since Checkpoint triggering task
    Source: Kafka source -> Input mapper, Filter incoming,
    Process entity request, PROCESS, PROCESS -> (KafkaSink: Writer ->
    KafkaSink: Committer, Get side error output -> Error topic:
    Writer -> Error topic: Committer, Get side IMS config output,
    ProcessIMS config creation, PROCESS -> IMS config topic: Writer ->
    IMS config topic: Committer) (1/1) of job
    6ef604e79abcdd65eb65d6cb3fa20d6a is not being executed at the
    moment. Aborting checkpoint. Failure reason: Not all required
    tasks are currently running..

    Our checkpoints are minuscule, less than 5kB, and when they do
    succeed time is 3 seconds, max. Basically, we do not have any
    state. The storage used is Azure Blob, since we are running in
    Azure AKS (any recommendations there?). What we do observe on
    Azure Blob is that the old checkpoints have been deleted (as
    desired), but new ones were not created. Our checkpoint-related
    settings are:

    jobmanager.memory.heap.size             6522981568b

    jobmanager.memory.jvm-metaspace.size   1073741824b

    jobmanager.memory.jvm-overhead.max     858993472b

    jobmanager.memory.jvm-overhead.min     858993472b

    jobmanager.memory.off-heap.size         134217728b

    jobmanager.memory.process.size          8 gb


    execution.checkpointing.dir            
    
wasbs://[email protected]/flink-cluster-ims-flink/flink-checkpoints
    
<http://[email protected]/flink-cluster-ims-flink/flink-checkpoints>

    execution.checkpointing.externalized-checkpoint-retention      
    DELETE_ON_CANCELLATION

    execution.checkpointing.incremental     true

    execution.checkpointing.interval       60000

    execution.checkpointing.min-pause       5000

    execution.checkpointing.mode            EXACTLY_ONCE

    execution.checkpointing.num-retained   3

    execution.checkpointing.savepoint-dir  
    
wasbs://[email protected]/flink-cluster-ims-flink/flink-savepoints
    
<http://[email protected]/flink-cluster-ims-flink/flink-savepoints>

    execution.checkpointing.storage         jobmanager

    execution.checkpointing.timeout         300000


    Nix

Reply via email to