Yes, the watcher is still alive. The log in the controller indicates that
it observed the changes.


On Tue, Mar 17, 2015 at 2:05 PM, Mayuresh Gharat <gharatmayures...@gmail.com
> wrote:

> I think the way reassignment works is asynchronous. Changes are made to
> zookeeper but those changes get reflected only when controller watcher
> fires for the respective zookeeper path. Is your watcher still alive?
>
> Thanks,
>
> Mayuresh
>
> On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang <aw...@netflix.com.invalid>
> wrote:
>
> > Looking a bit more into controller log, it seems that when the partition
> > assignment is changed in ZooKeeper, the controller has quite a lot
> > exceptions communicating with new brokers where the partitions are
> > assigned. One thing to note is that the new brokers have Kafka version
> > 0.8.2.1 and the controller has Kafka version 0.8.1.1.
> >
> > 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> > [Controller-2-to-broker-48-send-thread] [warn]
> > [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> > request to broker id:48,host:xyz:7101
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> >         at kafka.utils.Utils$.read(Utils.scala:376)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >         at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >         at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >         at
> >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> > Does it explain why the brokers are not aware of the new assignments? Is
> > there anyway to recover from this communication problem, like restarting
> > the controller?
> >
> > Thanks,
> > Allen
> >
> >
> > On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang <aw...@netflix.com> wrote:
> >
> > > Hello,
> > >
> > > I developed a tool to add partitions and assign new partitions to a set
> > of
> > > brokers in one operation by utilizing the API
> > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> > >
> > > It worked well in most cases. However, in one case, I found that the
> > > brokers are not aware of new partitions assigned to them, even though
> the
> > > zookeeper data clearly shows the assignment.
> > >
> > > Here is the zookeeper data for the partition:
> > >
> > >
> > >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> > >
> > > On broker 62, the error message is:
> > >
> > > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > > correlation id 2048464 from client x on partition [m,71] failed due to
> > > Partition [m,71] doesn't exist on 62
> > >
> > > Here is the core function of the tool:
> > >
> > >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> > Boolean):
> > > Unit = {
> > >     val existingPartitionsReplicaList =
> > > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> > >     val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> > >     printf("Topic config: %s\n\n", config)
> > >     if (existingPartitionsReplicaList.size == 0)
> > >       throw new AdminOperationException("The topic %s does not
> > > exist".format(topic))
> > >     val currentPartitions = existingPartitionsReplicaList.size
> > >     val replicationFactor = existingPartitionsReplicaList.map(e =>
> > > e._2.size).max
> > >     val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e
> > =>
> > > e._2).toSet.toSeq
> > >     if
> (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> > > 0) {
> > >       printf("Topic %s already has partitions on brokers %s.
> > Skipping.\n",
> > > topic, brokersToAssignPartitions)
> > >       return
> > >     }
> > >     val totalBrokers = brokers.size
> > >     val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> > >     if (oldBrokers == 0) {
> > >       throw new IllegalArgumentException("Cannot add partitions to new
> > > brokers without existing partitions")
> > >     }
> > >     val expectedPartitions = currentPartitions * totalBrokers /
> > oldBrokers
> > >     val newPartitions = expectedPartitions - currentPartitions
> > >     if (newPartitions <= 0) {
> > >       throw new IllegalArgumentException("Invalid number of new
> > partitions
> > > %d".format(newPartitions))
> > >     }
> > >     val newPartitionReplicaList =
> > > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> > > newPartitions, replicationFactor, startPartitionId = currentPartitions)
> > >     val partitionReplicaList = existingPartitionsReplicaList.map(p =>
> > > p._1.partition -> p._2)
> > >     // add the new list
> > >     partitionReplicaList ++= newPartitionReplicaList
> > >     printf("Changing number of partitions from %d to %d to topic
> %s\n\n",
> > > currentPartitions, expectedPartitions, topic)
> > >     printf("Replica reassignment for new partitions:\n\n%s\n\n",
> > > getAssignmentJson(topic, newPartitionReplicaList))
> > >     printf("Complete replica assignment:\n\n%s\n\n",
> > > getAssignmentJson(topic, partitionReplicaList))
> > >     if (execute) {
> > >
>  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
> > > topic, partitionReplicaList, config, update = true)
> > >       println("New partitions are added")
> > >     } else {
> > >       println("No update is executed in dry run mode")
> > >     }
> > >   }
> > >
> > > It seems to me that the new assignment in ZooKeeper data does not
> > > propagate to some of the new brokers. However, looking at TopicCommand,
> > it
> > > uses the same AdminUtils function to add new partitions.
> > >
> > > Am I missing anything or this is a bug in the broker?
> > >
> > > Thanks,
> > > Allen
> > >
> > >
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Reply via email to