With regard to this sentence

*Offset Tracking with Structured Streaming:: While storing offsets in an
external storage with DStreams was necessary, SSS handles this
automatically through checkpointing. The checkpoints include the offsets
processed by each micro-batch. However, you can still access the most
recent offsets using the offset() method on your StreamingQuery object for
monitoring purposes that is if you need it*

In essence, with SSS and checkpointing in place, you can rely on the
automatic offset management provided by the framework,
*eliminating the need for the custom offset storage you had with DStreams.*

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 22 May 2024 at 19:49, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi  Anil,
>
> Ok let us put the complete picture here
>
>        * Current DStreams Setup:*
>
>    - Data Source: Kafka
>    - Processing Engine: Spark DStreams
>    - Data Transformation with Spark
>    - Sink: S3
>    - Data Format: Parquet
>    - Exactly-Once Delivery (Attempted): You're attempting exactly-once
>    delivery by recording micro-batch offsets in an external storage using
>    foreachRDD at the end of each micro-batch. This allows you to potentially
>    restart the job from the last processed offset in case of failures?
>    - Challenges with DStreams for Exactly-Once Delivery: Spark DStreams
>    offer limited built-in support for exactly-once delivery guarantees.
>
>
> *Moving to Spark Structured Streaming: (SSS)*
>
> All stays the same. except below
>
>
>    - Exactly-Once Delivery which is guaranteed by SSS
>    - Checkpointing: Enable checkpointing by setting the
>    checkpointLocation option in  writeStream. Spark will periodically
>    checkpoint the state of streaming query, including offsets, to a designated
>    location (e.g., HDFS, cloud storage or SSD).
>    - Offset Tracking with Structured Streaming:: While storing offsets in
>    an external storage with DStreams was necessary, SSS handles this
>    automatically through checkpointing. The checkpoints include the offsets
>    processed by each micro-batch. However, you can still access the most
>    recent offsets using the offset() method on your StreamingQuery object for
>    monitoring purposes that is if you need it
>
> Have a look at this article of mine  about  structured streaming  and
> checkpointing
>
> Processing Change Data Capture with Spark Structured Streaming
> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>
>
> In your case briefly
>
> def *store_offsets_to_checkpoint*(df, batchId):
>     if(len(df.take(1))) > 0:
>          df. persist()
>          # Extract offsets from the DataFrame (assuming a column named
> 'offset')
>          offset_rows = df.select(col('offset')).rdd.collect()
>          # Create OffsetRange objects from extracted offsets
>          offsets = [OffsetRange(partition=row.partition,
> fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition'
> and 'offset' columns
>                                 for row in offset_rows]
>          # Logic to store offsets in your external checkpoint store)
>           ......
>           df.unpersist()
>     else:
>           print("DataFrame is empty")
>
> # Define your Structured Streaming application with Kafka source and sink
>
>            """
>                "foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> *store_offsets_to_checkpoint* function
>                 foreachBatch(*store_offsets_to_checkpoint*) expects 2
> parameters, first: micro-batch as DataFrame or Dataset and second: unique
> id for each batch
>                Using foreachBatch, we write each micro batch to storage
> defined in our custom logic
>             """
>
> streaming = spark.readStream \
>            .format("kafka") \ .
>             option("kafka.bootstrap.servers", "localhost:9092") \
>           .option("subscribe", "topic_name") \
>           .load()
>
> # Custom sink function to store offsets in checkpoint
> streaming = streaming.writeStream \
>          . format("memory")  \
>  *        .option("checkpointLocation", "/path/to/checkpoint/store") \ *
>           .foreachBatch(*store_offsets_to_checkpoint*) \
>           .start()
>
> HTH
>
>
> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>
>
> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Wed, 22 May 2024 at 16:27, Anil Dasari <adas...@guidewire.com> wrote:
>
>> Thanks Das, Mtich.
>>
>> Mitch,
>> We process data from Kafka and write it to S3 in Parquet format using
>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>> process records micro-batch offsets to an external storage at the end of
>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>
>> Das,
>> Thanks for sharing the details. I will look into them.
>> Unfortunately, the listeners process is async and can't guarantee happens
>> before association with microbatch to commit offsets to external storage.
>> But still they will work. Is there a way to access lastProgress in
>> foreachBatch ?
>>
>>
>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> If you want to find what offset ranges are present in a microbatch in
>>> Structured Streaming, you have to look at the
>>> StreamingQuery.lastProgress or use the QueryProgressListener
>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>.
>>> Both of these approaches gives you access to the SourceProgress
>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html>
>>> which gives Kafka offsets as a JSON string.
>>>
>>> Hope this helps!
>>>
>>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> OK to understand better your current model relies on streaming data
>>>> input through Kafka topic, Spark does some ETL and you send to a sink, a
>>>> database for file storage like HDFS etc?
>>>>
>>>> Your current architecture relies on Direct Streams (DStream) and RDDs
>>>> and you want to move to Spark sStructured Streaming based on dataframes and
>>>> datasets?
>>>>
>>>> You have not specified your sink
>>>>
>>>> With regard to your question?
>>>>
>>>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>>>> streaming to get the microbatch end offsets to the checkpoint in our
>>>> external checkpoint store ?"
>>>>
>>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>>>> Structured Streaming. However, Structured Streaming provides mechanisms to
>>>> achieve similar functionality:
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* The information provided is correct to the best of my
>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>> expert opinions (Werner
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>
>>>>
>>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>>> <ashok34...@yahoo.com.invalid> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> what options are you considering yourself?
>>>>>
>>>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>>>>> adas...@guidewire.com> wrote:
>>>>>
>>>>>
>>>>> Hello,
>>>>>
>>>>> We are on Spark 3.x and using Spark dstream + kafka and planning to
>>>>> use structured streaming + Kafka.
>>>>> Is there an equivalent of Dstream HasOffsetRanges in structure
>>>>> streaming to get the microbatch end offsets to the checkpoint in our
>>>>> external checkpoint store ? Thanks in advance.
>>>>>
>>>>> Regards
>>>>>
>>>>>

Reply via email to