Hi Nicola,

First of all Python is not that bad just to throw it away, especially
considering that there are actual efforts in the community to make it even
better.

In a little bit more detailed PyFlink vs Java Flink: The Python runtime
itself isn't necessarily the root cause of your checkpoint issues.
No matter whether one uses python or java one must find the root cause of
the issue which is to take a look at the unchained graph and search for
backpressure.

Potential Redis-related issues apart from python itself:
Your Redis usage pattern (lookup + conditional backlog insertion) could
indeed contribute to checkpoint problems if:
- Redis operations are blocking and taking too long during checkpointing
- Network timeouts or Redis latency spikes occur during barrier alignment
- The Redis client isn't properly handling connection issues

All in all I would use disable_chaining() to separate operators and
increasing parallelism for better throughput.
To be clear, this would be my suggestion even in java as well. If the Redis
operator is slow then it will be slow in java too...

BR,
G


On Sat, Sep 20, 2025 at 12:04 PM Nikola Milutinovic <[email protected]>
wrote:

> Hi Gabor.
>
> I will definitely try this out. I have one question on my general
> direction. Would I benefit from ditching PyFlink and using Java Flink?
>
> I know that PyFlink has its set of quirks (Apache Beam is rather old,
> etc...) and your suspicion fell on Python Operator, immediately. However,
> is Python at the root of our problems?
>
> The operator in question has one non-Flink thing in it: *it contacts
> Redis DB directly*. We use Redis (Valkey) as our primary cache, which we
> update using Flink and CDC, in a separate job. That part works.
>
> Our usage of Redis, in the problematic case, is (via regular Python Redis
> library):
>
>    1. Lookup an entity in Redis
>    2. If not found, put it in "backlog" (which we store in Redis, also).
>
> Is this usage pattern perhaps the cause of barriers, sometimes, not
> propagating correctly? And would it improve if I switch over to Java? Or do
> I need to do something with how we use Redis?
>
> Nix.
> On 19/09/2025 15:27, Gabor Somogyi wrote:
>
> Hi Nikola,
>
> The first and most obvious suggestions are:
>
> # Enable unaligned checkpoint
> execution.checkpointing.unaligned: true
> # Or allow fallback to unaligned when alignment drags:
> execution.checkpointing.aligned-checkpoint-timeout: 10s
>
> “Checkpoint expired before completing” + “Not all required tasks are
> currently running”
> usually means one operator (often the Python one) intermittently stops
> making progress or restarts.
> When that happens, checkpoint barriers can’t cross the graph, the
> checkpoint times out
> and Kafka sinks can’t commit transactions -> processing stalls and Kafka
> lag grows.
>
> Your graph shows everything chained into a single task with parallelism 1
> where any hiccup anywhere blocks everything, including barriers.
> For this issue one can use disable_chaining() to separate operators and
> increasing parallelism for better throughput.
> After de-chaining backpressure can be checked on Flink UI which tells
> which operator is slow or not progressing.
>
> Hope this helps!
>
> BR,
> G
>
>
> On Fri, Sep 19, 2025 at 2:11 PM Nikola Milutinovic <
> [email protected]> wrote:
>
>> Hi group.
>>
>> I’d like some advice on how to debug a problem we have encountered. We
>> have a session cluster deployed on AKS, using Kubernetes Operator. Our jobs
>> have checkpointing enabled and most of them work fine. However, one job is
>> having problems checkpointing.
>>
>> The job is relatively simple: Kafka Source -> Filter -> ProcessFunction
>> -> Filter (with some side-channel output) -> Kafka Sink. This is a PyFlink
>> job, if it is relevant.
>>
>> The job starts and runs, but after several successful checkpoints it
>> fails to create a checkpoint. Then it succeeds, then fails again and after
>> some oscillating, it fails all of them. The job is behaving erratically,
>> after some time it stops processing the messages. The Kafka source topic
>> is showing a big lag (150k messages), so it stops processing after a while.
>> The exceptions we get are uninformative:
>>
>> 2025-09-19 11:49:17,084 WARN org.apache.flink.runtime.checkpoint.
>> CheckpointFailureManager [] - Failed to trigger or complete checkpoint 18
>> for job 6ef604e79abcdd65eb65d6cb3fa20d6a. (0 consecutive failed attempts
>> so far)
>> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
>> before completing.
>>
>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>> tolerable failure threshold. The latest checkpoint failed due to
>> Checkpoint expired before completing., view the Checkpoint History tab or the
>> Job Manager log to find out why continuous checkpoints failed.
>>
>> 2025-09-19 11:49:17,088 INFO org.apache.flink.runtime.checkpoint.
>> CheckpointFailureManager [] - Failed to trigger checkpoint for job
>> 6ef604e79abcdd65eb65d6cb3fa20d6a since Checkpoint triggering task Source:
>> Kafka source -> Input mapper, Filter incoming, Process entity request,
>> PROCESS, PROCESS -> (Kafka Sink: Writer -> Kafka Sink: Committer, Get side
>> error output -> Error topic: Writer -> Error topic: Committer, Get side
>> IMS config output, Process IMS config creation, PROCESS -> IMS config
>> topic: Writer -> IMS config topic: Committer) (1/1) of job 
>> 6ef604e79abcdd65eb65d6cb3fa20d6a
>> is not being executed at the moment. Aborting checkpoint. Failure reason:
>> Not all required tasks are currently running..
>>
>> Our checkpoints are minuscule, less than 5kB, and when they do succeed
>> time is 3 seconds, max. Basically, we do not have any state. The storage
>> used is Azure Blob, since we are running in Azure AKS (any recommendations
>> there?). What we do observe on Azure Blob is that the old checkpoints have
>> been deleted (as desired), but new ones were not created. Our
>> checkpoint-related settings are:
>>
>> jobmanager.memory.heap.size             6522981568b
>>
>> jobmanager.memory.jvm-metaspace.size    1073741824b
>>
>> jobmanager.memory.jvm-overhead.max      858993472b
>>
>> jobmanager.memory.jvm-overhead.min      858993472b
>>
>> jobmanager.memory.off-heap.size         134217728b
>>
>> jobmanager.memory.process.size          8 gb
>>
>> execution.checkpointing.dir             wasbs://
>> [email protected]/flink-cluster-ims-flink/flink-checkpoints
>>
>> execution.checkpointing.externalized-checkpoint-retention
>> DELETE_ON_CANCELLATION
>>
>> execution.checkpointing.incremental     true
>>
>> execution.checkpointing.interval        60000
>>
>> execution.checkpointing.min-pause       5000
>>
>> execution.checkpointing.mode            EXACTLY_ONCE
>>
>> execution.checkpointing.num-retained    3
>>
>> execution.checkpointing.savepoint-dir   wasbs://
>> [email protected]/flink-cluster-ims-flink/flink-savepoints
>>
>> execution.checkpointing.storage         jobmanager
>>
>> execution.checkpointing.timeout         300000
>>
>> Nix
>>
>

Reply via email to