Hi, You may have a point on scenario 2.
Caching Streaming DataFrames: In Spark Streaming, each batch of data is processed incrementally, and it may not fit the typical caching we discussed. Instead, Spark Streaming has its mechanisms to manage and optimize the processing of streaming data. Case in point for caching partial results, one often relies on maintaining state by using stateful operations (see below) on Structured Streaming DataFrames. In such scenarios, Spark maintains state internally based on the operations performed. For example, if you are doing a groupBy followed by an aggregation, Spark Streaming will manage the state of the keys and update them incrementally. Just to clarify, in the context of Spark Structured Streaming stateful operation refers to an operation that maintains and updates some form of state across batches of streaming data. Unlike stateless operations, which process each batch independently, stateful operations retain information from previous batches and use it to produce results for the current batch. So, bottom line, while one may not explicitly cache a streaming data frame, Spark internally optimizes the processing by maintaining the necessary state. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 10 Jan 2024 at 14:20, Andrzej Zera <andrzejz...@gmail.com> wrote: > Hey, > > Yes, that's how I understood it (scenario 1). However, I'm not sure if > scenario 2 is possible. I think cache on streaming DataFrame is supported > only in forEachBatch (in which it's actually no longer a streaming DF). > > śr., 10 sty 2024 o 15:01 Mich Talebzadeh <mich.talebza...@gmail.com> > napisał(a): > >> Hi, >> >> With regard to your point >> >> - Caching: Can you please explain what you mean by caching? I know that >> when you have batch and streaming sources in a streaming query, then you >> can try to cache batch ones to save on reads. But I'm not sure if it's what >> you mean, and I don't know how to apply what you suggest to streaming data. >> >> Let us visit this >> >> Caching purpose in Structured Streaming is to store frequently accessed >> data in memory or disk for faster retrieval, reducing repeated reads from >> sources. >> >> - Types: >> >> - Memory Caching: Stores data in memory for extremely fast access. >> - Disk Caching: Stores data on disk for larger datasets or >> persistence across triggers >> >> >> - Scenarios: >> >> Joining Streaming Data with Static Data: Cache static datasets >> (e.g., reference tables) to avoid repeated reads for each micro-batch. >> >> - >> - Reusing Intermediate Results: Cache intermediate dataframes that >> are expensive to compute and used multiple times within the query. >> - Window Operations: Cache data within a window to avoid re-reading >> for subsequent aggregations or calculations within that window. >> >> - Benefits: >> >> - Performance: Faster query execution by reducing I/O operations and >> computation overhead. >> - Cost Optimization: Reduced reads from external sources can lower >> costs, especially for cloud-based sources. >> - Scalability: Helps handle higher data volumes and throughput by >> minimizing expensive re-computations. >> >> >> Example codec >> >> scenario 1 >> >> static_data = spark.read.load("path/to/static/data") static_data.cache() >> streaming_data = spark.readStream.format("...").load() joined_data = >> streaming_data.join(static_data, ...) # Static data is cached for >> efficient joins >> >> scenario 2 >> >> intermediate_df = streaming_data.groupBy(...).count() >> intermediate_df.cache() >> # Use cached intermediate_df for further transformations or actions >> >> HTH >> >> Mich Talebzadeh, >> Dad | Technologist | Solutions Architect | Engineer >> London >> United Kingdom >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Wed, 10 Jan 2024 at 13:10, Andrzej Zera <andrzejz...@gmail.com> wrote: >> >>> Thank you very much for your suggestions. Yes, my main concern is >>> checkpointing costs. >>> >>> I went through your suggestions and here're my comments: >>> >>> - Caching: Can you please explain what you mean by caching? I know that >>> when you have batch and streaming sources in a streaming query, then you >>> can try to cache batch ones to save on reads. But I'm not sure if it's what >>> you mean, and I don't know how to apply what you suggest to streaming data. >>> >>> - Optimize Checkpointing Frequency: I'm already using changelog >>> checkpointing with RocksDB and increased trigger interval to a maximum >>> acceptable value. >>> >>> - Minimize LIST Request: That's where I can get most savings. My LIST >>> requests account for ~70% of checkpointing costs. From what I see, LIST >>> requests are ~2.5x the number of PUT requests. Unfortunately, when I >>> changed to checkpoting location DBFS, it didn't help with minimizing LIST >>> requests. They are roughly at the same level. From what I see, S3 Optimized >>> Committer is EMR-specific so I can't use it in Databricks. The fact that I >>> don't see a difference between S3 and DBFS checkpoint location suggests >>> that both must implement the same or similar committer. >>> >>> - Optimizing RocksDB: I still need to do this but I don't suspect it >>> will help much. From what I understand, these settings shouldn't have a >>> significant impact on the number of requests to S3. >>> >>> Any other ideas how to limit the number of LIST requests are appreciated >>> >>> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh <mich.talebza...@gmail.com> >>> napisał(a): >>> >>>> OK I assume that your main concern is checkpointing costs. >>>> >>>> - Caching: If your queries read the same data multiple times, caching >>>> the data might reduce the amount of data that needs to be checkpointed. >>>> >>>> >>>> - Optimize Checkpointing Frequency i.e >>>> >>>> - Consider Changelog Checkpointing with RocksDB. This can >>>> potentially reduce checkpoint size and duration by only storing state >>>> changes, rather than the entire state. >>>> - Adjust Trigger Interval (if possible): While not ideal for your >>>> near-real time requirement, even a slight increase in the trigger >>>> interval >>>> (e.g., to 7-8 seconds) can reduce checkpoint frequency and costs. >>>> >>>> - Minimize LIST Requests: >>>> >>>> - Enable S3 Optimized Committer: or as you stated consider DBFS >>>> >>>> You can also optimise RocksDB. Set your state backend to RocksDB, if >>>> not already. Here are what I use >>>> >>>> # Add RocksDB configurations here >>>> spark.conf.set("spark.sql.streaming.stateStore.providerClass", >>>> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") >>>> >>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog", "true") >>>> >>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB", >>>> "64") # Example configuration >>>> >>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style", >>>> "level") >>>> >>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase", >>>> "67108864") >>>> >>>> These configurations provide a starting point for tuning RocksDB. >>>> Depending on your specific use case and requirements, of course, your >>>> mileage varies. >>>> >>>> HTH >>>> >>>> Mich Talebzadeh, >>>> Dad | Technologist | Solutions Architect | Engineer >>>> London >>>> United Kingdom >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Sun, 7 Jan 2024 at 08:07, Andrzej Zera <andrzejz...@gmail.com> >>>> wrote: >>>> >>>>> Usually one or two topics per query. Each query has its own checkpoint >>>>> directory. Each topic has a few partitions. >>>>> >>>>> Performance-wise I don't experience any bottlenecks in terms of >>>>> checkpointing. It's all about the number of requests (including a high >>>>> number of LIST requests) and the associated cost. >>>>> >>>>> sob., 6 sty 2024 o 13:30 Mich Talebzadeh <mich.talebza...@gmail.com> >>>>> napisał(a): >>>>> >>>>>> How many topics and checkpoint directories are you dealing with? >>>>>> >>>>>> Does each topic has its own checkpoint on S3? >>>>>> >>>>>> All these checkpoints are sequential writes so even SSD would not >>>>>> really help >>>>>> >>>>>> HTH >>>>>> >>>>>> Mich Talebzadeh, >>>>>> Dad | Technologist | Solutions Architect | Engineer >>>>>> London >>>>>> United Kingdom >>>>>> >>>>>> >>>>>> view my Linkedin profile >>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>> >>>>>> >>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera <andrzejz...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hey, >>>>>>> >>>>>>> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that >>>>>>> require near-real time accuracy with trigger intervals in the level of >>>>>>> 5-10 >>>>>>> seconds. I usually run 3-6 streaming queries as part of the job and each >>>>>>> query includes at least one stateful operation (and usually two or >>>>>>> more). >>>>>>> My checkpoint location is S3 bucket and I use RocksDB as a state store. >>>>>>> Unfortunately, checkpointing costs are quite high. It's the main cost >>>>>>> item >>>>>>> of the system and it's roughly 4-5 times the cost of compute. >>>>>>> >>>>>>> To save on compute costs, the following things are usually >>>>>>> recommended: >>>>>>> >>>>>>> - increase trigger interval (as mentioned, I don't have much >>>>>>> space here) >>>>>>> - decrease the number of shuffle partitions (I have 2x the >>>>>>> number of workers) >>>>>>> >>>>>>> I'm looking for some other recommendations that I can use to save on >>>>>>> checkpointing costs. I saw that most requests are LIST requests. Can we >>>>>>> cut >>>>>>> them down somehow? I'm using Databricks. If I replace S3 bucket with >>>>>>> DBFS, >>>>>>> will it help in any way? >>>>>>> >>>>>>> Thank you! >>>>>>> Andrzej >>>>>>> >>>>>>>