Hi, > However, compiled plan is still too complicated for Flink newbies from my > point of view.
I don't think that the compiled plan was ever positioned to be a simple solution. If you want to have an easy approach, we have a declarative solution in place with SQL and/or the Table API imho. Best regards, Martijn On Thu, Feb 8, 2024 at 9:14 AM Zhanghao Chen <[email protected]> wrote: > > Hi Piotr, > > Thanks for the comment. I agree that compiled plan is the ultimate tool for > Flink SQL if one wants to make any changes to > query later, and this FLIP indeed is not essential in this sense. However, > compiled plan is still too complicated for Flink newbies from my point of > view. As I mentioned previously, our internal platform provides a visualized > tool for editing the compiled plan but most users still find it complex. > Therefore, the FLIP can still benefit users with better useability and the > proposed changes are actually quite lightweight (just copying a new hasher > with 2 lines deleted + extending the OperatorIdPair data structure) without > much extra effort. > > Best, > Zhanghao Chen > ________________________________ > From: Piotr Nowojski <[email protected]> > Sent: Thursday, February 8, 2024 14:50 > To: Zhanghao Chen <[email protected]> > Cc: Chesnay Schepler <[email protected]>; [email protected] > <[email protected]>; Yu Chen <[email protected]> > Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for > improved state compatibility on parallelism change > > Hey > > > AFAIK, there's no way to set UIDs for a SQL job, > > AFAIK you can't set UID manually, but Flink SQL generates a compiled plan > of a query with embedded UIDs. As I understand it, using a compiled plan is > the preferred (only?) way for Flink SQL if one wants to make any changes to > query later on or support Flink's runtime upgrades, without losing the > state. > > If that's the case, what would be the usefulness of this FLIP? Only for > DataStream API for users that didn't know that they should have manually > configured UIDs? But they have the workaround to actually post-factum add > the UIDs anyway, right? So maybe indeed Chesnay is right that this FLIP is > not that helpful/worth the extra effort? > > Best, > Piotrek > > czw., 8 lut 2024 o 03:55 Zhanghao Chen <[email protected]> > napisał(a): > > > Hi Chesnay, > > > > AFAIK, there's no way to set UIDs for a SQL job, it'll be great if you can > > share how you allow UID setting for SQL jobs. We've explored providing a > > visualized DAG editor for SQL jobs that allows UID setting on our internal > > platform, but most users found it too complicated to use. Another > > possible way is to utilize SQL hints, but that's complicated as well. From > > our experience, many SQL users are not familiar with Flink, what they want > > is an experience similar to writing a normal SQL in MySQL, without > > involving much extra concepts like the DAG and the UID. In fact, some > > DataStream and PyFlink users also share the same concern. > > > > On the other hand, some performance-tuning is inevitable for a > > long-running jobs in production, and parallelism tuning is among the most > > common techniques. FLIP-367 [1] and FLIP-146 [2] allow user to tune the > > parallelism of source and sinks, and both are well-received in the > > discussion thread. Users definitely don't want to lost state after a > > parallelism tuning, which is highly risky at present. > > > > Putting these together, I think the FLIP has a high value in production. > > Through offline discussion, I leant that multiple companies have developed > > or trying to develop similar hasher changes in their internal distribution, > > including ByteDance, Xiaohongshu, and Bilibili. It'll be great if we can > > improve the SQL experience for all community users as well, WDYT? > > > > Best, > > Zhanghao Chen > > ------------------------------ > > *From:* Chesnay Schepler <[email protected]> > > *Sent:* Thursday, February 8, 2024 2:01 > > *To:* [email protected] <[email protected]>; Zhanghao Chen < > > [email protected]>; Piotr Nowojski <[email protected]>; Yu > > Chen <[email protected]> > > *Subject:* Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID > > generation for improved state compatibility on parallelism change > > > > The FLIP is a bit weird to be honest. It only applies in cases where > > users haven't set uids, but that goes against best-practices and as far > > as I'm told SQL also sets UIDs everywhere. > > > > I'm wondering if this is really worth the effort. > > > > On 07/02/2024 10:23, Zhanghao Chen wrote: > > > After offline discussion with @Yu Chen<mailto:[email protected] > > <[email protected]>>, I've updated the FLIP [1] to include a design > > that allows for compatible hasher upgrade by adding StreamGraphHasherV2 to > > the legacy hasher list, which is actually a revival of the idea from > > FLIP-5290 [2] when StreamGraphHasherV2 was introduced in Flink 1.2. We're > > targeting to make V3 the default hasher in Flink 1.20 given that > > state-compatibility is no longer an issue. Take a review when you have a > > chance, and I'd like to especially thank @Yu Chen< > > mailto:[email protected] <[email protected]>> for the through > > offline discussion and code debugging help to make this possible. > > > > > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change > > > [2] https://issues.apache.org/jira/browse/FLINK-5290 > > > > > > Best, > > > Zhanghao Chen > > > ________________________________ > > > From: Zhanghao Chen <[email protected]> > > > Sent: Friday, January 12, 2024 10:46 > > > To: Piotr Nowojski <[email protected]>; Yu Chen < > > [email protected]> > > > Cc: [email protected] <[email protected]> > > > Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID > > generation for improved state compatibility on parallelism change > > > > > > Thanks for the input, Piotr. It might still be possible to make it > > compatible with the old snapshots, following the direction of FLINK-5290< > > https://issues.apache.org/jira/browse/FLINK-5290> suggested by Yu. I'll > > discuss with Yu on more details. > > > > > > Best, > > > Zhanghao Chen > > > ________________________________ > > > From: Piotr Nowojski <[email protected]> > > > Sent: Friday, January 12, 2024 1:55 > > > To: Yu Chen <[email protected]> > > > Cc: Zhanghao Chen <[email protected]>; [email protected] < > > [email protected]> > > > Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID > > generation for improved state compatibility on parallelism change > > > > > > Hi, > > > > > > Using unaligned checkpoints is orthogonal to this FLIP. > > > > > > Yes, unaligned checkpoints are not supported for pointwise connections, > > so most of the cases go away anyway. > > > It is possible to switch from unchained to chained subtasks by removing > > a keyBy exchange, and this would be > > > a problem, but that's just one of the things that we claim that > > unaligned checkpoints do not support [1]. But as > > > I stated above, this is an orthogonal issue to this FLIP. > > > > > > Regarding the proposal itself, generally speaking it makes sense to me > > as well. However I'm quite worried about > > > the compatibility and/or migration path. The: > > >> (v2.0) Make HasherV3 the default hasher, mark HasherV2 deprecated. > > > step would break the compatibility with Flink 1.xx snapshots. But as > > this is for v2.0, maybe that's not the end of > > > the world? > > > > > > Best, > > > Piotrek > > > > > > [1] > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/#capabilities-and-limitations > > > > > > czw., 11 sty 2024 o 12:10 Yu Chen <[email protected]<mailto: > > [email protected]>> napisał(a): > > > Hi Zhanghao, > > > > > > Actually, Stefan has done similar compatibility work in the early > > FLINK-5290[1], where he introduced the legacyStreamGraphHashers list for > > hasher backward compatibility. > > > > > > We have attempted to implement a similar feature in the internal version > > of FLINK and tried to include the new hasher as part of the > > legacyStreamGraphHashers, > > > which would ensure that the corresponding Operator State could be found > > at restore while ignoring the chaining condition(without changing the > > default hasher). > > > > > > However, we have found that such a solution may lead to some unexpected > > situations in some cases. While I have no time to find out the root cause > > recently. > > > > > > If you're interested, I'd be happy to discuss it with you and try to > > solve the problem. > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-5290 > > > > > > Best, > > > Yu Chen > > > > > > > > > > > > 2024年1月11日 15:07,Zhanghao Chen <[email protected]<mailto: > > [email protected]>> 写道: > > > > > > Hi Yu, > > > > > > I haven't thought too much about the compatibility design before. By the > > nature of the problem, it's impossible to make V3 compatible with V2, what > > we can do is to somewhat better inform users when switching the hasher, but > > I don't have any good idea so far. Do you have any suggestions on this? > > > > > > Best, > > > Zhanghao Chen > > > ________________________________ > > > From: Yu Chen <[email protected]<mailto:[email protected]>> > > > Sent: Thursday, January 11, 2024 13:52 > > > To: [email protected]<mailto:[email protected]> < > > [email protected]<mailto:[email protected]>> > > > Cc: Piotr Nowojski <[email protected]<mailto: > > [email protected]>>; [email protected]<mailto: > > [email protected]> <[email protected]<mailto: > > [email protected]>> > > > Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID > > generation for improved state compatibility on parallelism change > > > > > > Hi Zhanghao, > > > > > > Thanks for driving this, that’s really painful for us when we need to > > switch config `pipeline.operator-chaining`. > > > > > > But I have a Concern, according to FLIP description, modifying > > `isChainable` related code in `StreamGraphHasherV2` will cause the > > generated operator id to be changed, which will result in the user unable > > to recover from the old state (old and new Operator IDs can't be mapped). > > > Therefore switching Hasher strategy (V2->V3 or V3->V2) will lead to an > > incompatibility, is there any relevant compatibility design considered? > > > > > > Best, > > > Yu Chen > > > > > > 2024年1月10日 10:25,Zhanghao Chen <[email protected]<mailto: > > [email protected]>> 写道: > > > > > > Hi David, > > > > > > Thanks for the comments. AFAIK, unaligned checkpoints are disabled for > > pointwise connections according to [1], let's wait Piotr for confirmation. > > The issue itself is not directly related to this proposal as well. If a > > user manually specifies UIDs for each of the chained operators and has > > unaligned checkpoints enabled, we will encounter the same issue if they > > decide to break the chain on a later restart and try to recover from a > > retained cp. > > > > > > [1] > > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/ > > > > > > > > > Best, > > > Zhanghao Chen > > > ________________________________ > > > From: David Morávek <[email protected]<mailto:[email protected]>> > > > Sent: Wednesday, January 10, 2024 6:26 > > > To: [email protected]<mailto:[email protected]> < > > [email protected]<mailto:[email protected]>>; Piotr Nowojski < > > [email protected]<mailto:[email protected]>> > > > Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID > > generation for improved state compatibility on parallelism change > > > > > > Hi Zhanghao, > > > > > > Thanks for the FLIP. What you're proposing makes a lot of sense +1 > > > > > > Have you thought about how this works with unaligned checkpoints in case > > > you go from unchained to chained? I think it should be fine because this > > > scenario should only apply to forward/rebalance scenarios where we, as > > far > > > as I recall, force alignment anyway, so there should be no exchanges to > > > snapshot. It might just work, but something to double-check. Maybe @Piotr > > > Nowojski <[email protected]<mailto:[email protected]>> > > could confirm it. > > > > > > Best, > > > D. > > > > > > On Wed, Jan 3, 2024 at 7:10 AM Zhanghao Chen <[email protected] > > <mailto:[email protected]>> > > > wrote: > > > > > > Dear Flink devs, > > > > > > I'd like to start a discussion on FLIP 411: Chaining-agnostic Operator ID > > > generation for improved state compatibility on parallelism change [1]. > > > > > > Currently, when user does not explicitly set operator UIDs, the chaining > > > behavior will still affect state compatibility, as the generation of the > > > Operator ID is dependent on its chained output nodes. For example, a > > simple > > > source->sink DAG with source and sink chained together is state > > > incompatible with an otherwise identical DAG with source and sink > > unchained > > > (either because the parallelisms of the two ops are changed to be unequal > > > or chaining is disabled). This greatly limits the flexibility to perform > > > chain-breaking/building for performance tuning. > > > > > > The dependency on chained output nodes for Operator ID generation can be > > > traced back to Flink 1.2. It is unclear at this point on why chained > > output > > > nodes are involved in the algorithm, but the following history background > > > might be related: prior to Flink 1.3, Flink runtime takes the snapshots > > by > > > the operator ID of the first vertex in a chain, so it somewhat makes > > sense > > > to include chained output nodes into the algorithm as > > > chain-breaking/building is expected to break state-compatibility anyway. > > > > > > Given that operator-level state recovery within a chain has long been > > > supported since Flink 1.3, I propose to introduce StreamGraphHasherV3 > > that > > > is agnostic of the chaining behavior of operators, so that users are free > > > to tune the parallelism of individual operators without worrying about > > > state incompatibility. We can make the V3 hasher an optional choice in > > > Flink 1.19, and make it the default hasher in 2.0 for backwards > > > compatibility. > > > > > > Looking forward to your suggestions on it, thanks~ > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change > > > > > > Best, > > > Zhanghao Chen > > > > > > >
