[ 
https://issues.apache.org/jira/browse/SPARK-51823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-51823:
-----------------------------------
    Labels: pull-request-available  (was: )

> Add option to not persist state stores on executors
> ---------------------------------------------------
>
>                 Key: SPARK-51823
>                 URL: https://issues.apache.org/jira/browse/SPARK-51823
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.5.5
>            Reporter: Adam Binford
>            Priority: Major
>              Labels: pull-request-available
>
> Currently when using stateful streams, the state is loaded on executors when 
> needed, and then kept on the executors assuming it will be needed again soon. 
> This makes sense for very low latency applications or those dealing with a 
> small but frequent volume of data. But this does not work well for high 
> latency, high volume applications that need to optimize resource usage.
> Originally there was only the HDFSBackedStateStore, which keeps state in 
> memory. Users frequently hit OOM errors trying to use this for large queries. 
> Typically in Spark, you can run a query of nearly any size, with the 
> appropriate number of shuffle partitions, and it would finish eventually 
> depending on the amount of compute resources you have. For stateful streams, 
> all state data is kept in memory indefinitely on the executors you have, so 
> your resources _had_ to scale with the data, there was no option for "less 
> resources but take longer".
> The RocksDBStateStore was created to try to alleviate these issues, but it 
> does not do it completely. It moves the Java heap resource requirement to 
> native memory and disk, but still has the same scaling issue. In its default 
> setup, it can accumulate unbounded native memory. The bounded memory setting 
> was added to try to alleviate that, but it still has issues with scaling. 
> Currently l0 blocks are pinned in the cache, so any RocksDB instance open on 
> an executor will keeps it's l0 blocks in memory even when they are not used, 
> with no option to evict it. Additionally all the RocksDB instances share a 
> background threadpool that defaults to 2 threads for doing flushing and 
> compaction, so any open RocksDB instance, even when not being actively used 
> by an executor, can lead to resource contention with other open instances.
> The real solution to scaling stateful queries is to have the option to not 
> persist state stores on executors when they are not being used. This does 
> have some tradeoffs, as each batch the state will have to be reloaded from 
> its source, but this is an acceptable tradeoff for a lot of use cases. 
> Specifically, for streams that may have long periods between batches or 
> utilize dynamic allocation, it is likely the loaded state stores will 
> eventually be unloaded when the executor is deallocated before being used for 
> a second time anyway. In this case keeping the state stores on the executors 
> provides no benefit and just wastes resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to