Hello, I developed a tool to add partitions and assign new partitions to a set of brokers in one operation by utilizing the API AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
It worked well in most cases. However, in one case, I found that the brokers are not aware of new partitions assigned to them, even though the zookeeper data clearly shows the assignment. Here is the zookeeper data for the partition: {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]} On broker 62, the error message is: 2015-03-17 17:11:57,157 WARN kafka.utils.Logging$class:83 [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with correlation id 2048464 from client x on partition [m,71] failed due to Partition [m,71] doesn't exist on 62 Here is the core function of the tool: def addPartitionsToTopic(zkClient: ZkClient, topic: String, brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute: Boolean): Unit = { val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) val config = AdminUtils.fetchTopicConfig(zkClient, topic) printf("Topic config: %s\n\n", config) if (existingPartitionsReplicaList.size == 0) throw new AdminOperationException("The topic %s does not exist".format(topic)) val currentPartitions = existingPartitionsReplicaList.size val replicationFactor = existingPartitionsReplicaList.map(e => e._2.size).max val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e => e._2).toSet.toSeq if (brokersToAssignPartitions.intersect(brokersWithPartitions).size > 0) { printf("Topic %s already has partitions on brokers %s. Skipping.\n", topic, brokersToAssignPartitions) return } val totalBrokers = brokers.size val oldBrokers = totalBrokers - brokersToAssignPartitions.size if (oldBrokers == 0) { throw new IllegalArgumentException("Cannot add partitions to new brokers without existing partitions") } val expectedPartitions = currentPartitions * totalBrokers / oldBrokers val newPartitions = expectedPartitions - currentPartitions if (newPartitions <= 0) { throw new IllegalArgumentException("Invalid number of new partitions %d".format(newPartitions)) } val newPartitionReplicaList = AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions, newPartitions, replicationFactor, startPartitionId = currentPartitions) val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) // add the new list partitionReplicaList ++= newPartitionReplicaList printf("Changing number of partitions from %d to %d to topic %s\n\n", currentPartitions, expectedPartitions, topic) printf("Replica reassignment for new partitions:\n\n%s\n\n", getAssignmentJson(topic, newPartitionReplicaList)) printf("Complete replica assignment:\n\n%s\n\n", getAssignmentJson(topic, partitionReplicaList)) if (execute) { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, config, update = true) println("New partitions are added") } else { println("No update is executed in dry run mode") } } It seems to me that the new assignment in ZooKeeper data does not propagate to some of the new brokers. However, looking at TopicCommand, it uses the same AdminUtils function to add new partitions. Am I missing anything or this is a bug in the broker? Thanks, Allen