Well, you will need to set-up a pilot, test these scenarios and come up
with a minimal Viable Product (MVP)

.It will be difficult to give accurate answers but would have thought that
you could have written your processed (enriched dataframe) to a database
table partitioned by hourly rate and then go from there.

HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 29 Apr 2021 at 20:17, Eric Beabes <mailinglist...@gmail.com> wrote:

> Correct. Question you've asked seems to be the one we're looking an answer
> for.
>
> If we set the processingTime to 60 minutes that will require tons of
> memory, right? What happens if the batch fails? Reprocess for the same
> hour? Not sure if this is the right approach.
>
> That's why we're thinking we will keep processing time small but use Kafka
> Connector to copy messages to hourly partitions.
>
> Other ideas are welcomed!
>
>
>
> On Thu, Apr 29, 2021 at 11:30 AM Mich Talebzadeh <
> mich.talebza...@gmail.com>
> wrote:
>
> > Let say that you have your readStream created in SSS with socket format.
> >
> > Now you want to process these files/messages
> >
> >            result = streamingDataFrame.select( \
> >                      .....
> >                      writeStream. \
> >                      outputMode('append'). \
> >                      option("truncate", "false"). \
> >                      foreachBatch(sendToSink). \
> >                      trigger(processingTime='x seconds'). \
> >                      queryName('process name'). \
> >                      start()
> >
> >
> > The crucial ones are in blue. sendToSink() function will have two
> important
> > parameters
> >
> > def sendToSink(df, batchId):
> >
> > df is the DataFrame carrying your messages (as Dataframe) collected
> within
> > that triggering interval (x seconds).batchId is a monolithically
> increasing
> > number giving the running batchID.  Within that sendToSink(), once you
> have
> > the DataFrame you can do all sorts of slicing and dicing and write to
> > database (in our case BigQuery) or write to cloud storage (S3 in your
> case
> > etc). However, the crucial question is will you be able to have a
> > processing time of an hour, so you can write directory to hourly created
> > partitions in S3.?
> >
> > HTH
> >
> >
> >
> >
> >
> >
> >
> >    view my Linkedin profile
> > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
> >
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
> >
> >
> >
> > On Thu, 29 Apr 2021 at 18:56, Eric Beabes <mailinglist...@gmail.com>
> > wrote:
> >
> > > Source (devices) are sending messages to AWS SQS (Not Kafka). Each
> > message
> > > contains the path of the file on S3. (We have no control on the source.
> > > They won't change the way it's being done.)
> > >
> > > SSS will be listening to the SQS queue. We are thinking SSS will read
> > each
> > > SQS message, get the file location, read the file, clean up as needed &
> > > build a message that will be written to Kafka.
> > >
> > > Let's just say millions of messages will come in from SQS per minute
> into
> > > SSS. As for SSS, we are thinking batch window size could be 5 seconds
> > > (configurable).
> > >
> > >
> > > On Thu, Apr 29, 2021 at 10:43 AM Mich Talebzadeh <
> > > mich.talebza...@gmail.com>
> > > wrote:
> > >
> > > > Ok thanks for the info.
> > > >
> > > > One question I forgot to ask is what is the streaming interval that
> the
> > > > source is sending messages to Kafka to be processed inside SSS? For
> > > example
> > > > are these market data etc?
> > > >
> > > > HTH
> > > >
> > > >
> > > >
> > > >    view my Linkedin profile
> > > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
> > > >
> > > >
> > > >
> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for
> > any
> > > > loss, damage or destruction of data or any other property which may
> > arise
> > > > from relying on this email's technical content is explicitly
> > disclaimed.
> > > > The author will in no case be liable for any monetary damages arising
> > > from
> > > > such loss, damage or destruction.
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, 29 Apr 2021 at 18:35, Eric Beabes <mailinglist...@gmail.com>
> > > > wrote:
> > > >
> > > > > We're thinking Kafka will allow us to scale to billions of messages
> > in
> > > a
> > > > > day. That's the promise of Kafka, right? No other reason really.
> Main
> > > > goal
> > > > > is to "batch" the messages per hour, create file(s) on S3 which are
> > > > sorted
> > > > > by device_id so that we can do more aggregations which can later be
> > > > sliced
> > > > > & diced using UI.
> > > > >
> > > > > Feel free to suggest alternatives. Thanks.
> > > > >
> > > > >
> > > > > On Thu, Apr 29, 2021 at 10:22 AM Mich Talebzadeh <
> > > > > mich.talebza...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Eric,
> > > > > >
> > > > > > On your second point "Is there a better way to do this"
> > > > > >
> > > > > > You are going to use Spark Structured Streaming (SSS) to clean
> and
> > > > enrich
> > > > > > the data and then push the messages to Kafka.
> > > > > >
> > > > > > I assume you will be using foreachBatch in this case. What
> purpose
> > is
> > > > > there
> > > > > > for Kafka to receive the enriched data from SSS? Any other reason
> > > > except
> > > > > > hourly partition of your data?
> > > > > >
> > > > > > HTH
> > > > > >
> > > > > >
> > > > > >
> > > > > >    view my Linkedin profile
> > > > > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
> > > > > >
> > > > > >
> > > > > >
> > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility
> > for
> > > > any
> > > > > > loss, damage or destruction of data or any other property which
> may
> > > > arise
> > > > > > from relying on this email's technical content is explicitly
> > > > disclaimed.
> > > > > > The author will in no case be liable for any monetary damages
> > arising
> > > > > from
> > > > > > such loss, damage or destruction.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, 29 Apr 2021 at 18:07, Eric Beabes <
> > mailinglist...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > We’ve a use case where lots of messages will come in via AWS
> SQS
> > > from
> > > > > > > various devices. We’re thinking of reading these messages using
> > > Spark
> > > > > > > Structured Streaming, cleaning them up as needed & saving each
> > > > message
> > > > > on
> > > > > > > Kafka. Later we’re thinking of using Kafka S3 Connector to push
> > > them
> > > > to
> > > > > > S3
> > > > > > > on an hourly basis; meaning there will be a different directory
> > for
> > > > > each
> > > > > > > hour. Challenge is that, within this hourly “partition” the
> > > messages
> > > > > need
> > > > > > > to be “sorted by” a certain field (let’s say device_id). Reason
> > > > being,
> > > > > > > we’re planning to create an EXTERNAL table on it with BUCKETS
> on
> > > > > > device_id.
> > > > > > > This will speed up the subsequent Aggregation jobs.
> > > > > > >
> > > > > > > Questions:
> > > > > > >
> > > > > > > 1) Does Kafka S3 Connector allow messages to be sorted by a
> > > > particular
> > > > > > > field within a partition – or – do we need to extend it?
> > > > > > > 2) Is there a better way to do this?
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to