I am not sure if Kafka version difference is the issue as later adding partitions for other topics works. Is there any compatibility issues on protocol level in 0.8.2.1 vs. 0.8.1.1?
Also restarting the controller seems to fix the problem. On Tue, Mar 17, 2015 at 4:08 PM, Mayuresh Gharat <gharatmayures...@gmail.com > wrote: > Probably you can try restarting the controller and have same version for > the controller and the brokers. > BTW, was there any specific reason you are running 2 different versions for > the controller and other brokers? > > Thanks, > > Mayuresh > > On Tue, Mar 17, 2015 at 4:02 PM, Allen Wang <aw...@netflix.com.invalid> > wrote: > > > Yes, the watcher is still alive. The log in the controller indicates that > > it observed the changes. > > > > > > On Tue, Mar 17, 2015 at 2:05 PM, Mayuresh Gharat < > > gharatmayures...@gmail.com > > > wrote: > > > > > I think the way reassignment works is asynchronous. Changes are made to > > > zookeeper but those changes get reflected only when controller watcher > > > fires for the respective zookeeper path. Is your watcher still alive? > > > > > > Thanks, > > > > > > Mayuresh > > > > > > On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang <aw...@netflix.com.invalid > > > > > wrote: > > > > > > > Looking a bit more into controller log, it seems that when the > > partition > > > > assignment is changed in ZooKeeper, the controller has quite a lot > > > > exceptions communicating with new brokers where the partitions are > > > > assigned. One thing to note is that the new brokers have Kafka > version > > > > 0.8.2.1 and the controller has Kafka version 0.8.1.1. > > > > > > > > 2015-03-16 22:36:58,178 WARN kafka.utils.Logging$class:89 > > > > [Controller-2-to-broker-48-send-thread] [warn] > > > > [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a > > > > request to broker id:48,host:xyz:7101 > > > > java.io.EOFException: Received -1 when reading from channel, socket > has > > > > likely been closed. > > > > at kafka.utils.Utils$.read(Utils.scala:376) > > > > at > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > > > at > > > > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > > at > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > > at > > > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > > > at > > > > > > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) > > > > at > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > > > Does it explain why the brokers are not aware of the new assignments? > > Is > > > > there anyway to recover from this communication problem, like > > restarting > > > > the controller? > > > > > > > > Thanks, > > > > Allen > > > > > > > > > > > > On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang <aw...@netflix.com> > > wrote: > > > > > > > > > Hello, > > > > > > > > > > I developed a tool to add partitions and assign new partitions to a > > set > > > > of > > > > > brokers in one operation by utilizing the API > > > > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(). > > > > > > > > > > It worked well in most cases. However, in one case, I found that > the > > > > > brokers are not aware of new partitions assigned to them, even > though > > > the > > > > > zookeeper data clearly shows the assignment. > > > > > > > > > > Here is the zookeeper data for the partition: > > > > > > > > > > > > > > > > > > > > > > > > > {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]} > > > > > > > > > > On broker 62, the error message is: > > > > > > > > > > 2015-03-17 17:11:57,157 WARN kafka.utils.Logging$class:83 > > > > > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with > > > > > correlation id 2048464 from client x on partition [m,71] failed due > > to > > > > > Partition [m,71] doesn't exist on 62 > > > > > > > > > > Here is the core function of the tool: > > > > > > > > > > def addPartitionsToTopic(zkClient: ZkClient, topic: String, > > > > > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute: > > > > Boolean): > > > > > Unit = { > > > > > val existingPartitionsReplicaList = > > > > > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) > > > > > val config = AdminUtils.fetchTopicConfig(zkClient, topic) > > > > > printf("Topic config: %s\n\n", config) > > > > > if (existingPartitionsReplicaList.size == 0) > > > > > throw new AdminOperationException("The topic %s does not > > > > > exist".format(topic)) > > > > > val currentPartitions = existingPartitionsReplicaList.size > > > > > val replicationFactor = existingPartitionsReplicaList.map(e => > > > > > e._2.size).max > > > > > val brokersWithPartitions = > > existingPartitionsReplicaList.flatMap(e > > > > => > > > > > e._2).toSet.toSeq > > > > > if > > > (brokersToAssignPartitions.intersect(brokersWithPartitions).size > > > > > > 0) { > > > > > printf("Topic %s already has partitions on brokers %s. > > > > Skipping.\n", > > > > > topic, brokersToAssignPartitions) > > > > > return > > > > > } > > > > > val totalBrokers = brokers.size > > > > > val oldBrokers = totalBrokers - brokersToAssignPartitions.size > > > > > if (oldBrokers == 0) { > > > > > throw new IllegalArgumentException("Cannot add partitions to > > new > > > > > brokers without existing partitions") > > > > > } > > > > > val expectedPartitions = currentPartitions * totalBrokers / > > > > oldBrokers > > > > > val newPartitions = expectedPartitions - currentPartitions > > > > > if (newPartitions <= 0) { > > > > > throw new IllegalArgumentException("Invalid number of new > > > > partitions > > > > > %d".format(newPartitions)) > > > > > } > > > > > val newPartitionReplicaList = > > > > > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions, > > > > > newPartitions, replicationFactor, startPartitionId = > > currentPartitions) > > > > > val partitionReplicaList = existingPartitionsReplicaList.map(p > => > > > > > p._1.partition -> p._2) > > > > > // add the new list > > > > > partitionReplicaList ++= newPartitionReplicaList > > > > > printf("Changing number of partitions from %d to %d to topic > > > %s\n\n", > > > > > currentPartitions, expectedPartitions, topic) > > > > > printf("Replica reassignment for new partitions:\n\n%s\n\n", > > > > > getAssignmentJson(topic, newPartitionReplicaList)) > > > > > printf("Complete replica assignment:\n\n%s\n\n", > > > > > getAssignmentJson(topic, partitionReplicaList)) > > > > > if (execute) { > > > > > > > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, > > > > > topic, partitionReplicaList, config, update = true) > > > > > println("New partitions are added") > > > > > } else { > > > > > println("No update is executed in dry run mode") > > > > > } > > > > > } > > > > > > > > > > It seems to me that the new assignment in ZooKeeper data does not > > > > > propagate to some of the new brokers. However, looking at > > TopicCommand, > > > > it > > > > > uses the same AdminUtils function to add new partitions. > > > > > > > > > > Am I missing anything or this is a bug in the broker? > > > > > > > > > > Thanks, > > > > > Allen > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -Regards, > > > Mayuresh R. Gharat > > > (862) 250-7125 > > > > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >