[ 
https://issues.apache.org/jira/browse/KAFKA-799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob updated KAFKA-799:
----------------------

    Status: Patch Available  (was: Open)

  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): 
mutable.Map[TopicAndPartition, Seq[Int]] = {
    val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
    topics.foreach { topic =>
      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, 
getTopicPath(topic))._1
      jsonPartitionMapOpt match {
        case Some(jsonPartitionMap) =>
          if (jsonPartitionMap != null) {
            Json.parseFull(jsonPartitionMap) match {
              case Some(m) => m.asInstanceOf[Map[String, 
Any]].get("partitions") match {
                case Some(repl) =>
                  val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
                  for ((partition, replicas) <- replicaMap) {
                    ret.put(TopicAndPartition(topic, partition.toInt), replicas)
                    debug("Replicas assigned to topic [%s], partition [%s] are 
[%s]".format(topic, partition, replicas))
                  }
                case None =>
              }
              case None =>
            }
          }
        case None =>
      }
    }
    ret
  }
  
  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): 
mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
    val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
    topics.foreach { topic =>
      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, 
getTopicPath(topic))._1
      val partitionMap = jsonPartitionMapOpt match {
        case Some(jsonPartitionMap) =>
          if (jsonPartitionMap != null) {
            Json.parseFull(jsonPartitionMap) match {
              case Some(m) => m.asInstanceOf[Map[String, 
Any]].get("partitions") match {
                case Some(replicaMap) =>
                  val m1 = replicaMap.asInstanceOf[Map[String, Seq[Int]]]
                  m1.map(p => (p._1.toInt, p._2))
                case None => Map[Int, Seq[Int]]()
              }
              case None => Map[Int, Seq[Int]]()
            }
          } else Map[Int, Seq[Int]]()
        case None => Map[Int, Seq[Int]]()
      }
      debug("Partition map for /brokers/topics/%s is %s".format(topic, 
partitionMap))
      ret += (topic -> partitionMap)
    }
    ret
  }
                
> Infinite loop trying to start a broker 
> ---------------------------------------
>
>                 Key: KAFKA-799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-799
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 0.8
>         Environment: Mac OS X 10.7.5
>            Reporter: Bob
>            Assignee: Neha Narkhede
>            Priority: Blocker
>              Labels: patch
>
> I followed the quickstart instructions
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.8+Quick+Start
> It caused an infinite loop while trying to start a broker. 
> [2013-03-08 16:55:19,287] ERROR Error while electing or becoming leader on 
> broker 1 (kafka.server.ZookeeperLeaderElector)
> kafka.common.KafkaException: Can't parse json string: null
> at kafka.utils.Json$.liftedTree1$1(Json.scala:20)
> at kafka.utils.Json$.parseFull(Json.scala:16)
> at 
> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:484)
> at 
> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:480)
> at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.utils.ZkUtils$.getReplicaAssignmentForTopics(ZkUtils.scala:480)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:451)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:225)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:87)
> at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:106)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: java.lang.NullPointerException
> at 
> scala.util.parsing.combinator.lexical.Scanners$Scanner.<init>(Scanners.scala:52)
> at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:71)
> at scala.util.parsing.json.JSON$.parseFull(JSON.scala:85)
> at kafka.utils.Json$.liftedTree1$1(Json.scala:17)
> ... 13 more
> I tracked the issue to unhandled Java null string in these 2 methods in 
> ZkUtils: getReplicaAssignmentForTopics, getPartitionAssignmentForTopics.
> I am submitting a patch with the fixes. Now quickstart works fine for me.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to