Hi Mohammed, Without diving too much into your business logic a thing which catches my eye is the partitiong you are using. In general all calls to`keyBy`or `rebalance` are very expensive because all the data is shuffled across down- stream tasks. Flink tries to fuse operators with the same keyGroups together that there is no communication overhead between them but this is not possible if a shuffle is between them One example would be your cgmStream which first is distributed by a specified key and rebalance right after it. When applying `keyBy` operation it is important to understand how the key distribution in your input data looks like. It may happen that specific keys occur very very and some others appear with a less likelihood this also can cause a skew in your pipeline which cannot be resolved with a higher parallelism (some tasks are overloaded, some are idle).
I also have a couple of followup questions to better understand your setup - What do you mean with 20k concurrent stream data, 20k records per second? - How many taskmanagers are you using and how are the slots distributed? - Can you check the Flink WebUI if some operators are idle and maybe share the image of the job graph? - How did you notice the lag of 2k between the operators? Best, Fabian