You are right.
- another question on migration. Is there a way to get the microbatch id
during the microbatch dataset `trasform` operation like in rdd transform ?
I am attempting to implement the following pseudo functionality with
structured streaming. In this approach, recordCategoriesMetadata is fetched
and rdd metrics like rdd size etc using microbatch idin the transform
operation.
```code
val rddQueue = new mutable.Queue[RDD[Int]]()
// source components
val sources = Seq.empty[String]
val consolidatedDstream = sources
.map(source => {
val inputStream = ssc.queueStream(rddQueue)
inputStream.transform((rdd, ts) => {
// emit metrics of microbatch ts : rdd size etc.

val recordCategories = rdd.map(..).collect();
val recordCategoriesMetadata = ...
rdd
.map(r =>
val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
(source, customRecord)
)
})
}
)
.reduceLeft(_ union _)

consolidatedDstream
.foreachRDD((rdd, ts) => {
// get pipes for each source
val pipes = Seq.empty[String] // pipes of given source
pipes.foreach(pipe => {
val pipeSource = null; // get from pipe variable
val psRDD = rdd
.filter {
case (source, sourceRDD) => source.equals(pipeSource)
}
// apply pipe transformation and sink

})
})
```

In structured streaming, it can look like -

```code
val consolidatedDstream = sources
.map(source => {
val inputStream = ... (for each source)
inputStream
}
)
.reduceLeft(_ union _)

consolidatedDstream
.writeStream
.foreachBatch((ds, ts) => {
val newDS = ds.transform((internalDS => {
// emit metrics of microbatch ts : rdd size etc.

val recordCategories = rdd.map(..).collect();
val recordCategoriesMetadata = ...
internalDS
.map(r =>
val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
(source, customRecord)
)
})(... <encoder>)
// get pipes for each source
val pipes = Seq.empty[String] // pipes of given source
pipes.foreach(pipe => {
val pipeSource = null; // get from pipe variable
val psRDD = newDS
.filter {
case (source, sourceDS) => source.equals(pipeSource)
}
// apply pipe transformation and sink

})
})
```
^ is just pseudo code and still not sure if it works. Let me know your
suggestions if any. thanks.

On Wed, May 22, 2024 at 8:34 AM Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> The right way to associated microbatches when committing to external
> storage is to use the microbatch id that you can get in foreachBatch. That
> microbatch id guarantees that the data produced in the batch is the always
> the same no matter any recomputations (assuming all processing logic is
> deterministic). So you can commit the batch id + batch data together. And
> then async commit the batch id + offsets.
>
> On Wed, May 22, 2024 at 11:27 AM Anil Dasari <adas...@guidewire.com>
> wrote:
>
>> Thanks Das, Mtich.
>>
>> Mitch,
>> We process data from Kafka and write it to S3 in Parquet format using
>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>> process records micro-batch offsets to an external storage at the end of
>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>
>> Das,
>> Thanks for sharing the details. I will look into them.
>> Unfortunately, the listeners process is async and can't guarantee happens
>> before association with microbatch to commit offsets to external storage.
>> But still they will work. Is there a way to access lastProgress in
>> foreachBatch ?
>>
>>
>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> If you want to find what offset ranges are present in a microbatch in
>>> Structured Streaming, you have to look at the
>>> StreamingQuery.lastProgress or use the QueryProgressListener
>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>.
>>> Both of these approaches gives you access to the SourceProgress
>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html>
>>> which gives Kafka offsets as a JSON string.
>>>
>>> Hope this helps!
>>>
>>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> OK to understand better your current model relies on streaming data
>>>> input through Kafka topic, Spark does some ETL and you send to a sink, a
>>>> database for file storage like HDFS etc?
>>>>
>>>> Your current architecture relies on Direct Streams (DStream) and RDDs
>>>> and you want to move to Spark sStructured Streaming based on dataframes and
>>>> datasets?
>>>>
>>>> You have not specified your sink
>>>>
>>>> With regard to your question?
>>>>
>>>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>>>> streaming to get the microbatch end offsets to the checkpoint in our
>>>> external checkpoint store ?"
>>>>
>>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>>>> Structured Streaming. However, Structured Streaming provides mechanisms to
>>>> achieve similar functionality:
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>> <https://en.everybodywiki.com/Mich_Talebzadeh>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* The information provided is correct to the best of my
>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>> expert opinions (Werner
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
>>>> Braun
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>
>>>> )".
>>>>
>>>>
>>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>>> <ashok34...@yahoo.com.invalid> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> what options are you considering yourself?
>>>>>
>>>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>>>>> adas...@guidewire.com> wrote:
>>>>>
>>>>>
>>>>> Hello,
>>>>>
>>>>> We are on Spark 3.x and using Spark dstream + kafka and planning to
>>>>> use structured streaming + Kafka.
>>>>> Is there an equivalent of Dstream HasOffsetRanges in structure
>>>>> streaming to get the microbatch end offsets to the checkpoint in our
>>>>> external checkpoint store ? Thanks in advance.
>>>>>
>>>>> Regards
>>>>>
>>>>>

Reply via email to