Hi,

This is a known issue of Flink. For example key groups can have sizes +/- 1 and 
they are currently randomly distributed across the cluster, so some machines 
will get more keys to handle then the others. If the number of keys is 
relatively small, like 3 keys per key group, the load difference can be quite 
large (some machines may get almost only key groups with size 2 while others 
will get mostly with size of 3, making 50% load difference).

Unfortunately I don’t know about any concrete plans to address it. Maybe Till 
will know something more (I CC’ed him).

Also I don’t think it’s exposed via a metric anywhere.

Piotrek

> On 22 Oct 2019, at 10:00, Flavio Pompermaier <pomperma...@okkam.it> wrote:
> 
> Hi to all,
> I was looking into the Flink example of the Flink training trying to 
> understand why in the ClickEventCount[1]  one task manager was reading twice 
> the speed of the other.
> 
> I had to debug a lot of internal code of Flink to understand that it depends 
> on the adopted hash function (used by Flink to assign keys to taskmanagers) 
> that was assigning 4 keys to a TM and 2 to the other. Is there a smarter way 
> to monitor this thing (e.g. a metric like taskManager_numKeys)?
> 
> I also discovered that one cannot force how to partition keys per taskmanager 
> (i.e. use keyBy after a customPartition). Is there any development effort in 
> this direction?
> 
> Best,
> Flavio
> 
> [1]https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
>  
> <https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java>

Reply via email to