Dear Flink devs, I'd like to start a discussion on FLIP 411: Chaining-agnostic Operator ID generation for improved state compatibility on parallelism change [1].
Currently, when user does not explicitly set operator UIDs, the chaining behavior will still affect state compatibility, as the generation of the Operator ID is dependent on its chained output nodes. For example, a simple source->sink DAG with source and sink chained together is state incompatible with an otherwise identical DAG with source and sink unchained (either because the parallelisms of the two ops are changed to be unequal or chaining is disabled). This greatly limits the flexibility to perform chain-breaking/building for performance tuning. The dependency on chained output nodes for Operator ID generation can be traced back to Flink 1.2. It is unclear at this point on why chained output nodes are involved in the algorithm, but the following history background might be related: prior to Flink 1.3, Flink runtime takes the snapshots by the operator ID of the first vertex in a chain, so it somewhat makes sense to include chained output nodes into the algorithm as chain-breaking/building is expected to break state-compatibility anyway. Given that operator-level state recovery within a chain has long been supported since Flink 1.3, I propose to introduce StreamGraphHasherV3 that is agnostic of the chaining behavior of operators, so that users are free to tune the parallelism of individual operators without worrying about state incompatibility. We can make the V3 hasher an optional choice in Flink 1.19, and make it the default hasher in 2.0 for backwards compatibility. Looking forward to your suggestions on it, thanks~ [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change Best, Zhanghao Chen