[ https://issues.apache.org/jira/browse/FLINK-21548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Iaroslav Zeigerman updated FLINK-21548: --------------------------------------- Description: When the cardinality of keys matches the existing parallelism not all downstream tasks are utilized in the downstream operator. Even those that are utilized are not utilized evenly. For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out of 500) will receive any records at all. *NOTE*: for all examples below 1M record instances were used. This behavior can easily be reproduced with the following test case: {code:scala} import org.apache.flink.runtime.state.KeyGroupRangeAssignment object Test { val parallelism = 500 val recordsNum = 1000000 def run(): Unit = { val recordIds = (0 to recordsNum).map(_ % parallelism) val tasks = recordIds.map(selectTask) println(s"Total unique keys: ${recordIds.toSet.size}") println(s"Key distribution: ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}") println("=======================") println(s"Tasks involved: ${tasks.toSet.size}") println(s"Record distribution by task: ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}") } def selectTask(key: Int): Int = KeyGroupRangeAssignment.assignToKeyGroup( key, parallelism ) } {code} Which produces the following results: {noformat} Total unique keys: 500 Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000)) ======================= Tasks involved: 313 Record distribution by task: Vector((147,10000), (248,10000), ..., (232,2000), (100,2000)) {noformat} Record distribution visualized: !Screen Shot 2021-03-01 at 10.52.31 AM.png! I have determined that in order to achieve the utilization of all tasks the number of unique keys should be at least 5 times of the parallelism value. The relation between number of unique keys and a fraction of utilized tasks appears to be exponential: !Screen Shot 2021-03-01 at 10.54.42 AM.png! But with 5x number of keys the skew is still quite significant: !Screen Shot 2021-03-01 at 10.57.33 AM.png! Given that keys used in my test are integer values for which `hashCode` returns the value itself I tend to believe that the skew is caused by the Flink's murmur hash implementation which is used [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76]. was: When the cardinality of keys matches the existing parallelism not all downstream tasks are utilized in the downstream operator. Even those that are utilized are not utilized evenly. For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out of 500) will receive any records at all. This behavior can easily be reproduced with the following test case: {code:scala} import org.apache.flink.runtime.state.KeyGroupRangeAssignment object Test { val parallelism = 500 val recordsNum = 1000000 def run(): Unit = { val recordIds = (0 to recordsNum).map(_ % parallelism) val tasks = recordIds.map(selectTask) println(s"Total unique keys: ${recordIds.toSet.size}") println(s"Key distribution: ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}") println("=======================") println(s"Tasks involved: ${tasks.toSet.size}") println(s"Record distribution by task: ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}") } def selectTask(key: Int): Int = KeyGroupRangeAssignment.assignToKeyGroup( key, parallelism ) } {code} Which produces the following results: {noformat} Total unique keys: 500 Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000)) ======================= Tasks involved: 313 Record distribution by task: Vector((147,10000), (248,10000), ..., (232,2000), (100,2000)) {noformat} Record distribution visualized: !Screen Shot 2021-03-01 at 10.52.31 AM.png! I have determined that in order to achieve the utilization of all tasks the number of unique keys should be at least 5 times of the parallelism value. The relation between number of unique keys and a fraction of utilized tasks appears to be exponential: !Screen Shot 2021-03-01 at 10.54.42 AM.png! But with 5x number of keys the skew is still quite significant: !Screen Shot 2021-03-01 at 10.57.33 AM.png! Given that keys used in my test are integer values for which `hashCode` returns the value itself I tend to believe that the skew is caused by the Flink's murmur hash implementation which is used [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76]. > keyBy operation produces skewed record distribution for low-cardinality keys > ---------------------------------------------------------------------------- > > Key: FLINK-21548 > URL: https://issues.apache.org/jira/browse/FLINK-21548 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Coordination, Runtime / Task > Affects Versions: 1.11.0, 1.12.1 > Reporter: Iaroslav Zeigerman > Priority: Major > Attachments: Screen Shot 2021-03-01 at 10.52.31 AM.png, Screen Shot > 2021-03-01 at 10.54.42 AM.png, Screen Shot 2021-03-01 at 10.57.33 AM.png > > > When the cardinality of keys matches the existing parallelism not all > downstream tasks are utilized in the downstream operator. Even those that are > utilized are not utilized evenly. > For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out > of 500) will receive any records at all. > *NOTE*: for all examples below 1M record instances were used. > This behavior can easily be reproduced with the following test case: > {code:scala} > import org.apache.flink.runtime.state.KeyGroupRangeAssignment > object Test { > val parallelism = 500 > val recordsNum = 1000000 > def run(): Unit = { > val recordIds = (0 to recordsNum).map(_ % parallelism) > val tasks = recordIds.map(selectTask) > println(s"Total unique keys: ${recordIds.toSet.size}") > println(s"Key distribution: > ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}") > println("=======================") > println(s"Tasks involved: ${tasks.toSet.size}") > println(s"Record distribution by task: > ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}") > } > def selectTask(key: Int): Int = > KeyGroupRangeAssignment.assignToKeyGroup( > key, > parallelism > ) > } > {code} > Which produces the following results: > {noformat} > Total unique keys: 500 > Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000)) > ======================= > Tasks involved: 313 > Record distribution by task: Vector((147,10000), (248,10000), ..., > (232,2000), (100,2000)) > {noformat} > Record distribution visualized: > !Screen Shot 2021-03-01 at 10.52.31 AM.png! > I have determined that in order to achieve the utilization of all tasks the > number of unique keys should be at least 5 times of the parallelism value. > The relation between number of unique keys and a fraction of utilized tasks > appears to be exponential: > !Screen Shot 2021-03-01 at 10.54.42 AM.png! > But with 5x number of keys the skew is still quite significant: > !Screen Shot 2021-03-01 at 10.57.33 AM.png! > Given that keys used in my test are integer values for which `hashCode` > returns the value itself I tend to believe that the skew is caused by the > Flink's murmur hash implementation which is used > [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76]. -- This message was sent by Atlassian Jira (v8.3.4#803005)