Hi Zhanghao, Thanks for the FLIP. What you're proposing makes a lot of sense +1
Have you thought about how this works with unaligned checkpoints in case you go from unchained to chained? I think it should be fine because this scenario should only apply to forward/rebalance scenarios where we, as far as I recall, force alignment anyway, so there should be no exchanges to snapshot. It might just work, but something to double-check. Maybe @Piotr Nowojski <piotr.nowoj...@gmail.com> could confirm it. Best, D. On Wed, Jan 3, 2024 at 7:10 AM Zhanghao Chen <zhanghao.c...@outlook.com> wrote: > Dear Flink devs, > > I'd like to start a discussion on FLIP 411: Chaining-agnostic Operator ID > generation for improved state compatibility on parallelism change [1]. > > Currently, when user does not explicitly set operator UIDs, the chaining > behavior will still affect state compatibility, as the generation of the > Operator ID is dependent on its chained output nodes. For example, a simple > source->sink DAG with source and sink chained together is state > incompatible with an otherwise identical DAG with source and sink unchained > (either because the parallelisms of the two ops are changed to be unequal > or chaining is disabled). This greatly limits the flexibility to perform > chain-breaking/building for performance tuning. > > The dependency on chained output nodes for Operator ID generation can be > traced back to Flink 1.2. It is unclear at this point on why chained output > nodes are involved in the algorithm, but the following history background > might be related: prior to Flink 1.3, Flink runtime takes the snapshots by > the operator ID of the first vertex in a chain, so it somewhat makes sense > to include chained output nodes into the algorithm as > chain-breaking/building is expected to break state-compatibility anyway. > > Given that operator-level state recovery within a chain has long been > supported since Flink 1.3, I propose to introduce StreamGraphHasherV3 that > is agnostic of the chaining behavior of operators, so that users are free > to tune the parallelism of individual operators without worrying about > state incompatibility. We can make the V3 hasher an optional choice in > Flink 1.19, and make it the default hasher in 2.0 for backwards > compatibility. > > Looking forward to your suggestions on it, thanks~ > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change > > Best, > Zhanghao Chen >