Piotr Nowojski created FLINK-25275:
--------------------------------------
Summary: Weighted KeyGroup assignment
Key: FLINK-25275
URL: https://issues.apache.org/jira/browse/FLINK-25275
Project: Flink
Issue Type: New Feature
Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Piotr Nowojski
Currently key groups are split into key group ranges naively in the simplest
way. Key groups are split into equally sized continuous ranges (number of
ranges = parallelism = number of keygroups / size of single keygroup). Flink
could avoid data skew between key groups, by assigning them to tasks based on
their "weight". "Weight" could be defined as frequency of an access for the
given key group.
Arbitrary, non-continuous, key group assignment (for example TM1 is processing
kg1 and kg3 while TM2 is processing only kg2) would require extensive changes
to the state backends for example. However the data skew could be mitigated to
some extent by creating key group ranges in a more clever way, while keeping
the key group range continuity. For example TM1 processes range [kg1, kg9],
while TM2 just [kg10, kg11].
[This branch shows a PoC of such
approach.|https://github.com/pnowojski/flink/commits/antiskew]
--
This message was sent by Atlassian Jira
(v8.20.1#820001)