I am struggling in trying to read data in kafka and save them to parquet
file on hdfs by using spark streaming according to this post
https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet

My code is similar to  following

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

  .write.parquet("hdfs://data.parquet")


What the difference is I am writing in Java language.

But in practice, this code just run once and then exit gracefully. Although
it produces the parquet file successfully and no any exception is threw out
, it runs like a normal spark program rather than a spark streaming program.

What should I do if want to read kafka and save them to parquet in batch?

Regard,
Junfeng Chen

Reply via email to