From what I know, you would have to iterate on each RDD. When you are reading 
from the Stream, Spark actually collects the data as a miniRDD for each period 
of time.

I hope this helps.
ds.foreachRDD{ rdd =>
  val newNames = Seq(“Field1”,"Field2”,"Field3")
  val mydataDF = rdd.toDF(newNames: _*)
  mydataDF.createOrReplaceTempView(“myTempTable")
  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = spark.sql("select *, now() as TStamp from 
myTempTable")
  wordCountsDataFrame.write.mode(mode).save(output)
  val lines = wordCountsDataFrame.count().toInt
//      wordCountsDataFrame.show(20, false)
  println("Total entries in this batch: "+lines)
}

> On 16 May 2017, at 09:36, kant kodali <kanth...@gmail.com> wrote:
> 
> Hi All,
> 
> I have the following code.
> 
>  val ds = sparkSession.readStream()
>                 .format("kafka")
>                 .option("kafka.bootstrap.servers",bootstrapServers))
>                 .option("subscribe", topicName)
>                 .option("checkpointLocation", hdfsCheckPointDir)
>                 .load();
>  val ds1 = ds.select($"value")
>  val query = ds1.writeStream.outputMode("append").format("console").start()
>  query.awaitTermination()
> There are no errors when I execute this code however I don't see any data 
> being printed out to console? When I run my standalone test Kafka consumer 
> jar I can see that it is receiving messages. so I am not sure what is going 
> on with above code? any ideas?
> 
> Thanks!

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain:     +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia

Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to