catching up a bit late on this, I mentioned optimising RockDB as below in
my earlier thread, specifically

              # Add RocksDB configurations here
        spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
        spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog",
"true")

spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
"64")  # Example configuration

 spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style",
"level")

spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase",
"67108864")

   - Maybe a bit more clarity will be useful, although they can be
   subjective and somehow debatable.


   1.

   spark.sql.streaming.stateStore.rocksdb.changelog - Enable Changelog: -
   Benefit: Changelog is essential for maintaining state consistency and fault
   tolerance. Enabling it ensures that changes are logged and can be replayed
   in case of failures. - Drawback: The changelog can consume additional
   storage, especially when dealing with frequent state updates.
   2.

   spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB - Write Buffer
   Size: - Benefit: A larger write buffer can improve write performance and
   reduce write amplification. - Drawback: It may lead to increased memory
   usage. The optimal size depends on the characteristics of your workload and
   available resources.
   3.

   spark.sql.streaming.stateStore.rocksdb.compaction.style - Compaction
   Style: - Benefit: Choosing an appropriate compaction style (e.g., level)
   can impact read and write performance. - Drawback: Different compaction
   styles have trade-offs, and the optimal choice depends on factors like read
   vs. write workload and available storage.
   4.

   spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase
   - Target File Size for Level-based Compaction: ** There is another
   compaction style called "uniform" which can be beneficial in scenarios
   with a heavy write workload **


   - Borrowing from colloquial English, your mileage varies as usual.
   However, since you said you may consider tuning RockDB, I add the following
   if I may.
   - The choice of RocksDB and its configurations is often dependent on the
   specific requirements and characteristics of your streaming application.
   RocksDB can provide good performance for stateful streaming applications
   with proper tuning. However memory usage is a crucial consideration,
   especially when dealing with large state sizes. Also note that changelog is
   essential for achieving fault tolerance but may introduce additional
   storage overhead.
   - The optimal configuration depends on factors like the volume and
   characteristics of the streaming data, the frequency of state updates, and
   available resources. In summary, what works well for one workload might
   not be optimal for another.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 15:42, Andrzej Zera <andrzejz...@gmail.com> wrote:

> Yes, I agree. But apart from maintaining this state internally (in memory
> or in memory+disk as in case of RocksDB), every trigger it saves some
> information about this state in a checkpoint location. I'm afraid we can't
> do much about this checkpointing operation. I'll continue looking for
> information on how I can decrease the number of LIST requests (ListBucket
> operations) made in this process.
>
> Thank you for your input so far!
> Andrzej
>
> śr., 10 sty 2024 o 16:33 Mich Talebzadeh <mich.talebza...@gmail.com>
> napisał(a):
>
>> Hi,
>>
>> You may have a point on scenario 2.
>>
>> Caching Streaming DataFrames: In Spark Streaming, each batch of data is
>> processed incrementally, and it may not fit the typical caching we
>> discussed. Instead, Spark Streaming has its mechanisms to manage and
>> optimize the processing of streaming data. Case in point for caching
>> partial results, one often relies on maintaining state by using stateful
>> operations (see below) on Structured Streaming DataFrames. In such
>> scenarios, Spark maintains state internally based on the operations
>> performed. For example, if you are doing a groupBy followed by an
>> aggregation, Spark Streaming will manage the state of the keys and update
>> them incrementally.
>>
>> Just to clarify, in the context of Spark Structured Streaming stateful
>> operation refers to an operation that maintains and updates some form of
>> state across batches of streaming data. Unlike stateless operations, which
>> process each batch independently, stateful operations retain information
>> from previous batches and use it to produce results for the current batch.
>>
>> So, bottom line, while one may not explicitly cache a streaming data
>> frame, Spark internally optimizes the processing by maintaining the
>> necessary state.
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 10 Jan 2024 at 14:20, Andrzej Zera <andrzejz...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> Yes, that's how I understood it (scenario 1). However, I'm not sure if
>>> scenario 2 is possible. I think cache on streaming DataFrame is supported
>>> only in forEachBatch (in which it's actually no longer a streaming DF).
>>>
>>> śr., 10 sty 2024 o 15:01 Mich Talebzadeh <mich.talebza...@gmail.com>
>>> napisał(a):
>>>
>>>> Hi,
>>>>
>>>>  With regard to your point
>>>>
>>>> - Caching: Can you please explain what you mean by caching? I know that
>>>> when you have batch and streaming sources in a streaming query, then you
>>>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>>>> you mean, and I don't know how to apply what you suggest to streaming data.
>>>>
>>>> Let us visit this
>>>>
>>>> Caching purpose in Structured Streaming is to store frequently
>>>> accessed data in memory or disk for faster retrieval, reducing repeated
>>>> reads from sources.
>>>>
>>>> - Types:
>>>>
>>>>    - Memory Caching: Stores data in memory for extremely fast access.
>>>>    - Disk Caching: Stores data on disk for larger datasets or
>>>>    persistence across triggers
>>>>
>>>>
>>>> - Scenarios:
>>>>
>>>> Joining Streaming Data with Static Data: Cache static datasets
>>>> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>>>>
>>>>    -
>>>>    - Reusing Intermediate Results: Cache intermediate dataframes that
>>>>    are expensive to compute and used multiple times within the query.
>>>>    - Window Operations: Cache data within a window to avoid re-reading
>>>>    for subsequent aggregations or calculations within that window.
>>>>
>>>> - Benefits:
>>>>
>>>>    - Performance: Faster query execution by reducing I/O operations
>>>>    and computation overhead.
>>>>    - Cost Optimization: Reduced reads from external sources can lower
>>>>    costs, especially for cloud-based sources.
>>>>    - Scalability: Helps handle higher data volumes and throughput by
>>>>    minimizing expensive re-computations.
>>>>
>>>>
>>>> Example codec
>>>>
>>>> scenario 1
>>>>
>>>> static_data = spark.read.load("path/to/static/data")
>>>> static_data.cache() streaming_data = spark.readStream.format("...").load()
>>>> joined_data = streaming_data.join(static_data, ...) # Static data is
>>>> cached for efficient joins
>>>>
>>>> scenario 2
>>>>
>>>> intermediate_df = streaming_data.groupBy(...).count()
>>>> intermediate_df.cache()
>>>> # Use cached intermediate_df for further transformations or actions
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Dad | Technologist | Solutions Architect | Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 10 Jan 2024 at 13:10, Andrzej Zera <andrzejz...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thank you very much for your suggestions. Yes, my main concern is
>>>>> checkpointing costs.
>>>>>
>>>>> I went through your suggestions and here're my comments:
>>>>>
>>>>> - Caching: Can you please explain what you mean by caching? I know
>>>>> that when you have batch and streaming sources in a streaming query, then
>>>>> you can try to cache batch ones to save on reads. But I'm not sure if it's
>>>>> what you mean, and I don't know how to apply what you suggest to streaming
>>>>> data.
>>>>>
>>>>> - Optimize Checkpointing Frequency: I'm already using changelog
>>>>> checkpointing with RocksDB and increased trigger interval to a maximum
>>>>> acceptable value.
>>>>>
>>>>> - Minimize LIST Request: That's where I can get most savings. My LIST
>>>>> requests account for ~70% of checkpointing costs. From what I see, LIST
>>>>> requests are ~2.5x the number of PUT requests. Unfortunately, when I
>>>>> changed to checkpoting location DBFS, it didn't help with minimizing LIST
>>>>> requests. They are roughly at the same level. From what I see, S3 
>>>>> Optimized
>>>>> Committer is EMR-specific so I can't use it in Databricks. The fact that I
>>>>> don't see a difference between S3 and DBFS checkpoint location suggests
>>>>> that both must implement the same or similar committer.
>>>>>
>>>>> - Optimizing RocksDB: I still need to do this but I don't suspect it
>>>>> will help much. From what I understand, these settings shouldn't have a
>>>>> significant impact on the number of requests to S3.
>>>>>
>>>>> Any other ideas how to limit the number of LIST requests are
>>>>> appreciated
>>>>>
>>>>> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>> napisał(a):
>>>>>
>>>>>> OK I assume that your main concern is checkpointing costs.
>>>>>>
>>>>>> - Caching: If your queries read the same data multiple times,
>>>>>> caching the data might reduce the amount of data that needs to be
>>>>>> checkpointed.
>>>>>>
>>>>>> - Optimize Checkpointing Frequency i.e
>>>>>>
>>>>>>    - Consider Changelog Checkpointing with RocksDB.  This can
>>>>>>    potentially reduce checkpoint size and duration by only storing state
>>>>>>    changes, rather than the entire state.
>>>>>>    - Adjust Trigger Interval (if possible): While not ideal for your
>>>>>>    near-real time requirement, even a slight increase in the trigger 
>>>>>> interval
>>>>>>    (e.g., to 7-8 seconds) can reduce checkpoint frequency and costs.
>>>>>>
>>>>>> - Minimize LIST Requests:
>>>>>>
>>>>>>    - Enable S3 Optimized Committer: or as you stated consider DBFS
>>>>>>
>>>>>> You can also optimise RocksDB. Set your state backend to RocksDB, if
>>>>>> not already. Here are what I use
>>>>>>
>>>>>>               # Add RocksDB configurations here
>>>>>>
>>>>>> spark.conf.set("spark.sql.streaming.stateStore.providerClass",
>>>>>> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
>>>>>>
>>>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog", 
>>>>>> "true")
>>>>>>
>>>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
>>>>>> "64")  # Example configuration
>>>>>>
>>>>>>  
>>>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style",
>>>>>> "level")
>>>>>>
>>>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase",
>>>>>> "67108864")
>>>>>>
>>>>>> These configurations provide a starting point for tuning RocksDB.
>>>>>> Depending on your specific use case and requirements, of course, your
>>>>>> mileage varies.
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>>> London
>>>>>> United Kingdom
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, 7 Jan 2024 at 08:07, Andrzej Zera <andrzejz...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Usually one or two topics per query. Each query has its own
>>>>>>> checkpoint directory. Each topic has a few partitions.
>>>>>>>
>>>>>>> Performance-wise I don't experience any bottlenecks in terms of
>>>>>>> checkpointing. It's all about the number of requests (including a high
>>>>>>> number of LIST requests) and the associated cost.
>>>>>>>
>>>>>>> sob., 6 sty 2024 o 13:30 Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>>> napisał(a):
>>>>>>>
>>>>>>>> How many topics and checkpoint directories are you dealing with?
>>>>>>>>
>>>>>>>> Does each topic has its own checkpoint  on S3?
>>>>>>>>
>>>>>>>> All these checkpoints are sequential writes so even SSD would not
>>>>>>>> really help
>>>>>>>>
>>>>>>>> HTH
>>>>>>>>
>>>>>>>> Mich Talebzadeh,
>>>>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>>>>> London
>>>>>>>> United Kingdom
>>>>>>>>
>>>>>>>>
>>>>>>>>    view my Linkedin profile
>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>
>>>>>>>>
>>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property 
>>>>>>>> which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>>> damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera <andrzejz...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey,
>>>>>>>>>
>>>>>>>>> I'm running a few Structured Streaming jobs (with Spark 3.5.0)
>>>>>>>>> that require near-real time accuracy with trigger intervals in the 
>>>>>>>>> level of
>>>>>>>>> 5-10 seconds. I usually run 3-6 streaming queries as part of the job 
>>>>>>>>> and
>>>>>>>>> each query includes at least one stateful operation (and usually two 
>>>>>>>>> or
>>>>>>>>> more). My checkpoint location is S3 bucket and I use RocksDB as a 
>>>>>>>>> state
>>>>>>>>> store. Unfortunately, checkpointing costs are quite high. It's the 
>>>>>>>>> main
>>>>>>>>> cost item of the system and it's roughly 4-5 times the cost of 
>>>>>>>>> compute.
>>>>>>>>>
>>>>>>>>> To save on compute costs, the following things are usually
>>>>>>>>> recommended:
>>>>>>>>>
>>>>>>>>>    - increase trigger interval (as mentioned, I don't have much
>>>>>>>>>    space here)
>>>>>>>>>    - decrease the number of shuffle partitions (I have 2x the
>>>>>>>>>    number of workers)
>>>>>>>>>
>>>>>>>>> I'm looking for some other recommendations that I can use to save
>>>>>>>>> on checkpointing costs. I saw that most requests are LIST requests. 
>>>>>>>>> Can we
>>>>>>>>> cut them down somehow? I'm using Databricks. If I replace S3 bucket 
>>>>>>>>> with
>>>>>>>>> DBFS, will it help in any way?
>>>>>>>>>
>>>>>>>>> Thank you!
>>>>>>>>> Andrzej
>>>>>>>>>
>>>>>>>>>

Reply via email to