[ https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363625#comment-14363625 ]
Joseph Holsten edited comment on KAFKA-2019 at 3/16/15 6:09 PM: ---------------------------------------------------------------- [~becket_qin] could you provide an example of the worst case unbalance for this code? I'm having trouble seeing how the hash-ordered round robit can have a worst case more than number-of-consumer-threads. Here's my reasoning: With the current consumer-ordered sort, the threadAssignor will iterate over the sequence: - c0:t0 - c0:t1 - c0:t2 - c0:t3 - c1:t0 - c1:t1 - c1:t2 - c1:t3 With the hash-ordered sort, the sequence could have a best case of: - c0:t0 - c1:t0 - c0:t1 - c1:t1 - c0:t2 - c1:t2 - c0:t3 - c1:t3 and a worst case identical to the consumer-ordered sort. For a partition count of {{(n+1)*m}}, consumer-ordered will produce a worst-case ordering every time, with {{c0}} always ending up with {{2m}} partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will produce this sort of worst case: Likelyhood that {{c0}} will have all m threads selected: {code} C(n*m,m) / P(n*m,n*n) = ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood of any consumer {{c0..cn}} will have all its threads selected: {code} n * C(n*m,m) / P(n*m,n*n) = n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered with {{n=2}}, {{m=2}}: {code} 2 * C(2*2,2) / P(2*2,2*2) = 2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) = 2 * (6) / (24) = 0.5 {code} And with {{n=2}}, {{m=4}}: {code} 2 * C(2*4,4) / P(2*4,2*2) = 2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) = 2 * 70 / 40320 = ~ 0.00347 {code} I totally agree to the point about backward incompatibility, it would not be pleasant to have different servers using different thread sequences. So this code can't go in as a modification to the existing RoundRobinAssignor, perhaps as a new assignor? was (Author: josephholsten): [~becket_qin] could you provide an example of the worst case unbalance for this code? I'm having trouble seeing how the hash-ordered round robit can have a worst case more than number-of-consumer-threads. Here's my reasoning: With the current consumer-ordered sort, the threadAssignor will iterate over the sequence: - c0:t0 - c0:t1 - c0:t2 - c0:t3 - c1:t0 - c1:t1 - c1:t2 - c1:t3 With the hash-ordered sort, the sequence could have a best case of: - c0:t0 - c1:t0 - c0:t1 - c1:t1 - c0:t2 - c1:t2 - c0:t3 - c1:t3 and a worst case identical to the consumer-ordered sort. For a partition count of {{(n+1)*m}}, consumer-ordered will produce a worst-case ordering every time, with {{c0}} always ending up with {{2m}} partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will produce this sort of worst case: Likelyhood that {{c0}} will have all m threads selected: {code} C(n*m,m) / P(n*m,n*n) = ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood of any consumer {{c0..cn} will have all its threads selected: {code} n * C(n*m,m) / P(n*m,n*n) = n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered with {{n=2}}, {{m=2}}: {code} 2 * C(2*2,2) / P(2*2,2*2) = 2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) = 2 * (6) / (24) = 0.5 {code} And with {{n=2}}, {{m=4}}: {code} 2 * C(2*4,4) / P(2*4,2*2) = 2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) = 2 * 70 / 40320 = ~ 0.00347 {code} I totally agree to the point about backward incompatibility, it would not be pleasant to have different servers using different thread sequences. So this code can't go in as a modification to the existing RoundRobinAssignor, perhaps as a new assignor? > RoundRobinAssignor clusters by consumer > --------------------------------------- > > Key: KAFKA-2019 > URL: https://issues.apache.org/jira/browse/KAFKA-2019 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: Joseph Holsten > Assignee: Neha Narkhede > Priority: Minor > Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, > KAFKA-2019.patch > > > When rolling out a change today, I noticed that some of my consumers are > "greedy", taking far more partitions than others. > The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds > sorted by toString, which is {{ "%s-%d".format(consumer, threadId)}}. This > causes each consumer's threads to be adjacent to each other. > One possible fix would be to define ConsumerThreadId.hashCode, and sort by > that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)