After offline discussion with @Yu Chen<mailto:yuchen.e...@gmail.com>, I've 
updated the FLIP [1] to include a design that allows for compatible hasher 
upgrade by adding StreamGraphHasherV2 to the legacy hasher list, which is 
actually a revival of the idea from FLIP-5290 [2] when StreamGraphHasherV2 was 
introduced in Flink 1.2. We're targeting to make V3 the default hasher in Flink 
1.20 given that state-compatibility is no longer an issue. Take a review when 
you have a chance, and I'd like to especially thank @Yu 
Chen<mailto:yuchen.e...@gmail.com> for the through offline discussion and code 
debugging help to make this possible.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change
[2] https://issues.apache.org/jira/browse/FLINK-5290

Best,
Zhanghao Chen
________________________________
From: Zhanghao Chen <zhanghao.c...@outlook.com>
Sent: Friday, January 12, 2024 10:46
To: Piotr Nowojski <pnowoj...@apache.org>; Yu Chen <yuchen.e...@gmail.com>
Cc: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for 
improved state compatibility on parallelism change

Thanks for the input, Piotr. It might still be possible to make it compatible 
with the old snapshots, following the direction of 
FLINK-5290<https://issues.apache.org/jira/browse/FLINK-5290> suggested by Yu. 
I'll discuss with Yu on more details.

Best,
Zhanghao Chen
________________________________
From: Piotr Nowojski <pnowoj...@apache.org>
Sent: Friday, January 12, 2024 1:55
To: Yu Chen <yuchen.e...@gmail.com>
Cc: Zhanghao Chen <zhanghao.c...@outlook.com>; dev@flink.apache.org 
<dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for 
improved state compatibility on parallelism change

Hi,

Using unaligned checkpoints is orthogonal to this FLIP.

Yes, unaligned checkpoints are not supported for pointwise connections, so most 
of the cases go away anyway.
It is possible to switch from unchained to chained subtasks by removing a keyBy 
exchange, and this would be
a problem, but that's just one of the things that we claim that unaligned 
checkpoints do not support [1]. But as
I stated above, this is an orthogonal issue to this FLIP.

Regarding the proposal itself, generally speaking it makes sense to me as well. 
However I'm quite worried about
the compatibility and/or migration path. The:
> (v2.0) Make HasherV3 the default hasher, mark HasherV2 deprecated.

step would break the compatibility with Flink 1.xx snapshots. But as this is 
for v2.0, maybe that's not the end of
the world?

Best,
Piotrek

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/#capabilities-and-limitations

czw., 11 sty 2024 o 12:10 Yu Chen 
<yuchen.e...@gmail.com<mailto:yuchen.e...@gmail.com>> napisał(a):
Hi Zhanghao,

Actually, Stefan has done similar compatibility work in the early 
FLINK-5290[1], where he introduced the legacyStreamGraphHashers list for hasher 
backward compatibility.

We have attempted to implement a similar feature in the internal version of 
FLINK and tried to include the new hasher as part of the 
legacyStreamGraphHashers,
which would ensure that the corresponding Operator State could be found at 
restore while ignoring the chaining condition(without changing the default 
hasher).

However, we have found that such a solution may lead to some unexpected 
situations in some cases. While I have no time to find out the root cause 
recently.

If you're interested, I'd be happy to discuss it with you and try to solve the 
problem.

[1] https://issues.apache.org/jira/browse/FLINK-5290

Best,
Yu Chen



2024年1月11日 15:07,Zhanghao Chen 
<zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> 写道:

Hi Yu,

I haven't thought too much about the compatibility design before. By the nature 
of the problem, it's impossible to make V3 compatible with V2, what we can do 
is to somewhat better inform users when switching the hasher, but I don't have 
any good idea so far. Do you have any suggestions on this?

Best,
Zhanghao Chen
________________________________
From: Yu Chen <yuchen.e...@gmail.com<mailto:yuchen.e...@gmail.com>>
Sent: Thursday, January 11, 2024 13:52
To: dev@flink.apache.org<mailto:dev@flink.apache.org> 
<dev@flink.apache.org<mailto:dev@flink.apache.org>>
Cc: Piotr Nowojski <piotr.nowoj...@gmail.com<mailto:piotr.nowoj...@gmail.com>>; 
zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com> 
<zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>>
Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for 
improved state compatibility on parallelism change

Hi Zhanghao,

Thanks for driving this, that’s really painful for us when we need to switch 
config `pipeline.operator-chaining`.

But I have a Concern, according to FLIP description, modifying `isChainable` 
related code in `StreamGraphHasherV2` will cause the generated operator id to 
be changed, which will result in the user unable to recover from the old state 
(old and new Operator IDs can't be mapped).
Therefore switching Hasher strategy (V2->V3 or V3->V2) will lead to an 
incompatibility, is there any relevant compatibility design considered?

Best,
Yu Chen

2024年1月10日 10:25,Zhanghao Chen 
<zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> 写道:

Hi David,

Thanks for the comments. AFAIK, unaligned checkpoints are disabled for 
pointwise connections according to [1], let's wait Piotr for confirmation. The 
issue itself is not directly related to this proposal as well. If a user 
manually specifies UIDs for each of the chained operators and has unaligned 
checkpoints enabled, we will encounter the same issue if they decide to break 
the chain on a later restart and try to recover from a retained cp.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/


Best,
Zhanghao Chen
________________________________
From: David Morávek <d...@apache.org<mailto:d...@apache.org>>
Sent: Wednesday, January 10, 2024 6:26
To: dev@flink.apache.org<mailto:dev@flink.apache.org> 
<dev@flink.apache.org<mailto:dev@flink.apache.org>>; Piotr Nowojski 
<piotr.nowoj...@gmail.com<mailto:piotr.nowoj...@gmail.com>>
Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for 
improved state compatibility on parallelism change

Hi Zhanghao,

Thanks for the FLIP. What you're proposing makes a lot of sense +1

Have you thought about how this works with unaligned checkpoints in case
you go from unchained to chained? I think it should be fine because this
scenario should only apply to forward/rebalance scenarios where we, as far
as I recall, force alignment anyway, so there should be no exchanges to
snapshot. It might just work, but something to double-check. Maybe @Piotr
Nowojski <piotr.nowoj...@gmail.com<mailto:piotr.nowoj...@gmail.com>> could 
confirm it.

Best,
D.

On Wed, Jan 3, 2024 at 7:10 AM Zhanghao Chen 
<zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>>
wrote:

Dear Flink devs,

I'd like to start a discussion on FLIP 411: Chaining-agnostic Operator ID
generation for improved state compatibility on parallelism change [1].

Currently, when user does not explicitly set operator UIDs, the chaining
behavior will still affect state compatibility, as the generation of the
Operator ID is dependent on its chained output nodes. For example, a simple
source->sink DAG with source and sink chained together is state
incompatible with an otherwise identical DAG with source and sink unchained
(either because the parallelisms of the two ops are changed to be unequal
or chaining is disabled). This greatly limits the flexibility to perform
chain-breaking/building for performance tuning.

The dependency on chained output nodes for Operator ID generation can be
traced back to Flink 1.2. It is unclear at this point on why chained output
nodes are involved in the algorithm, but the following history background
might be related: prior to Flink 1.3, Flink runtime takes the snapshots by
the operator ID of the first vertex in a chain, so it somewhat makes sense
to include chained output nodes into the algorithm as
chain-breaking/building is expected to break state-compatibility anyway.

Given that operator-level state recovery within a chain has long been
supported since Flink 1.3, I propose to introduce StreamGraphHasherV3 that
is agnostic of the chaining behavior of operators, so that users are free
to tune the parallelism of individual operators without worrying about
state incompatibility. We can make the V3 hasher an optional choice in
Flink 1.19, and make it the default hasher in 2.0 for backwards
compatibility.

Looking forward to your suggestions on it, thanks~

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change

Best,
Zhanghao Chen

Reply via email to