It writes data every 5 seconds. under checkpoint directory gs://testdata/raw_chk what do you see?
You should have four entries under chkpt directory example /mnt/gs/prices/chkpt> ltr total 1 -rw-r--rwx. 1 hduser hadoop 45 May 4 07:38 metadata drwxr-xrwx. 3 hduser hadoop 4096 May 4 07:38 sources drwxr-xrwx. 2 hduser hadoop 4096 May 5 10:30 offsets drwxr-xrwx. 2 hduser hadoop 4096 May 5 10:30 commits Check this article of mine Processing Change Data Capture with Spark Structured Streaming <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/> view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 22 Apr 2022 at 15:58, hsy...@gmail.com <hsy...@gmail.com> wrote: > Hello all, > > I’m just trying to build a pipeline reading data from a streaming source > and write to orc file. But I don’t see any file that is written to the > file system nor any exceptions > > Here is an example > > val df = spark.readStream.format(“...") > .option( > “Topic", > "Some topic" > ) > .load() > val q = df.writeStream.format("orc").option("path", > "gs://testdata/raw") > .option("checkpointLocation", > "gs://testdata/raw_chk").trigger(Trigger.ProcessingTime(5, > TimeUnit.SECONDS)).start > q.awaitTermination(1200000) > q.stop() > > > I couldn’t find any file until 1200 seconds are over > Does it mean all the data is cached in memory. If I keep the pipeline > running I see no file would be flushed in the file system. > > How do I control how often spark streaming write to disk? > > Thanks! >