In SSS
writeStream. \
outputMode('append'). \
option("truncate", "false"). \
* foreachBatch(SendToBigQuery). \*
option('checkpointLocation', checkpoint_path). \
so this writeStream will call foreachBatch(<function_name>)
"""
"foreachBatch" performs custom write logic on each
micro-batch through SendToBigQuery function
foreachBatch(SendToBigQuery) expects 2 parameters, first:*
micro-batch as DataFrame or Dataset and second: unique id for each batch*
Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table
that does this
def SendToBigQuery(df, batchId):
if(len(df.take(1))) > 0:
print(batchId)
# do your logic
else:
print("DataFrame is empty")
You should also have it in
option('checkpointLocation', checkpoint_path).
See this article on mine
Processing Change Data Capture with Spark Structured Streaming
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=QuTPDwvXSqWKWsAi7z611Q%3D%3D>
HTH
Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Mon, 26 Jun 2023 at 06:01, Anil Dasari <[email protected]> wrote:
> Hi,
> I am using spark 3.3.1 distribution and spark stream in my application. Is
> there a way to add a microbatch id to all logs generated by spark and spark
> applications ?
>
> Thanks.
>