Hi David,

Thanks for the comments. AFAIK, unaligned checkpoints are disabled for 
pointwise connections according to [1], let's wait Piotr for confirmation. The 
issue itself is not directly related to this proposal as well. If a user 
manually specifies UIDs for each of the chained operators and has unaligned 
checkpoints enabled, we will encounter the same issue if they decide to break 
the chain on a later restart and try to recover from a retained cp.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/


Best,
Zhanghao Chen
________________________________
From: David Morávek <d...@apache.org>
Sent: Wednesday, January 10, 2024 6:26
To: dev@flink.apache.org <dev@flink.apache.org>; Piotr Nowojski 
<piotr.nowoj...@gmail.com>
Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for 
improved state compatibility on parallelism change

Hi Zhanghao,

Thanks for the FLIP. What you're proposing makes a lot of sense +1

Have you thought about how this works with unaligned checkpoints in case
you go from unchained to chained? I think it should be fine because this
scenario should only apply to forward/rebalance scenarios where we, as far
as I recall, force alignment anyway, so there should be no exchanges to
snapshot. It might just work, but something to double-check. Maybe @Piotr
Nowojski <piotr.nowoj...@gmail.com> could confirm it.

Best,
D.

On Wed, Jan 3, 2024 at 7:10 AM Zhanghao Chen <zhanghao.c...@outlook.com>
wrote:

> Dear Flink devs,
>
> I'd like to start a discussion on FLIP 411: Chaining-agnostic Operator ID
> generation for improved state compatibility on parallelism change [1].
>
> Currently, when user does not explicitly set operator UIDs, the chaining
> behavior will still affect state compatibility, as the generation of the
> Operator ID is dependent on its chained output nodes. For example, a simple
> source->sink DAG with source and sink chained together is state
> incompatible with an otherwise identical DAG with source and sink unchained
> (either because the parallelisms of the two ops are changed to be unequal
> or chaining is disabled). This greatly limits the flexibility to perform
> chain-breaking/building for performance tuning.
>
> The dependency on chained output nodes for Operator ID generation can be
> traced back to Flink 1.2. It is unclear at this point on why chained output
> nodes are involved in the algorithm, but the following history background
> might be related: prior to Flink 1.3, Flink runtime takes the snapshots by
> the operator ID of the first vertex in a chain, so it somewhat makes sense
> to include chained output nodes into the algorithm as
> chain-breaking/building is expected to break state-compatibility anyway.
>
> Given that operator-level state recovery within a chain has long been
> supported since Flink 1.3, I propose to introduce StreamGraphHasherV3 that
> is agnostic of the chaining behavior of operators, so that users are free
> to tune the parallelism of individual operators without worrying about
> state incompatibility. We can make the V3 hasher an optional choice in
> Flink 1.19, and make it the default hasher in 2.0 for backwards
> compatibility.
>
> Looking forward to your suggestions on it, thanks~
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change
>
> Best,
> Zhanghao Chen
>

Reply via email to