Hi Spark devs, I have a new feature to propose and hear opinions on community. Not sure it is such a big change to worth to step on SPIP, so posting to dev mailing list instead.
> Feature Reconfigurable number of partitions on state operators in Structured Streaming > Rationalization Nowadays, state in structured streaming is stored individually via partition given such configuration "spark.sql.shuffle.partitions" and cannot modify the configuration after the query is run once. One contributor already submitted a patch [1] without knowing why such restriction came into play. Such restriction for state is necessary because state is distributed by hash function applied to key columns, but as a side-effect of restriction, we can't change partitions of stateful operators. End users would have various workloads and also various SLA (and SLA can be changed), so restricting to specific count of partitions would not satisfy their needs. Moreover, end users are not easy to indicate the configuration before they run query, and realize they can't modify it when they try to modify it. > Proposal The feature proposes decoupling data partitions and operator partitions via introducing key groups to state, enabling scalability of operator partitions while state data partitions remain same (so no issue on state data). This approach is inspired by how Flink supports scalability with partitioned state. The concept itself is simple, while we apply such partitioning expression to the key columns (simplified): hash(key columns) % number of state operator partitions it will apply below partitioning expression so that it can be distributed via state data partitions but each state operator partition could handle multiple state data partitions. (hash(key columns) % number of state key groups) % number of state operator partitions The state data will not still be scalable actually, so the number of state key groups will be a new hard limit (we should restrict modifying it once query is run). But we can change the number of stateful operator partitions afterwards. The number of stateful operator partitions should be equal or smaller than the number of state key groups. (It doesn't make sense for partitions to be not assigned any state key group and idle.) > Possible Risk Performance might be affected, because either one should be performed: 1. each partition should calculate key group id per key 2. key group id should be calculated and inserted to the row before passing state operators (shuffle), and removed after passing state operators There's other performance concern like committing multiple states in a partition when number of operator partitions < number of state key groups, but we could run it concurrently (at least for HDFS state store), and actually it is also an issue for nowadays (all tasks may not be launched together). Code complexity would be introduced as expected. > Limitation For the first time, it will not support dynamic reconfiguration like changing the value during query is running. Actually it can be achieved simply via unloading all the state providers in executors before running next batch, but it would invalidate all state caches and may incur high latency to reload the state cache for previous batch. But I guess we could adopt it if we feel bigger merit for reconfiguring partitions of stateful operators against reloading state. > Rejected alternatives * Offline physical repartitioning of state data (loading all state and recalculate new partition id per key and resave) I thought about it but discarded since it should take non-trivial time to repartition once state is going to be huge. Also it is not easy to implement the tool running efficiently. (whole state may not be fit in memory, have to handle them concurrently, etc.) Please share your opinion about this proposal: opinion regarding accept or decline, things to correct in my mail, any suggestions for improvement, etc. Please also let me know if it would be better to move this to google doc or pdf with filing JIRA issue. Thanks, Jungtaek Lim (HeartSaVioR) 1. https://github.com/apache/spark/pull/21718