Hello,

I ​am trying ​use the new Kafka ​consumer ​​"KafkaUtils.createDirectStream"​
but I am having some issues making it work.
I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and
I am still getting the same strange exception "ClassNotFoundException:
$line49.$read$$iwC$$i...."

Has anyone else been facing this kind of problem?

The following is the code and logs that I have been using to reproduce the
issue:

spark-shell: script
------------------------------------------
sc.stop()
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils

val sparkConf = new
SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
"4041" ).set("spark.driver.allowMultipleContexts",
"true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
val ssc = new StreamingContext(sparkConf, Seconds(5))

val kafkaParams = Map[String, String]("bootstrap.servers" ->
"localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
"zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
val topic = Set("test")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)

val raw = messages.map(_._2)
val words = raw.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
------------------------------------------


spark-shell: output
------------------------------------------
sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2
ssc: org.apache.spark.streaming.StreamingContext =
org.apache.spark.streaming.StreamingContext@28ec9c23
kafkaParams: scala.collection.immutable.Map[String,String] =
Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
OPC)topic: scala.collection.immutable.Set[String] = Set(test)
WARN  [main] kafka.utils.VerifiableProperties - Property schema.registry.url
is not valid
messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)]
= org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
raw: org.apache.spark.streaming.dstream.DStream[String] =
org.apache.spark.streaming.dstream.MappedDStream@578ce232
words: org.apache.spark.streaming.dstream.DStream[String] =
org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
schema.registry.url is not valid
WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
java.lang.ClassNotFoundException:
$line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
..
..
Driver stacktrace:
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
------------------------------------------


Best regards and thanks in advance for any help.
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to