Could you run it locally first to make sure it works, and you see output? Also, I recommend going through the previous step-by-step approach to narrow down where the problem is.
TD On Mon, Jul 14, 2014 at 9:15 PM, [email protected] <[email protected]> wrote: > Actually, I deployed this on yarn cluster(spark-submit) and I couldn't > find any output from the yarn stdout logs > > > On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das < > [email protected]> wrote: > >> Can you make sure you are running locally on more than 1 local cores? You >> could set the master in the SparkConf as conf.setMaster("local[4]"). Then >> see if there are jobs running on every batch of data in the Spark web ui >> (running on localhost:4040). If you still dont get any output, try first >> simple printing recRDD.count() in the foreachRDD (that is, first test spark >> streaming). If you can get that to work, then I would test the Spark SQL >> stuff. >> >> TD >> >> >> On Mon, Jul 14, 2014 at 5:25 PM, [email protected] <[email protected]> >> wrote: >> >>> No errors but no output either... Thanks! >>> >>> >>> On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das < >>> [email protected]> wrote: >>> >>>> Could you elaborate on what is the problem you are facing? Compiler >>>> error? Runtime error? Class-not-found error? Not receiving any data from >>>> Kafka? Receiving data but SQL command throwing error? No errors but no >>>> output either? >>>> >>>> TD >>>> >>>> >>>> On Mon, Jul 14, 2014 at 4:06 PM, [email protected] <[email protected]> >>>> wrote: >>>> >>>>> Hi All, >>>>> >>>>> Couple days ago, I tried to integrate SQL and streaming together. My >>>>> understanding is I can transform RDD from Dstream to schemaRDD and execute >>>>> SQL on each RDD. But I got no luck >>>>> Would you guys help me take a look at my code? Thank you very much! >>>>> >>>>> object KafkaSpark { >>>>> >>>>> def main(args: Array[String]): Unit = { >>>>> if (args.length < 4) { >>>>> System.err.println("Usage: KafkaSpark <zkQuorum> <group> >>>>> <topics> <numThreads>") >>>>> System.exit(1) >>>>> } >>>>> >>>>> >>>>> val Array(zkQuorum, group, topics, numThreads) = args >>>>> val sparkConf = new SparkConf().setAppName("KafkaSpark") >>>>> val ssc = new StreamingContext(sparkConf, Seconds(10)) >>>>> val sc = new SparkContext(sparkConf) >>>>> val sqlContext = new SQLContext(sc); >>>>> // ssc.checkpoint("checkpoint") >>>>> >>>>> // Importing the SQL context gives access to all the SQL functions >>>>> and implicit conversions. >>>>> import sqlContext._ >>>>> >>>>> >>>>> val tt = Time(10000) >>>>> val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap >>>>> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, >>>>> topicpMap).map(t => getRecord(t._2.split("#"))) >>>>> >>>>> val result = recordsStream.foreachRDD((recRDD, tt)=>{ >>>>> recRDD.registerAsTable("records") >>>>> val result = sql("select * from records") >>>>> println(result) >>>>> result.foreach(println) >>>>> }) >>>>> >>>>> ssc.start() >>>>> ssc.awaitTermination() >>>>> >>>>> } >>>>> >>>>> def getRecord(l:Array[String]):Record = { >>>>> println("Getting the record") >>>>> Record(l(0), l(1))} >>>>> } >>>>> >>>>> >>>> >>> >> >
