Hi Saisai, thanks you for your help, all is working ok now.

Cheers!

2014-10-09 2:49 GMT+02:00 Shao, Saisai <saisai.s...@intel.com>:

>  Hi, I think you have to change the code like this to specify the type
> info, like this:
>
>
>
> *      val kafkaStream: ReceiverInputDStream[(String, String)] =
> KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](*
>
> *ssc,*
>
> *        kafkaParams,*
>
> *        topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)*
>
>
>
> You can take a try, actually Kafka unit test also use this API and worked
> fine.
>
>
>
> Besides, the fixed issue you mentioned below will only be occurred in Java
> code calling related API.
>
>
>
> Thanks
>
> Jerry
>
>
>
>
>
> *From:* Antonio Jesus Navarro [mailto:ajnava...@stratio.com]
> *Sent:* Wednesday, October 08, 2014 10:04 PM
> *To:* user@spark.apache.org
> *Subject:* Error reading from Kafka
>
>
>
> Hi, I'm trying to read from Kafka.  I was able to do it correctly with
> this method.
>
>
>
> def createStream(
>
>       ssc: StreamingContext,
>
>       zkQuorum: String,
>
>       groupId: String,
>
>       topics: Map[String, Int],
>
>       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
>
>     ): ReceiverInputDStream[(String, String)]
>
>
>
>
>
> But now I have to add some params to kafka consumer so I've changed to
> other createStream method but I'm getting an error:
>
>
>
> 14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering
> receiver 0
>
> *14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver
> for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException:
> scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)*
>
> at java.lang.Class.getConstructor0(Class.java:2849)
>
> at java.lang.Class.getConstructor(Class.java:1718)
>
> at
> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106)
>
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> *This is my code.   It seems that createStream returns
> ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string))
>  so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by
> reflection from Nothing object and don't find the method.  *
>
>
>
>       *val topics = config.getString("nessus.kafka.topics")*
>
> *      val numThreads = config.getInt("nessus.kafka.numThreads")*
>
> *      val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap*
>
> *      val kafkaParams = Map(*
>
> *        "zookeeper.connect" -> "localhost:2181",*
>
> *        "group.id <http://group.id/>" -> "my-grp")*
>
>
>
> *      val kafkaStream: ReceiverInputDStream[(String, String)] =
> KafkaUtils.createStream(ssc,*
>
> *        kafkaParams,*
>
> *        topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)*
>
>
>
>
>
> I found that issue https://issues.apache.org/jira/browse/SPARK-2103
> <https://issues.apache.org/jira/browse/SPARK-2103>  but it was solved and
> I'm using spark 1.1.0  and scala 2.10 so I don't know what happens.
>
>
>
> Any thoughts?
>
>
>
> --
>
> <http://www.stratio.com/>
> Avenida de Europa, 26. Ática 5. 3ª Planta
>
> 28224 Pozuelo de Alarcón, Madrid
>
> Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*
>



-- 
<http://www.stratio.com/>
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*

Reply via email to