[ https://issues.apache.org/jira/browse/KAFKA-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Bradstreet updated KAFKA-7410: ------------------------------------ Description: AdminUtils creates a bad partition assignment when the number of brokers on each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 brokers rack C. Under such a scenario, a single broker from rack C may allocated more frequently than expected. kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a list of brokers alternating by rack, however once it runs out of brokers on the racks with fewer brokers, it ends up generating a run of brokers from the same rack together as rackIterator.hasNext will return false for the other racks. {code:java} while (result.size < brokerRackMap.size) { val rackIterator = brokersIteratorByRack(racks(rackIndex)) if (rackIterator.hasNext) result += rackIterator.next() rackIndex = (rackIndex + 1) % racks.length }{code} Once assignReplicasToBrokersRackAware encounters the run of brokers from the same rack, and it attempts to maintain the rack invariant, it will skip all of the C brokers until it wraps around to the first broker in the alternated list and choose that broker if it is from a different rack. Note the code below that skips over the run when choosing the replicas. {code:java} if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) { replicaBuffer += broker racksWithReplicas += rack brokersWithReplicas += broker done = true } k += 1 {code} It repeats this behavior for each of the remaining brokers for C, each time choosing the first broker in the alternated list until it has allocated all of the partitions. See the attached sample code demonstrating this behavior. was: AdminUtils creates a bad partition assignment when the number of brokers on each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 brokers rack C. Under such a scenario, a single broker from rack C may allocated more frequently than expected. kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a list of brokers alternating by rack, however once it runs out of brokers on the racks with fewer brokers, it ends up generating a run of brokers from the same rack together as rackIterator.hasNext will return false for the other racks. {code:java} while (result.size < brokerRackMap.size) { val rackIterator = brokersIteratorByRack(racks(rackIndex)) if (rackIterator.hasNext) result += rackIterator.next() rackIndex = (rackIndex + 1) % racks.length }{code} Once assignReplicasToBrokersRackAware encounters the run of brokers from the same rack, and it attempts to maintain the rack invariant, it will skip all of the C brokers until it wraps around to the first broker in the alternated list and choose that broker if it is from a different rack. Note the code below that skips over the run when choosing the replicas. {code:java} if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) { replicaBuffer += broker racksWithReplicas += rack brokersWithReplicas += broker done = true } k += 1 {code} It repeats this behavior for each of the remaining brokers for C, each time choosing the first broker in the alternated list until it has allocated all of the partitions. See the attached sample code for more details. > Rack aware partition assignment creates highly unbalanced broker assignments > on unbalanced racks > ------------------------------------------------------------------------------------------------ > > Key: KAFKA-7410 > URL: https://issues.apache.org/jira/browse/KAFKA-7410 > Project: Kafka > Issue Type: Bug > Components: admin > Affects Versions: 1.1.1 > Reporter: Lucas Bradstreet > Priority: Major > Attachments: AdminUtilsTest.scala > > > AdminUtils creates a bad partition assignment when the number of brokers on > each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 > brokers rack C. Under such a scenario, a single broker from rack C may > allocated more frequently than expected. > kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a > list of brokers alternating by rack, however once it runs out of brokers on > the racks with fewer brokers, it ends up generating a run of brokers from the > same rack together as rackIterator.hasNext will return false for the other > racks. > {code:java} > while (result.size < brokerRackMap.size) { > val rackIterator = brokersIteratorByRack(racks(rackIndex)) > if (rackIterator.hasNext) > result += rackIterator.next() > rackIndex = (rackIndex + 1) % racks.length > }{code} > Once assignReplicasToBrokersRackAware encounters the run of brokers from the > same rack, and it attempts to maintain the rack invariant, it will skip all > of the C brokers until it wraps around to the first broker in the alternated > list and choose that broker if it is from a different rack. Note the code > below that skips over the run when choosing the replicas. > {code:java} > if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) > && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == > numBrokers)) { > replicaBuffer += broker > racksWithReplicas += rack > brokersWithReplicas += broker > done = true > } > k += 1 > {code} > It repeats this behavior for each of the remaining brokers for C, each time > choosing the first broker in the alternated list until it has allocated all > of the partitions. > See the attached sample code demonstrating this behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005)