Add these jars while creating the Context. val sc = new SparkContext(conf)
sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/ *spark-streaming-kafka_2.10-1.1.0.jar*") sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/ *zkclient-0.3.jar*") sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/ *metrics-core-2.2.0.jar*") sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/ *kafka_2.10-0.8.0.jar*") val ssc = new StreamingContext(sc, Seconds(10)) Thanks Best Regards On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli < mario.pastore...@teralytics.ch> wrote: > Hi, > > I'm trying to use spark-streaming with kafka but I get a strange error on > class that are missing. I would like to ask if my way to build the fat jar > is correct or no. My program is > > val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum, > kafkaGroupId, kafkaTopicsWithThreads) > .map(_._2) > > kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition { > iter:Iterator[CellWithLAC] => > println("time: " ++ t.toString ++ " #received: " ++ iter.size.toString) > }) > > I use sbt to manage my project and my build.sbt (with assembly 0.12.0 > plugin) is > > name := "spark_example" > > version := "0.0.1" > > scalaVersion := "2.10.4" > > scalacOptions ++= Seq("-deprecation","-feature") > > libraryDependencies ++= Seq( > "org.apache.spark" % "spark-streaming_2.10" % "1.1.1", > "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1", > "joda-time" % "joda-time" % "2.6" > ) > > assemblyMergeStrategy in assembly := { > case p if p startsWith "com/esotericsoftware/minlog" => > MergeStrategy.first > case p if p startsWith "org/apache/commons/beanutils" => > MergeStrategy.first > case p if p startsWith "org/apache/" => MergeStrategy.last > case "plugin.properties" => MergeStrategy.discard > case p if p startsWith "META-INF" => MergeStrategy.discard > case x => > val oldStrategy = (assemblyMergeStrategy in assembly).value > oldStrategy(x) > } > > I create the jar with sbt assembly and the run with > $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main > target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181 > test-consumer-group test1 > > where master:7077 is the spark master, localhost:2181 is zookeeper, > test-consumer-group is kafka groupid and test1 is the kafka topic. The > program starts and keep running but I get an error and nothing is printed. > In the log I found the following stack trace: > > 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection from > [10.0.3.1/10.0.3.1:54325] > 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection to > [jpl-devvax/127.0.1.1:38767] > 14/12/11 13:02:08 INFO network.SendingConnection: Connected to [jpl-devvax/ > 127.0.1.1:38767], 1 messages pending > 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 > in memory on jpl-devvax:38767 (size: 842.0 B, free: 265.4 MB) > 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver for > stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602 > 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver > for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: > kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1 > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown > Source) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown > Source) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown > Source) > at > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown > Source) > at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source) > at > kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown > Source) > at > org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114) > at > org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) > at > org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) > at > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) > at > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > at org.apache.spark.scheduler.Task.run(Task.scala:54) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > I have searched inside the fat jar and I found that that class is not in > it: > > > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar | grep > "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector" > > > > The problem is the double dollar before anonfun: if you put only one then > the class is there: > > > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar | grep > "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector" > [...] > kafka/consumer/ZookeeperConsumerConnector.class > > > > I'm submitting my job to spark-1.1.1 compiled with hadoop2.4 downloaded > from the spark website. > > My question is: how can I solve this problem? I guess the problem is my > sbt script but I don't understand why. > > > Thanks, > Mario Pastorelli > >