Hi Gabriela, I don't know about data lake but this is about Spark Structured Streaming. Have both readStream and writeStream working OK, for example can you do df.printSchema() after read?
It is advisable to wrap the logic inside try: This is an example of wrapping it data_path = "file:///mnt/gs/prices/data/" checkpoint_path = "file:///mnt/gs/prices/chkpt/" try: streamingDataFrame = self.spark \ .readStream \ .option('newFilesOnly', 'true') \ .option('header', 'true') \ .option('maxFilesPerTrigger', 1000) \ .option('latestFirst', 'false') \ .text(data_path) \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) streamingDataFrame.printSchema() result = streamingDataFrame.select( \ col("parsed_value.rowkey").alias("rowkey") \ , col("parsed_value.ticker").alias("ticker") \ , col("parsed_value.timeissued").alias("timeissued") \ , col("parsed_value.price").alias("price")). \ writeStream. \ outputMode('append'). \ option("truncate", "false"). \ foreachBatch(sendToSink). \ queryName('trailFiles'). \ trigger(once = True). \ option('checkpointLocation', checkpoint_path). \ start(data_path) except Exception as e: print(f"""{e}, quitting""") sys.exit(1) result.awaitTermination() def sendToSink(df, batchId): if(len(df.take(1))) > 0: print(f"""batchId is {batchId}""") rows = df.count() print(f""" Total records processed in this run = {rows}""") else: print("DataFrame is empty") HTH view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 27 Jan 2022 at 12:57, Gabriela Dvořáková <gabri...@monthio.com.invalid> wrote: > Hi, > > I am writing to ask for advice regarding the cleanSource option of the > DataStreamReader. I am using pyspark with Spark 3.1. via Azure Synapse. To > my knowledge, cleanSource option was introduced in Spark version 3. I'd > spent a significant amount of time trying to configure this option with > both "archive" and "delete" options, but the streaming seems to only > process data in the source data lake storage account container, and store > them in the sink storage account data lake container. The archive folder is > never created nor any of the source processed files are removed. None of > the forums or stackoverflow have been of any help so far, so I am reaching > out to you if you perhaps have any tips on how to get it running? Here is > my code: > > Reading: > df = (spark > .readStream > .option("sourceArchiveDir", f > 'abfss://{TRANSIENT_DATA_LAKE_CONTAINER_NAME}@{DATA_LAKE_ACCOUNT_NAME}. > dfs.core.windows.net/budget-app/budgetOutput/archived-v5') > .option("cleanSource", "archive") > .format('json') > .schema(schema) > .load(TRANSIENT_DATA_LAKE_PATH)) > -- > > ...Processing... > > Writing: > ( > df.writeStream > .format("delta") > .outputMode('append') > .option("checkpointLocation", RAW_DATA_LAKE_CHECKPOINT_PATH) > .trigger(once=True) > .partitionBy("Year", "Month", "clientId") > .start(RAW_DATA_LAKE_PATH) > .awaitTermination() > ) > > Thank you very much for help, > Gabriela > > _____________________________________ > > Med venlig hilsen / Best regards > > Gabriela Dvořáková > > Developer | monthio > > M: +421902480757 > > E: gabri...@monthio.com > > W: www.monthio.com > > Monthio Aps, Ragnagade 7, 2100 Copenhagen > > > Create personal wealth and healthy economy > > for people by changing the ways of banking" > >