Hi,

> However, compiled plan is still too complicated for Flink newbies from my 
> point of view.

I don't think that the compiled plan was ever positioned to be a
simple solution. If you want to have an easy approach, we have a
declarative solution in place with SQL and/or the Table API imho.

Best regards,

Martijn

On Thu, Feb 8, 2024 at 9:14 AM Zhanghao Chen <[email protected]> wrote:
>
> Hi Piotr,
>
> Thanks for the comment. I agree that compiled plan is the ultimate tool for 
> Flink SQL if one wants to make any changes to
> query later, and this FLIP indeed is not essential in this sense. However, 
> compiled plan is still too complicated for Flink newbies from my point of 
> view. As I mentioned previously, our internal platform provides a visualized 
> tool for editing the compiled plan but most users still find it complex. 
> Therefore, the FLIP can still benefit users with better useability and the 
> proposed changes are actually quite lightweight (just copying a new hasher 
> with 2 lines deleted + extending the OperatorIdPair data structure) without 
> much extra effort.
>
> Best,
> Zhanghao Chen
> ________________________________
> From: Piotr Nowojski <[email protected]>
> Sent: Thursday, February 8, 2024 14:50
> To: Zhanghao Chen <[email protected]>
> Cc: Chesnay Schepler <[email protected]>; [email protected] 
> <[email protected]>; Yu Chen <[email protected]>
> Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for 
> improved state compatibility on parallelism change
>
> Hey
>
> > AFAIK, there's no way to set UIDs for a SQL job,
>
> AFAIK you can't set UID manually, but  Flink SQL generates a compiled plan
> of a query with embedded UIDs. As I understand it, using a compiled plan is
> the preferred (only?) way for Flink SQL if one wants to make any changes to
> query later on or support Flink's runtime upgrades, without losing the
> state.
>
> If that's the case, what would be the usefulness of this FLIP? Only for
> DataStream API for users that didn't know that they should have manually
> configured UIDs? But they have the workaround to actually post-factum add
> the UIDs anyway, right? So maybe indeed Chesnay is right that this FLIP is
> not that helpful/worth the extra effort?
>
> Best,
> Piotrek
>
> czw., 8 lut 2024 o 03:55 Zhanghao Chen <[email protected]>
> napisał(a):
>
> > Hi Chesnay,
> >
> > AFAIK, there's no way to set UIDs for a SQL job, it'll be great if you can
> > share how you allow UID setting for SQL jobs. We've explored providing a
> > visualized DAG editor for SQL jobs that allows UID setting on our internal
> > platform, but most users found it too complicated to use. Another
> > possible way is to utilize SQL hints, but that's complicated as well. From
> > our experience, many SQL users are not familiar with Flink, what they want
> > is an experience similar to writing a normal SQL in MySQL, without
> > involving much extra concepts like the DAG and the UID. In fact, some
> > DataStream and PyFlink users also share the same concern.
> >
> > On the other hand, some performance-tuning is inevitable for a
> > long-running jobs in production, and parallelism tuning is among the most
> > common techniques. FLIP-367 [1] and FLIP-146 [2] allow user to tune the
> > parallelism of source and sinks, and both are well-received in the
> > discussion thread. Users definitely don't want to lost state after a
> > parallelism tuning, which is highly risky at present.
> >
> > Putting these together, I think the FLIP has a high value in production.
> > Through offline discussion, I leant that multiple companies have developed
> > or trying to develop similar hasher changes in their internal distribution,
> > including ByteDance, Xiaohongshu, and Bilibili. It'll be great if we can
> > improve the SQL experience for all community users as well, WDYT?
> >
> > Best,
> > Zhanghao Chen
> > ------------------------------
> > *From:* Chesnay Schepler <[email protected]>
> > *Sent:* Thursday, February 8, 2024 2:01
> > *To:* [email protected] <[email protected]>; Zhanghao Chen <
> > [email protected]>; Piotr Nowojski <[email protected]>; Yu
> > Chen <[email protected]>
> > *Subject:* Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> > generation for improved state compatibility on parallelism change
> >
> > The FLIP is a bit weird to be honest. It only applies in cases where
> > users haven't set uids, but that goes against best-practices and as far
> > as I'm told SQL also sets UIDs everywhere.
> >
> > I'm wondering if this is really worth the effort.
> >
> > On 07/02/2024 10:23, Zhanghao Chen wrote:
> > > After offline discussion with @Yu Chen<mailto:[email protected]
> > <[email protected]>>, I've updated the FLIP [1] to include a design
> > that allows for compatible hasher upgrade by adding StreamGraphHasherV2 to
> > the legacy hasher list, which is actually a revival of the idea from
> > FLIP-5290 [2] when StreamGraphHasherV2 was introduced in Flink 1.2. We're
> > targeting to make V3 the default hasher in Flink 1.20 given that
> > state-compatibility is no longer an issue. Take a review when you have a
> > chance, and I'd like to especially thank @Yu Chen<
> > mailto:[email protected] <[email protected]>> for the through
> > offline discussion and code debugging help to make this possible.
> > >
> > > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change
> > > [2] https://issues.apache.org/jira/browse/FLINK-5290
> > >
> > > Best,
> > > Zhanghao Chen
> > > ________________________________
> > > From: Zhanghao Chen <[email protected]>
> > > Sent: Friday, January 12, 2024 10:46
> > > To: Piotr Nowojski <[email protected]>; Yu Chen <
> > [email protected]>
> > > Cc: [email protected] <[email protected]>
> > > Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> > generation for improved state compatibility on parallelism change
> > >
> > > Thanks for the input, Piotr. It might still be possible to make it
> > compatible with the old snapshots, following the direction of FLINK-5290<
> > https://issues.apache.org/jira/browse/FLINK-5290> suggested by Yu. I'll
> > discuss with Yu on more details.
> > >
> > > Best,
> > > Zhanghao Chen
> > > ________________________________
> > > From: Piotr Nowojski <[email protected]>
> > > Sent: Friday, January 12, 2024 1:55
> > > To: Yu Chen <[email protected]>
> > > Cc: Zhanghao Chen <[email protected]>; [email protected] <
> > [email protected]>
> > > Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> > generation for improved state compatibility on parallelism change
> > >
> > > Hi,
> > >
> > > Using unaligned checkpoints is orthogonal to this FLIP.
> > >
> > > Yes, unaligned checkpoints are not supported for pointwise connections,
> > so most of the cases go away anyway.
> > > It is possible to switch from unchained to chained subtasks by removing
> > a keyBy exchange, and this would be
> > > a problem, but that's just one of the things that we claim that
> > unaligned checkpoints do not support [1]. But as
> > > I stated above, this is an orthogonal issue to this FLIP.
> > >
> > > Regarding the proposal itself, generally speaking it makes sense to me
> > as well. However I'm quite worried about
> > > the compatibility and/or migration path. The:
> > >> (v2.0) Make HasherV3 the default hasher, mark HasherV2 deprecated.
> > > step would break the compatibility with Flink 1.xx snapshots. But as
> > this is for v2.0, maybe that's not the end of
> > > the world?
> > >
> > > Best,
> > > Piotrek
> > >
> > > [1]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/#capabilities-and-limitations
> > >
> > > czw., 11 sty 2024 o 12:10 Yu Chen <[email protected]<mailto:
> > [email protected]>> napisał(a):
> > > Hi Zhanghao,
> > >
> > > Actually, Stefan has done similar compatibility work in the early
> > FLINK-5290[1], where he introduced the legacyStreamGraphHashers list for
> > hasher backward compatibility.
> > >
> > > We have attempted to implement a similar feature in the internal version
> > of FLINK and tried to include the new hasher as part of the
> > legacyStreamGraphHashers,
> > > which would ensure that the corresponding Operator State could be found
> > at restore while ignoring the chaining condition(without changing the
> > default hasher).
> > >
> > > However, we have found that such a solution may lead to some unexpected
> > situations in some cases. While I have no time to find out the root cause
> > recently.
> > >
> > > If you're interested, I'd be happy to discuss it with you and try to
> > solve the problem.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-5290
> > >
> > > Best,
> > > Yu Chen
> > >
> > >
> > >
> > > 2024年1月11日 15:07,Zhanghao Chen <[email protected]<mailto:
> > [email protected]>> 写道:
> > >
> > > Hi Yu,
> > >
> > > I haven't thought too much about the compatibility design before. By the
> > nature of the problem, it's impossible to make V3 compatible with V2, what
> > we can do is to somewhat better inform users when switching the hasher, but
> > I don't have any good idea so far. Do you have any suggestions on this?
> > >
> > > Best,
> > > Zhanghao Chen
> > > ________________________________
> > > From: Yu Chen <[email protected]<mailto:[email protected]>>
> > > Sent: Thursday, January 11, 2024 13:52
> > > To: [email protected]<mailto:[email protected]> <
> > [email protected]<mailto:[email protected]>>
> > > Cc: Piotr Nowojski <[email protected]<mailto:
> > [email protected]>>; [email protected]<mailto:
> > [email protected]> <[email protected]<mailto:
> > [email protected]>>
> > > Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> > generation for improved state compatibility on parallelism change
> > >
> > > Hi Zhanghao,
> > >
> > > Thanks for driving this, that’s really painful for us when we need to
> > switch config `pipeline.operator-chaining`.
> > >
> > > But I have a Concern, according to FLIP description, modifying
> > `isChainable` related code in `StreamGraphHasherV2` will cause the
> > generated operator id to be changed, which will result in the user unable
> > to recover from the old state (old and new Operator IDs can't be mapped).
> > > Therefore switching Hasher strategy (V2->V3 or V3->V2) will lead to an
> > incompatibility, is there any relevant compatibility design considered?
> > >
> > > Best,
> > > Yu Chen
> > >
> > > 2024年1月10日 10:25,Zhanghao Chen <[email protected]<mailto:
> > [email protected]>> 写道:
> > >
> > > Hi David,
> > >
> > > Thanks for the comments. AFAIK, unaligned checkpoints are disabled for
> > pointwise connections according to [1], let's wait Piotr for confirmation.
> > The issue itself is not directly related to this proposal as well. If a
> > user manually specifies UIDs for each of the chained operators and has
> > unaligned checkpoints enabled, we will encounter the same issue if they
> > decide to break the chain on a later restart and try to recover from a
> > retained cp.
> > >
> > > [1]
> > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/
> > >
> > >
> > > Best,
> > > Zhanghao Chen
> > > ________________________________
> > > From: David Morávek <[email protected]<mailto:[email protected]>>
> > > Sent: Wednesday, January 10, 2024 6:26
> > > To: [email protected]<mailto:[email protected]> <
> > [email protected]<mailto:[email protected]>>; Piotr Nowojski <
> > [email protected]<mailto:[email protected]>>
> > > Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> > generation for improved state compatibility on parallelism change
> > >
> > > Hi Zhanghao,
> > >
> > > Thanks for the FLIP. What you're proposing makes a lot of sense +1
> > >
> > > Have you thought about how this works with unaligned checkpoints in case
> > > you go from unchained to chained? I think it should be fine because this
> > > scenario should only apply to forward/rebalance scenarios where we, as
> > far
> > > as I recall, force alignment anyway, so there should be no exchanges to
> > > snapshot. It might just work, but something to double-check. Maybe @Piotr
> > > Nowojski <[email protected]<mailto:[email protected]>>
> > could confirm it.
> > >
> > > Best,
> > > D.
> > >
> > > On Wed, Jan 3, 2024 at 7:10 AM Zhanghao Chen <[email protected]
> > <mailto:[email protected]>>
> > > wrote:
> > >
> > > Dear Flink devs,
> > >
> > > I'd like to start a discussion on FLIP 411: Chaining-agnostic Operator ID
> > > generation for improved state compatibility on parallelism change [1].
> > >
> > > Currently, when user does not explicitly set operator UIDs, the chaining
> > > behavior will still affect state compatibility, as the generation of the
> > > Operator ID is dependent on its chained output nodes. For example, a
> > simple
> > > source->sink DAG with source and sink chained together is state
> > > incompatible with an otherwise identical DAG with source and sink
> > unchained
> > > (either because the parallelisms of the two ops are changed to be unequal
> > > or chaining is disabled). This greatly limits the flexibility to perform
> > > chain-breaking/building for performance tuning.
> > >
> > > The dependency on chained output nodes for Operator ID generation can be
> > > traced back to Flink 1.2. It is unclear at this point on why chained
> > output
> > > nodes are involved in the algorithm, but the following history background
> > > might be related: prior to Flink 1.3, Flink runtime takes the snapshots
> > by
> > > the operator ID of the first vertex in a chain, so it somewhat makes
> > sense
> > > to include chained output nodes into the algorithm as
> > > chain-breaking/building is expected to break state-compatibility anyway.
> > >
> > > Given that operator-level state recovery within a chain has long been
> > > supported since Flink 1.3, I propose to introduce StreamGraphHasherV3
> > that
> > > is agnostic of the chaining behavior of operators, so that users are free
> > > to tune the parallelism of individual operators without worrying about
> > > state incompatibility. We can make the V3 hasher an optional choice in
> > > Flink 1.19, and make it the default hasher in 2.0 for backwards
> > > compatibility.
> > >
> > > Looking forward to your suggestions on it, thanks~
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change
> > >
> > > Best,
> > > Zhanghao Chen
> > >
> >
> >

Reply via email to