Adam Binford created SPARK-51823:
------------------------------------

             Summary: Add option to not persist state stores on executors
                 Key: SPARK-51823
                 URL: https://issues.apache.org/jira/browse/SPARK-51823
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 3.5.5
            Reporter: Adam Binford


Currently when using stateful streams, the state is loaded on executors when 
needed, and then kept on the executors assuming it will be needed again soon. 
This makes sense for very low latency applications or those dealing with a 
small but frequent volume of data. But this does not work well for high 
latency, high volume applications that need to optimize resource usage.

Originally there was only the HDFSBackedStateStore, which keeps state in 
memory. Users frequently hit OOM errors trying to use this for large queries. 
Typically in Spark, you can run a query of nearly any size, with the 
appropriate number of shuffle partitions, and it would finish eventually 
depending on the amount of compute resources you have. For stateful streams, 
all state data is kept in memory indefinitely on the executors you have, so 
your resources _had_ to scale with the data, there was no option for "less 
resources but take longer".

The RocksDBStateStore was created to try to alleviate these issues, but it does 
not do it completely. It moves the Java heap resource requirement to native 
memory and disk, but still has the same scaling issue. In its default setup, it 
can accumulate unbounded native memory. The bounded memory setting was added to 
try to alleviate that, but it still has issues with scaling. Currently l0 
blocks are pinned in the cache, so any RocksDB instance open on an executor 
will keeps it's l0 blocks in memory even when they are not used, with no option 
to evict it. Additionally all the RocksDB instances share a background 
threadpool that defaults to 2 threads for doing flushing and compaction, so any 
open RocksDB instance, even when not being actively used by an executor, can 
lead to resource contention with other open instances.

The real solution to scaling stateful queries is to have the option to not 
persist state stores on executors when they are not being used. This does have 
some tradeoffs, as each batch the state will have to be reloaded from its 
source, but this is an acceptable tradeoff for a lot of use cases. 
Specifically, for streams that may have long periods between batches or utilize 
dynamic allocation, it is likely the loaded state stores will eventually be 
unloaded when the executor is deallocated before being used for a second time 
anyway. In this case keeping the state stores on the executors provides no 
benefit and just wastes resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to