With regard to this sentence
*Offset Tracking with Structured Streaming:: While storing offsets in an external storage with DStreams was necessary, SSS handles this automatically through checkpointing. The checkpoints include the offsets processed by each micro-batch. However, you can still access the most recent offsets using the offset() method on your StreamingQuery object for monitoring purposes that is if you need it* In essence, with SSS and checkpointing in place, you can rely on the automatic offset management provided by the framework, *eliminating the need for the custom offset storage you had with DStreams.* HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 22 May 2024 at 19:49, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi Anil, > > Ok let us put the complete picture here > > * Current DStreams Setup:* > > - Data Source: Kafka > - Processing Engine: Spark DStreams > - Data Transformation with Spark > - Sink: S3 > - Data Format: Parquet > - Exactly-Once Delivery (Attempted): You're attempting exactly-once > delivery by recording micro-batch offsets in an external storage using > foreachRDD at the end of each micro-batch. This allows you to potentially > restart the job from the last processed offset in case of failures? > - Challenges with DStreams for Exactly-Once Delivery: Spark DStreams > offer limited built-in support for exactly-once delivery guarantees. > > > *Moving to Spark Structured Streaming: (SSS)* > > All stays the same. except below > > > - Exactly-Once Delivery which is guaranteed by SSS > - Checkpointing: Enable checkpointing by setting the > checkpointLocation option in writeStream. Spark will periodically > checkpoint the state of streaming query, including offsets, to a designated > location (e.g., HDFS, cloud storage or SSD). > - Offset Tracking with Structured Streaming:: While storing offsets in > an external storage with DStreams was necessary, SSS handles this > automatically through checkpointing. The checkpoints include the offsets > processed by each micro-batch. However, you can still access the most > recent offsets using the offset() method on your StreamingQuery object for > monitoring purposes that is if you need it > > Have a look at this article of mine about structured streaming and > checkpointing > > Processing Change Data Capture with Spark Structured Streaming > <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/> > > In your case briefly > > def *store_offsets_to_checkpoint*(df, batchId): > if(len(df.take(1))) > 0: > df. persist() > # Extract offsets from the DataFrame (assuming a column named > 'offset') > offset_rows = df.select(col('offset')).rdd.collect() > # Create OffsetRange objects from extracted offsets > offsets = [OffsetRange(partition=row.partition, > fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition' > and 'offset' columns > for row in offset_rows] > # Logic to store offsets in your external checkpoint store) > ...... > df.unpersist() > else: > print("DataFrame is empty") > > # Define your Structured Streaming application with Kafka source and sink > > """ > "foreach" performs custom write logic on each row and > "foreachBatch" performs custom write logic on each micro-batch through > *store_offsets_to_checkpoint* function > foreachBatch(*store_offsets_to_checkpoint*) expects 2 > parameters, first: micro-batch as DataFrame or Dataset and second: unique > id for each batch > Using foreachBatch, we write each micro batch to storage > defined in our custom logic > """ > > streaming = spark.readStream \ > .format("kafka") \ . > option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "topic_name") \ > .load() > > # Custom sink function to store offsets in checkpoint > streaming = streaming.writeStream \ > . format("memory") \ > * .option("checkpointLocation", "/path/to/checkpoint/store") \ * > .foreachBatch(*store_offsets_to_checkpoint*) \ > .start() > > HTH > > > <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/> > > <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/> > Mich Talebzadeh, > Technologist | Architect | Data Engineer | Generative AI | FinCrime > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Wed, 22 May 2024 at 16:27, Anil Dasari <adas...@guidewire.com> wrote: > >> Thanks Das, Mtich. >> >> Mitch, >> We process data from Kafka and write it to S3 in Parquet format using >> Dstreams. To ensure exactly-once delivery and prevent data loss, our >> process records micro-batch offsets to an external storage at the end of >> each micro-batch in foreachRDD, which is then used when the job restarts. >> >> Das, >> Thanks for sharing the details. I will look into them. >> Unfortunately, the listeners process is async and can't guarantee happens >> before association with microbatch to commit offsets to external storage. >> But still they will work. Is there a way to access lastProgress in >> foreachBatch ? >> >> >> On Wed, May 22, 2024 at 7:35 AM Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> If you want to find what offset ranges are present in a microbatch in >>> Structured Streaming, you have to look at the >>> StreamingQuery.lastProgress or use the QueryProgressListener >>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>. >>> Both of these approaches gives you access to the SourceProgress >>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html> >>> which gives Kafka offsets as a JSON string. >>> >>> Hope this helps! >>> >>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> OK to understand better your current model relies on streaming data >>>> input through Kafka topic, Spark does some ETL and you send to a sink, a >>>> database for file storage like HDFS etc? >>>> >>>> Your current architecture relies on Direct Streams (DStream) and RDDs >>>> and you want to move to Spark sStructured Streaming based on dataframes and >>>> datasets? >>>> >>>> You have not specified your sink >>>> >>>> With regard to your question? >>>> >>>> "Is there an equivalent of Dstream HasOffsetRanges in structure >>>> streaming to get the microbatch end offsets to the checkpoint in our >>>> external checkpoint store ?" >>>> >>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark >>>> Structured Streaming. However, Structured Streaming provides mechanisms to >>>> achieve similar functionality: >>>> >>>> HTH >>>> >>>> Mich Talebzadeh, >>>> Technologist | Architect | Data Engineer | Generative AI | FinCrime >>>> London >>>> United Kingdom >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>> >>>> >>>> >>>> *Disclaimer:* The information provided is correct to the best of my >>>> knowledge but of course cannot be guaranteed . It is essential to note >>>> that, as with any advice, quote "one test result is worth one-thousand >>>> expert opinions (Werner >>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>> >>>> >>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID >>>> <ashok34...@yahoo.com.invalid> wrote: >>>> >>>>> Hello, >>>>> >>>>> what options are you considering yourself? >>>>> >>>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari < >>>>> adas...@guidewire.com> wrote: >>>>> >>>>> >>>>> Hello, >>>>> >>>>> We are on Spark 3.x and using Spark dstream + kafka and planning to >>>>> use structured streaming + Kafka. >>>>> Is there an equivalent of Dstream HasOffsetRanges in structure >>>>> streaming to get the microbatch end offsets to the checkpoint in our >>>>> external checkpoint store ? Thanks in advance. >>>>> >>>>> Regards >>>>> >>>>>