Part 2 In this case, we are simply counting the number of rows to be ingested once before SSS terminates. This is shown in the above method
batchId is 0 Total records processed in this run = 3107 wrote to DB So it shows batchId (0) and the total records count() and writes to BigQuery table and terminates wait and start again, it should pickup from the next batchId batchId is 1 Total records processed in this run = 80 wrote to DB hduser@rhes76: /home/hduser/dba/bin/python/DSBQ/src> *What checkpoint directory has* /mnt/gs/prices/chkpt> ltr total 1 -rw-r--r--. 1 hduser hadoop 45 May 2 09:35 metadata drwxr-xr-x. 1 hduser hadoop 0 May 2 09:35 offsets drwxr-xr-x. 1 hduser hadoop 0 May 2 09:35 commits drwxr-xr-x. 1 hduser hadoop 0 May 2 09:35 sources cat metadata {"id":"cc3a9459-2a9d-4740-a280-e5b5d333d098"} cd offsets/ /mnt/gs/prices/chkpt/offsets> ltr total 2 -rw-r--r--. 1 hduser hadoop 529 May 2 09:35 0 -rw-r--r--. 1 hduser hadoop 529 May 2 09:39 1 > cat 0 v1 {"batchWatermarkMs":0,"batchTimestampMs":1619944526698,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}} {"logOffset":0} cat 1 v1 {"batchWatermarkMs":0,"batchTimestampMs":1619944796208,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}} {"logOffset":1} So there are two offsets for each run (0,1) with different "batchTimestampMs", namely "batchTimestampMs":1619944526698 and "batchTimestampMs":1619944796208 respectively *How to trigger SSS for each run* The SSS job can be triggered in many ways. Can use simple cron on prem, autoSys, Airflow on prem or composer in cloud. If there is nothing in the queue (source stopped say), SSS will come back with DataFrame is empty and terminates. This logic is already built in the sendToSink() method. *Conclusion* I am not sure not running compute with SSS is going to save a lot. Surely compute process will be run as needed and that saves some dollars but the whole infra-structure has to be there and the lion cost goes there. If the idea is to run CDC once or twice a day, then it is equally fine to schedule SSS to start at certain intervals. The import thing to realise is that SSS will pickup from the records left through the checkpoint directory. If checkpoint directory is lost or content deleted, SSS will process all the records from batchId 0. HTH, Mich view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 2 May 2021 at 10:45, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > This message is in two parts > > Hi, > > I did some tests on these. The idea being running Spark Structured > Streaming (SSS) on a collection of records since the last run of SSS and > shutdown SSS job. > > Some parts of this approach has been described in the following databricks > blog > > Running Streaming Jobs Once a Day For 10x Cost Savings > <https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html> > However, real life is more complicated than that. > > Let us look at a typical example as depicted in my lousy diagram attached > > sources (trail files) --> Kafka --> Flume --> write to Cloud storage > (mounted locally) --> SSS --> BigQuery > > > What I have in here is a typical example of *trail files* produced by > source. This could be some CDC like Oracle Golden Gate sending > committed logs or anything that writes to files. We use Kafka to ingest > these files and we use Apache flume > <http://flume.apache.org/#:~:text=Welcome%20to%20Apache%20Flume%20%C2%B6%20Flume%20is%20a,reliability%20mechanisms%20and%20many%20failover%20and%20recovery%20mechanisms.> > to > move these files onto Google Cloud Storage (gs:// ) *mounted as a local > file system* in the edge node through Cloud Storage Fuse > <https://cloud.google.com/storage/docs/gcs-fuse> > > > The advantage of these storage types is that both on-premise and cloud > applications can take care of it. > > > For on-premise the mount point would be say data_path = > "file:///mnt/gs/prices/data/" where gs://etcbucket is mounted as /mnt/gs > > > For cloud reference it would be data_path = "etcbucket/prices/data/" > > > We use Flume's file_roll type for this storage > > > This is an event driven architecture and our interest is to process these > trail files through SSS at a predetermined interval or when needed. > However, the caveat is that the volume should be containable meaning SSS > can process it as required. The code base in PySpark is as follows: > > > data_path = "etcbucket/prices/data/" > > checkpoint_path = "etcbucket/prices/chkpt/" > > try: > > streamingDataFrame = self.spark \ > > .readStream \ > > .option('newFilesOnly', 'true') \ > > .option('header', 'true') \ > > .option('maxFilesPerTrigger', 10000) \ > > .option('latestFirst', 'false') \ > > .text(data_path) \ > > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value")) > > > streamingDataFrame.printSchema() > > result = streamingDataFrame.select( \ > > col("parsed_value.rowkey").alias("rowkey") \ > > , col("parsed_value.ticker").alias("ticker") \ > > , col("parsed_value.timeissued").alias("timeissued") \ > > , col("parsed_value.price").alias("price")). \ > > writeStream. \ > > outputMode('append'). \ > > option("truncate", "false"). \ > > *foreachBatch(sendToSink). \* > > queryName('trailFiles'). \ > > * trigger(once = True). \* > > * option('checkpointLocation', checkpoint_path). \* > > start(data_path) > > except Exception as e: > > print(f"""{e}, quitting""") > > sys.exit(1) > > def sendToSink(df, batchId): > if(len(df.take(1))) > 0: > print(f"""batchId is {batchId}""") > df.show(100,False) > df. persist() > # write to BigQuery batch table > s.writeTableToBQ(df, "append", > config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) > df.unpersist() > print(f"""wrote to DB""") > else: > print("DataFrame is empty") > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 2 May 2021 at 10:42, Mail Delivery Subsystem < > mailer-dae...@googlemail.com> wrote: > >> [image: Error Icon] >> Message too large >> Your message couldn't be delivered to *user@spark.apache.org* because it >> exceeds the size limit. Try reducing the message size and then resending >> it. >> The response from the remote server was: >> >> 552 5.3.4 Message size exceeds fixed limit >> >> >> >> ---------- Forwarded message ---------- >> From: Mich Talebzadeh <mich.talebza...@gmail.com> >> To: >> Cc: user <user@spark.apache.org> >> Bcc: >> Date: Sun, 2 May 2021 10:42:09 +0100 >> Subject: Re: Spark Streaming with Files >> ----- Message truncated ----- > >