Hi, I'm trying to read from Kafka.  I was able to do it correctly with this
method.

def createStream(
      ssc: StreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: Map[String, Int],
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)]


But now I have to add some params to kafka consumer so I've changed to
other createStream method but I'm getting an error:

14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering
receiver 0
*14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver
for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException:
scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)*
at java.lang.Class.getConstructor0(Class.java:2849)
at java.lang.Class.getConstructor(Class.java:1718)
at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

*This is my code.   It seems that createStream returns
ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string))
 so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by
reflection from Nothing object and don't find the method.  *

      *val topics = config.getString("nessus.kafka.topics")*
*      val numThreads = config.getInt("nessus.kafka.numThreads")*
*      val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap*
*      val kafkaParams = Map(*
*        "zookeeper.connect" -> "localhost:2181",*
*        "group.id <http://group.id/>" -> "my-grp")*

*      val kafkaStream: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc,*
*        kafkaParams,*
*        topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)*


I found that issue https://issues.apache.org/jira/browse/SPARK-2103
<https://issues.apache.org/jira/browse/SPARK-2103>  but it was solved and
I'm using spark 1.1.0  and scala 2.10 so I don't know what happens.

Any thoughts?

-- 
<http://www.stratio.com/>
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*

Reply via email to