Hi everybody, I discussed last week the semantics and an implementation stragegy of the configuration parameter with Piotr and did the implementation and some tests this weekend.
A short summary of what I discussed and recapped with Piotr: - The configuration parameter allows (and tolerates) the use of `SourceReader`s that do not implement `pauseOrResumeSplits` method. (The exception is ignored in `SourceOperator`.) - The configuration parameter allows (and tolerates) the use of `SourceSplitReader`s that do not implement `pauseOrResumeSplits` method. (The exception is ignored in the `PauseResumeSplitsTask` of the `SplitFetcher`.) In particular, this means that a `SourceReader` with two `SplitReader`s where one does not implement `pauseOrResumeSplits` and the other does. It will allow the use of the one that doesn't and will, nevertheless, still attempt to pause/resume the other. (Consequently, if the one that doesn't support pause is ahead it simply cannot not pause the `SplitReader` but if the other is ahead it will be paused until watermarks are aligned.) There is one flaw that I don't really like but which I accept as from the discussion and which I will add/update in the FLIP: If there is any other mechanism (e.g. other than watermark alignment) that attempts to pause or resume `SplitReader`s, it will have side effects and potential unexpected behavior if one or more `SplitReader`s do not implement `pauseOrResumeSplits` and the user set the configuration parameter to allow/tolerate it for split-level watermark alignment. (The reason is simply that we cannot differentiate which mechanism attempts to pause/resume, i.e., if it used for watermark alignment or something else.) Given that this configuration parameter is supposed to be an intermediate fallback, it is acceptable for me but changed at latest when some other mechanism uses pauseOrResumeSplits. As for the parameter naming, I have implemented it the following way (reason: There exists a parameter `pipeline.auto-watermark-interval`.): pipeline.watermark-alignment.allow-unaligned-source-splits (default: false) Status: I have implemented the configuration parameter (and an IT case). I still need to update the FLIP and will ping you (tomorrow or so) when I'm done with that. Please check/review my description from above if you see any problems with that. Thanks a lot and regards, Sebastian On Wed, Jul 20, 2022 at 11:24 PM Thomas Weise <t...@apache.org> wrote: > Hi Sebastian, > > Thank you for updating the FLIP and sorry for my delayed response. As > Piotr pointed out, we would need to incorporate the fallback flag into > the design to reflect the outcome of the previous discussion. > > Based on the current FLIP and as detailed by Becket, the > SourceOperator coordinates the alignment. It is responsible for the > pause/resume decision and knows how many splits are assigned. > Therefore shouldn't it have all the information needed to efficiently > handle the case of UnsupportedOperationException thrown by a reader? > > Although the fallback requires some extra implementation effort, I > think that is more than offset by not surprising users and offering a > smoother migration path. Yes, the flag is a temporary feature that > will become obsolete in perhaps 2-3 releases (can we please also > include that into the FLIP?). But since it would be just a > configuration property that can be ignored at that point (for which > there is precedence), no code change will be forced on users. > > As for the property name, perhaps the following would be even more > descriptive? > > coarse.grained.wm.alignment.fallback.enabled > > Thanks! > Thomas > > > On Wed, Jul 13, 2022 at 10:59 AM Becket Qin <becket....@gmail.com> wrote: > > > > Thanks for the explanation, Sebastian. I understand your concern now. > > > > 1. About the major concern. Personally I'd consider the coarse grained > watermark alignment as a special case for backward compatibility. In the > future, if for whatever reason we want to pause a split and that is not > supported, it seems the only thing that makes sense is throwing an > exception, instead of pausing the entire source reader. Regarding this > FLIP, if the logic that determines which split should be paused is in the > SourceOperator, the SourceOperator actually knows the reason why it pauses > a split. It also knows whether there are more than one split assigned to > the source reader. So it can just fallback to the coarse grained watermark > alignment, without affecting other reasons of pausing a split, right? And > in the future, if there are more purposes for pausing / resuming a split, > the SourceOperator still needs to understand each of the reasons in order > to resume the splits after all the pausing conditions are no longer met. > > > > 2. Naming wise, would "coarse.grained.watermark.alignment.enabled" > address your concern? > > > > The only concern I have for Option A is that people may not be able to > benefit from split level WM alignment until all the sources they need have > that implemented. This seems unnecessarily delaying the adoption of a new > feature, which looks like a more substantive downside compared with the > "coarse.grained.wm.alignment.enabled" option. > > > > BTW, the SourceOperator doesn't need to invoke the pauseOrResumeSplit() > method and catch the UnsupportedOperation every time. A flag can be set so > it doesn't attempt to pause the split after the first time it sees the > exception. > > > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Wed, Jul 13, 2022 at 5:11 PM Sebastian Mattheis < > sebast...@ververica.com> wrote: > >> > >> Hi Becket, Hi Thomas, Hi Piotrek, > >> > >> Thanks for the feedback. I would like to highlight some concerns: > >> > >> Major: A configuration parameter like "allow coarse grained alignment" > defines a semantic that mixes two contexts conditionally as follows: > "ignore incapability to pause splits in SourceReader/SplitReader" IF > (conditional) we "allow coarse grained watermark alignment". At the same > time we said that there is no way to check the capability of > SourceReader/SplitReader to pause/resume other than observing a > UnsupportedOperationException during runtime such that we cannot disable > the trigger for watermark split alignment in the SourceOperator. Instead, > we can only ignore the incapability of SourceReader/SplitReader during > execution of a pause/resume attempt which, consequently, requires to check > the "allow coarse grained alignment " parameter value (to implement the > conditional semantic). However, during this execution we actually don't > know whether the attempt was executed for the purpose of watermark > alignment or for some other purpose such that the check actually depends on > who triggered the pause/resume attempt and hides the exception potentially > unexpectedly for some other use case. Of course, currently there is no > other purpose and, hence, no other trigger than watermark alignment. > However, this breaks, in my perspective, the idea of having > pauseOrResumeSplits (re)usable for other use cases. > >> Minor: I'm not aware of any configuration parameter in the format like > `allow.*` as you suggested with `allow.coarse.grained.watermark.alignment`. > Would that still be okay to do? > >> > >> As we have agreed to not have a "supportsPausableSplits" method because > of potential inconsistencies between return value of this method and the > actual implementation (and also the difficulty to have a meaningful return > value where the support actually depends on SourceReader AND the assigned > SplitReaders), I don't want to bring up the discussion about the > "supportsPauseableSplits" method again. Instead, I see the following > options: > >> > >> Option A: I would drop the idea of "allow coarse grained alignment" > semantic of the parameter but implement a parameter to "enable/disable > split watermark alignment" which we can easily use in the SourceOperator to > disable the trigger of split alignment. This is indeed more static and less > flexible, because it disables split alignment unconditionally, but it is > "context-decoupled" and more straight-forward to use. This would also > address the use case of disabling split alignment for the purpose of > runtime behavior evaluation, as mentioned by Thomas (if I remember > correctly.) I would implement the parameter with a default where watermark > split alignment is enabled which requires users to check their application > when upgrading to 1.16 if a) there is a source that reads from multiple > splits and b), if yes, all splits of that source support pause/resume. If > a) yes and b) no, the user must take action to disable watermark split > alignment (which disables the trigger of split alignment only for the > purpose). > >> > >> Option B: If we ignore my concern, I would simply check the "allow > coarse grained watermark alignment" parameter value on every attempt to > execute pause/resume in the SourceReader and in the SplitReader and will > not throw UnsupportedOperationException if the parameter value is set to > true. > >> > >> Please note that the parameter is also used only for some kind of > migration phase. Therefore, I wonder if we need to overcomplicate things. > >> > >> @Piotrek, @Becket, @Thomas: I would recommend/favour option A. Please > let me know your feedback and/or concerns as soon as possible, if possible. > :) > >> > >> Regards, > >> Sebastian > >> > >> > >> On Wed, Jul 13, 2022 at 9:37 AM Becket Qin <becket....@gmail.com> > wrote: > >>> > >>> Hi Sebastian, > >>> > >>> Thanks for updating the FLIP wiki. > >>> > >>> Just to double confirm, I was thinking of a configuration like > "allow.coarse.grained.watermark.alignment". This will allow the coarse > grained watermark alignment as a fallback instead of bubbling up an > exception if split pausing is not supported in some Sources in a Flink job. > And this will only affect the Sources that do not support split pausing, > but not the Sources that have split pausing supported. > >>> > >>> This seems slightly different from a <knob> enables / disables split > alignment. This sounds like a global thing, and it seems not necessary to > disable the split alignment, as long as the coarse grained alignment can be > a fallback. > >>> > >>> Thanks, > >>> > >>> Jiangjie (Becket) Qin > >>> > >>> On Wed, Jul 13, 2022 at 2:46 PM Sebastian Mattheis < > sebast...@ververica.com> wrote: > >>>> > >>>> Hi Piotrek, > >>>> > >>>> Sorry I've read it and forgot it when I was ripping out the > supportsPauseOrResume method again. Thanks for pointing that out. I will > add it as follows: The <knob> enables/disables split alignment in the > SourceOperator where the default is that split alignment is enabled. (And I > will add the note: "In future releases, the <knob> may be ignored such that > split alignment is always enabled.") > >>>> > >>>> Cheers, > >>>> Sebastian > >>>> > >>>> On Tue, Jul 12, 2022 at 11:14 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >>>>> > >>>>> Hi Sebastian, > >>>>> > >>>>> Thanks for picking this up. > >>>>> > >>>>> > 5. There is NO configuration option to enable watermark alignment > of > >>>>> splits or disable pause/resume capabilities. > >>>>> > >>>>> Isn't this contradicting what we actually agreed on? > >>>>> > >>>>> > we are planning to have a configuration based way to revert to the > >>>>> previous behavior > >>>>> > >>>>> I think what we agreed in the last couple of emails was to add a > >>>>> configuration toggle, that would allow Flink 1.15 users, that are > using > >>>>> watermark alignment with multiple splits per source operator, to > continue > >>>>> using it with the old 1.15 semantic, even if their source doesn't > support > >>>>> pausing/resuming splits. It seems to me like the current FLIP and > >>>>> implementation proposal would always throw an exception in that case? > >>>>> > >>>>> Best, > >>>>> Piotrek > >>>>> > >>>>> wt., 12 lip 2022 o 10:18 Sebastian Mattheis <sebast...@ververica.com > > > >>>>> napisał(a): > >>>>> > >>>>> > Hi all, > >>>>> > > >>>>> > I have updated FLIP-217 [1] to the proposed specification and > adapted the > >>>>> > current implementation [2] respectively. > >>>>> > > >>>>> > This means both, FLIP and implementation, are ready for review > from my > >>>>> > side. (I would revise commit history and messages for the final PR > but left > >>>>> > it as is for now and the records of this discussion.) > >>>>> > > >>>>> > 1. Please review the updated version of FLIP-217 [1]. If there are > no > >>>>> > further concerns, I would initiate the voting. > >>>>> > (2. If you want to speed up things, please also have a look into > the > >>>>> > updated implementation [2].) > >>>>> > > >>>>> > Please consider the following updated specification in the current > status > >>>>> > of FLIP-217 where the essence is as follows: > >>>>> > > >>>>> > 1. A method pauseOrResumeSplits is added to SourceReader with > default > >>>>> > implementation that throws UnsupportedOperationException. > >>>>> > 2. method pauseOrResumeSplits is added to SplitReader with default > >>>>> > implementation that throws UnsupportedOperationException. > >>>>> > 3. SourceOperator initiates split alignment only if more than one > split is > >>>>> > assigned to the source (and, of course, only if withSplitAlignment > is used). > >>>>> > 4. There is NO "supportsPauseOrResumeSplits" method at any place > (to > >>>>> > indicate if the implementation supports pause/resume capabilities). > >>>>> > 5. There is NO configuration option to enable watermark alignment > of > >>>>> > splits or disable pause/resume capabilities. > >>>>> > > >>>>> > *Note:* If the SourceReader or some SplitReader do not override > >>>>> > pauseOrResumeSplits but it is required in the application, an > exception is > >>>>> > thrown at runtime when an split alignment attempt is executed (not > during > >>>>> > startup or any time earlier). > >>>>> > > >>>>> > Also, I have revised the compatibility/migration section to > describe a bit > >>>>> > of a rationale for the default implementation with exception > throwing > >>>>> > behavior. > >>>>> > > >>>>> > Regards, > >>>>> > Sebastian > >>>>> > > >>>>> > [1] > >>>>> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits > >>>>> > [2] > https://github.com/smattheis/flink/tree/flip-217-split-wm-alignment > >>>>> > > >>>>> > On Mon, Jul 4, 2022 at 3:59 AM Thomas Weise <t...@apache.org> > wrote: > >>>>> > > >>>>> >> Hi, > >>>>> >> > >>>>> >> Thank you Becket and Piotr for ironing out the "case 2" behavior. > >>>>> >> Strictly speaking we are introducing a regression by allowing an > >>>>> >> exception to bubble up that did not exist in the previous release, > >>>>> >> regardless how suboptimal the behavior may be. However, given > that the > >>>>> >> feature is still experimental and we are planning to have a > >>>>> >> configuration based way to revert to the previous behavior, I > think > >>>>> >> this is a good solution. > >>>>> >> > >>>>> >> +1 from my side > >>>>> >> > >>>>> >> Thomas > >>>>> >> > >>>>> >> On Wed, Jun 29, 2022 at 2:43 PM Piotr Nowojski < > pnowoj...@apache.org> > >>>>> >> wrote: > >>>>> >> > > >>>>> >> > +1 :) > >>>>> >> > > >>>>> >> > śr., 29 cze 2022 o 17:23 Becket Qin <becket....@gmail.com> > napisał(a): > >>>>> >> > > >>>>> >> > > Thanks for the explanation, Piotr. > >>>>> >> > > > >>>>> >> > > So it looks like we have a conclusion here. > >>>>> >> > > > >>>>> >> > > 1. Regarding the supportsPausingSplits() method, I feel it > brings more > >>>>> >> > > confusion while the benefit is marginal, so I prefer not > having that > >>>>> >> if > >>>>> >> > > possible. It would be good to also hear @Thomas Weise < > t...@apache.org > >>>>> >> >'s > >>>>> >> > > opinion as he mentioned some concern earlier. > >>>>> >> > > 2. Let's add the feature knob then. In the future we can > simply > >>>>> >> ignore the > >>>>> >> > > configuration when deprecating it. > >>>>> >> > > > >>>>> >> > > Thanks, > >>>>> >> > > > >>>>> >> > > Jiangjie (Becket) Qin > >>>>> >> > > > >>>>> >> > > On Wed, Jun 29, 2022 at 10:19 PM Piotr Nowojski < > pnowoj...@apache.org > >>>>> >> > > >>>>> >> > > wrote: > >>>>> >> > > > >>>>> >> > > > Hi, > >>>>> >> > > > > >>>>> >> > > > I mean I'm fine with throwing an exception by default in > Flink 1.16 > >>>>> >> in > >>>>> >> > > the > >>>>> >> > > > "Case 2", but I think we need to provide a way to > workaround it for > >>>>> >> > > example > >>>>> >> > > > via a feature toggle, if it's an easy thing to do. And it > seems to > >>>>> >> be a > >>>>> >> > > > simple thing. > >>>>> >> > > > > >>>>> >> > > > However this is orthogonal to the `supportsPausingSplits()` > issue. I > >>>>> >> > > don't > >>>>> >> > > > have a big preference whether > >>>>> >> > > > a) the exception should originate on JM, using `default > boolean > >>>>> >> > > > supportsPausingSplits() { return false; }` (as currently > proposed > >>>>> >> in the > >>>>> >> > > > FLIP), > >>>>> >> > > > b) or on the TM from `pauseOrResumeSplits()` throwing > >>>>> >> > > > `UnsupportedOperationException` as you are proposing. > >>>>> >> > > > > >>>>> >> > > > a) fails earlier, so it's more user friendly from this > perspective, > >>>>> >> but > >>>>> >> > > it > >>>>> >> > > > provides more possibilities for bugs/inconsistencies for > connector > >>>>> >> > > > developers, since `supportsPausingSplits()` would have to > be kept > >>>>> >> in sync > >>>>> >> > > > with `pauseOrResumeSplits()`. > >>>>> >> > > > > >>>>> >> > > > Best, > >>>>> >> > > > Piotrek > >>>>> >> > > > > >>>>> >> > > > śr., 29 cze 2022 o 15:27 Becket Qin <becket....@gmail.com> > >>>>> >> napisał(a): > >>>>> >> > > > > >>>>> >> > > > > Hi Piotr, > >>>>> >> > > > > > >>>>> >> > > > > Just to make sure we are on the same page. There are two > cases > >>>>> >> for the > >>>>> >> > > > > existing FLIP-182 users: > >>>>> >> > > > > > >>>>> >> > > > > Case 1: Each source reader only has one split assigned. > This is > >>>>> >> the > >>>>> >> > > > > targeted case for FLIP-182. > >>>>> >> > > > > Case 2: Each source reader has multiple splits assigned. > This is > >>>>> >> the > >>>>> >> > > > flaky > >>>>> >> > > > > case that may or may not work. > >>>>> >> > > > > > >>>>> >> > > > > With solution 1, the users of case 1 won't be impacted. > The users > >>>>> >> in > >>>>> >> > > > case 2 > >>>>> >> > > > > will receive an exception which they won't get at the > moment. > >>>>> >> > > > > > >>>>> >> > > > > Do you mean we should not throw an exception in case 2? > >>>>> >> Personally I > >>>>> >> > > feel > >>>>> >> > > > > that is OK and could have been done in FLIP-182 itself > because > >>>>> >> it's > >>>>> >> > > not a > >>>>> >> > > > > designed use case. As a user I may see a big variation of > the job > >>>>> >> state > >>>>> >> > > > > sizes from time to time and I am not able to rely on this > feature > >>>>> >> to > >>>>> >> > > plan > >>>>> >> > > > > my resources and uphold the SLA. > >>>>> >> > > > > > >>>>> >> > > > > That said, if you have a strong opinion on this, I am > fine with > >>>>> >> having > >>>>> >> > > > the > >>>>> >> > > > > configuration like > "allow.coarse-grained.watermark.alignment" > >>>>> >> with the > >>>>> >> > > > > default value set to false, given that a configuration is > much > >>>>> >> easier > >>>>> >> > > to > >>>>> >> > > > > deprecate than a method. > >>>>> >> > > > > > >>>>> >> > > > > Thanks, > >>>>> >> > > > > > >>>>> >> > > > > Jiangjie (Becket) Qin > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> >> > >>>>> > >