You are right. - another question on migration. Is there a way to get the microbatch id during the microbatch dataset `trasform` operation like in rdd transform ? I am attempting to implement the following pseudo functionality with structured streaming. In this approach, recordCategoriesMetadata is fetched and rdd metrics like rdd size etc using microbatch idin the transform operation. ```code val rddQueue = new mutable.Queue[RDD[Int]]() // source components val sources = Seq.empty[String] val consolidatedDstream = sources .map(source => { val inputStream = ssc.queueStream(rddQueue) inputStream.transform((rdd, ts) => { // emit metrics of microbatch ts : rdd size etc.
val recordCategories = rdd.map(..).collect(); val recordCategoriesMetadata = ... rdd .map(r => val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema) (source, customRecord) ) }) } ) .reduceLeft(_ union _) consolidatedDstream .foreachRDD((rdd, ts) => { // get pipes for each source val pipes = Seq.empty[String] // pipes of given source pipes.foreach(pipe => { val pipeSource = null; // get from pipe variable val psRDD = rdd .filter { case (source, sourceRDD) => source.equals(pipeSource) } // apply pipe transformation and sink }) }) ``` In structured streaming, it can look like - ```code val consolidatedDstream = sources .map(source => { val inputStream = ... (for each source) inputStream } ) .reduceLeft(_ union _) consolidatedDstream .writeStream .foreachBatch((ds, ts) => { val newDS = ds.transform((internalDS => { // emit metrics of microbatch ts : rdd size etc. val recordCategories = rdd.map(..).collect(); val recordCategoriesMetadata = ... internalDS .map(r => val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema) (source, customRecord) ) })(... <encoder>) // get pipes for each source val pipes = Seq.empty[String] // pipes of given source pipes.foreach(pipe => { val pipeSource = null; // get from pipe variable val psRDD = newDS .filter { case (source, sourceDS) => source.equals(pipeSource) } // apply pipe transformation and sink }) }) ``` ^ is just pseudo code and still not sure if it works. Let me know your suggestions if any. thanks. On Wed, May 22, 2024 at 8:34 AM Tathagata Das <tathagata.das1...@gmail.com> wrote: > The right way to associated microbatches when committing to external > storage is to use the microbatch id that you can get in foreachBatch. That > microbatch id guarantees that the data produced in the batch is the always > the same no matter any recomputations (assuming all processing logic is > deterministic). So you can commit the batch id + batch data together. And > then async commit the batch id + offsets. > > On Wed, May 22, 2024 at 11:27 AM Anil Dasari <adas...@guidewire.com> > wrote: > >> Thanks Das, Mtich. >> >> Mitch, >> We process data from Kafka and write it to S3 in Parquet format using >> Dstreams. To ensure exactly-once delivery and prevent data loss, our >> process records micro-batch offsets to an external storage at the end of >> each micro-batch in foreachRDD, which is then used when the job restarts. >> >> Das, >> Thanks for sharing the details. I will look into them. >> Unfortunately, the listeners process is async and can't guarantee happens >> before association with microbatch to commit offsets to external storage. >> But still they will work. Is there a way to access lastProgress in >> foreachBatch ? >> >> >> On Wed, May 22, 2024 at 7:35 AM Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> If you want to find what offset ranges are present in a microbatch in >>> Structured Streaming, you have to look at the >>> StreamingQuery.lastProgress or use the QueryProgressListener >>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>. >>> Both of these approaches gives you access to the SourceProgress >>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html> >>> which gives Kafka offsets as a JSON string. >>> >>> Hope this helps! >>> >>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> OK to understand better your current model relies on streaming data >>>> input through Kafka topic, Spark does some ETL and you send to a sink, a >>>> database for file storage like HDFS etc? >>>> >>>> Your current architecture relies on Direct Streams (DStream) and RDDs >>>> and you want to move to Spark sStructured Streaming based on dataframes and >>>> datasets? >>>> >>>> You have not specified your sink >>>> >>>> With regard to your question? >>>> >>>> "Is there an equivalent of Dstream HasOffsetRanges in structure >>>> streaming to get the microbatch end offsets to the checkpoint in our >>>> external checkpoint store ?" >>>> >>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark >>>> Structured Streaming. However, Structured Streaming provides mechanisms to >>>> achieve similar functionality: >>>> >>>> HTH >>>> >>>> Mich Talebzadeh, >>>> Technologist | Architect | Data Engineer | Generative AI | FinCrime >>>> London >>>> United Kingdom >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>> <https://en.everybodywiki.com/Mich_Talebzadeh> >>>> >>>> >>>> >>>> *Disclaimer:* The information provided is correct to the best of my >>>> knowledge but of course cannot be guaranteed . It is essential to note >>>> that, as with any advice, quote "one test result is worth one-thousand >>>> expert opinions (Werner >>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von >>>> Braun >>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun> >>>> )". >>>> >>>> >>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID >>>> <ashok34...@yahoo.com.invalid> wrote: >>>> >>>>> Hello, >>>>> >>>>> what options are you considering yourself? >>>>> >>>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari < >>>>> adas...@guidewire.com> wrote: >>>>> >>>>> >>>>> Hello, >>>>> >>>>> We are on Spark 3.x and using Spark dstream + kafka and planning to >>>>> use structured streaming + Kafka. >>>>> Is there an equivalent of Dstream HasOffsetRanges in structure >>>>> streaming to get the microbatch end offsets to the checkpoint in our >>>>> external checkpoint store ? Thanks in advance. >>>>> >>>>> Regards >>>>> >>>>>