[ 
https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363625#comment-14363625
 ] 

Joseph Holsten edited comment on KAFKA-2019 at 3/16/15 6:09 PM:
----------------------------------------------------------------

[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*n) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn}} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*n) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*2) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?


was (Author: josephholsten):
[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*n) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*n) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*2) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?

> RoundRobinAssignor clusters by consumer
> ---------------------------------------
>
>                 Key: KAFKA-2019
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2019
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Joseph Holsten
>            Assignee: Neha Narkhede
>            Priority: Minor
>         Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, 
> KAFKA-2019.patch
>
>
> When rolling out a change today, I noticed that some of my consumers are 
> "greedy", taking far more partitions than others.
> The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds 
> sorted by toString, which is {{ "%s-%d".format(consumer, threadId)}}. This 
> causes each consumer's threads to be adjacent to each other.
> One possible fix would be to define ConsumerThreadId.hashCode, and sort by 
> that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to