It writes data every 5 seconds. under checkpoint directory
gs://testdata/raw_chk what do you see?

You should have four entries under chkpt directory example

/mnt/gs/prices/chkpt> ltr
total 1
-rw-r--rwx. 1 hduser hadoop   45 May  4 07:38 metadata
drwxr-xrwx. 3 hduser hadoop 4096 May  4 07:38 sources
drwxr-xrwx. 2 hduser hadoop 4096 May  5 10:30 offsets
drwxr-xrwx. 2 hduser hadoop 4096 May  5 10:30 commits

Check this article of mine

Processing Change Data Capture with Spark Structured Streaming
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>






   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 22 Apr 2022 at 15:58, hsy...@gmail.com <hsy...@gmail.com> wrote:

> Hello all,
>
> I’m just trying to build a pipeline reading data from a streaming source
> and write to orc file. But I don’t see any file that is written to the
> file system nor any exceptions
>
> Here is an example
>
> val df = spark.readStream.format(“...")
>       .option(
>         “Topic",
>         "Some topic"
>       )
>       .load()
>     val q = df.writeStream.format("orc").option("path",
> "gs://testdata/raw")
>       .option("checkpointLocation",
> "gs://testdata/raw_chk").trigger(Trigger.ProcessingTime(5,
> TimeUnit.SECONDS)).start
>     q.awaitTermination(1200000)
>     q.stop()
>
>
> I couldn’t find any file until 1200 seconds are over
> Does it mean all the data is cached in memory. If I keep the pipeline
> running I see no file would be flushed in the file system.
>
> How do I control how often spark streaming write to disk?
>
> Thanks!
>

Reply via email to