Piotr Nowojski created FLINK-25275:
--------------------------------------

             Summary: Weighted KeyGroup assignment
                 Key: FLINK-25275
                 URL: https://issues.apache.org/jira/browse/FLINK-25275
             Project: Flink
          Issue Type: New Feature
          Components: Runtime / Network
    Affects Versions: 1.14.0
            Reporter: Piotr Nowojski


Currently key groups are split into key group ranges naively in the simplest 
way. Key groups are split into equally sized continuous ranges (number of 
ranges = parallelism = number of keygroups / size of single keygroup). Flink 
could avoid data skew between key groups, by assigning them to tasks based on 
their "weight". "Weight" could be defined as frequency of an access for the 
given key group. 

Arbitrary, non-continuous, key group assignment (for example TM1 is processing 
kg1 and kg3 while TM2 is processing only kg2) would require extensive changes 
to the state backends for example. However the data skew could be mitigated to 
some extent by creating key group ranges in a more clever way, while keeping 
the key group range continuity. For example TM1 processes range [kg1, kg9], 
while TM2 just [kg10, kg11].

[This branch shows a PoC of such 
approach.|https://github.com/pnowojski/flink/commits/antiskew]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to