Can you share this in a google doc to make the discussions easier.?
Thanks for coming up with ideas to improve upon the current restrictions with the SS state store. If I understood correctly, the plan is to introduce a logical partitioning scheme for state storage (based on keys) independent of spark’s partitioning so that the number of spark partitions can be varied. my 2 cents, 1. The Partitioning is already a kind of a logical entity in Spark. Maybe this can be leveraged to over-partition in advance (similar to setting the number of state key groups in your proposal) but make it easy to run more than one task (partition) per core (I am not sure how easy this is currently). Then we can continue to leverage the existing state implementation. This has similar limitations like what you pointed out (the max number of partitions has to be fixed upfront). But once the over provisioning of partitions is made easy it could be leveraged even for non-stateful operations. 1. Decouple the state from partition completely associate it only with the keys. This would be the most flexible option and we can scale the partitions up/down as we wish. This needs a scalable distributed state store implementation supporting fast look ups /storage by key. Thanks, Arun On 2 August 2018 at 23:45, Jungtaek Lim <kabh...@gmail.com> wrote: > Hi Spark devs, > > I have a new feature to propose and hear opinions on community. Not sure > it is such a big change to worth to step on SPIP, so posting to dev mailing > list instead. > > > Feature > > Reconfigurable number of partitions on state operators in Structured > Streaming > > > Rationalization > > Nowadays, state in structured streaming is stored individually via > partition given such configuration "spark.sql.shuffle.partitions" and > cannot modify the configuration after the query is run once. One > contributor already submitted a patch [1] without knowing why such > restriction came into play. > > Such restriction for state is necessary because state is distributed by > hash function applied to key columns, but as a side-effect of restriction, > we can't change partitions of stateful operators. End users would have > various workloads and also various SLA (and SLA can be changed), so > restricting to specific count of partitions would not satisfy their needs. > Moreover, end users are not easy to indicate the configuration before they > run query, and realize they can't modify it when they try to modify it. > > > Proposal > > The feature proposes decoupling data partitions and operator partitions > via introducing key groups to state, enabling scalability of operator > partitions while state data partitions remain same (so no issue on state > data). This approach is inspired by how Flink supports scalability with > partitioned state. > > The concept itself is simple, while we apply such partitioning expression > to the key columns (simplified): > > hash(key columns) % number of state operator partitions > > it will apply below partitioning expression so that it can be distributed > via state data partitions but each state operator partition could handle > multiple state data partitions. > > (hash(key columns) % number of state key groups) % number of state > operator partitions > > The state data will not still be scalable actually, so the number of state > key groups will be a new hard limit (we should restrict modifying it once > query is run). But we can change the number of stateful operator partitions > afterwards. The number of stateful operator partitions should be equal or > smaller than the number of state key groups. (It doesn't make sense for > partitions to be not assigned any state key group and idle.) > > > Possible Risk > > Performance might be affected, because either one should be performed: > > 1. each partition should calculate key group id per key > 2. key group id should be calculated and inserted to the row before > passing state operators (shuffle), and removed after passing state operators > > There's other performance concern like committing multiple states in a > partition when number of operator partitions < number of state key groups, > but we could run it concurrently (at least for HDFS state store), and > actually it is also an issue for nowadays (all tasks may not be launched > together). > > Code complexity would be introduced as expected. > > > Limitation > > For the first time, it will not support dynamic reconfiguration like > changing the value during query is running. Actually it can be achieved > simply via unloading all the state providers in executors before running > next batch, but it would invalidate all state caches and may incur high > latency to reload the state cache for previous batch. But I guess we could > adopt it if we feel bigger merit for reconfiguring partitions of stateful > operators against reloading state. > > > Rejected alternatives > > * Offline physical repartitioning of state data (loading all state and > recalculate new partition id per key and resave) > > I thought about it but discarded since it should take non-trivial time to > repartition once state is going to be huge. Also it is not easy to > implement the tool running efficiently. (whole state may not be fit in > memory, have to handle them concurrently, etc.) > > Please share your opinion about this proposal: opinion regarding accept or > decline, things to correct in my mail, any suggestions for improvement, etc. > > Please also let me know if it would be better to move this to google doc > or pdf with filing JIRA issue. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 1. https://github.com/apache/spark/pull/21718 >