The exception $line49.... is referring to a line of the spark shell.

Have you tried it from an actual assembled job with spark-submit ?

On Tue, Jun 23, 2015 at 3:48 PM, syepes <sye...@gmail.com> wrote:

> Hello,
>
> I ​am trying ​use the new Kafka ​consumer
> ​​"KafkaUtils.createDirectStream"​
> but I am having some issues making it work.
> I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and
> I am still getting the same strange exception "ClassNotFoundException:
> $line49.$read$$iwC$$i...."
>
> Has anyone else been facing this kind of problem?
>
> The following is the code and logs that I have been using to reproduce the
> issue:
>
> spark-shell: script
> ------------------------------------------
> sc.stop()
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
>
> val sparkConf = new
>
> SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
> "4041" ).set("spark.driver.allowMultipleContexts",
>
> "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
> val ssc = new StreamingContext(sparkConf, Seconds(5))
>
> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
> "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
> val topic = Set("test")
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
>
> val raw = messages.map(_._2)
> val words = raw.flatMap(_.split(" "))
> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
> wordCounts.print()
>
> ssc.start()
> ssc.awaitTermination()
> ------------------------------------------
>
>
> spark-shell: output
> ------------------------------------------
> sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2
> ssc: org.apache.spark.streaming.StreamingContext =
> org.apache.spark.streaming.StreamingContext@28ec9c23
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
> OPC)topic: scala.collection.immutable.Set[String] = Set(test)
> WARN  [main] kafka.utils.VerifiableProperties - Property
> schema.registry.url
> is not valid
> messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)]
> = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
> raw: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.MappedDStream@578ce232
> words: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
> wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
> org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
> WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
> schema.registry.url is not valid
> WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
> Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
> java.lang.ClassNotFoundException:
>
> $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:348)
>         at
>
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
>         at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>         at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> ..
> ..
> Driver stacktrace:
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>         at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
> ------------------------------------------
>
>
> Best regards and thanks in advance for any help.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to