Hi Anil,
Ok let us put the complete picture here
* Current DStreams Setup:*
- Data Source: Kafka
- Processing Engine: Spark DStreams
- Data Transformation with Spark
- Sink: S3
- Data Format: Parquet
- Exactly-Once Delivery (Attempted): You're attempting exactly-once
delivery by recording micro-batch offsets in an external storage using
foreachRDD at the end of each micro-batch. This allows you to potentially
restart the job from the last processed offset in case of failures?
- Challenges with DStreams for Exactly-Once Delivery: Spark DStreams
offer limited built-in support for exactly-once delivery guarantees.
*Moving to Spark Structured Streaming: (SSS)*
All stays the same. except below
- Exactly-Once Delivery which is guaranteed by SSS
- Checkpointing: Enable checkpointing by setting the checkpointLocation
option in writeStream. Spark will periodically checkpoint the state of
streaming query, including offsets, to a designated location (e.g., HDFS,
cloud storage or SSD).
- Offset Tracking with Structured Streaming:: While storing offsets in
an external storage with DStreams was necessary, SSS handles this
automatically through checkpointing. The checkpoints include the offsets
processed by each micro-batch. However, you can still access the most
recent offsets using the offset() method on your StreamingQuery object for
monitoring purposes that is if you need it
Have a look at this article of mine about structured streaming and
checkpointing
Processing Change Data Capture with Spark Structured Streaming
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>
In your case briefly
def *store_offsets_to_checkpoint*(df, batchId):
if(len(df.take(1))) > 0:
df. persist()
# Extract offsets from the DataFrame (assuming a column named
'offset')
offset_rows = df.select(col('offset')).rdd.collect()
# Create OffsetRange objects from extracted offsets
offsets = [OffsetRange(partition=row.partition,
fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition' and
'offset' columns
for row in offset_rows]
# Logic to store offsets in your external checkpoint store)
......
df.unpersist()
else:
print("DataFrame is empty")
# Define your Structured Streaming application with Kafka source and sink
"""
"foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
*store_offsets_to_checkpoint* function
foreachBatch(*store_offsets_to_checkpoint*) expects 2
parameters, first: micro-batch as DataFrame or Dataset and second: unique
id for each batch
Using foreachBatch, we write each micro batch to storage
defined in our custom logic
"""
streaming = spark.readStream \
.format("kafka") \ .
option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "topic_name") \
.load()
# Custom sink function to store offsets in checkpoint
streaming = streaming.writeStream \
. format("memory") \
* .option("checkpointLocation", "/path/to/checkpoint/store") \ *
.foreachBatch(*store_offsets_to_checkpoint*) \
.start()
HTH
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>
Mich Talebzadeh,
Technologist | Architect | Data Engineer | Generative AI | FinCrime
London
United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
On Wed, 22 May 2024 at 16:27, Anil Dasari <[email protected]> wrote:
> Thanks Das, Mtich.
>
> Mitch,
> We process data from Kafka and write it to S3 in Parquet format using
> Dstreams. To ensure exactly-once delivery and prevent data loss, our
> process records micro-batch offsets to an external storage at the end of
> each micro-batch in foreachRDD, which is then used when the job restarts.
>
> Das,
> Thanks for sharing the details. I will look into them.
> Unfortunately, the listeners process is async and can't guarantee happens
> before association with microbatch to commit offsets to external storage.
> But still they will work. Is there a way to access lastProgress in
> foreachBatch ?
>
>
> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <[email protected]>
> wrote:
>
>> If you want to find what offset ranges are present in a microbatch in
>> Structured Streaming, you have to look at the StreamingQuery.lastProgress or
>> use the QueryProgressListener
>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>.
>> Both of these approaches gives you access to the SourceProgress
>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html>
>> which gives Kafka offsets as a JSON string.
>>
>> Hope this helps!
>>
>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>> [email protected]> wrote:
>>
>>> OK to understand better your current model relies on streaming data
>>> input through Kafka topic, Spark does some ETL and you send to a sink, a
>>> database for file storage like HDFS etc?
>>>
>>> Your current architecture relies on Direct Streams (DStream) and RDDs
>>> and you want to move to Spark sStructured Streaming based on dataframes and
>>> datasets?
>>>
>>> You have not specified your sink
>>>
>>> With regard to your question?
>>>
>>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>>> streaming to get the microbatch end offsets to the checkpoint in our
>>> external checkpoint store ?"
>>>
>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>>> Structured Streaming. However, Structured Streaming provides mechanisms to
>>> achieve similar functionality:
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer | Generative AI | FinCrime
>>> London
>>> United Kingdom
>>>
>>>
>>> view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>> https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Wed, 22 May 2024 at 10:32, [email protected]
>>> <[email protected]> wrote:
>>>
>>>> Hello,
>>>>
>>>> what options are you considering yourself?
>>>>
>>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>>>> [email protected]> wrote:
>>>>
>>>>
>>>> Hello,
>>>>
>>>> We are on Spark 3.x and using Spark dstream + kafka and planning to use
>>>> structured streaming + Kafka.
>>>> Is there an equivalent of Dstream HasOffsetRanges in structure
>>>> streaming to get the microbatch end offsets to the checkpoint in our
>>>> external checkpoint store ? Thanks in advance.
>>>>
>>>> Regards
>>>>
>>>>