Hi Mohammed,

Without diving too much into your business logic a thing which catches my eye 
is the partitiong you are using. In general all
calls to`keyBy`or `rebalance` are very expensive because all the data is 
shuffled across down- stream tasks. Flink tries to
fuse operators with the same keyGroups together that there is no communication 
overhead between them but this is not 
possible if a shuffle is between them
One example would be your cgmStream which first is distributed by a specified 
key and rebalance right after it. 
When applying `keyBy` operation it is important to understand how the key 
distribution in your input data looks like. It may
happen that specific keys occur very very and some others appear with a less 
likelihood this also can cause a skew in your
pipeline which cannot be resolved with a higher parallelism (some tasks are 
overloaded, some are idle).

I also have a couple of followup questions to better understand your setup

- What do you mean with 20k concurrent stream data, 20k records per second?
- How many taskmanagers are you using and how are the slots distributed?
- Can you check the Flink WebUI if some operators are idle and maybe share the 
image of the job graph?
- How did you notice the lag of 2k between the operators?

Best,
Fabian


Reply via email to