Hi Yu,

I haven't thought too much about the compatibility design before. By the nature 
of the problem, it's impossible to make V3 compatible with V2, what we can do 
is to somewhat better inform users when switching the hasher, but I don't have 
any good idea so far. Do you have any suggestions on this?

Best,
Zhanghao Chen
________________________________
From: Yu Chen <[email protected]>
Sent: Thursday, January 11, 2024 13:52
To: [email protected] <[email protected]>
Cc: Piotr Nowojski <[email protected]>; [email protected] 
<[email protected]>
Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for 
improved state compatibility on parallelism change

Hi Zhanghao,

Thanks for driving this, that’s really painful for us when we need to switch 
config `pipeline.operator-chaining`.

But I have a Concern, according to FLIP description, modifying `isChainable` 
related code in `StreamGraphHasherV2` will cause the generated operator id to 
be changed, which will result in the user unable to recover from the old state 
(old and new Operator IDs can't be mapped).
Therefore switching Hasher strategy (V2->V3 or V3->V2) will lead to an 
incompatibility, is there any relevant compatibility design considered?

Best,
Yu Chen

2024年1月10日 10:25,Zhanghao Chen <[email protected]> 写道:

Hi David,

Thanks for the comments. AFAIK, unaligned checkpoints are disabled for 
pointwise connections according to [1], let's wait Piotr for confirmation. The 
issue itself is not directly related to this proposal as well. If a user 
manually specifies UIDs for each of the chained operators and has unaligned 
checkpoints enabled, we will encounter the same issue if they decide to break 
the chain on a later restart and try to recover from a retained cp.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/


Best,
Zhanghao Chen
________________________________
From: David Morávek <[email protected]>
Sent: Wednesday, January 10, 2024 6:26
To: [email protected] <[email protected]>; Piotr Nowojski 
<[email protected]>
Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for 
improved state compatibility on parallelism change

Hi Zhanghao,

Thanks for the FLIP. What you're proposing makes a lot of sense +1

Have you thought about how this works with unaligned checkpoints in case
you go from unchained to chained? I think it should be fine because this
scenario should only apply to forward/rebalance scenarios where we, as far
as I recall, force alignment anyway, so there should be no exchanges to
snapshot. It might just work, but something to double-check. Maybe @Piotr
Nowojski <[email protected]> could confirm it.

Best,
D.

On Wed, Jan 3, 2024 at 7:10 AM Zhanghao Chen <[email protected]>
wrote:

Dear Flink devs,

I'd like to start a discussion on FLIP 411: Chaining-agnostic Operator ID
generation for improved state compatibility on parallelism change [1].

Currently, when user does not explicitly set operator UIDs, the chaining
behavior will still affect state compatibility, as the generation of the
Operator ID is dependent on its chained output nodes. For example, a simple
source->sink DAG with source and sink chained together is state
incompatible with an otherwise identical DAG with source and sink unchained
(either because the parallelisms of the two ops are changed to be unequal
or chaining is disabled). This greatly limits the flexibility to perform
chain-breaking/building for performance tuning.

The dependency on chained output nodes for Operator ID generation can be
traced back to Flink 1.2. It is unclear at this point on why chained output
nodes are involved in the algorithm, but the following history background
might be related: prior to Flink 1.3, Flink runtime takes the snapshots by
the operator ID of the first vertex in a chain, so it somewhat makes sense
to include chained output nodes into the algorithm as
chain-breaking/building is expected to break state-compatibility anyway.

Given that operator-level state recovery within a chain has long been
supported since Flink 1.3, I propose to introduce StreamGraphHasherV3 that
is agnostic of the chaining behavior of operators, so that users are free
to tune the parallelism of individual operators without worrying about
state incompatibility. We can make the V3 hasher an optional choice in
Flink 1.19, and make it the default hasher in 2.0 for backwards
compatibility.

Looking forward to your suggestions on it, thanks~

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change

Best,
Zhanghao Chen


Reply via email to