Hey,

I’m trying to run kafka spark streaming using mesos with following example:

sc.stop
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
val sparkConf = new 
SparkConf().setAppName("Summarizer").setMaster("zk://127.0.0.1:2181/mesos")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("zookeeper.connect" -> "127.0.0.1:2181", 
"group.id" -> "test")
val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, Map("test" -> 1), 
StorageLevel.MEMORY_ONLY_SER).map(_._2)

messages.foreachRDD { pairRDD =>
println(s"DataListener.listen() [pairRDD = ${pairRDD}]")
println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]")
pairRDD.foreach(row => println(s"DataListener.listen() [row = ${row}]"))
val msgData = pairRDD.collect()
}

Unfortunately println(s"DataListener.listen() [pairRDD.count = 
${pairRDD.count()}]”) returning always 0

I tested same example but using “local[2]” instead of 
"zk://127.0.0.1:2181/mesos” and all working perfect (count return correct 
produced message count, and pairRDD.foreach(row => 
println(s"DataListener.listen() [row = ${row}]”)) returning kafka msg.

Could you help me to understand that issue? what i’m going wrong?

attaching:
spark shell output http://pastebin.com/zdYFBj4T <http://pastebin.com/zdYFBj4T>
executor output http://pastebin.com/LDMtCjq0 <http://pastebin.com/LDMtCjq0>

thanks!
bartek

Reply via email to