Hi,

I have the following driver and it works when I run it in the local[*] mode
but if I execute it in a standalone cluster then then I don't get any data
from kafka.

Does anybody know why that might be?

      val sparkConf = new SparkConf().setAppName("KafkaMessageReceiver")
      val sc = new SparkContext(sparkConf)
      val sqlContext = new SQLContext(sc)
      import sqlContext._
      val ssc = new StreamingContext(sparkConf, Seconds(3))
      ssc.checkpoint("checkpoint")
      val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
      val eventData =
dStream.map(_._2).window(Seconds(12)).map(_.split(",")).map(data =>
Data(data(0), data(1), data(2), data(3), data(4)))
      val result = eventData.transform((rdd, time) => {
        sqlContext.registerRDDAsTable(rdd, "data")
        sql("SELECT count(state) FROM data WHERE state='Active'")
      })

      result.print()
      //eventData.foreachRDD(rdd => registerRDDAsTable(rdd, "data"))
      ssc.start()
      ssc.awaitTermination()





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-tp11364.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to