Hi David, Thanks for the comments. AFAIK, unaligned checkpoints are disabled for pointwise connections according to [1], let's wait Piotr for confirmation. The issue itself is not directly related to this proposal as well. If a user manually specifies UIDs for each of the chained operators and has unaligned checkpoints enabled, we will encounter the same issue if they decide to break the chain on a later restart and try to recover from a retained cp.
[1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/ Best, Zhanghao Chen ________________________________ From: David Morávek <d...@apache.org> Sent: Wednesday, January 10, 2024 6:26 To: dev@flink.apache.org <dev@flink.apache.org>; Piotr Nowojski <piotr.nowoj...@gmail.com> Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for improved state compatibility on parallelism change Hi Zhanghao, Thanks for the FLIP. What you're proposing makes a lot of sense +1 Have you thought about how this works with unaligned checkpoints in case you go from unchained to chained? I think it should be fine because this scenario should only apply to forward/rebalance scenarios where we, as far as I recall, force alignment anyway, so there should be no exchanges to snapshot. It might just work, but something to double-check. Maybe @Piotr Nowojski <piotr.nowoj...@gmail.com> could confirm it. Best, D. On Wed, Jan 3, 2024 at 7:10 AM Zhanghao Chen <zhanghao.c...@outlook.com> wrote: > Dear Flink devs, > > I'd like to start a discussion on FLIP 411: Chaining-agnostic Operator ID > generation for improved state compatibility on parallelism change [1]. > > Currently, when user does not explicitly set operator UIDs, the chaining > behavior will still affect state compatibility, as the generation of the > Operator ID is dependent on its chained output nodes. For example, a simple > source->sink DAG with source and sink chained together is state > incompatible with an otherwise identical DAG with source and sink unchained > (either because the parallelisms of the two ops are changed to be unequal > or chaining is disabled). This greatly limits the flexibility to perform > chain-breaking/building for performance tuning. > > The dependency on chained output nodes for Operator ID generation can be > traced back to Flink 1.2. It is unclear at this point on why chained output > nodes are involved in the algorithm, but the following history background > might be related: prior to Flink 1.3, Flink runtime takes the snapshots by > the operator ID of the first vertex in a chain, so it somewhat makes sense > to include chained output nodes into the algorithm as > chain-breaking/building is expected to break state-compatibility anyway. > > Given that operator-level state recovery within a chain has long been > supported since Flink 1.3, I propose to introduce StreamGraphHasherV3 that > is agnostic of the chaining behavior of operators, so that users are free > to tune the parallelism of individual operators without worrying about > state incompatibility. We can make the V3 hasher an optional choice in > Flink 1.19, and make it the default hasher in 2.0 for backwards > compatibility. > > Looking forward to your suggestions on it, thanks~ > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change > > Best, > Zhanghao Chen >