Hi, I'm trying to read from Kafka. I was able to do it correctly with this method.
def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] But now I have to add some params to kafka consumer so I've changed to other createStream method but I'm getting an error: 14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 *14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)* at java.lang.Class.getConstructor0(Class.java:2849) at java.lang.Class.getConstructor(Class.java:1718) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *This is my code. It seems that createStream returns ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string)) so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by reflection from Nothing object and don't find the method. * *val topics = config.getString("nessus.kafka.topics")* * val numThreads = config.getInt("nessus.kafka.numThreads")* * val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap* * val kafkaParams = Map(* * "zookeeper.connect" -> "localhost:2181",* * "group.id <http://group.id/>" -> "my-grp")* * val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,* * kafkaParams,* * topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)* I found that issue https://issues.apache.org/jira/browse/SPARK-2103 <https://issues.apache.org/jira/browse/SPARK-2103> but it was solved and I'm using spark 1.1.0 and scala 2.10 so I don't know what happens. Any thoughts? -- <http://www.stratio.com/> Avenida de Europa, 26. Ática 5. 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*