Adam Binford created SPARK-51823: ------------------------------------ Summary: Add option to not persist state stores on executors Key: SPARK-51823 URL: https://issues.apache.org/jira/browse/SPARK-51823 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.5.5 Reporter: Adam Binford
Currently when using stateful streams, the state is loaded on executors when needed, and then kept on the executors assuming it will be needed again soon. This makes sense for very low latency applications or those dealing with a small but frequent volume of data. But this does not work well for high latency, high volume applications that need to optimize resource usage. Originally there was only the HDFSBackedStateStore, which keeps state in memory. Users frequently hit OOM errors trying to use this for large queries. Typically in Spark, you can run a query of nearly any size, with the appropriate number of shuffle partitions, and it would finish eventually depending on the amount of compute resources you have. For stateful streams, all state data is kept in memory indefinitely on the executors you have, so your resources _had_ to scale with the data, there was no option for "less resources but take longer". The RocksDBStateStore was created to try to alleviate these issues, but it does not do it completely. It moves the Java heap resource requirement to native memory and disk, but still has the same scaling issue. In its default setup, it can accumulate unbounded native memory. The bounded memory setting was added to try to alleviate that, but it still has issues with scaling. Currently l0 blocks are pinned in the cache, so any RocksDB instance open on an executor will keeps it's l0 blocks in memory even when they are not used, with no option to evict it. Additionally all the RocksDB instances share a background threadpool that defaults to 2 threads for doing flushing and compaction, so any open RocksDB instance, even when not being actively used by an executor, can lead to resource contention with other open instances. The real solution to scaling stateful queries is to have the option to not persist state stores on executors when they are not being used. This does have some tradeoffs, as each batch the state will have to be reloaded from its source, but this is an acceptable tradeoff for a lot of use cases. Specifically, for streams that may have long periods between batches or utilize dynamic allocation, it is likely the loaded state stores will eventually be unloaded when the executor is deallocated before being used for a second time anyway. In this case keeping the state stores on the executors provides no benefit and just wastes resources. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org