Hi, 

There can be various reasons for performance issues, such as:

1. Bottlenecks in the Kafka connector when reading data

2. Time-consuming filter conditions

3. Bottlenecks in the Kafka connector when writing data

To further diagnose the problem, you can try:

1. Set the Flink configuration `pipeline.operator-chaining.enabled` to false. 
This will allow you to see the individual vertexes (source -> calc -> sink) on 
the Flink UI. Check the entire topology to see if any vertex are experiencing 
backpressure.

2. If there is no backpressure, it is likely that the bottleneck lies in data 
reading. Otherwise, the operator following the last one experiencing 
backpressure is likely the bottleneck (for example, if the source is under 
backpressure, the calc operator could be the bottleneck, indicating that the 
filtering logic is CPU-intensive).

3. 

```

and the process has been running for several days without failures, although 
with high CPU and memory usage.

```

If CPU utilization is high, you can also further analyze the CPU flame graph to 
see what tasks are being processed by that node and share it here.




Hope this helpful!




--

    Best!
    Xuyang




At 2025-02-20 22:03:19, "Guillermo Ortiz Fernández" 
<guillermo.ortiz.f...@gmail.com> wrote:

I have been configuring the Flink cluster (application mode) to process the 
Kafka data volume. The current configuration consists of 16 pods, each with 
12GB of memory and 2 CPUs. Each TaskManager has 4 slots.

All processing is done using Flink SQL, and since Kafka topics may contain 
out-of-order data, a 5-minute watermark is applied. In previous configurations, 
we allocated between 4-8GB per pod, but this caused memory errors that led to 
process crashes.

Currently, this process is a migration from Kafka Streams, which is running in 
parallel, consuming exactly the same data on the same infrastructure, but using 
significantly fewer resources than Flink. Approximately, Flink is consuming 10x 
more memory and 8x more CPU than Kafka Streams.

I am unsure whether this is due to Flink inherently consuming more resources, 
whether using SQL results in poorly optimized auto-generated code, or if there 
is a configuration issue. The metrics do not show any backpressure or errors, 
and the process has been running for several days without failures, although 
with high CPU and memory usage.

Any ideas on how to diagnose or optimize this to reduce resource consumption?



El jue, 30 ene 2025 a las 10:02, Pedro Mázala (<pedroh.maz...@gmail.com>) 
escribió:

> The average output rate needed to avoid lag after filtering messages should 
> be around 60K messages per second. I’ve been testing different configurations 
> of parallelism, slots and pods (everything runs on Kubernetes), but I’m far 
> from achieving those numbers.

How are you partitioning your query? Do you see any backpressure happening on 
the Flink UI?


> In the latest configuration, I used 20 pods, a parallelism of 120, with 4 
> slots per taskmanager.


Were all tasks working properly or you had idleness?


> Additionally, setting parallelism to 120 creates hundreds of subtasks for the 
> smaller topics, which don’t do much but still consume minimal resources even 
> if idle.


On the table API I'm not sure if you can choose parallelism per "task" 
DataStream I'm sure you can do it. 






Att,
Pedro Mázala
+31 (06) 3819 3814
Be awesome




On Wed, 29 Jan 2025 at 22:06, Guillermo Ortiz Fernández 
<guillermo.ortiz.f...@gmail.com> wrote:

After last checking it uses about 200-400 millicores each pod and 2.2Gb.


El mié, 29 ene 2025 a las 21:41, Guillermo Ortiz Fernández 
(<guillermo.ortiz.f...@gmail.com>) escribió:


I have a job entirely written in Flink SQL. The first part of the program 
processes 10 input topics and generates one output topic with normalized 
messages and some filtering applied (really easy, some where by fields and 
substring). Nine of the topics produce between hundreds and thousands of 
messages per second, with an average of 4–10 partitions each. The other topic 
produces 150K messages per second and has 500 partitions. They are unioned to 
the output topic.

The average output rate needed to avoid lag after filtering messages should be 
around 60K messages per second. I’ve been testing different configurations of 
parallelism, slots and pods (everything runs on Kubernetes), but I’m far from 
achieving those numbers.

In the latest configuration, I used 20 pods, a parallelism of 120, with 4 slots 
per taskmanager. With this setup, I achieve approximately 20K messages per 
second, but I’m unable to consume the largest topic at the rate messages are 
being produced. Additionally, setting parallelism to 120 creates hundreds of 
subtasks for the smaller topics, which don’t do much but still consume minimal 
resources even if idle.

I started trying with parallelism 12 and got about 1000-3000 messages per 
second.

When I check the use of cpu and memory to the pods and don't see any problem 
and they are far from the limit, each taskmanager has 4gb and 2cpus and they 
are never close to using the cpu.

It’s a requirement to do everything 100% in SQL. How can I improve the 
throughput rate? Should I be concerned about the hundreds of subtasks created 
for the smaller topics?

Reply via email to