Hey, I’m trying to run kafka spark streaming using mesos with following example:
sc.stop import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel val sparkConf = new SparkConf().setAppName("Summarizer").setMaster("zk://127.0.0.1:2181/mesos") val ssc = new StreamingContext(sparkConf, Seconds(10)) val kafkaParams = Map[String, String]("zookeeper.connect" -> "127.0.0.1:2181", "group.id" -> "test") val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Map("test" -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2) messages.foreachRDD { pairRDD => println(s"DataListener.listen() [pairRDD = ${pairRDD}]") println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]") pairRDD.foreach(row => println(s"DataListener.listen() [row = ${row}]")) val msgData = pairRDD.collect() } Unfortunately println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]”) returning always 0 I tested same example but using “local[2]” instead of "zk://127.0.0.1:2181/mesos” and all working perfect (count return correct produced message count, and pairRDD.foreach(row => println(s"DataListener.listen() [row = ${row}]”)) returning kafka msg. Could you help me to understand that issue? what i’m going wrong? attaching: spark shell output http://pastebin.com/zdYFBj4T <http://pastebin.com/zdYFBj4T> executor output http://pastebin.com/LDMtCjq0 <http://pastebin.com/LDMtCjq0> thanks! bartek