[ 
https://issues.apache.org/jira/browse/KAFKA-797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13597874#comment-13597874
 ] 

Bob commented on KAFKA-797:
---------------------------

Ok, I found 2 bugs in ZkUtils.getReplicaAssignmentForTopics and 
getPartitionAssignmentForTopics. It has to do with improperly handled Java null 
string, here's my workaround, now the quickstart example works end to end. BTW 
is it possible for me to fork this repo and submit a pull request?

  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): 
mutable.Map[TopicAndPartition, Seq[Int]] = {
    val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
    topics.foreach { topic =>
      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, 
getTopicPath(topic))._1
      jsonPartitionMapOpt match {
        case Some(jsonPartitionMap) =>
          //<<<<<<null check>>>>>>>>
          if (jsonPartitionMap != null) { 
            Json.parseFull(jsonPartitionMap) match {
              case Some(m) => m.asInstanceOf[Map[String, 
Any]].get("partitions") match {
                case Some(repl) =>
                  val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
                  for ((partition, replicas) <- replicaMap) {
                    ret.put(TopicAndPartition(topic, partition.toInt), replicas)
                    debug("Replicas assigned to topic [%s], partition [%s] are 
[%s]".format(topic, partition, replicas))
                  }
                case None =>
              }
              case None =>
            }
          }
        case None =>
      }
    }
    ret
  }

  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): 
mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
    val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
    topics.foreach { topic =>
      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, 
getTopicPath(topic))._1
      val partitionMap = jsonPartitionMapOpt match {
        case Some(jsonPartitionMap) =>
          //<<<<<<null check>>>>>>>>
          if (jsonPartitionMap != null) {
            Json.parseFull(jsonPartitionMap) match {
              case Some(m) => m.asInstanceOf[Map[String, 
Any]].get("partitions") match {
                case Some(replicaMap) =>
                  val m1 = replicaMap.asInstanceOf[Map[String, Seq[Int]]]
                  m1.map(p => (p._1.toInt, p._2))
                case None => Map[Int, Seq[Int]]()
              }
              case None => Map[Int, Seq[Int]]()
            }
          } else Map[Int, Seq[Int]]() //<<<<<set default
        case None => Map[Int, Seq[Int]]()
      }
      debug("Partition map for /brokers/topics/%s is %s".format(topic, 
partitionMap))
      ret += (topic -> partitionMap)
    }
    ret
  }
                
> Cannot start Kafka 0.8 per Quick Start instructions
> ---------------------------------------------------
>
>                 Key: KAFKA-797
>                 URL: https://issues.apache.org/jira/browse/KAFKA-797
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: David Arthur
>
> Checked out latest 0.8 (b5edbb193b33ebf0b3056935a537967ff21478a6), and ran:
> ./sbt update
> ./sbt package
> Both complete successfully. Then I try to start the server:
> $ ./bin/kafka-server-start.sh config/server.properties 
> Exception in thread "main" java.lang.NoClassDefFoundError: scala/ScalaObject
>       at java.lang.ClassLoader.defineClass1(Native Method)
>       at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
>       at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
>       at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
>       at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
>       at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
>       at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>       at java.lang.ClassLoader.defineClass1(Native Method)
>       at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
>       at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
>       at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
>       at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
>       at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
>       at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>       at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.ClassNotFoundException: scala.ScalaObject
>       at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>       ... 25 more

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to