[ https://issues.apache.org/jira/browse/SPARK-51823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated SPARK-51823: ----------------------------------- Labels: pull-request-available (was: ) > Add option to not persist state stores on executors > --------------------------------------------------- > > Key: SPARK-51823 > URL: https://issues.apache.org/jira/browse/SPARK-51823 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 3.5.5 > Reporter: Adam Binford > Priority: Major > Labels: pull-request-available > > Currently when using stateful streams, the state is loaded on executors when > needed, and then kept on the executors assuming it will be needed again soon. > This makes sense for very low latency applications or those dealing with a > small but frequent volume of data. But this does not work well for high > latency, high volume applications that need to optimize resource usage. > Originally there was only the HDFSBackedStateStore, which keeps state in > memory. Users frequently hit OOM errors trying to use this for large queries. > Typically in Spark, you can run a query of nearly any size, with the > appropriate number of shuffle partitions, and it would finish eventually > depending on the amount of compute resources you have. For stateful streams, > all state data is kept in memory indefinitely on the executors you have, so > your resources _had_ to scale with the data, there was no option for "less > resources but take longer". > The RocksDBStateStore was created to try to alleviate these issues, but it > does not do it completely. It moves the Java heap resource requirement to > native memory and disk, but still has the same scaling issue. In its default > setup, it can accumulate unbounded native memory. The bounded memory setting > was added to try to alleviate that, but it still has issues with scaling. > Currently l0 blocks are pinned in the cache, so any RocksDB instance open on > an executor will keeps it's l0 blocks in memory even when they are not used, > with no option to evict it. Additionally all the RocksDB instances share a > background threadpool that defaults to 2 threads for doing flushing and > compaction, so any open RocksDB instance, even when not being actively used > by an executor, can lead to resource contention with other open instances. > The real solution to scaling stateful queries is to have the option to not > persist state stores on executors when they are not being used. This does > have some tradeoffs, as each batch the state will have to be reloaded from > its source, but this is an acceptable tradeoff for a lot of use cases. > Specifically, for streams that may have long periods between batches or > utilize dynamic allocation, it is likely the loaded state stores will > eventually be unloaded when the executor is deallocated before being used for > a second time anyway. In this case keeping the state stores on the executors > provides no benefit and just wastes resources. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org