Yes, of course, if 'sort by within a partition' is not available in the Connector we will start without it BUT was wondering if it is - OR - if someone has a better idea.
On Thu, Apr 29, 2021 at 12:32 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Well, you will need to set-up a pilot, test these scenarios and come up > with a minimal Viable Product (MVP) > > .It will be difficult to give accurate answers but would have thought that > you could have written your processed (enriched dataframe) to a database > table partitioned by hourly rate and then go from there. > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 29 Apr 2021 at 20:17, Eric Beabes <mailinglist...@gmail.com> > wrote: > > > Correct. Question you've asked seems to be the one we're looking an > answer > > for. > > > > If we set the processingTime to 60 minutes that will require tons of > > memory, right? What happens if the batch fails? Reprocess for the same > > hour? Not sure if this is the right approach. > > > > That's why we're thinking we will keep processing time small but use > Kafka > > Connector to copy messages to hourly partitions. > > > > Other ideas are welcomed! > > > > > > > > On Thu, Apr 29, 2021 at 11:30 AM Mich Talebzadeh < > > mich.talebza...@gmail.com> > > wrote: > > > > > Let say that you have your readStream created in SSS with socket > format. > > > > > > Now you want to process these files/messages > > > > > > result = streamingDataFrame.select( \ > > > ..... > > > writeStream. \ > > > outputMode('append'). \ > > > option("truncate", "false"). \ > > > foreachBatch(sendToSink). \ > > > trigger(processingTime='x seconds'). \ > > > queryName('process name'). \ > > > start() > > > > > > > > > The crucial ones are in blue. sendToSink() function will have two > > important > > > parameters > > > > > > def sendToSink(df, batchId): > > > > > > df is the DataFrame carrying your messages (as Dataframe) collected > > within > > > that triggering interval (x seconds).batchId is a monolithically > > increasing > > > number giving the running batchID. Within that sendToSink(), once you > > have > > > the DataFrame you can do all sorts of slicing and dicing and write to > > > database (in our case BigQuery) or write to cloud storage (S3 in your > > case > > > etc). However, the crucial question is will you be able to have a > > > processing time of an hour, so you can write directory to hourly > created > > > partitions in S3.? > > > > > > HTH > > > > > > > > > > > > > > > > > > > > > > > > view my Linkedin profile > > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > > > > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for > any > > > loss, damage or destruction of data or any other property which may > arise > > > from relying on this email's technical content is explicitly > disclaimed. > > > The author will in no case be liable for any monetary damages arising > > from > > > such loss, damage or destruction. > > > > > > > > > > > > > > > On Thu, 29 Apr 2021 at 18:56, Eric Beabes <mailinglist...@gmail.com> > > > wrote: > > > > > > > Source (devices) are sending messages to AWS SQS (Not Kafka). Each > > > message > > > > contains the path of the file on S3. (We have no control on the > source. > > > > They won't change the way it's being done.) > > > > > > > > SSS will be listening to the SQS queue. We are thinking SSS will read > > > each > > > > SQS message, get the file location, read the file, clean up as > needed & > > > > build a message that will be written to Kafka. > > > > > > > > Let's just say millions of messages will come in from SQS per minute > > into > > > > SSS. As for SSS, we are thinking batch window size could be 5 seconds > > > > (configurable). > > > > > > > > > > > > On Thu, Apr 29, 2021 at 10:43 AM Mich Talebzadeh < > > > > mich.talebza...@gmail.com> > > > > wrote: > > > > > > > > > Ok thanks for the info. > > > > > > > > > > One question I forgot to ask is what is the streaming interval that > > the > > > > > source is sending messages to Kafka to be processed inside SSS? For > > > > example > > > > > are these market data etc? > > > > > > > > > > HTH > > > > > > > > > > > > > > > > > > > > view my Linkedin profile > > > > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > > > > > > > > > > > > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility > for > > > any > > > > > loss, damage or destruction of data or any other property which may > > > arise > > > > > from relying on this email's technical content is explicitly > > > disclaimed. > > > > > The author will in no case be liable for any monetary damages > arising > > > > from > > > > > such loss, damage or destruction. > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, 29 Apr 2021 at 18:35, Eric Beabes < > mailinglist...@gmail.com> > > > > > wrote: > > > > > > > > > > > We're thinking Kafka will allow us to scale to billions of > messages > > > in > > > > a > > > > > > day. That's the promise of Kafka, right? No other reason really. > > Main > > > > > goal > > > > > > is to "batch" the messages per hour, create file(s) on S3 which > are > > > > > sorted > > > > > > by device_id so that we can do more aggregations which can later > be > > > > > sliced > > > > > > & diced using UI. > > > > > > > > > > > > Feel free to suggest alternatives. Thanks. > > > > > > > > > > > > > > > > > > On Thu, Apr 29, 2021 at 10:22 AM Mich Talebzadeh < > > > > > > mich.talebza...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Eric, > > > > > > > > > > > > > > On your second point "Is there a better way to do this" > > > > > > > > > > > > > > You are going to use Spark Structured Streaming (SSS) to clean > > and > > > > > enrich > > > > > > > the data and then push the messages to Kafka. > > > > > > > > > > > > > > I assume you will be using foreachBatch in this case. What > > purpose > > > is > > > > > > there > > > > > > > for Kafka to receive the enriched data from SSS? Any other > reason > > > > > except > > > > > > > hourly partition of your data? > > > > > > > > > > > > > > HTH > > > > > > > > > > > > > > > > > > > > > > > > > > > > view my Linkedin profile > > > > > > > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Disclaimer:* Use it at your own risk. Any and all > responsibility > > > for > > > > > any > > > > > > > loss, damage or destruction of data or any other property which > > may > > > > > arise > > > > > > > from relying on this email's technical content is explicitly > > > > > disclaimed. > > > > > > > The author will in no case be liable for any monetary damages > > > arising > > > > > > from > > > > > > > such loss, damage or destruction. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, 29 Apr 2021 at 18:07, Eric Beabes < > > > mailinglist...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > We’ve a use case where lots of messages will come in via AWS > > SQS > > > > from > > > > > > > > various devices. We’re thinking of reading these messages > using > > > > Spark > > > > > > > > Structured Streaming, cleaning them up as needed & saving > each > > > > > message > > > > > > on > > > > > > > > Kafka. Later we’re thinking of using Kafka S3 Connector to > push > > > > them > > > > > to > > > > > > > S3 > > > > > > > > on an hourly basis; meaning there will be a different > directory > > > for > > > > > > each > > > > > > > > hour. Challenge is that, within this hourly “partition” the > > > > messages > > > > > > need > > > > > > > > to be “sorted by” a certain field (let’s say device_id). > Reason > > > > > being, > > > > > > > > we’re planning to create an EXTERNAL table on it with BUCKETS > > on > > > > > > > device_id. > > > > > > > > This will speed up the subsequent Aggregation jobs. > > > > > > > > > > > > > > > > Questions: > > > > > > > > > > > > > > > > 1) Does Kafka S3 Connector allow messages to be sorted by a > > > > > particular > > > > > > > > field within a partition – or – do we need to extend it? > > > > > > > > 2) Is there a better way to do this? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >