This message is in two parts Hi,
I did some tests on these. The idea being running Spark Structured Streaming (SSS) on a collection of records since the last run of SSS and shutdown SSS job. Some parts of this approach has been described in the following databricks blog Running Streaming Jobs Once a Day For 10x Cost Savings <https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html> However, real life is more complicated than that. Let us look at a typical example as depicted in my lousy diagram attached sources (trail files) --> Kafka --> Flume --> write to Cloud storage (mounted locally) --> SSS --> BigQuery What I have in here is a typical example of *trail files* produced by source. This could be some CDC like Oracle Golden Gate sending committed logs or anything that writes to files. We use Kafka to ingest these files and we use Apache flume <http://flume.apache.org/#:~:text=Welcome%20to%20Apache%20Flume%20%C2%B6%20Flume%20is%20a,reliability%20mechanisms%20and%20many%20failover%20and%20recovery%20mechanisms.> to move these files onto Google Cloud Storage (gs:// ) *mounted as a local file system* in the edge node through Cloud Storage Fuse <https://cloud.google.com/storage/docs/gcs-fuse> The advantage of these storage types is that both on-premise and cloud applications can take care of it. For on-premise the mount point would be say data_path = "file:///mnt/gs/prices/data/" where gs://etcbucket is mounted as /mnt/gs For cloud reference it would be data_path = "etcbucket/prices/data/" We use Flume's file_roll type for this storage This is an event driven architecture and our interest is to process these trail files through SSS at a predetermined interval or when needed. However, the caveat is that the volume should be containable meaning SSS can process it as required. The code base in PySpark is as follows: data_path = "etcbucket/prices/data/" checkpoint_path = "etcbucket/prices/chkpt/" try: streamingDataFrame = self.spark \ .readStream \ .option('newFilesOnly', 'true') \ .option('header', 'true') \ .option('maxFilesPerTrigger', 10000) \ .option('latestFirst', 'false') \ .text(data_path) \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) streamingDataFrame.printSchema() result = streamingDataFrame.select( \ col("parsed_value.rowkey").alias("rowkey") \ , col("parsed_value.ticker").alias("ticker") \ , col("parsed_value.timeissued").alias("timeissued") \ , col("parsed_value.price").alias("price")). \ writeStream. \ outputMode('append'). \ option("truncate", "false"). \ *foreachBatch(sendToSink). \* queryName('trailFiles'). \ * trigger(once = True). \* * option('checkpointLocation', checkpoint_path). \* start(data_path) except Exception as e: print(f"""{e}, quitting""") sys.exit(1) def sendToSink(df, batchId): if(len(df.take(1))) > 0: print(f"""batchId is {batchId}""") df.show(100,False) df. persist() # write to BigQuery batch table s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) df.unpersist() print(f"""wrote to DB""") else: print("DataFrame is empty") view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 2 May 2021 at 10:42, Mail Delivery Subsystem < mailer-dae...@googlemail.com> wrote: > [image: Error Icon] > Message too large > Your message couldn't be delivered to *user@spark.apache.org* because it > exceeds the size limit. Try reducing the message size and then resending > it. > The response from the remote server was: > > 552 5.3.4 Message size exceeds fixed limit > > > > ---------- Forwarded message ---------- > From: Mich Talebzadeh <mich.talebza...@gmail.com> > To: > Cc: user <user@spark.apache.org> > Bcc: > Date: Sun, 2 May 2021 10:42:09 +0100 > Subject: Re: Spark Streaming with Files > ----- Message truncated -----