I have ever tried to use readStream and writeStream, but it throw "Uri without authority: hdfs:/data/_spark_metadata" exception, which is not seen in normal read mode. The parquet path I specified is hdfs:///data
Regard, Junfeng Chen On Thu, Mar 8, 2018 at 9:38 AM, naresh Goud <nareshgoud.du...@gmail.com> wrote: > change it to readStream instead of read as below > > val df = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "host1:port1,host2:port2") > .option("subscribe", "topic1") > .load() > > > Check is this helpful > > https://github.com/ndulam/KafkaSparkStreams/blob/master/SampleStreamApp/src/main/scala/com/naresh/org/SensorDataSave.scala > > > > > > > > > Thanks, > Naresh > www.linkedin.com/in/naresh-dulam > http://hadoopandspark.blogspot.com/ > > > > On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen <darou...@gmail.com> wrote: > >> I am struggling in trying to read data in kafka and save them to parquet >> file on hdfs by using spark streaming according to this post >> https://stackoverflow.com/questions/45827664/read- >> from-kafka-and-write-to-hdfs-in-parquet >> >> My code is similar to following >> >> val df = spark >> .read >> .format("kafka") >> .option("kafka.bootstrap.servers", "host1:port1,host2:port2") >> .option("subscribe", "topic1") >> .load() >> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") >> .as[(String, String)] >> >> .write.parquet("hdfs://data.parquet") >> >> >> What the difference is I am writing in Java language. >> >> But in practice, this code just run once and then exit gracefully. >> Although it produces the parquet file successfully and no any exception is >> threw out , it runs like a normal spark program rather than a spark >> streaming program. >> >> What should I do if want to read kafka and save them to parquet in batch? >> >> Regard, >> Junfeng Chen >> >