Cody, Are you referring to the val lines = messages.map(_._2)?
Regards, Siva > On 15-Jun-2016, at 10:32 PM, Cody Koeninger <c...@koeninger.org> wrote: > > Doesn't that result in consuming each RDD twice, in order to infer the > json schema? > > On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <siva.kuma...@me.com> wrote: >> Of course :) >> >> object sparkStreaming { >> def main(args: Array[String]) { >> StreamingExamples.setStreamingLogLevels() //Set reasonable logging >> levels for streaming if the user has not configured log4j. >> val topics = "test" >> val brokers = "localhost:9092" >> val topicsSet = topics.split(",").toSet >> val sparkConf = new >> SparkConf().setAppName("KafkaDroneCalc").setMaster("local") >> //spark://localhost:7077 >> val sc = new SparkContext(sparkConf) >> val ssc = new StreamingContext(sc, Seconds(30)) >> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) >> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet) >> val lines = messages.map(_._2) >> val sqlContext = new org.apache.spark.sql.SQLContext(sc) >> lines.foreachRDD( rdd => { >> val df = sqlContext.read.json(rdd) >> df.registerTempTable(“drone") >> sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm), >> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show() >> }) >> ssc.start() >> ssc.awaitTermination() >> } >> } >> >> I haven’t checked long running performance though. >> >> Regards, >> >> Siva >> >> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote: >> >> Hi, >> >> Good to hear so! Mind sharing a few snippets of your solution? >> >> Pozdrawiam, >> Jacek Laskowski >> ---- >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark http://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <siva.kuma...@me.com> wrote: >> >> Thanks Jacek, >> >> Job completed!! :) Just used data frames and sql query. Very clean and >> functional code. >> >> Siva >> >> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote: >> >> mapWithState >> >> >> --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org