[ 
https://issues.apache.org/jira/browse/FLINK-21548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Iaroslav Zeigerman updated FLINK-21548:
---------------------------------------
    Description: 
When the cardinality of keys matches the existing parallelism not all 
downstream tasks are utilized in the downstream operator. Even those that are 
utilized are not utilized evenly.

For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out 
of 500) will receive any records at all. 
*NOTE*: for all examples below 1M record instances were used.
This behavior can easily be reproduced with the following test case:
{code:scala}
import org.apache.flink.runtime.state.KeyGroupRangeAssignment

object Test {

  val parallelism = 500
  val recordsNum  = 1000000

  def run(): Unit = {
    val recordIds = (0 to recordsNum).map(_ % parallelism)
    val tasks     = recordIds.map(selectTask)

    println(s"Total unique keys: ${recordIds.toSet.size}")
    println(s"Key distribution: 
${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
    println("=======================")
    println(s"Tasks involved: ${tasks.toSet.size}")
    println(s"Record distribution by task: 
${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
  }

  def selectTask(key: Int): Int =
    KeyGroupRangeAssignment.assignToKeyGroup(
      key,
      parallelism
    )
}
{code}
Which produces the following results:
{noformat}
Total unique keys: 500
Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000))
=======================
Tasks involved: 313
Record distribution by task: Vector((147,10000), (248,10000), ..., (232,2000), 
(100,2000))
{noformat}
Record distribution visualized:
 !Screen Shot 2021-03-01 at 10.52.31 AM.png!

I have determined that in order to achieve the utilization of all tasks the 
number of unique keys should be at least 5 times of the parallelism value. The 
relation between number of unique keys and a fraction of utilized tasks appears 
to be exponential:
 !Screen Shot 2021-03-01 at 10.54.42 AM.png!  

But with 5x number of keys the skew is still quite significant:
!Screen Shot 2021-03-01 at 10.57.33 AM.png!

Given that keys used in my test are integer values for which `hashCode` returns 
the value itself I tend to believe that the skew is caused by the Flink's 
murmur hash implementation which is used 
[here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76].

  was:
When the cardinality of keys matches the existing parallelism not all 
downstream tasks are utilized in the downstream operator. Even those that are 
utilized are not utilized evenly.

For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out 
of 500) will receive any records at all. 
This behavior can easily be reproduced with the following test case:
{code:scala}
import org.apache.flink.runtime.state.KeyGroupRangeAssignment

object Test {

  val parallelism = 500
  val recordsNum  = 1000000

  def run(): Unit = {
    val recordIds = (0 to recordsNum).map(_ % parallelism)
    val tasks     = recordIds.map(selectTask)

    println(s"Total unique keys: ${recordIds.toSet.size}")
    println(s"Key distribution: 
${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
    println("=======================")
    println(s"Tasks involved: ${tasks.toSet.size}")
    println(s"Record distribution by task: 
${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
  }

  def selectTask(key: Int): Int =
    KeyGroupRangeAssignment.assignToKeyGroup(
      key,
      parallelism
    )
}
{code}
Which produces the following results:
{noformat}
Total unique keys: 500
Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000))
=======================
Tasks involved: 313
Record distribution by task: Vector((147,10000), (248,10000), ..., (232,2000), 
(100,2000))
{noformat}
Record distribution visualized:
 !Screen Shot 2021-03-01 at 10.52.31 AM.png!

I have determined that in order to achieve the utilization of all tasks the 
number of unique keys should be at least 5 times of the parallelism value. The 
relation between number of unique keys and a fraction of utilized tasks appears 
to be exponential:
 !Screen Shot 2021-03-01 at 10.54.42 AM.png!  

But with 5x number of keys the skew is still quite significant:
!Screen Shot 2021-03-01 at 10.57.33 AM.png!

Given that keys used in my test are integer values for which `hashCode` returns 
the value itself I tend to believe that the skew is caused by the Flink's 
murmur hash implementation which is used 
[here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76].


> keyBy operation produces skewed record distribution for low-cardinality keys
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-21548
>                 URL: https://issues.apache.org/jira/browse/FLINK-21548
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Runtime / Coordination, Runtime / Task
>    Affects Versions: 1.11.0, 1.12.1
>            Reporter: Iaroslav Zeigerman
>            Priority: Major
>         Attachments: Screen Shot 2021-03-01 at 10.52.31 AM.png, Screen Shot 
> 2021-03-01 at 10.54.42 AM.png, Screen Shot 2021-03-01 at 10.57.33 AM.png
>
>
> When the cardinality of keys matches the existing parallelism not all 
> downstream tasks are utilized in the downstream operator. Even those that are 
> utilized are not utilized evenly.
> For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out 
> of 500) will receive any records at all. 
> *NOTE*: for all examples below 1M record instances were used.
> This behavior can easily be reproduced with the following test case:
> {code:scala}
> import org.apache.flink.runtime.state.KeyGroupRangeAssignment
> object Test {
>   val parallelism = 500
>   val recordsNum  = 1000000
>   def run(): Unit = {
>     val recordIds = (0 to recordsNum).map(_ % parallelism)
>     val tasks     = recordIds.map(selectTask)
>     println(s"Total unique keys: ${recordIds.toSet.size}")
>     println(s"Key distribution: 
> ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
>     println("=======================")
>     println(s"Tasks involved: ${tasks.toSet.size}")
>     println(s"Record distribution by task: 
> ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
>   }
>   def selectTask(key: Int): Int =
>     KeyGroupRangeAssignment.assignToKeyGroup(
>       key,
>       parallelism
>     )
> }
> {code}
> Which produces the following results:
> {noformat}
> Total unique keys: 500
> Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000))
> =======================
> Tasks involved: 313
> Record distribution by task: Vector((147,10000), (248,10000), ..., 
> (232,2000), (100,2000))
> {noformat}
> Record distribution visualized:
>  !Screen Shot 2021-03-01 at 10.52.31 AM.png!
> I have determined that in order to achieve the utilization of all tasks the 
> number of unique keys should be at least 5 times of the parallelism value. 
> The relation between number of unique keys and a fraction of utilized tasks 
> appears to be exponential:
>  !Screen Shot 2021-03-01 at 10.54.42 AM.png!  
> But with 5x number of keys the skew is still quite significant:
> !Screen Shot 2021-03-01 at 10.57.33 AM.png!
> Given that keys used in my test are integer values for which `hashCode` 
> returns the value itself I tend to believe that the skew is caused by the 
> Flink's murmur hash implementation which is used 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to