No errors but no output either... Thanks!
On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Could you elaborate on what is the problem you are facing? Compiler error? > Runtime error? Class-not-found error? Not receiving any data from Kafka? > Receiving data but SQL command throwing error? No errors but no output > either? > > TD > > > On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com <hsy...@gmail.com> > wrote: > >> Hi All, >> >> Couple days ago, I tried to integrate SQL and streaming together. My >> understanding is I can transform RDD from Dstream to schemaRDD and execute >> SQL on each RDD. But I got no luck >> Would you guys help me take a look at my code? Thank you very much! >> >> object KafkaSpark { >> >> def main(args: Array[String]): Unit = { >> if (args.length < 4) { >> System.err.println("Usage: KafkaSpark <zkQuorum> <group> <topics> >> <numThreads>") >> System.exit(1) >> } >> >> >> val Array(zkQuorum, group, topics, numThreads) = args >> val sparkConf = new SparkConf().setAppName("KafkaSpark") >> val ssc = new StreamingContext(sparkConf, Seconds(10)) >> val sc = new SparkContext(sparkConf) >> val sqlContext = new SQLContext(sc); >> // ssc.checkpoint("checkpoint") >> >> // Importing the SQL context gives access to all the SQL functions >> and implicit conversions. >> import sqlContext._ >> >> >> val tt = Time(10000) >> val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap >> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, >> topicpMap).map(t => getRecord(t._2.split("#"))) >> >> val result = recordsStream.foreachRDD((recRDD, tt)=>{ >> recRDD.registerAsTable("records") >> val result = sql("select * from records") >> println(result) >> result.foreach(println) >> }) >> >> ssc.start() >> ssc.awaitTermination() >> >> } >> >> def getRecord(l:Array[String]):Record = { >> println("Getting the record") >> Record(l(0), l(1))} >> } >> >> >