[ https://issues.apache.org/jira/browse/FLINK-21695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-21695: ----------------------------------- Labels: stale-major (was: ) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Increase default value for number of KeyGroups > ---------------------------------------------- > > Key: FLINK-21695 > URL: https://issues.apache.org/jira/browse/FLINK-21695 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends > Reporter: Stephan Ewen > Priority: Major > Labels: stale-major > > The current calculation for the number of Key Groups (max parallelism) leads > in many cases to data skew and to confusion among users. > Specifically, the fact that for maxParallelisms above 128, the default value > is set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently, > half of the tasks get one keygroup and the other half gets two keygroups, > which is very skewed. > See section (1) in this "lessons learned" blog post. > https://engineering.contentsquare.com/2021/ten-flink-gotchas/ > We can fix this by > - either setting a default maxParallelism to something pretty high (2048 > for example). The cost is that we add the default key group overhead per > state entry from one byte to two bytes. > - or we stay with some similar logic, but we instead of {{1.5 x > operatorParallelism}} we go with some higher multiplier, like {{4 x > operatorParallelism}}. The price is again that we more quickly reach the > point where we have two bytes of keygroup encoding overhead, instead of one. > Implementation wise, there is an unfortunate situation that the > maxParallelism, if not configured, is not stored anywhere in the job graph, > but re-derived on the JobManager each time it loads a JobGraph vertex > (ExecutionJobVertex) which does not have a MaxParallelism configured. This > relies on the implicit contract that this logic never changes. > Changing this logic will instantly break all jobs which have not explicitly > configured the Max Parallelism. That seems like a pretty heavy design > shortcoming, unfortunately :-( > A way to partially work around that is by moving the logic that derives the > maximum parallelism to the {{StreamGraphGenerator}}, so we never create > JobGraphs where vertices have no configured Max Parallelism (and we keep the > re-derivation logic for backwards compatibility for persisted JobGraphs). > The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to > give existing un-configured applications a way to keep restoring from old > savepoints. -- This message was sent by Atlassian Jira (v8.3.4#803005)