Hi Sebastian, Thank you for updating the FLIP page. It looks good and I think you can start a VOTE.
Thomas On Tue, Jul 26, 2022 at 10:57 AM Sebastian Mattheis <sebast...@ververica.com> wrote: > > Hi everybody, > > I have updated FLIP-217 [1] and have implemented the respective changes in > [2]. Please review. If there are no concerns, I would initiate the voting > on Thursday. > > Best regards, > Sebastian > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits > [2] https://github.com/smattheis/flink/tree/flip-217-split-wm-alignment > > On Mon, Jul 25, 2022 at 9:19 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > > > Thanks for the update Sebastian :) > > > > Best, > > Piotrek > > > > pon., 25 lip 2022 o 08:12 Sebastian Mattheis <sebast...@ververica.com> > > napisał(a): > > > >> Hi everybody, > >> > >> I discussed last week the semantics and an implementation stragegy of the > >> configuration parameter with Piotr and did the implementation and some > >> tests this weekend. > >> > >> A short summary of what I discussed and recapped with Piotr: > >> > >> - The configuration parameter allows (and tolerates) the use of > >> `SourceReader`s that do not implement `pauseOrResumeSplits` method. (The > >> exception is ignored in `SourceOperator`.) > >> - The configuration parameter allows (and tolerates) the use of > >> `SourceSplitReader`s that do not implement `pauseOrResumeSplits` method. > >> (The exception is ignored in the `PauseResumeSplitsTask` of the > >> `SplitFetcher`.) > >> > >> In particular, this means that a `SourceReader` with two `SplitReader`s > >> where one does not implement `pauseOrResumeSplits` and the other does. It > >> will allow the use of the one that doesn't and will, nevertheless, still > >> attempt to pause/resume the other. (Consequently, if the one that doesn't > >> support pause is ahead it simply cannot not pause the `SplitReader` but if > >> the other is ahead it will be paused until watermarks are aligned.) > >> > >> There is one flaw that I don't really like but which I accept as from the > >> discussion and which I will add/update in the FLIP: > >> If there is any other mechanism (e.g. other than watermark alignment) > >> that attempts to pause or resume `SplitReader`s, it will have side effects > >> and potential unexpected behavior if one or more `SplitReader`s do not > >> implement `pauseOrResumeSplits` and the user set the configuration > >> parameter to allow/tolerate it for split-level watermark alignment. (The > >> reason is simply that we cannot differentiate which mechanism attempts to > >> pause/resume, i.e., if it used for watermark alignment or something else.) > >> Given that this configuration parameter is supposed to be an intermediate > >> fallback, it is acceptable for me but changed at latest when some other > >> mechanism uses pauseOrResumeSplits. > >> > >> As for the parameter naming, I have implemented it the following way > >> (reason: There exists a parameter `pipeline.auto-watermark-interval`.): > >> > >> pipeline.watermark-alignment.allow-unaligned-source-splits (default: > >> false) > >> > >> Status: I have implemented the configuration parameter (and an IT case). > >> I still need to update the FLIP and will ping you (tomorrow or so) when I'm > >> done with that. Please check/review my description from above if you see > >> any problems with that. > >> > >> Thanks a lot and regards, > >> Sebastian > >> > >> > >> On Wed, Jul 20, 2022 at 11:24 PM Thomas Weise <t...@apache.org> wrote: > >> > >>> Hi Sebastian, > >>> > >>> Thank you for updating the FLIP and sorry for my delayed response. As > >>> Piotr pointed out, we would need to incorporate the fallback flag into > >>> the design to reflect the outcome of the previous discussion. > >>> > >>> Based on the current FLIP and as detailed by Becket, the > >>> SourceOperator coordinates the alignment. It is responsible for the > >>> pause/resume decision and knows how many splits are assigned. > >>> Therefore shouldn't it have all the information needed to efficiently > >>> handle the case of UnsupportedOperationException thrown by a reader? > >>> > >>> Although the fallback requires some extra implementation effort, I > >>> think that is more than offset by not surprising users and offering a > >>> smoother migration path. Yes, the flag is a temporary feature that > >>> will become obsolete in perhaps 2-3 releases (can we please also > >>> include that into the FLIP?). But since it would be just a > >>> configuration property that can be ignored at that point (for which > >>> there is precedence), no code change will be forced on users. > >>> > >>> As for the property name, perhaps the following would be even more > >>> descriptive? > >>> > >>> coarse.grained.wm.alignment.fallback.enabled > >>> > >>> Thanks! > >>> Thomas > >>> > >>> > >>> On Wed, Jul 13, 2022 at 10:59 AM Becket Qin <becket....@gmail.com> > >>> wrote: > >>> > > >>> > Thanks for the explanation, Sebastian. I understand your concern now. > >>> > > >>> > 1. About the major concern. Personally I'd consider the coarse grained > >>> watermark alignment as a special case for backward compatibility. In the > >>> future, if for whatever reason we want to pause a split and that is not > >>> supported, it seems the only thing that makes sense is throwing an > >>> exception, instead of pausing the entire source reader. Regarding this > >>> FLIP, if the logic that determines which split should be paused is in the > >>> SourceOperator, the SourceOperator actually knows the reason why it pauses > >>> a split. It also knows whether there are more than one split assigned to > >>> the source reader. So it can just fallback to the coarse grained watermark > >>> alignment, without affecting other reasons of pausing a split, right? And > >>> in the future, if there are more purposes for pausing / resuming a split, > >>> the SourceOperator still needs to understand each of the reasons in order > >>> to resume the splits after all the pausing conditions are no longer met. > >>> > > >>> > 2. Naming wise, would "coarse.grained.watermark.alignment.enabled" > >>> address your concern? > >>> > > >>> > The only concern I have for Option A is that people may not be able to > >>> benefit from split level WM alignment until all the sources they need have > >>> that implemented. This seems unnecessarily delaying the adoption of a new > >>> feature, which looks like a more substantive downside compared with the > >>> "coarse.grained.wm.alignment.enabled" option. > >>> > > >>> > BTW, the SourceOperator doesn't need to invoke the > >>> pauseOrResumeSplit() method and catch the UnsupportedOperation every time. > >>> A flag can be set so it doesn't attempt to pause the split after the first > >>> time it sees the exception. > >>> > > >>> > > >>> > Thanks, > >>> > > >>> > Jiangjie (Becket) Qin > >>> > > >>> > > >>> > > >>> > On Wed, Jul 13, 2022 at 5:11 PM Sebastian Mattheis < > >>> sebast...@ververica.com> wrote: > >>> >> > >>> >> Hi Becket, Hi Thomas, Hi Piotrek, > >>> >> > >>> >> Thanks for the feedback. I would like to highlight some concerns: > >>> >> > >>> >> Major: A configuration parameter like "allow coarse grained > >>> alignment" defines a semantic that mixes two contexts conditionally as > >>> follows: "ignore incapability to pause splits in SourceReader/SplitReader" > >>> IF (conditional) we "allow coarse grained watermark alignment". At the > >>> same > >>> time we said that there is no way to check the capability of > >>> SourceReader/SplitReader to pause/resume other than observing a > >>> UnsupportedOperationException during runtime such that we cannot disable > >>> the trigger for watermark split alignment in the SourceOperator. Instead, > >>> we can only ignore the incapability of SourceReader/SplitReader during > >>> execution of a pause/resume attempt which, consequently, requires to check > >>> the "allow coarse grained alignment " parameter value (to implement the > >>> conditional semantic). However, during this execution we actually don't > >>> know whether the attempt was executed for the purpose of watermark > >>> alignment or for some other purpose such that the check actually depends > >>> on > >>> who triggered the pause/resume attempt and hides the exception potentially > >>> unexpectedly for some other use case. Of course, currently there is no > >>> other purpose and, hence, no other trigger than watermark alignment. > >>> However, this breaks, in my perspective, the idea of having > >>> pauseOrResumeSplits (re)usable for other use cases. > >>> >> Minor: I'm not aware of any configuration parameter in the format > >>> like `allow.*` as you suggested with > >>> `allow.coarse.grained.watermark.alignment`. Would that still be okay to > >>> do? > >>> >> > >>> >> As we have agreed to not have a "supportsPausableSplits" method > >>> because of potential inconsistencies between return value of this method > >>> and the actual implementation (and also the difficulty to have a > >>> meaningful > >>> return value where the support actually depends on SourceReader AND the > >>> assigned SplitReaders), I don't want to bring up the discussion about the > >>> "supportsPauseableSplits" method again. Instead, I see the following > >>> options: > >>> >> > >>> >> Option A: I would drop the idea of "allow coarse grained alignment" > >>> semantic of the parameter but implement a parameter to "enable/disable > >>> split watermark alignment" which we can easily use in the SourceOperator > >>> to > >>> disable the trigger of split alignment. This is indeed more static and > >>> less > >>> flexible, because it disables split alignment unconditionally, but it is > >>> "context-decoupled" and more straight-forward to use. This would also > >>> address the use case of disabling split alignment for the purpose of > >>> runtime behavior evaluation, as mentioned by Thomas (if I remember > >>> correctly.) I would implement the parameter with a default where watermark > >>> split alignment is enabled which requires users to check their application > >>> when upgrading to 1.16 if a) there is a source that reads from multiple > >>> splits and b), if yes, all splits of that source support pause/resume. If > >>> a) yes and b) no, the user must take action to disable watermark split > >>> alignment (which disables the trigger of split alignment only for the > >>> purpose). > >>> >> > >>> >> Option B: If we ignore my concern, I would simply check the "allow > >>> coarse grained watermark alignment" parameter value on every attempt to > >>> execute pause/resume in the SourceReader and in the SplitReader and will > >>> not throw UnsupportedOperationException if the parameter value is set to > >>> true. > >>> >> > >>> >> Please note that the parameter is also used only for some kind of > >>> migration phase. Therefore, I wonder if we need to overcomplicate things. > >>> >> > >>> >> @Piotrek, @Becket, @Thomas: I would recommend/favour option A. Please > >>> let me know your feedback and/or concerns as soon as possible, if > >>> possible. > >>> :) > >>> >> > >>> >> Regards, > >>> >> Sebastian > >>> >> > >>> >> > >>> >> On Wed, Jul 13, 2022 at 9:37 AM Becket Qin <becket....@gmail.com> > >>> wrote: > >>> >>> > >>> >>> Hi Sebastian, > >>> >>> > >>> >>> Thanks for updating the FLIP wiki. > >>> >>> > >>> >>> Just to double confirm, I was thinking of a configuration like > >>> "allow.coarse.grained.watermark.alignment". This will allow the coarse > >>> grained watermark alignment as a fallback instead of bubbling up an > >>> exception if split pausing is not supported in some Sources in a Flink > >>> job. > >>> And this will only affect the Sources that do not support split pausing, > >>> but not the Sources that have split pausing supported. > >>> >>> > >>> >>> This seems slightly different from a <knob> enables / disables split > >>> alignment. This sounds like a global thing, and it seems not necessary to > >>> disable the split alignment, as long as the coarse grained alignment can > >>> be > >>> a fallback. > >>> >>> > >>> >>> Thanks, > >>> >>> > >>> >>> Jiangjie (Becket) Qin > >>> >>> > >>> >>> On Wed, Jul 13, 2022 at 2:46 PM Sebastian Mattheis < > >>> sebast...@ververica.com> wrote: > >>> >>>> > >>> >>>> Hi Piotrek, > >>> >>>> > >>> >>>> Sorry I've read it and forgot it when I was ripping out the > >>> supportsPauseOrResume method again. Thanks for pointing that out. I will > >>> add it as follows: The <knob> enables/disables split alignment in the > >>> SourceOperator where the default is that split alignment is enabled. (And > >>> I > >>> will add the note: "In future releases, the <knob> may be ignored such > >>> that > >>> split alignment is always enabled.") > >>> >>>> > >>> >>>> Cheers, > >>> >>>> Sebastian > >>> >>>> > >>> >>>> On Tue, Jul 12, 2022 at 11:14 PM Piotr Nowojski < > >>> pnowoj...@apache.org> wrote: > >>> >>>>> > >>> >>>>> Hi Sebastian, > >>> >>>>> > >>> >>>>> Thanks for picking this up. > >>> >>>>> > >>> >>>>> > 5. There is NO configuration option to enable watermark > >>> alignment of > >>> >>>>> splits or disable pause/resume capabilities. > >>> >>>>> > >>> >>>>> Isn't this contradicting what we actually agreed on? > >>> >>>>> > >>> >>>>> > we are planning to have a configuration based way to revert to > >>> the > >>> >>>>> previous behavior > >>> >>>>> > >>> >>>>> I think what we agreed in the last couple of emails was to add a > >>> >>>>> configuration toggle, that would allow Flink 1.15 users, that are > >>> using > >>> >>>>> watermark alignment with multiple splits per source operator, to > >>> continue > >>> >>>>> using it with the old 1.15 semantic, even if their source doesn't > >>> support > >>> >>>>> pausing/resuming splits. It seems to me like the current FLIP and > >>> >>>>> implementation proposal would always throw an exception in that > >>> case? > >>> >>>>> > >>> >>>>> Best, > >>> >>>>> Piotrek > >>> >>>>> > >>> >>>>> wt., 12 lip 2022 o 10:18 Sebastian Mattheis < > >>> sebast...@ververica.com> > >>> >>>>> napisał(a): > >>> >>>>> > >>> >>>>> > Hi all, > >>> >>>>> > > >>> >>>>> > I have updated FLIP-217 [1] to the proposed specification and > >>> adapted the > >>> >>>>> > current implementation [2] respectively. > >>> >>>>> > > >>> >>>>> > This means both, FLIP and implementation, are ready for review > >>> from my > >>> >>>>> > side. (I would revise commit history and messages for the final > >>> PR but left > >>> >>>>> > it as is for now and the records of this discussion.) > >>> >>>>> > > >>> >>>>> > 1. Please review the updated version of FLIP-217 [1]. If there > >>> are no > >>> >>>>> > further concerns, I would initiate the voting. > >>> >>>>> > (2. If you want to speed up things, please also have a look into > >>> the > >>> >>>>> > updated implementation [2].) > >>> >>>>> > > >>> >>>>> > Please consider the following updated specification in the > >>> current status > >>> >>>>> > of FLIP-217 where the essence is as follows: > >>> >>>>> > > >>> >>>>> > 1. A method pauseOrResumeSplits is added to SourceReader with > >>> default > >>> >>>>> > implementation that throws UnsupportedOperationException. > >>> >>>>> > 2. method pauseOrResumeSplits is added to SplitReader with > >>> default > >>> >>>>> > implementation that throws UnsupportedOperationException. > >>> >>>>> > 3. SourceOperator initiates split alignment only if more than > >>> one split is > >>> >>>>> > assigned to the source (and, of course, only if > >>> withSplitAlignment is used). > >>> >>>>> > 4. There is NO "supportsPauseOrResumeSplits" method at any place > >>> (to > >>> >>>>> > indicate if the implementation supports pause/resume > >>> capabilities). > >>> >>>>> > 5. There is NO configuration option to enable watermark > >>> alignment of > >>> >>>>> > splits or disable pause/resume capabilities. > >>> >>>>> > > >>> >>>>> > *Note:* If the SourceReader or some SplitReader do not override > >>> >>>>> > pauseOrResumeSplits but it is required in the application, an > >>> exception is > >>> >>>>> > thrown at runtime when an split alignment attempt is executed > >>> (not during > >>> >>>>> > startup or any time earlier). > >>> >>>>> > > >>> >>>>> > Also, I have revised the compatibility/migration section to > >>> describe a bit > >>> >>>>> > of a rationale for the default implementation with exception > >>> throwing > >>> >>>>> > behavior. > >>> >>>>> > > >>> >>>>> > Regards, > >>> >>>>> > Sebastian > >>> >>>>> > > >>> >>>>> > [1] > >>> >>>>> > > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits > >>> >>>>> > [2] > >>> https://github.com/smattheis/flink/tree/flip-217-split-wm-alignment > >>> >>>>> > > >>> >>>>> > On Mon, Jul 4, 2022 at 3:59 AM Thomas Weise <t...@apache.org> > >>> wrote: > >>> >>>>> > > >>> >>>>> >> Hi, > >>> >>>>> >> > >>> >>>>> >> Thank you Becket and Piotr for ironing out the "case 2" > >>> behavior. > >>> >>>>> >> Strictly speaking we are introducing a regression by allowing an > >>> >>>>> >> exception to bubble up that did not exist in the previous > >>> release, > >>> >>>>> >> regardless how suboptimal the behavior may be. However, given > >>> that the > >>> >>>>> >> feature is still experimental and we are planning to have a > >>> >>>>> >> configuration based way to revert to the previous behavior, I > >>> think > >>> >>>>> >> this is a good solution. > >>> >>>>> >> > >>> >>>>> >> +1 from my side > >>> >>>>> >> > >>> >>>>> >> Thomas > >>> >>>>> >> > >>> >>>>> >> On Wed, Jun 29, 2022 at 2:43 PM Piotr Nowojski < > >>> pnowoj...@apache.org> > >>> >>>>> >> wrote: > >>> >>>>> >> > > >>> >>>>> >> > +1 :) > >>> >>>>> >> > > >>> >>>>> >> > śr., 29 cze 2022 o 17:23 Becket Qin <becket....@gmail.com> > >>> napisał(a): > >>> >>>>> >> > > >>> >>>>> >> > > Thanks for the explanation, Piotr. > >>> >>>>> >> > > > >>> >>>>> >> > > So it looks like we have a conclusion here. > >>> >>>>> >> > > > >>> >>>>> >> > > 1. Regarding the supportsPausingSplits() method, I feel it > >>> brings more > >>> >>>>> >> > > confusion while the benefit is marginal, so I prefer not > >>> having that > >>> >>>>> >> if > >>> >>>>> >> > > possible. It would be good to also hear @Thomas Weise < > >>> t...@apache.org > >>> >>>>> >> >'s > >>> >>>>> >> > > opinion as he mentioned some concern earlier. > >>> >>>>> >> > > 2. Let's add the feature knob then. In the future we can > >>> simply > >>> >>>>> >> ignore the > >>> >>>>> >> > > configuration when deprecating it. > >>> >>>>> >> > > > >>> >>>>> >> > > Thanks, > >>> >>>>> >> > > > >>> >>>>> >> > > Jiangjie (Becket) Qin > >>> >>>>> >> > > > >>> >>>>> >> > > On Wed, Jun 29, 2022 at 10:19 PM Piotr Nowojski < > >>> pnowoj...@apache.org > >>> >>>>> >> > > >>> >>>>> >> > > wrote: > >>> >>>>> >> > > > >>> >>>>> >> > > > Hi, > >>> >>>>> >> > > > > >>> >>>>> >> > > > I mean I'm fine with throwing an exception by default in > >>> Flink 1.16 > >>> >>>>> >> in > >>> >>>>> >> > > the > >>> >>>>> >> > > > "Case 2", but I think we need to provide a way to > >>> workaround it for > >>> >>>>> >> > > example > >>> >>>>> >> > > > via a feature toggle, if it's an easy thing to do. And it > >>> seems to > >>> >>>>> >> be a > >>> >>>>> >> > > > simple thing. > >>> >>>>> >> > > > > >>> >>>>> >> > > > However this is orthogonal to the > >>> `supportsPausingSplits()` issue. I > >>> >>>>> >> > > don't > >>> >>>>> >> > > > have a big preference whether > >>> >>>>> >> > > > a) the exception should originate on JM, using `default > >>> boolean > >>> >>>>> >> > > > supportsPausingSplits() { return false; }` (as currently > >>> proposed > >>> >>>>> >> in the > >>> >>>>> >> > > > FLIP), > >>> >>>>> >> > > > b) or on the TM from `pauseOrResumeSplits()` throwing > >>> >>>>> >> > > > `UnsupportedOperationException` as you are proposing. > >>> >>>>> >> > > > > >>> >>>>> >> > > > a) fails earlier, so it's more user friendly from this > >>> perspective, > >>> >>>>> >> but > >>> >>>>> >> > > it > >>> >>>>> >> > > > provides more possibilities for bugs/inconsistencies for > >>> connector > >>> >>>>> >> > > > developers, since `supportsPausingSplits()` would have to > >>> be kept > >>> >>>>> >> in sync > >>> >>>>> >> > > > with `pauseOrResumeSplits()`. > >>> >>>>> >> > > > > >>> >>>>> >> > > > Best, > >>> >>>>> >> > > > Piotrek > >>> >>>>> >> > > > > >>> >>>>> >> > > > śr., 29 cze 2022 o 15:27 Becket Qin <becket....@gmail.com > >>> > > >>> >>>>> >> napisał(a): > >>> >>>>> >> > > > > >>> >>>>> >> > > > > Hi Piotr, > >>> >>>>> >> > > > > > >>> >>>>> >> > > > > Just to make sure we are on the same page. There are > >>> two cases > >>> >>>>> >> for the > >>> >>>>> >> > > > > existing FLIP-182 users: > >>> >>>>> >> > > > > > >>> >>>>> >> > > > > Case 1: Each source reader only has one split assigned. > >>> This is > >>> >>>>> >> the > >>> >>>>> >> > > > > targeted case for FLIP-182. > >>> >>>>> >> > > > > Case 2: Each source reader has multiple splits > >>> assigned. This is > >>> >>>>> >> the > >>> >>>>> >> > > > flaky > >>> >>>>> >> > > > > case that may or may not work. > >>> >>>>> >> > > > > > >>> >>>>> >> > > > > With solution 1, the users of case 1 won't be impacted. > >>> The users > >>> >>>>> >> in > >>> >>>>> >> > > > case 2 > >>> >>>>> >> > > > > will receive an exception which they won't get at the > >>> moment. > >>> >>>>> >> > > > > > >>> >>>>> >> > > > > Do you mean we should not throw an exception in case 2? > >>> >>>>> >> Personally I > >>> >>>>> >> > > feel > >>> >>>>> >> > > > > that is OK and could have been done in FLIP-182 itself > >>> because > >>> >>>>> >> it's > >>> >>>>> >> > > not a > >>> >>>>> >> > > > > designed use case. As a user I may see a big variation > >>> of the job > >>> >>>>> >> state > >>> >>>>> >> > > > > sizes from time to time and I am not able to rely on > >>> this feature > >>> >>>>> >> to > >>> >>>>> >> > > plan > >>> >>>>> >> > > > > my resources and uphold the SLA. > >>> >>>>> >> > > > > > >>> >>>>> >> > > > > That said, if you have a strong opinion on this, I am > >>> fine with > >>> >>>>> >> having > >>> >>>>> >> > > > the > >>> >>>>> >> > > > > configuration like > >>> "allow.coarse-grained.watermark.alignment" > >>> >>>>> >> with the > >>> >>>>> >> > > > > default value set to false, given that a configuration > >>> is much > >>> >>>>> >> easier > >>> >>>>> >> > > to > >>> >>>>> >> > > > > deprecate than a method. > >>> >>>>> >> > > > > > >>> >>>>> >> > > > > Thanks, > >>> >>>>> >> > > > > > >>> >>>>> >> > > > > Jiangjie (Becket) Qin > >>> >>>>> >> > > > > > >>> >>>>> >> > > > > > >>> >>>>> >> > >>> >>>>> > > >>> > >>