Could you share your KafkaDeserializationSchema, we might be able to spot some optimization potential. You could also try out enableObjectReuse [1], which avoids copying data between tasks (not sure if you have any non-chained tasks).
If you are on 1.13, you could check out the flamegraph to see where the bottleneck occurs. [2] [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/ [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/ On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi, > > That's a throughput of 700 records/second, which should be well below > theoretical limits of any deserializer (from hundreds thousands up to tens > of millions records/second/per single operator), unless your records are > huge or very complex. > > Long story short, I don't know of a magic bullet to help you solve your > problem. As always you have two options, either optimise/speed up your > code/job, or scale up. > > If you choose the former, think about Flink as just another Java > application. Check metrics and resource usage, and understand what resource > is the problem (cpu? memory? machine is swapping? io?). You might be able > to guess what's your bottleneck (reading from kafka? deserialisation? > something else? Flink itself?) by looking at some of the metrics > (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or > you can also simplify your job to bare minimum and test performance of > independent components. Also you can always attach a code profiler and > simply look at what's happening. First identify what's the source of the > bottleneck and then try to understand what's causing it. > > Best, > Piotrek > > [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also > comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes > in the job graph based on busy/back pressured status and Flamegraph > support) > > wt., 25 maj 2021 o 15:44 B.B. <bijela.vr...@gmail.com> napisaĆ(a): > >> Hi, >> >> I am in the process of optimizing my job which at the moment by our >> thinking is too slow. >> >> We are deploying job in kubernetes with 1 job manager with 1gb ram and 1 >> cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and >> parallelism of two). >> >> The main problem is one kafka source that has 3,8 million events that we >> have to process. >> As a test we made a simple job that connects to kafka using a custom >> implementation of KafkaDeserializationSchema. There we are using >> ObjectMapper that mapps input values eg. >> >> *var event = objectMapper.readValue(consumerRecord.value(), >> MyClass.class);* >> >> This is then validated with hibernate validator and output of this >> source is printed on the console. >> >> The time needed for the job to consume all the events was one and a half >> hours, which seems a bit long. >> Is there a way we can speed up this process? >> >> Is more cpu cores or memory solution? >> Should we switch to avro deserialization schema? >> >> >> >>