Hello,

I developed a tool to add partitions and assign new partitions to a set of
brokers in one operation by utilizing the API
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().

It worked well in most cases. However, in one case, I found that the
brokers are not aware of new partitions assigned to them, even though the
zookeeper data clearly shows the assignment.

Here is the zookeeper data for the partition:

{"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}

On broker 62, the error message is:

2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
[kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
correlation id 2048464 from client x on partition [m,71] failed due to
Partition [m,71] doesn't exist on 62

Here is the core function of the tool:

  def addPartitionsToTopic(zkClient: ZkClient, topic: String,
brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute: Boolean):
Unit = {
    val existingPartitionsReplicaList =
ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
    val config = AdminUtils.fetchTopicConfig(zkClient, topic)
    printf("Topic config: %s\n\n", config)
    if (existingPartitionsReplicaList.size == 0)
      throw new AdminOperationException("The topic %s does not
exist".format(topic))
    val currentPartitions = existingPartitionsReplicaList.size
    val replicationFactor = existingPartitionsReplicaList.map(e =>
e._2.size).max
    val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e =>
e._2).toSet.toSeq
    if (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
0) {
      printf("Topic %s already has partitions on brokers %s. Skipping.\n",
topic, brokersToAssignPartitions)
      return
    }
    val totalBrokers = brokers.size
    val oldBrokers = totalBrokers - brokersToAssignPartitions.size
    if (oldBrokers == 0) {
      throw new IllegalArgumentException("Cannot add partitions to new
brokers without existing partitions")
    }
    val expectedPartitions = currentPartitions * totalBrokers / oldBrokers
    val newPartitions = expectedPartitions - currentPartitions
    if (newPartitions <= 0) {
      throw new IllegalArgumentException("Invalid number of new partitions
%d".format(newPartitions))
    }
    val newPartitionReplicaList =
AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
newPartitions, replicationFactor, startPartitionId = currentPartitions)
    val partitionReplicaList = existingPartitionsReplicaList.map(p =>
p._1.partition -> p._2)
    // add the new list
    partitionReplicaList ++= newPartitionReplicaList
    printf("Changing number of partitions from %d to %d to topic %s\n\n",
currentPartitions, expectedPartitions, topic)
    printf("Replica reassignment for new partitions:\n\n%s\n\n",
getAssignmentJson(topic, newPartitionReplicaList))
    printf("Complete replica assignment:\n\n%s\n\n",
getAssignmentJson(topic, partitionReplicaList))
    if (execute) {
      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
topic, partitionReplicaList, config, update = true)
      println("New partitions are added")
    } else {
      println("No update is executed in dry run mode")
    }
  }

It seems to me that the new assignment in ZooKeeper data does not propagate
to some of the new brokers. However, looking at TopicCommand, it uses the
same AdminUtils function to add new partitions.

Am I missing anything or this is a bug in the broker?

Thanks,
Allen

Reply via email to