Hello,

I tried to understand the algorithm used in choosing a leader of a topic / 
partition at the time of creation - rack unaware mode.

I went thru TopicCommand.scala and AdminUtils.scala, especially 
assignReplicasToBrokersRackUnaware() function (pasted below for convenience) 
and figured that the leader for a topic is chosen randomly and the first 
follower is chosen based on an initial random shift from the leader and the 
other followers  are chosen based off of the first follower by incrementing the 
first follower id.

Can someone please confirm that my understanding is correct?

I note that for the creation a topic with N number of partitions, the 
assignment algorithm will arrive at a balanced distribution of the number of 
partitions wrt the replicas. Whereas for the creation of N topics with 1 
partition each, the resulting replica assignment may NOT result in a balanced 
distribution of the number of topics wrt the replicas.

The next part of my question is - are there plans to include an algorithm that 
does the leader & follower assignment based the prevailing load of the brokers? 
Suppose I add a new broker to the cluster and start creating topics, I would 
expect the new broker as the leader for the newly created topics. However, in 
my experiments this does not turn out to be the case - there is no bias towards 
the new broker as the leader.

Are there any ways to introduce a bias towards choosing the new broker as the 
leader for the creation of next several topics?

I understand that one can (a) specify the replicas list for each topic during 
topic creation (b) use kafka-reassign-partitions.sh to do a reassignment of 
partitions and (c) set auto.leader.rebalance.enable=true.

Thanks,
Buvana

private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                                 replicationFactor: Int,
                                                 brokerList: Seq[Int],
                                                 fixedStartIndex: Int,
                                                 startPartitionId: Int): 
Map[Int, Seq[Int]] = {
    val ret = mutable.Map[Int, Seq[Int]]()
    val brokerArray = brokerList.toArray
    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else 
rand.nextInt(brokerArray.length)
    var currentPartitionId = math.max(0, startPartitionId)
    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else 
rand.nextInt(brokerArray.length)
    for (_ <- 0 until nPartitions) {
      if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 
0))
        nextReplicaShift += 1
      val firstReplicaIndex = (currentPartitionId + startIndex) % 
brokerArray.length
      val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
      for (j <- 0 until replicationFactor - 1)
        replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, 
nextReplicaShift, j, brokerArray.length))
      ret.put(currentPartitionId, replicaBuffer)
      currentPartitionId += 1
    }
    ret
  }


Reply via email to