I'm going to follow the guidelines you mentioned. On the other hand, do you know if there is any real performance comparison between Flink SQL and Flink? By using 100% Flink SQL, I think we gain a lot of ease of use, but we might be losing performance. Additionally, there are features available in Flink that are not supported in Flink SQL, which prevents us from using certain functionalities.
I want to share part of the code to show you what kind of code I'm talking about: As I mentioned we have about 5 kafka topics,1000s record per seconds and one with 200k-300k events per second. I normalize them and union to write in a single topic after join with a small table to enrich: topic1: SELECT CAST(UNIX_TIMESTAMP(end_time, 'yyyyMMdd HH:mm:ss.SSS') AS BIGINT) * 1000 AS eventTimestamp, CAST(SUBSTRING(phone_number, 3) AS INT) AS phone_number, imsi_phone as imsi_phone, 'probe1' AS origin, CASE WHEN Cell LIKE 'example_text%' THEN CONCAT('22222', LPAD(CAST(REGEXP_EXTRACT(Cell, '(\d+)-(\d+)', 1) AS STRING), 7, '0'), LPAD(CAST(REGEXP_EXTRACT(Cell, '(\d+)-(\d+)', 2) AS STRING), 3, '0')) ELSE CONCAT('33333', LPAD(CAST(REGEXP_EXTRACT(Cell, '(\d+)-(\d+)', 1) AS STRING), 5, '0'), LPAD(CAST(REGEXP_EXTRACT(Cell, '(\d+)-(\d+)', 2) AS STRING), 5, '0')) END AS unique_key FROM probe_example WHERE phone_number IS NOT NULL AND CHAR_LENGTH(phone_number) >= 10 AND LEFT(phone_number, 2) = '11' AND IS_DIGIT(phone_number) AND (LEFT(imsi_phone, 5) = '123123' OR LEFT(imsi_phone, 5) = '543535') topicN: ... very similar Union topic1 until topicN Join with upsertKafka Write to topic. After that I union topics and join with a upsert-kafka topic to enrich. It's a pity that I can't broadcast it or do a lookup join with this topic (it isnt' supported with upsert-kafka) because I guess that it produces shuffle to do it. El vie, 21 feb 2025 a las 6:31, Xuyang (<xyzhong...@163.com>) escribió: > Hi, > > There can be various reasons for performance issues, such as: > > 1. Bottlenecks in the Kafka connector when reading data > > 2. Time-consuming filter conditions > > 3. Bottlenecks in the Kafka connector when writing data > > To further diagnose the problem, you can try: > > 1. Set the Flink configuration `pipeline.operator-chaining.enabled` to > false. This will allow you to see the individual vertexes (source -> calc > -> sink) on the Flink UI. Check the entire topology to see if any vertex > are experiencing backpressure. > > 2. If there is no backpressure, it is likely that the bottleneck lies in > data reading. Otherwise, the operator following the last one experiencing > backpressure is likely the bottleneck (for example, if the source is under > backpressure, the calc operator could be the bottleneck, indicating that > the filtering logic is CPU-intensive). > > 3. > > ``` > > and the process has been running for several days without failures, > although with *high CPU and memory usage*. > > ``` > > If CPU utilization is high, you can also further analyze the CPU flame > graph to see what tasks are being processed by that node and share it here. > > > Hope this helpful! > > > -- > Best! > Xuyang > > > At 2025-02-20 22:03:19, "Guillermo Ortiz Fernández" < > guillermo.ortiz.f...@gmail.com> wrote: > > I have been configuring the Flink cluster (application mode) to process > the Kafka data volume. The current configuration consists of 16 pods, each > with *12GB of memory and 2 CPUs*. Each TaskManager has *4 slots*. > > All processing is done using *Flink SQL*, and since Kafka topics may > contain out-of-order data, a *5-minute watermark* is applied. In previous > configurations, we allocated between *4-8GB per pod*, but this caused > memory errors that led to process crashes. > > Currently, this process is a *migration from Kafka Streams*, which is > running in parallel, consuming exactly the same data on the same > infrastructure, but using *significantly fewer resources than Flink*. > Approximately, Flink is consuming *10x more memory* and *8x more CPU* > than Kafka Streams. > > I am unsure whether this is due to Flink inherently consuming more > resources, whether using SQL results in poorly optimized auto-generated > code, or if there is a configuration issue. The *metrics do not show any > backpressure or errors*, and the process has been running for several > days without failures, although with *high CPU and memory usage*. > > Any ideas on how to diagnose or optimize this to reduce resource > consumption? > > El jue, 30 ene 2025 a las 10:02, Pedro Mázala (<pedroh.maz...@gmail.com>) > escribió: > >> > The average output rate needed to avoid lag after filtering messages >> should be around 60K messages per second. I’ve been testing different >> configurations of parallelism, slots and pods (everything runs on >> Kubernetes), but I’m far from achieving those numbers. >> >> How are you partitioning your query? Do you see any backpressure >> happening on the Flink UI? >> >> > In the latest configuration, I used 20 pods, a parallelism of 120, >> with 4 slots per taskmanager. >> >> Were all tasks working properly or you had idleness? >> >> > Additionally, setting parallelism to 120 creates hundreds of subtasks >> for the smaller topics, which don’t do much but still consume minimal >> resources even if idle. >> >> On the table API I'm not sure if you can choose parallelism per "task" >> DataStream I'm sure you can do it. >> >> >> >> Att, >> Pedro Mázala >> +31 (06) 3819 3814 >> Be awesome >> >> >> On Wed, 29 Jan 2025 at 22:06, Guillermo Ortiz Fernández < >> guillermo.ortiz.f...@gmail.com> wrote: >> >>> After last checking it uses about 200-400 millicores each pod and 2.2Gb. >>> >>> El mié, 29 ene 2025 a las 21:41, Guillermo Ortiz Fernández (< >>> guillermo.ortiz.f...@gmail.com>) escribió: >>> >>>> I have a job entirely written in Flink SQL. The first part of the >>>> program processes 10 input topics and generates one output topic with >>>> normalized messages and some filtering applied (really easy, some where by >>>> fields and substring). Nine of the topics produce between hundreds and >>>> thousands of messages per second, with an average of 4–10 partitions each. >>>> The other topic produces 150K messages per second and has 500 partitions. >>>> They are unioned to the output topic. >>>> >>>> The average output rate needed to avoid lag after filtering messages >>>> should be around 60K messages per second. I’ve been testing different >>>> configurations of parallelism, slots and pods (everything runs on >>>> Kubernetes), but I’m far from achieving those numbers. >>>> >>>> In the latest configuration, I used 20 pods, a parallelism of 120, with >>>> 4 slots per taskmanager. With this setup, I achieve approximately 20K >>>> messages per second, but I’m unable to consume the largest topic at the >>>> rate messages are being produced. Additionally, setting parallelism to 120 >>>> creates hundreds of subtasks for the smaller topics, which don’t do much >>>> but still consume minimal resources even if idle. >>>> >>>> I started trying with parallelism 12 and got about 1000-3000 messages >>>> per second. >>>> >>>> When I check the use of cpu and memory to the pods and don't see any >>>> problem and they are far from the limit, each taskmanager has 4gb and >>>> 2cpus and they are never close to using the cpu. >>>> >>>> It’s a requirement to do everything 100% in SQL. How can I improve the >>>> throughput rate? Should I be concerned about the hundreds of subtasks >>>> created for the smaller topics? >>>> >>>