[
https://issues.apache.org/jira/browse/KAFKA-799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bob updated KAFKA-799:
----------------------
Status: Patch Available (was: Open)
def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]):
mutable.Map[TopicAndPartition, Seq[Int]] = {
val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
topics.foreach { topic =>
val jsonPartitionMapOpt = readDataMaybeNull(zkClient,
getTopicPath(topic))._1
jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
if (jsonPartitionMap != null) {
Json.parseFull(jsonPartitionMap) match {
case Some(m) => m.asInstanceOf[Map[String,
Any]].get("partitions") match {
case Some(repl) =>
val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
for ((partition, replicas) <- replicaMap) {
ret.put(TopicAndPartition(topic, partition.toInt), replicas)
debug("Replicas assigned to topic [%s], partition [%s] are
[%s]".format(topic, partition, replicas))
}
case None =>
}
case None =>
}
}
case None =>
}
}
ret
}
def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]):
mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
topics.foreach { topic =>
val jsonPartitionMapOpt = readDataMaybeNull(zkClient,
getTopicPath(topic))._1
val partitionMap = jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
if (jsonPartitionMap != null) {
Json.parseFull(jsonPartitionMap) match {
case Some(m) => m.asInstanceOf[Map[String,
Any]].get("partitions") match {
case Some(replicaMap) =>
val m1 = replicaMap.asInstanceOf[Map[String, Seq[Int]]]
m1.map(p => (p._1.toInt, p._2))
case None => Map[Int, Seq[Int]]()
}
case None => Map[Int, Seq[Int]]()
}
} else Map[Int, Seq[Int]]()
case None => Map[Int, Seq[Int]]()
}
debug("Partition map for /brokers/topics/%s is %s".format(topic,
partitionMap))
ret += (topic -> partitionMap)
}
ret
}
> Infinite loop trying to start a broker
> ---------------------------------------
>
> Key: KAFKA-799
> URL: https://issues.apache.org/jira/browse/KAFKA-799
> Project: Kafka
> Issue Type: Bug
> Components: controller
> Affects Versions: 0.8
> Environment: Mac OS X 10.7.5
> Reporter: Bob
> Assignee: Neha Narkhede
> Priority: Blocker
> Labels: patch
>
> I followed the quickstart instructions
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.8+Quick+Start
> It caused an infinite loop while trying to start a broker.
> [2013-03-08 16:55:19,287] ERROR Error while electing or becoming leader on
> broker 1 (kafka.server.ZookeeperLeaderElector)
> kafka.common.KafkaException: Can't parse json string: null
> at kafka.utils.Json$.liftedTree1$1(Json.scala:20)
> at kafka.utils.Json$.parseFull(Json.scala:16)
> at
> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:484)
> at
> kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:480)
> at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.utils.ZkUtils$.getReplicaAssignmentForTopics(ZkUtils.scala:480)
> at
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:451)
> at
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:225)
> at
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:87)
> at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53)
> at
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:106)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: java.lang.NullPointerException
> at
> scala.util.parsing.combinator.lexical.Scanners$Scanner.<init>(Scanners.scala:52)
> at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:71)
> at scala.util.parsing.json.JSON$.parseFull(JSON.scala:85)
> at kafka.utils.Json$.liftedTree1$1(Json.scala:17)
> ... 13 more
> I tracked the issue to unhandled Java null string in these 2 methods in
> ZkUtils: getReplicaAssignmentForTopics, getPartitionAssignmentForTopics.
> I am submitting a patch with the fixes. Now quickstart works fine for me.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira