Hi Spark devs,

I have a new feature to propose and hear opinions on community. Not sure it
is such a big change to worth to step on SPIP, so posting to dev mailing
list  instead.

> Feature

Reconfigurable number of partitions on state operators in Structured
Streaming

> Rationalization

Nowadays, state in structured streaming is stored individually via
partition given such configuration "spark.sql.shuffle.partitions" and
cannot modify the configuration after the query is run once. One
contributor already submitted a patch [1] without knowing why such
restriction came into play.

Such restriction for state is necessary because state is distributed by
hash function applied to key columns, but as a side-effect of restriction,
we can't change partitions of stateful operators. End users would have
various workloads and also various SLA (and SLA can be changed), so
restricting to specific count of partitions would not satisfy their needs.
Moreover, end users are not easy to indicate the configuration before they
run query, and realize they can't modify it when they try to modify it.

> Proposal

The feature proposes decoupling data partitions and operator partitions via
introducing key groups to state, enabling scalability of operator
partitions while state data partitions remain same (so no issue on state
data). This approach is inspired by how Flink supports scalability with
partitioned state.

The concept itself is simple, while we apply such partitioning expression
to the key columns (simplified):

hash(key columns) % number of state operator partitions

it will apply below partitioning expression so that it can be distributed
via state data partitions but each state operator partition could handle
multiple state data partitions.

(hash(key columns) % number of state key groups) % number of state operator
partitions

The state data will not still be scalable actually, so the number of state
key groups will be a new hard limit (we should restrict modifying it once
query is run). But we can change the number of stateful operator partitions
afterwards. The number of stateful operator partitions should be equal or
smaller than the number of state key groups. (It doesn't make sense for
partitions to be not assigned any state key group and idle.)

> Possible Risk

Performance might be affected, because either one should be performed:

1. each partition should calculate key group id per key
2. key group id should be calculated and inserted to the row before passing
state operators (shuffle), and removed after passing state operators

There's other performance concern like committing multiple states in a
partition when number of operator partitions < number of state key groups,
but we could run it concurrently (at least for HDFS state store), and
actually it is also an issue for nowadays (all tasks may not be launched
together).

Code complexity would be introduced as expected.

> Limitation

For the first time, it will not support dynamic reconfiguration like
changing the value during query is running. Actually it can be achieved
simply via unloading all the state providers in executors before running
next batch, but it would invalidate all state caches and may incur high
latency to reload the state cache for previous batch. But I guess we could
adopt it if we feel bigger merit for reconfiguring partitions of stateful
operators against reloading state.

> Rejected alternatives

* Offline physical repartitioning of state data (loading all state and
recalculate new partition id per key and resave)

I thought about it but discarded since it should take non-trivial time to
repartition once state is going to be huge. Also it is not easy to
implement the tool running efficiently. (whole state may not be fit in
memory, have to handle them concurrently, etc.)

Please share your opinion about this proposal: opinion regarding accept or
decline, things to correct in my mail, any suggestions for improvement, etc.

Please also let me know if it would be better to move this to google doc or
pdf with filing JIRA issue.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://github.com/apache/spark/pull/21718

Reply via email to