[ 
https://issues.apache.org/jira/browse/FLINK-21695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-21695:
-----------------------------------
    Labels: stale-major  (was: )

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Increase default value for number of KeyGroups
> ----------------------------------------------
>
>                 Key: FLINK-21695
>                 URL: https://issues.apache.org/jira/browse/FLINK-21695
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / State Backends
>            Reporter: Stephan Ewen
>            Priority: Major
>              Labels: stale-major
>
> The current calculation for the number of Key Groups (max parallelism) leads 
> in many cases to data skew and to confusion among users.
> Specifically, the fact that for maxParallelisms above 128, the default value 
> is set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently, 
> half of the tasks get one keygroup and the other half gets two keygroups, 
> which is very skewed.
> See section (1) in this "lessons learned" blog post. 
> https://engineering.contentsquare.com/2021/ten-flink-gotchas/
> We can fix this by
>   - either setting a default maxParallelism to something pretty high (2048 
> for example). The cost is that we add the default key group overhead per 
> state entry from one byte to two bytes.
>   - or we stay with some similar logic, but we instead of {{1.5 x 
> operatorParallelism}} we go with some higher multiplier, like {{4 x 
> operatorParallelism}}. The price is again that we more quickly reach the 
> point where we have two bytes of keygroup encoding overhead, instead of one.
> Implementation wise, there is an unfortunate situation that the 
> maxParallelism, if not configured, is not stored anywhere in the job graph, 
> but re-derived on the JobManager each time it loads a JobGraph vertex 
> (ExecutionJobVertex) which does not have a MaxParallelism configured. This 
> relies on the implicit contract that this logic never changes.
> Changing this logic will instantly break all jobs which have not explicitly 
> configured the Max Parallelism. That seems like a pretty heavy design 
> shortcoming, unfortunately :-(
> A way to partially work around that is by moving the logic that derives the 
> maximum parallelism to the {{StreamGraphGenerator}}, so we never create 
> JobGraphs where vertices have no configured Max Parallelism (and we keep the 
> re-derivation logic for backwards compatibility for persisted JobGraphs).
> The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to 
> give existing un-configured applications a way to keep restoring from old 
> savepoints. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to