Hi, I think you have to change the code like this to specify the type info, 
like this:

      val kafkaStream: ReceiverInputDStream[(String, String)] = 
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
        kafkaParams,
        topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)

You can take a try, actually Kafka unit test also use this API and worked fine.

Besides, the fixed issue you mentioned below will only be occurred in Java code 
calling related API.

Thanks
Jerry


From: Antonio Jesus Navarro [mailto:ajnava...@stratio.com]
Sent: Wednesday, October 08, 2014 10:04 PM
To: user@spark.apache.org
Subject: Error reading from Kafka

Hi, I'm trying to read from Kafka.  I was able to do it correctly with this 
method.

def createStream(
      ssc: StreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: Map[String, Int],
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)]


But now I have to add some params to kafka consumer so I've changed to other 
createStream method but I'm getting an error:

14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0
14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for 
stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: 
scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:2849)
at java.lang.Class.getConstructor(Class.java:1718)
at 
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

This is my code.   It seems that createStream returns 
ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string))  
so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by 
reflection from Nothing object and don't find the method.

      val topics = config.getString("nessus.kafka.topics")
      val numThreads = config.getInt("nessus.kafka.numThreads")
      val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
      val kafkaParams = Map(
        "zookeeper.connect" -> "localhost:2181",
        "group.id<http://group.id/>" -> "my-grp")

      val kafkaStream: ReceiverInputDStream[(String, String)] = 
KafkaUtils.createStream(ssc,
        kafkaParams,
        topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)


I found that issue https://issues.apache.org/jira/browse/SPARK-2103 
<https://issues.apache.org/jira/browse/SPARK-2103>   but it was solved and I'm 
using spark 1.1.0  and scala 2.10 so I don't know what happens.

Any thoughts?

--
[http://www.stratio.com/wp-content/uploads/2014/05/stratio_logo_2014.png]<http://www.stratio.com/>
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // @stratiobd<https://twitter.com/StratioBD>

Reply via email to