[ 
https://issues.apache.org/jira/browse/KAFKA-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet updated KAFKA-7410:
------------------------------------
    Description: 
AdminUtils creates a bad partition assignment when the number of brokers on 
each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 brokers 
rack C. Under such a scenario, a single broker from rack C may allocated more 
frequently than expected.

kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a list 
of brokers alternating by rack, however once it runs out of brokers on the 
racks with fewer brokers, it ends up generating a run of brokers from the same 
rack together as rackIterator.hasNext will return false for the other racks.
{code:java}
while (result.size < brokerRackMap.size) {
  val rackIterator = brokersIteratorByRack(racks(rackIndex))
  if (rackIterator.hasNext)
    result += rackIterator.next()
  rackIndex = (rackIndex + 1) % racks.length
}{code}
Once assignReplicasToBrokersRackAware encounters the run of brokers from the 
same rack, and it attempts to maintain the rack invariant, it will skip all of 
the C brokers until it wraps around to the first broker in the alternated list 
and choose that broker if it is from a different rack. Note the code below that 
skips over the run when choosing the replicas.
{code:java}
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == 
numBrokers)) {
replicaBuffer += broker
racksWithReplicas += rack
brokersWithReplicas += broker
done = true
}
k += 1
{code}
It repeats this behavior for each of the remaining brokers for C, each time 
choosing the first broker in the alternated list until it has allocated all of 
the partitions.

See the attached sample code demonstrating this behavior.

  was:
AdminUtils creates a bad partition assignment when the number of brokers on 
each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 brokers 
rack C. Under such a scenario, a single broker from rack C may allocated more 
frequently than expected.

kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a list 
of brokers alternating by rack, however once it runs out of brokers on the 
racks with fewer brokers, it ends up generating a run of brokers from the same 
rack together as rackIterator.hasNext will return false for the other racks.
{code:java}
while (result.size < brokerRackMap.size) {
  val rackIterator = brokersIteratorByRack(racks(rackIndex))
  if (rackIterator.hasNext)
    result += rackIterator.next()
  rackIndex = (rackIndex + 1) % racks.length
}{code}
Once assignReplicasToBrokersRackAware encounters the run of brokers from the 
same rack, and it attempts to maintain the rack invariant, it will skip all of 
the C brokers until it wraps around to the first broker in the alternated list 
and choose that broker if it is from a different rack. Note the code below that 
skips over the run when choosing the replicas.
{code:java}
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == 
numBrokers)) {
replicaBuffer += broker
racksWithReplicas += rack
brokersWithReplicas += broker
done = true
}
k += 1
{code}
It repeats this behavior for each of the remaining brokers for C, each time 
choosing the first broker in the alternated list until it has allocated all of 
the partitions.

See the attached sample code for more details.


> Rack aware partition assignment creates highly unbalanced broker assignments 
> on unbalanced racks
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7410
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7410
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin
>    Affects Versions: 1.1.1
>            Reporter: Lucas Bradstreet
>            Priority: Major
>         Attachments: AdminUtilsTest.scala
>
>
> AdminUtils creates a bad partition assignment when the number of brokers on 
> each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 
> brokers rack C. Under such a scenario, a single broker from rack C may 
> allocated more frequently than expected.
> kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a 
> list of brokers alternating by rack, however once it runs out of brokers on 
> the racks with fewer brokers, it ends up generating a run of brokers from the 
> same rack together as rackIterator.hasNext will return false for the other 
> racks.
> {code:java}
> while (result.size < brokerRackMap.size) {
>   val rackIterator = brokersIteratorByRack(racks(rackIndex))
>   if (rackIterator.hasNext)
>     result += rackIterator.next()
>   rackIndex = (rackIndex + 1) % racks.length
> }{code}
> Once assignReplicasToBrokersRackAware encounters the run of brokers from the 
> same rack, and it attempts to maintain the rack invariant, it will skip all 
> of the C brokers until it wraps around to the first broker in the alternated 
> list and choose that broker if it is from a different rack. Note the code 
> below that skips over the run when choosing the replicas.
> {code:java}
> if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
> && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == 
> numBrokers)) {
> replicaBuffer += broker
> racksWithReplicas += rack
> brokersWithReplicas += broker
> done = true
> }
> k += 1
> {code}
> It repeats this behavior for each of the remaining brokers for C, each time 
> choosing the first broker in the alternated list until it has allocated all 
> of the partitions.
> See the attached sample code demonstrating this behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to