coalesce might work. Say "spark.sql.shuffle.partitions" = 200, and then " input.readStream.map.filter.groupByKey(..).coalesce(2)...." would still create 200 instances for state but execute just 2 tasks.
However I think further groupByKey operations downstream would need similar coalesce. And this is assuming the user sets the right shuffle partitions upfront. It maybe worth to bundle this pattern as some builtin api so that it can be transparent to the user. I am not sure how were you planning to expose the state key groups at api level and if it would be transparent. IMO, decoupling the state and partitions and making it key based would still be worth exploring to support dynamic state rebalancing. May be the default HDFS based implementation can maintain the state partition wise and not support it, but there could be implementations based on distributed k-v store which supports this. Thanks, Arun On 3 August 2018 at 08:21, Joseph Torres <joseph.tor...@databricks.com> wrote: > A coalesced RDD will definitely maintain any within-partition invariants > that the original RDD maintained. It pretty much just runs its input > partitions sequentially. > > There'd still be some Dataframe API work needed to get the coalesce > operation where you want it to be, but this is much simpler than > introducing a new concept of state key groups. As far as I can tell, > state key groups are just the same thing that we currently call partitions > of the aggregate RDD. > > On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim <kabh...@gmail.com> wrote: > >> I’m afraid I don’t know about the details on coalesce(), but some finding >> resource for coalesce, it looks like helping reducing actual partitions. >> >> For streaming aggregation, state for all partitions (by default, 200) >> must be initialized and committed even it is being unchanged. Otherwise >> error occurred when reading a partition which is excluded in query >> previously. Moreover, it can’t find existing row from state or store row in >> wrong partition if partition id doesn’t match the expected id via hashing >> function. >> >> Could you verify coalesce() meets such requirements? >> >> On Fri, 3 Aug 2018 at 22:23 Joseph Torres <joseph.tor...@databricks.com> >> wrote: >> >>> Scheduling multiple partitions in the same task is basically what >>> coalesce() does. Is there a reason that doesn't work here? >>> >>> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim <kabh...@gmail.com> wrote: >>> >>>> Here's a link for Google docs (anyone can comment): >>>> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6 >>>> EOdj_3pXEsyq4LGpyNs/edit?usp=sharing >>>> >>>> Please note that I just copied the content to the google docs, so >>>> someone could point out lack of details. I would like to start with >>>> explanation of the concept, and once we are in agreement on going forward, >>>> I could add more detail in doc, or even just start working and detail can >>>> be shared with POC code or even WIP patch. >>>> >>>> Answer inlined for Arun's comments: >>>> >>>> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan <ar...@apache.org>님이 작성: >>>> >>>>> Can you share this in a google doc to make the discussions easier.? >>>>> >>>>> >>>>> >>>>> Thanks for coming up with ideas to improve upon the current >>>>> restrictions with the SS state store. >>>>> >>>>> >>>>> >>>>> If I understood correctly, the plan is to introduce a logical >>>>> partitioning scheme for state storage (based on keys) independent of >>>>> spark’s partitioning so that the number of spark partitions can be varied. >>>>> >>>>> >>>>> >>>>> my 2 cents, >>>>> >>>>> >>>>> >>>>> 1. The Partitioning is already a kind of a logical entity in >>>>> Spark. Maybe this can be leveraged to over-partition in advance >>>>> (similar to >>>>> setting the number of state key groups in your proposal) but make it >>>>> easy >>>>> to run more than one task (partition) per core (I am not sure how easy >>>>> this >>>>> is currently). Then we can continue to leverage the existing state >>>>> implementation. This has similar limitations like what you pointed out >>>>> (the >>>>> max number of partitions has to be fixed upfront). But once the over >>>>> provisioning of partitions is made easy it could be leveraged even for >>>>> non-stateful operations. >>>>> >>>>> >>>> If we could allow assigning multiple partitions in a task (say, >>>> parallelism or maximum concurrency), maybe we could achieve it a bit >>>> easier. I'm not pretty familiar with core of Spark, so I can't imagine how >>>> we could do it. In addition, partitions for downstream operators will be >>>> affected unless we don't shuffle afterwards. >>>> >>>> >>>>> 1. Decouple the state from partition completely associate it only >>>>> with the keys. This would be the most flexible option and we can scale >>>>> the >>>>> partitions up/down as we wish. This needs a scalable distributed state >>>>> store implementation supporting fast look ups /storage by key. >>>>> >>>>> >>>> It can be achievable with couple of external storages like Redis or >>>> HBase or so, but I would avoid the step which requires end users to >>>> maintain other system as well. Spark is coupled with specific version of >>>> Hadoop, so we could expect that end users could run and maintain HDFS. >>>> >>>> >>>>> Thanks, >>>>> >>>>> Arun >>>>> >>>>> >>>>> On 2 August 2018 at 23:45, Jungtaek Lim <kabh...@gmail.com> wrote: >>>>> >>>>>> Hi Spark devs, >>>>>> >>>>>> I have a new feature to propose and hear opinions on community. Not >>>>>> sure it is such a big change to worth to step on SPIP, so posting to dev >>>>>> mailing list instead. >>>>>> >>>>>> > Feature >>>>>> >>>>>> Reconfigurable number of partitions on state operators in Structured >>>>>> Streaming >>>>>> >>>>>> > Rationalization >>>>>> >>>>>> Nowadays, state in structured streaming is stored individually via >>>>>> partition given such configuration "spark.sql.shuffle.partitions" and >>>>>> cannot modify the configuration after the query is run once. One >>>>>> contributor already submitted a patch [1] without knowing why such >>>>>> restriction came into play. >>>>>> >>>>>> Such restriction for state is necessary because state is distributed >>>>>> by hash function applied to key columns, but as a side-effect of >>>>>> restriction, we can't change partitions of stateful operators. End users >>>>>> would have various workloads and also various SLA (and SLA can be >>>>>> changed), >>>>>> so restricting to specific count of partitions would not satisfy their >>>>>> needs. Moreover, end users are not easy to indicate the configuration >>>>>> before they run query, and realize they can't modify it when they try to >>>>>> modify it. >>>>>> >>>>>> > Proposal >>>>>> >>>>>> The feature proposes decoupling data partitions and operator >>>>>> partitions via introducing key groups to state, enabling scalability of >>>>>> operator partitions while state data partitions remain same (so no issue >>>>>> on >>>>>> state data). This approach is inspired by how Flink supports scalability >>>>>> with partitioned state. >>>>>> >>>>>> The concept itself is simple, while we apply such partitioning >>>>>> expression to the key columns (simplified): >>>>>> >>>>>> hash(key columns) % number of state operator partitions >>>>>> >>>>>> it will apply below partitioning expression so that it can be >>>>>> distributed via state data partitions but each state operator partition >>>>>> could handle multiple state data partitions. >>>>>> >>>>>> (hash(key columns) % number of state key groups) % number of state >>>>>> operator partitions >>>>>> >>>>>> The state data will not still be scalable actually, so the number of >>>>>> state key groups will be a new hard limit (we should restrict modifying >>>>>> it >>>>>> once query is run). But we can change the number of stateful operator >>>>>> partitions afterwards. The number of stateful operator partitions should >>>>>> be >>>>>> equal or smaller than the number of state key groups. (It doesn't make >>>>>> sense for partitions to be not assigned any state key group and idle.) >>>>>> >>>>>> > Possible Risk >>>>>> >>>>>> Performance might be affected, because either one should be performed: >>>>>> >>>>>> 1. each partition should calculate key group id per key >>>>>> 2. key group id should be calculated and inserted to the row before >>>>>> passing state operators (shuffle), and removed after passing state >>>>>> operators >>>>>> >>>>>> There's other performance concern like committing multiple states in >>>>>> a partition when number of operator partitions < number of state key >>>>>> groups, but we could run it concurrently (at least for HDFS state store), >>>>>> and actually it is also an issue for nowadays (all tasks may not be >>>>>> launched together). >>>>>> >>>>>> Code complexity would be introduced as expected. >>>>>> >>>>>> > Limitation >>>>>> >>>>>> For the first time, it will not support dynamic reconfiguration like >>>>>> changing the value during query is running. Actually it can be achieved >>>>>> simply via unloading all the state providers in executors before running >>>>>> next batch, but it would invalidate all state caches and may incur high >>>>>> latency to reload the state cache for previous batch. But I guess we >>>>>> could >>>>>> adopt it if we feel bigger merit for reconfiguring partitions of stateful >>>>>> operators against reloading state. >>>>>> >>>>>> > Rejected alternatives >>>>>> >>>>>> * Offline physical repartitioning of state data (loading all state >>>>>> and recalculate new partition id per key and resave) >>>>>> >>>>>> I thought about it but discarded since it should take non-trivial >>>>>> time to repartition once state is going to be huge. Also it is not easy >>>>>> to >>>>>> implement the tool running efficiently. (whole state may not be fit in >>>>>> memory, have to handle them concurrently, etc.) >>>>>> >>>>>> Please share your opinion about this proposal: opinion regarding >>>>>> accept or decline, things to correct in my mail, any suggestions for >>>>>> improvement, etc. >>>>>> >>>>>> Please also let me know if it would be better to move this to google >>>>>> doc or pdf with filing JIRA issue. >>>>>> >>>>>> Thanks, >>>>>> Jungtaek Lim (HeartSaVioR) >>>>>> >>>>>> 1. https://github.com/apache/spark/pull/21718 >>>>>> >>>>> >>>>> >>> >