Piotr Nowojski created FLINK-25275: -------------------------------------- Summary: Weighted KeyGroup assignment Key: FLINK-25275 URL: https://issues.apache.org/jira/browse/FLINK-25275 Project: Flink Issue Type: New Feature Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Piotr Nowojski
Currently key groups are split into key group ranges naively in the simplest way. Key groups are split into equally sized continuous ranges (number of ranges = parallelism = number of keygroups / size of single keygroup). Flink could avoid data skew between key groups, by assigning them to tasks based on their "weight". "Weight" could be defined as frequency of an access for the given key group. Arbitrary, non-continuous, key group assignment (for example TM1 is processing kg1 and kg3 while TM2 is processing only kg2) would require extensive changes to the state backends for example. However the data skew could be mitigated to some extent by creating key group ranges in a more clever way, while keeping the key group range continuity. For example TM1 processes range [kg1, kg9], while TM2 just [kg10, kg11]. [This branch shows a PoC of such approach.|https://github.com/pnowojski/flink/commits/antiskew] -- This message was sent by Atlassian Jira (v8.20.1#820001)