Aah, glad you found it out. TD
On Tue, Jul 15, 2014 at 7:52 PM, hsy...@gmail.com <hsy...@gmail.com> wrote: > Thanks Tathagata, we actually found the problem. I created SQLContext and > StreamContext from different SparkContext. But thanks for your help > > Best, > Siyuan > > > On Tue, Jul 15, 2014 at 6:53 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Oh yes, we have run sql, streaming and mllib all together. >> >> You can take a look at the demo <https://databricks.com/cloud> that >> DataBricks gave at the spark summit. >> >> I think I get the problem is. Sql("....") returns a RDD, and println(rdd) >> prints only the RDD's name. And rdd.foreach(println) prints the records in >> the executors, so you wont find anything in the driver logs! >> So try doing a collect, or take on the RDD returned by sql query and >> print that. >> >> TD >> >> >> On Tue, Jul 15, 2014 at 4:28 PM, hsy...@gmail.com <hsy...@gmail.com> >> wrote: >> >>> By the way, have you ever run SQL and stream together? Do you know any >>> example that works? Thanks! >>> >>> >>> On Tue, Jul 15, 2014 at 4:28 PM, hsy...@gmail.com <hsy...@gmail.com> >>> wrote: >>> >>>> Hi Tathagata, >>>> >>>> I could see the output of count, but no sql results. Run in standalone >>>> is meaningless for me and I just run in my local single node yarn cluster. >>>> Thanks >>>> >>>> >>>> On Tue, Jul 15, 2014 at 12:48 PM, Tathagata Das < >>>> tathagata.das1...@gmail.com> wrote: >>>> >>>>> Could you run it locally first to make sure it works, and you see >>>>> output? Also, I recommend going through the previous step-by-step approach >>>>> to narrow down where the problem is. >>>>> >>>>> TD >>>>> >>>>> >>>>> On Mon, Jul 14, 2014 at 9:15 PM, hsy...@gmail.com <hsy...@gmail.com> >>>>> wrote: >>>>> >>>>>> Actually, I deployed this on yarn cluster(spark-submit) and I >>>>>> couldn't find any output from the yarn stdout logs >>>>>> >>>>>> >>>>>> On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das < >>>>>> tathagata.das1...@gmail.com> wrote: >>>>>> >>>>>>> Can you make sure you are running locally on more than 1 local >>>>>>> cores? You could set the master in the SparkConf as >>>>>>> conf.setMaster("local[4]"). Then see if there are jobs running on every >>>>>>> batch of data in the Spark web ui (running on localhost:4040). If you >>>>>>> still >>>>>>> dont get any output, try first simple printing recRDD.count() in the >>>>>>> foreachRDD (that is, first test spark streaming). If you can get that to >>>>>>> work, then I would test the Spark SQL stuff. >>>>>>> >>>>>>> TD >>>>>>> >>>>>>> >>>>>>> On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com <hsy...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> No errors but no output either... Thanks! >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das < >>>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Could you elaborate on what is the problem you are facing? >>>>>>>>> Compiler error? Runtime error? Class-not-found error? Not receiving >>>>>>>>> any >>>>>>>>> data from Kafka? Receiving data but SQL command throwing error? No >>>>>>>>> errors >>>>>>>>> but no output either? >>>>>>>>> >>>>>>>>> TD >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com < >>>>>>>>> hsy...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> Couple days ago, I tried to integrate SQL and streaming together. >>>>>>>>>> My understanding is I can transform RDD from Dstream to schemaRDD and >>>>>>>>>> execute SQL on each RDD. But I got no luck >>>>>>>>>> Would you guys help me take a look at my code? Thank you very >>>>>>>>>> much! >>>>>>>>>> >>>>>>>>>> object KafkaSpark { >>>>>>>>>> >>>>>>>>>> def main(args: Array[String]): Unit = { >>>>>>>>>> if (args.length < 4) { >>>>>>>>>> System.err.println("Usage: KafkaSpark <zkQuorum> <group> >>>>>>>>>> <topics> <numThreads>") >>>>>>>>>> System.exit(1) >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> val Array(zkQuorum, group, topics, numThreads) = args >>>>>>>>>> val sparkConf = new SparkConf().setAppName("KafkaSpark") >>>>>>>>>> val ssc = new StreamingContext(sparkConf, Seconds(10)) >>>>>>>>>> val sc = new SparkContext(sparkConf) >>>>>>>>>> val sqlContext = new SQLContext(sc); >>>>>>>>>> // ssc.checkpoint("checkpoint") >>>>>>>>>> >>>>>>>>>> // Importing the SQL context gives access to all the SQL >>>>>>>>>> functions and implicit conversions. >>>>>>>>>> import sqlContext._ >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> val tt = Time(10000) >>>>>>>>>> val topicpMap = >>>>>>>>>> topics.split(",").map((_,numThreads.toInt)).toMap >>>>>>>>>> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, >>>>>>>>>> group, topicpMap).map(t => getRecord(t._2.split("#"))) >>>>>>>>>> >>>>>>>>>> val result = recordsStream.foreachRDD((recRDD, tt)=>{ >>>>>>>>>> recRDD.registerAsTable("records") >>>>>>>>>> val result = sql("select * from records") >>>>>>>>>> println(result) >>>>>>>>>> result.foreach(println) >>>>>>>>>> }) >>>>>>>>>> >>>>>>>>>> ssc.start() >>>>>>>>>> ssc.awaitTermination() >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> def getRecord(l:Array[String]):Record = { >>>>>>>>>> println("Getting the record") >>>>>>>>>> Record(l(0), l(1))} >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >