[ https://issues.apache.org/jira/browse/FLINK-21548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Iaroslav Zeigerman updated FLINK-21548: --------------------------------------- Summary: keyBy operation produces skewed record distribution for low-cardinality keys (was: keyBy operation produces skewed record distribution with low-cardinality keys) > keyBy operation produces skewed record distribution for low-cardinality keys > ---------------------------------------------------------------------------- > > Key: FLINK-21548 > URL: https://issues.apache.org/jira/browse/FLINK-21548 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Coordination, Runtime / Task > Affects Versions: 1.11.0, 1.12.1 > Reporter: Iaroslav Zeigerman > Priority: Major > Attachments: Screen Shot 2021-03-01 at 10.52.31 AM.png, Screen Shot > 2021-03-01 at 10.54.42 AM.png, Screen Shot 2021-03-01 at 10.57.33 AM.png > > > When the cardinality of keys matches the existing parallelism not all > downstream tasks are utilized in the downstream operator. Even those that are > utilized are not utilized evenly. > For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out > of 500) will receive any records at all. > This behavior can easily be reproduced with the following test case: > {code:scala} > import org.apache.flink.runtime.state.KeyGroupRangeAssignment > import scala.util.Random > object Test { > val parallelism = 500 > val recordsNum = 1000000 > def run(): Unit = { > val recordIds = (0 to recordsNum).map(_ % parallelism) > val tasks = recordIds.map(selectTask) > println(s"Total unique keys: ${recordIds.toSet.size}") > println(s"Key distribution: > ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}") > println("=======================") > println(s"Tasks involved: ${tasks.toSet.size}") > println(s"Record distribution by task: > ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}") > } > def selectTask(key: Int): Int = > KeyGroupRangeAssignment.assignToKeyGroup( > key, > parallelism > ) > } > {code} > Which produces the following results: > {noformat} > Total unique keys: 500 > Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000)) > ======================= > Tasks involved: 313 > Record distribution by task: Vector((147,10000), (248,10000), ..., > (232,2000), (100,2000)) > {noformat} > Record distribution visualized: > !Screen Shot 2021-03-01 at 10.52.31 AM.png! > I have determined that in order to achieve the utilization of all tasks the > number of unique keys should be at least 5 times of the parallelism value. > The relation between number of unique keys and a fraction of utilized tasks > appear to be exponential: > !Screen Shot 2021-03-01 at 10.54.42 AM.png! > But with 5x number of keys the skew is still quite significant: > !Screen Shot 2021-03-01 at 10.57.33 AM.png! > Given that keys used in my test are integer values for which `hashCode` > returns the value itself I tend to believe that the skew is caused by the > Flink's murmur hash implementation which is used > [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76]. -- This message was sent by Atlassian Jira (v8.3.4#803005)