Hi, I think you have to change the code like this to specify the type info, like this:
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2) You can take a try, actually Kafka unit test also use this API and worked fine. Besides, the fixed issue you mentioned below will only be occurred in Java code calling related API. Thanks Jerry From: Antonio Jesus Navarro [mailto:ajnava...@stratio.com] Sent: Wednesday, October 08, 2014 10:04 PM To: user@spark.apache.org Subject: Error reading from Kafka Hi, I'm trying to read from Kafka. I was able to do it correctly with this method. def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] But now I have to add some params to kafka consumer so I've changed to other createStream method but I'm getting an error: 14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2849) at java.lang.Class.getConstructor(Class.java:1718) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) This is my code. It seems that createStream returns ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string)) so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by reflection from Nothing object and don't find the method. val topics = config.getString("nessus.kafka.topics") val numThreads = config.getInt("nessus.kafka.numThreads") val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap val kafkaParams = Map( "zookeeper.connect" -> "localhost:2181", "group.id<http://group.id/>" -> "my-grp") val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2) I found that issue https://issues.apache.org/jira/browse/SPARK-2103 <https://issues.apache.org/jira/browse/SPARK-2103> but it was solved and I'm using spark 1.1.0 and scala 2.10 so I don't know what happens. Any thoughts? -- [http://www.stratio.com/wp-content/uploads/2014/05/stratio_logo_2014.png]<http://www.stratio.com/> Avenida de Europa, 26. Ática 5. 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 352 59 42 // @stratiobd<https://twitter.com/StratioBD>