From what I know, you would have to iterate on each RDD. When you are reading from the Stream, Spark actually collects the data as a miniRDD for each period of time.
I hope this helps. ds.foreachRDD{ rdd => val newNames = Seq(“Field1”,"Field2”,"Field3") val mydataDF = rdd.toDF(newNames: _*) mydataDF.createOrReplaceTempView(“myTempTable") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select *, now() as TStamp from myTempTable") wordCountsDataFrame.write.mode(mode).save(output) val lines = wordCountsDataFrame.count().toInt // wordCountsDataFrame.show(20, false) println("Total entries in this batch: "+lines) } > On 16 May 2017, at 09:36, kant kodali <kanth...@gmail.com> wrote: > > Hi All, > > I have the following code. > > val ds = sparkSession.readStream() > .format("kafka") > .option("kafka.bootstrap.servers",bootstrapServers)) > .option("subscribe", topicName) > .option("checkpointLocation", hdfsCheckPointDir) > .load(); > val ds1 = ds.select($"value") > val query = ds1.writeStream.outputMode("append").format("console").start() > query.awaitTermination() > There are no errors when I execute this code however I don't see any data > being printed out to console? When I run my standalone test Kafka consumer > jar I can see that it is receiving messages. so I am not sure what is going > on with above code? any ideas? > > Thanks! Didac Gil de la Iglesia PhD in Computer Science didacg...@gmail.com Spain: +34 696 285 544 Sweden: +46 (0)730229737 Skype: didac.gil.de.la.iglesia
signature.asc
Description: Message signed with OpenPGP