Yes, of course, if 'sort by within a partition' is not available in the
Connector we will start without it BUT was wondering if it is - OR - if
someone has a better idea.

On Thu, Apr 29, 2021 at 12:32 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Well, you will need to set-up a pilot, test these scenarios and come up
> with a minimal Viable Product (MVP)
>
> .It will be difficult to give accurate answers but would have thought that
> you could have written your processed (enriched dataframe) to a database
> table partitioned by hourly rate and then go from there.
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 29 Apr 2021 at 20:17, Eric Beabes <mailinglist...@gmail.com>
> wrote:
>
> > Correct. Question you've asked seems to be the one we're looking an
> answer
> > for.
> >
> > If we set the processingTime to 60 minutes that will require tons of
> > memory, right? What happens if the batch fails? Reprocess for the same
> > hour? Not sure if this is the right approach.
> >
> > That's why we're thinking we will keep processing time small but use
> Kafka
> > Connector to copy messages to hourly partitions.
> >
> > Other ideas are welcomed!
> >
> >
> >
> > On Thu, Apr 29, 2021 at 11:30 AM Mich Talebzadeh <
> > mich.talebza...@gmail.com>
> > wrote:
> >
> > > Let say that you have your readStream created in SSS with socket
> format.
> > >
> > > Now you want to process these files/messages
> > >
> > >            result = streamingDataFrame.select( \
> > >                      .....
> > >                      writeStream. \
> > >                      outputMode('append'). \
> > >                      option("truncate", "false"). \
> > >                      foreachBatch(sendToSink). \
> > >                      trigger(processingTime='x seconds'). \
> > >                      queryName('process name'). \
> > >                      start()
> > >
> > >
> > > The crucial ones are in blue. sendToSink() function will have two
> > important
> > > parameters
> > >
> > > def sendToSink(df, batchId):
> > >
> > > df is the DataFrame carrying your messages (as Dataframe) collected
> > within
> > > that triggering interval (x seconds).batchId is a monolithically
> > increasing
> > > number giving the running batchID.  Within that sendToSink(), once you
> > have
> > > the DataFrame you can do all sorts of slicing and dicing and write to
> > > database (in our case BigQuery) or write to cloud storage (S3 in your
> > case
> > > etc). However, the crucial question is will you be able to have a
> > > processing time of an hour, so you can write directory to hourly
> created
> > > partitions in S3.?
> > >
> > > HTH
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >    view my Linkedin profile
> > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
> > >
> > >
> > >
> > > *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any
> > > loss, damage or destruction of data or any other property which may
> arise
> > > from relying on this email's technical content is explicitly
> disclaimed.
> > > The author will in no case be liable for any monetary damages arising
> > from
> > > such loss, damage or destruction.
> > >
> > >
> > >
> > >
> > > On Thu, 29 Apr 2021 at 18:56, Eric Beabes <mailinglist...@gmail.com>
> > > wrote:
> > >
> > > > Source (devices) are sending messages to AWS SQS (Not Kafka). Each
> > > message
> > > > contains the path of the file on S3. (We have no control on the
> source.
> > > > They won't change the way it's being done.)
> > > >
> > > > SSS will be listening to the SQS queue. We are thinking SSS will read
> > > each
> > > > SQS message, get the file location, read the file, clean up as
> needed &
> > > > build a message that will be written to Kafka.
> > > >
> > > > Let's just say millions of messages will come in from SQS per minute
> > into
> > > > SSS. As for SSS, we are thinking batch window size could be 5 seconds
> > > > (configurable).
> > > >
> > > >
> > > > On Thu, Apr 29, 2021 at 10:43 AM Mich Talebzadeh <
> > > > mich.talebza...@gmail.com>
> > > > wrote:
> > > >
> > > > > Ok thanks for the info.
> > > > >
> > > > > One question I forgot to ask is what is the streaming interval that
> > the
> > > > > source is sending messages to Kafka to be processed inside SSS? For
> > > > example
> > > > > are these market data etc?
> > > > >
> > > > > HTH
> > > > >
> > > > >
> > > > >
> > > > >    view my Linkedin profile
> > > > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
> > > > >
> > > > >
> > > > >
> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility
> for
> > > any
> > > > > loss, damage or destruction of data or any other property which may
> > > arise
> > > > > from relying on this email's technical content is explicitly
> > > disclaimed.
> > > > > The author will in no case be liable for any monetary damages
> arising
> > > > from
> > > > > such loss, damage or destruction.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Thu, 29 Apr 2021 at 18:35, Eric Beabes <
> mailinglist...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > We're thinking Kafka will allow us to scale to billions of
> messages
> > > in
> > > > a
> > > > > > day. That's the promise of Kafka, right? No other reason really.
> > Main
> > > > > goal
> > > > > > is to "batch" the messages per hour, create file(s) on S3 which
> are
> > > > > sorted
> > > > > > by device_id so that we can do more aggregations which can later
> be
> > > > > sliced
> > > > > > & diced using UI.
> > > > > >
> > > > > > Feel free to suggest alternatives. Thanks.
> > > > > >
> > > > > >
> > > > > > On Thu, Apr 29, 2021 at 10:22 AM Mich Talebzadeh <
> > > > > > mich.talebza...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Eric,
> > > > > > >
> > > > > > > On your second point "Is there a better way to do this"
> > > > > > >
> > > > > > > You are going to use Spark Structured Streaming (SSS) to clean
> > and
> > > > > enrich
> > > > > > > the data and then push the messages to Kafka.
> > > > > > >
> > > > > > > I assume you will be using foreachBatch in this case. What
> > purpose
> > > is
> > > > > > there
> > > > > > > for Kafka to receive the enriched data from SSS? Any other
> reason
> > > > > except
> > > > > > > hourly partition of your data?
> > > > > > >
> > > > > > > HTH
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >    view my Linkedin profile
> > > > > > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > *Disclaimer:* Use it at your own risk. Any and all
> responsibility
> > > for
> > > > > any
> > > > > > > loss, damage or destruction of data or any other property which
> > may
> > > > > arise
> > > > > > > from relying on this email's technical content is explicitly
> > > > > disclaimed.
> > > > > > > The author will in no case be liable for any monetary damages
> > > arising
> > > > > > from
> > > > > > > such loss, damage or destruction.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, 29 Apr 2021 at 18:07, Eric Beabes <
> > > mailinglist...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > We’ve a use case where lots of messages will come in via AWS
> > SQS
> > > > from
> > > > > > > > various devices. We’re thinking of reading these messages
> using
> > > > Spark
> > > > > > > > Structured Streaming, cleaning them up as needed & saving
> each
> > > > > message
> > > > > > on
> > > > > > > > Kafka. Later we’re thinking of using Kafka S3 Connector to
> push
> > > > them
> > > > > to
> > > > > > > S3
> > > > > > > > on an hourly basis; meaning there will be a different
> directory
> > > for
> > > > > > each
> > > > > > > > hour. Challenge is that, within this hourly “partition” the
> > > > messages
> > > > > > need
> > > > > > > > to be “sorted by” a certain field (let’s say device_id).
> Reason
> > > > > being,
> > > > > > > > we’re planning to create an EXTERNAL table on it with BUCKETS
> > on
> > > > > > > device_id.
> > > > > > > > This will speed up the subsequent Aggregation jobs.
> > > > > > > >
> > > > > > > > Questions:
> > > > > > > >
> > > > > > > > 1) Does Kafka S3 Connector allow messages to be sorted by a
> > > > > particular
> > > > > > > > field within a partition – or – do we need to extend it?
> > > > > > > > 2) Is there a better way to do this?
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to