Hi Saisai, thanks you for your help, all is working ok now. Cheers!
2014-10-09 2:49 GMT+02:00 Shao, Saisai <saisai.s...@intel.com>: > Hi, I think you have to change the code like this to specify the type > info, like this: > > > > * val kafkaStream: ReceiverInputDStream[(String, String)] = > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](* > > *ssc,* > > * kafkaParams,* > > * topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)* > > > > You can take a try, actually Kafka unit test also use this API and worked > fine. > > > > Besides, the fixed issue you mentioned below will only be occurred in Java > code calling related API. > > > > Thanks > > Jerry > > > > > > *From:* Antonio Jesus Navarro [mailto:ajnava...@stratio.com] > *Sent:* Wednesday, October 08, 2014 10:04 PM > *To:* user@spark.apache.org > *Subject:* Error reading from Kafka > > > > Hi, I'm trying to read from Kafka. I was able to do it correctly with > this method. > > > > def createStream( > > ssc: StreamingContext, > > zkQuorum: String, > > groupId: String, > > topics: Map[String, Int], > > storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 > > ): ReceiverInputDStream[(String, String)] > > > > > > But now I have to add some params to kafka consumer so I've changed to > other createStream method but I'm getting an error: > > > > 14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering > receiver 0 > > *14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver > for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: > scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)* > > at java.lang.Class.getConstructor0(Class.java:2849) > > at java.lang.Class.getConstructor(Class.java:1718) > > at > org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) > > at > org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) > > at > org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) > > at > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) > > at > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) > > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) > > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > > at org.apache.spark.scheduler.Task.run(Task.scala:54) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > > > *This is my code. It seems that createStream returns > ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string)) > so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by > reflection from Nothing object and don't find the method. * > > > > *val topics = config.getString("nessus.kafka.topics")* > > * val numThreads = config.getInt("nessus.kafka.numThreads")* > > * val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap* > > * val kafkaParams = Map(* > > * "zookeeper.connect" -> "localhost:2181",* > > * "group.id <http://group.id/>" -> "my-grp")* > > > > * val kafkaStream: ReceiverInputDStream[(String, String)] = > KafkaUtils.createStream(ssc,* > > * kafkaParams,* > > * topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)* > > > > > > I found that issue https://issues.apache.org/jira/browse/SPARK-2103 > <https://issues.apache.org/jira/browse/SPARK-2103> but it was solved and > I'm using spark 1.1.0 and scala 2.10 so I don't know what happens. > > > > Any thoughts? > > > > -- > > <http://www.stratio.com/> > Avenida de Europa, 26. Ática 5. 3ª Planta > > 28224 Pozuelo de Alarcón, Madrid > > Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>* > -- <http://www.stratio.com/> Avenida de Europa, 26. Ática 5. 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*