This message is in two parts

Hi,

I did some tests on these. The idea being running  Spark Structured
Streaming (SSS) on a collection of records since the last run of SSS and
shutdown SSS job.

Some parts of this approach has been described in the following databricks
blog

Running Streaming Jobs Once a Day For 10x Cost Savings
<https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html>
However, real life is more complicated than that.

Let us look at a typical example as depicted in my lousy diagram attached

sources (trail files) --> Kafka --> Flume --> write to Cloud storage
(mounted locally) --> SSS --> BigQuery


What I have in here is a typical example of *trail files* produced by
source. This could be some CDC like Oracle Golden Gate sending
committed logs or anything that writes to files. We use Kafka to ingest
these files and we use Apache flume
<http://flume.apache.org/#:~:text=Welcome%20to%20Apache%20Flume%20%C2%B6%20Flume%20is%20a,reliability%20mechanisms%20and%20many%20failover%20and%20recovery%20mechanisms.>
to
move these files onto Google Cloud Storage (gs:// ) *mounted as a local
file system* in the edge node through Cloud Storage Fuse
<https://cloud.google.com/storage/docs/gcs-fuse>


The advantage of these storage types is that both on-premise and cloud
applications can take care of it.


For on-premise the mount point would be say data_path =
"file:///mnt/gs/prices/data/" where gs://etcbucket is mounted as /mnt/gs


For cloud reference it would be data_path = "etcbucket/prices/data/"


We use Flume's file_roll type for this storage


This is an event driven architecture and our interest is to process these
trail files through SSS at a predetermined interval or when needed.
However, the caveat is that the volume should be containable meaning SSS
can process it as required. The code base in PySpark is as follows:


      data_path = "etcbucket/prices/data/"

      checkpoint_path = "etcbucket/prices/chkpt/"

        try:

            streamingDataFrame = self.spark \

                .readStream \

                .option('newFilesOnly', 'true') \

                .option('header', 'true') \

                .option('maxFilesPerTrigger', 10000) \

                .option('latestFirst', 'false') \

                .text(data_path) \

                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


            streamingDataFrame.printSchema()

            result = streamingDataFrame.select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     writeStream. \

                     outputMode('append'). \

                     option("truncate", "false"). \

                     *foreachBatch(sendToSink). \*

                     queryName('trailFiles'). \

   *                  trigger(once = True). \*

  *                   option('checkpointLocation', checkpoint_path). \*

                     start(data_path)

        except Exception as e:

                print(f"""{e}, quitting""")

                sys.exit(1)

def sendToSink(df, batchId):
    if(len(df.take(1))) > 0:
        print(f"""batchId is {batchId}""")
        df.show(100,False)
        df. persist()
        # write to BigQuery batch table
        s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
        df.unpersist()
        print(f"""wrote to DB""")
    else:
        print("DataFrame is empty")



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 2 May 2021 at 10:42, Mail Delivery Subsystem <
mailer-dae...@googlemail.com> wrote:

> [image: Error Icon]
> Message too large
> Your message couldn't be delivered to *user@spark.apache.org* because it
> exceeds the size limit. Try reducing the message size and then resending
> it.
> The response from the remote server was:
>
> 552 5.3.4 Message size exceeds fixed limit
>
>
>
> ---------- Forwarded message ----------
> From: Mich Talebzadeh <mich.talebza...@gmail.com>
> To:
> Cc: user <user@spark.apache.org>
> Bcc:
> Date: Sun, 2 May 2021 10:42:09 +0100
> Subject: Re: Spark Streaming with Files
> ----- Message truncated -----

Reply via email to