Yes, the watcher is still alive. The log in the controller indicates that it observed the changes.
On Tue, Mar 17, 2015 at 2:05 PM, Mayuresh Gharat <gharatmayures...@gmail.com > wrote: > I think the way reassignment works is asynchronous. Changes are made to > zookeeper but those changes get reflected only when controller watcher > fires for the respective zookeeper path. Is your watcher still alive? > > Thanks, > > Mayuresh > > On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang <aw...@netflix.com.invalid> > wrote: > > > Looking a bit more into controller log, it seems that when the partition > > assignment is changed in ZooKeeper, the controller has quite a lot > > exceptions communicating with new brokers where the partitions are > > assigned. One thing to note is that the new brokers have Kafka version > > 0.8.2.1 and the controller has Kafka version 0.8.1.1. > > > > 2015-03-16 22:36:58,178 WARN kafka.utils.Logging$class:89 > > [Controller-2-to-broker-48-send-thread] [warn] > > [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a > > request to broker id:48,host:xyz:7101 > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:376) > > at > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at > > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > at > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) > > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > Does it explain why the brokers are not aware of the new assignments? Is > > there anyway to recover from this communication problem, like restarting > > the controller? > > > > Thanks, > > Allen > > > > > > On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang <aw...@netflix.com> wrote: > > > > > Hello, > > > > > > I developed a tool to add partitions and assign new partitions to a set > > of > > > brokers in one operation by utilizing the API > > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(). > > > > > > It worked well in most cases. However, in one case, I found that the > > > brokers are not aware of new partitions assigned to them, even though > the > > > zookeeper data clearly shows the assignment. > > > > > > Here is the zookeeper data for the partition: > > > > > > > > > > > > {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]} > > > > > > On broker 62, the error message is: > > > > > > 2015-03-17 17:11:57,157 WARN kafka.utils.Logging$class:83 > > > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with > > > correlation id 2048464 from client x on partition [m,71] failed due to > > > Partition [m,71] doesn't exist on 62 > > > > > > Here is the core function of the tool: > > > > > > def addPartitionsToTopic(zkClient: ZkClient, topic: String, > > > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute: > > Boolean): > > > Unit = { > > > val existingPartitionsReplicaList = > > > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) > > > val config = AdminUtils.fetchTopicConfig(zkClient, topic) > > > printf("Topic config: %s\n\n", config) > > > if (existingPartitionsReplicaList.size == 0) > > > throw new AdminOperationException("The topic %s does not > > > exist".format(topic)) > > > val currentPartitions = existingPartitionsReplicaList.size > > > val replicationFactor = existingPartitionsReplicaList.map(e => > > > e._2.size).max > > > val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e > > => > > > e._2).toSet.toSeq > > > if > (brokersToAssignPartitions.intersect(brokersWithPartitions).size > > > > 0) { > > > printf("Topic %s already has partitions on brokers %s. > > Skipping.\n", > > > topic, brokersToAssignPartitions) > > > return > > > } > > > val totalBrokers = brokers.size > > > val oldBrokers = totalBrokers - brokersToAssignPartitions.size > > > if (oldBrokers == 0) { > > > throw new IllegalArgumentException("Cannot add partitions to new > > > brokers without existing partitions") > > > } > > > val expectedPartitions = currentPartitions * totalBrokers / > > oldBrokers > > > val newPartitions = expectedPartitions - currentPartitions > > > if (newPartitions <= 0) { > > > throw new IllegalArgumentException("Invalid number of new > > partitions > > > %d".format(newPartitions)) > > > } > > > val newPartitionReplicaList = > > > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions, > > > newPartitions, replicationFactor, startPartitionId = currentPartitions) > > > val partitionReplicaList = existingPartitionsReplicaList.map(p => > > > p._1.partition -> p._2) > > > // add the new list > > > partitionReplicaList ++= newPartitionReplicaList > > > printf("Changing number of partitions from %d to %d to topic > %s\n\n", > > > currentPartitions, expectedPartitions, topic) > > > printf("Replica reassignment for new partitions:\n\n%s\n\n", > > > getAssignmentJson(topic, newPartitionReplicaList)) > > > printf("Complete replica assignment:\n\n%s\n\n", > > > getAssignmentJson(topic, partitionReplicaList)) > > > if (execute) { > > > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, > > > topic, partitionReplicaList, config, update = true) > > > println("New partitions are added") > > > } else { > > > println("No update is executed in dry run mode") > > > } > > > } > > > > > > It seems to me that the new assignment in ZooKeeper data does not > > > propagate to some of the new brokers. However, looking at TopicCommand, > > it > > > uses the same AdminUtils function to add new partitions. > > > > > > Am I missing anything or this is a bug in the broker? > > > > > > Thanks, > > > Allen > > > > > > > > > > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >