Dear Flink devs,

I'd like to start a discussion on FLIP 411: Chaining-agnostic Operator ID 
generation for improved state compatibility on parallelism change [1].
Currently, when user does not explicitly set operator UIDs, the chaining 
behavior will still affect state compatibility, as the generation of the 
Operator ID is dependent on its chained output nodes. For example, a simple 
source->sink DAG with source and sink chained together is state incompatible 
with an otherwise identical DAG with source and sink unchained (either because 
the parallelisms of the two ops are changed to be unequal or chaining is 
disabled). This greatly limits the flexibility to perform 
chain-breaking/building for performance tuning.

The dependency on chained output nodes for Operator ID generation can be traced 
back to Flink 1.2. It is unclear at this point on why chained output nodes are 
involved in the algorithm, but the following history background might be 
related: prior to Flink 1.3, Flink runtime takes the snapshots by the operator 
ID of the first vertex in a chain, so it somewhat makes sense to include 
chained output nodes into the algorithm as chain-breaking/building is expected 
to break state-compatibility anyway.

Given that operator-level state recovery within a chain has long been supported 
since Flink 1.3, I propose to introduce StreamGraphHasherV3 that is agnostic of 
the chaining behavior of operators, so that users are free to tune the 
parallelism of individual operators without worrying about state 
incompatibility. We can make the V3 hasher an optional choice in Flink 1.19, 
and make it the default hasher in 2.0 for backwards compatibility.

Looking forward to your suggestions on it, thanks~

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change

Best,
Zhanghao Chen

Reply via email to