Thanks for the update Sebastian :) Best, Piotrek
pon., 25 lip 2022 o 08:12 Sebastian Mattheis <sebast...@ververica.com> napisał(a): > Hi everybody, > > I discussed last week the semantics and an implementation stragegy of the > configuration parameter with Piotr and did the implementation and some > tests this weekend. > > A short summary of what I discussed and recapped with Piotr: > > - The configuration parameter allows (and tolerates) the use of > `SourceReader`s that do not implement `pauseOrResumeSplits` method. (The > exception is ignored in `SourceOperator`.) > - The configuration parameter allows (and tolerates) the use of > `SourceSplitReader`s that do not implement `pauseOrResumeSplits` method. > (The exception is ignored in the `PauseResumeSplitsTask` of the > `SplitFetcher`.) > > In particular, this means that a `SourceReader` with two `SplitReader`s > where one does not implement `pauseOrResumeSplits` and the other does. It > will allow the use of the one that doesn't and will, nevertheless, still > attempt to pause/resume the other. (Consequently, if the one that doesn't > support pause is ahead it simply cannot not pause the `SplitReader` but if > the other is ahead it will be paused until watermarks are aligned.) > > There is one flaw that I don't really like but which I accept as from the > discussion and which I will add/update in the FLIP: > If there is any other mechanism (e.g. other than watermark alignment) that > attempts to pause or resume `SplitReader`s, it will have side effects and > potential unexpected behavior if one or more `SplitReader`s do not > implement `pauseOrResumeSplits` and the user set the configuration > parameter to allow/tolerate it for split-level watermark alignment. (The > reason is simply that we cannot differentiate which mechanism attempts to > pause/resume, i.e., if it used for watermark alignment or something else.) > Given that this configuration parameter is supposed to be an intermediate > fallback, it is acceptable for me but changed at latest when some other > mechanism uses pauseOrResumeSplits. > > As for the parameter naming, I have implemented it the following way > (reason: There exists a parameter `pipeline.auto-watermark-interval`.): > > pipeline.watermark-alignment.allow-unaligned-source-splits (default: false) > > Status: I have implemented the configuration parameter (and an IT case). I > still need to update the FLIP and will ping you (tomorrow or so) when I'm > done with that. Please check/review my description from above if you see > any problems with that. > > Thanks a lot and regards, > Sebastian > > > On Wed, Jul 20, 2022 at 11:24 PM Thomas Weise <t...@apache.org> wrote: > >> Hi Sebastian, >> >> Thank you for updating the FLIP and sorry for my delayed response. As >> Piotr pointed out, we would need to incorporate the fallback flag into >> the design to reflect the outcome of the previous discussion. >> >> Based on the current FLIP and as detailed by Becket, the >> SourceOperator coordinates the alignment. It is responsible for the >> pause/resume decision and knows how many splits are assigned. >> Therefore shouldn't it have all the information needed to efficiently >> handle the case of UnsupportedOperationException thrown by a reader? >> >> Although the fallback requires some extra implementation effort, I >> think that is more than offset by not surprising users and offering a >> smoother migration path. Yes, the flag is a temporary feature that >> will become obsolete in perhaps 2-3 releases (can we please also >> include that into the FLIP?). But since it would be just a >> configuration property that can be ignored at that point (for which >> there is precedence), no code change will be forced on users. >> >> As for the property name, perhaps the following would be even more >> descriptive? >> >> coarse.grained.wm.alignment.fallback.enabled >> >> Thanks! >> Thomas >> >> >> On Wed, Jul 13, 2022 at 10:59 AM Becket Qin <becket....@gmail.com> wrote: >> > >> > Thanks for the explanation, Sebastian. I understand your concern now. >> > >> > 1. About the major concern. Personally I'd consider the coarse grained >> watermark alignment as a special case for backward compatibility. In the >> future, if for whatever reason we want to pause a split and that is not >> supported, it seems the only thing that makes sense is throwing an >> exception, instead of pausing the entire source reader. Regarding this >> FLIP, if the logic that determines which split should be paused is in the >> SourceOperator, the SourceOperator actually knows the reason why it pauses >> a split. It also knows whether there are more than one split assigned to >> the source reader. So it can just fallback to the coarse grained watermark >> alignment, without affecting other reasons of pausing a split, right? And >> in the future, if there are more purposes for pausing / resuming a split, >> the SourceOperator still needs to understand each of the reasons in order >> to resume the splits after all the pausing conditions are no longer met. >> > >> > 2. Naming wise, would "coarse.grained.watermark.alignment.enabled" >> address your concern? >> > >> > The only concern I have for Option A is that people may not be able to >> benefit from split level WM alignment until all the sources they need have >> that implemented. This seems unnecessarily delaying the adoption of a new >> feature, which looks like a more substantive downside compared with the >> "coarse.grained.wm.alignment.enabled" option. >> > >> > BTW, the SourceOperator doesn't need to invoke the pauseOrResumeSplit() >> method and catch the UnsupportedOperation every time. A flag can be set so >> it doesn't attempt to pause the split after the first time it sees the >> exception. >> > >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > >> > >> > On Wed, Jul 13, 2022 at 5:11 PM Sebastian Mattheis < >> sebast...@ververica.com> wrote: >> >> >> >> Hi Becket, Hi Thomas, Hi Piotrek, >> >> >> >> Thanks for the feedback. I would like to highlight some concerns: >> >> >> >> Major: A configuration parameter like "allow coarse grained alignment" >> defines a semantic that mixes two contexts conditionally as follows: >> "ignore incapability to pause splits in SourceReader/SplitReader" IF >> (conditional) we "allow coarse grained watermark alignment". At the same >> time we said that there is no way to check the capability of >> SourceReader/SplitReader to pause/resume other than observing a >> UnsupportedOperationException during runtime such that we cannot disable >> the trigger for watermark split alignment in the SourceOperator. Instead, >> we can only ignore the incapability of SourceReader/SplitReader during >> execution of a pause/resume attempt which, consequently, requires to check >> the "allow coarse grained alignment " parameter value (to implement the >> conditional semantic). However, during this execution we actually don't >> know whether the attempt was executed for the purpose of watermark >> alignment or for some other purpose such that the check actually depends on >> who triggered the pause/resume attempt and hides the exception potentially >> unexpectedly for some other use case. Of course, currently there is no >> other purpose and, hence, no other trigger than watermark alignment. >> However, this breaks, in my perspective, the idea of having >> pauseOrResumeSplits (re)usable for other use cases. >> >> Minor: I'm not aware of any configuration parameter in the format like >> `allow.*` as you suggested with `allow.coarse.grained.watermark.alignment`. >> Would that still be okay to do? >> >> >> >> As we have agreed to not have a "supportsPausableSplits" method >> because of potential inconsistencies between return value of this method >> and the actual implementation (and also the difficulty to have a meaningful >> return value where the support actually depends on SourceReader AND the >> assigned SplitReaders), I don't want to bring up the discussion about the >> "supportsPauseableSplits" method again. Instead, I see the following >> options: >> >> >> >> Option A: I would drop the idea of "allow coarse grained alignment" >> semantic of the parameter but implement a parameter to "enable/disable >> split watermark alignment" which we can easily use in the SourceOperator to >> disable the trigger of split alignment. This is indeed more static and less >> flexible, because it disables split alignment unconditionally, but it is >> "context-decoupled" and more straight-forward to use. This would also >> address the use case of disabling split alignment for the purpose of >> runtime behavior evaluation, as mentioned by Thomas (if I remember >> correctly.) I would implement the parameter with a default where watermark >> split alignment is enabled which requires users to check their application >> when upgrading to 1.16 if a) there is a source that reads from multiple >> splits and b), if yes, all splits of that source support pause/resume. If >> a) yes and b) no, the user must take action to disable watermark split >> alignment (which disables the trigger of split alignment only for the >> purpose). >> >> >> >> Option B: If we ignore my concern, I would simply check the "allow >> coarse grained watermark alignment" parameter value on every attempt to >> execute pause/resume in the SourceReader and in the SplitReader and will >> not throw UnsupportedOperationException if the parameter value is set to >> true. >> >> >> >> Please note that the parameter is also used only for some kind of >> migration phase. Therefore, I wonder if we need to overcomplicate things. >> >> >> >> @Piotrek, @Becket, @Thomas: I would recommend/favour option A. Please >> let me know your feedback and/or concerns as soon as possible, if possible. >> :) >> >> >> >> Regards, >> >> Sebastian >> >> >> >> >> >> On Wed, Jul 13, 2022 at 9:37 AM Becket Qin <becket....@gmail.com> >> wrote: >> >>> >> >>> Hi Sebastian, >> >>> >> >>> Thanks for updating the FLIP wiki. >> >>> >> >>> Just to double confirm, I was thinking of a configuration like >> "allow.coarse.grained.watermark.alignment". This will allow the coarse >> grained watermark alignment as a fallback instead of bubbling up an >> exception if split pausing is not supported in some Sources in a Flink job. >> And this will only affect the Sources that do not support split pausing, >> but not the Sources that have split pausing supported. >> >>> >> >>> This seems slightly different from a <knob> enables / disables split >> alignment. This sounds like a global thing, and it seems not necessary to >> disable the split alignment, as long as the coarse grained alignment can be >> a fallback. >> >>> >> >>> Thanks, >> >>> >> >>> Jiangjie (Becket) Qin >> >>> >> >>> On Wed, Jul 13, 2022 at 2:46 PM Sebastian Mattheis < >> sebast...@ververica.com> wrote: >> >>>> >> >>>> Hi Piotrek, >> >>>> >> >>>> Sorry I've read it and forgot it when I was ripping out the >> supportsPauseOrResume method again. Thanks for pointing that out. I will >> add it as follows: The <knob> enables/disables split alignment in the >> SourceOperator where the default is that split alignment is enabled. (And I >> will add the note: "In future releases, the <knob> may be ignored such that >> split alignment is always enabled.") >> >>>> >> >>>> Cheers, >> >>>> Sebastian >> >>>> >> >>>> On Tue, Jul 12, 2022 at 11:14 PM Piotr Nowojski < >> pnowoj...@apache.org> wrote: >> >>>>> >> >>>>> Hi Sebastian, >> >>>>> >> >>>>> Thanks for picking this up. >> >>>>> >> >>>>> > 5. There is NO configuration option to enable watermark alignment >> of >> >>>>> splits or disable pause/resume capabilities. >> >>>>> >> >>>>> Isn't this contradicting what we actually agreed on? >> >>>>> >> >>>>> > we are planning to have a configuration based way to revert to the >> >>>>> previous behavior >> >>>>> >> >>>>> I think what we agreed in the last couple of emails was to add a >> >>>>> configuration toggle, that would allow Flink 1.15 users, that are >> using >> >>>>> watermark alignment with multiple splits per source operator, to >> continue >> >>>>> using it with the old 1.15 semantic, even if their source doesn't >> support >> >>>>> pausing/resuming splits. It seems to me like the current FLIP and >> >>>>> implementation proposal would always throw an exception in that >> case? >> >>>>> >> >>>>> Best, >> >>>>> Piotrek >> >>>>> >> >>>>> wt., 12 lip 2022 o 10:18 Sebastian Mattheis < >> sebast...@ververica.com> >> >>>>> napisał(a): >> >>>>> >> >>>>> > Hi all, >> >>>>> > >> >>>>> > I have updated FLIP-217 [1] to the proposed specification and >> adapted the >> >>>>> > current implementation [2] respectively. >> >>>>> > >> >>>>> > This means both, FLIP and implementation, are ready for review >> from my >> >>>>> > side. (I would revise commit history and messages for the final >> PR but left >> >>>>> > it as is for now and the records of this discussion.) >> >>>>> > >> >>>>> > 1. Please review the updated version of FLIP-217 [1]. If there >> are no >> >>>>> > further concerns, I would initiate the voting. >> >>>>> > (2. If you want to speed up things, please also have a look into >> the >> >>>>> > updated implementation [2].) >> >>>>> > >> >>>>> > Please consider the following updated specification in the >> current status >> >>>>> > of FLIP-217 where the essence is as follows: >> >>>>> > >> >>>>> > 1. A method pauseOrResumeSplits is added to SourceReader with >> default >> >>>>> > implementation that throws UnsupportedOperationException. >> >>>>> > 2. method pauseOrResumeSplits is added to SplitReader with >> default >> >>>>> > implementation that throws UnsupportedOperationException. >> >>>>> > 3. SourceOperator initiates split alignment only if more than one >> split is >> >>>>> > assigned to the source (and, of course, only if >> withSplitAlignment is used). >> >>>>> > 4. There is NO "supportsPauseOrResumeSplits" method at any place >> (to >> >>>>> > indicate if the implementation supports pause/resume >> capabilities). >> >>>>> > 5. There is NO configuration option to enable watermark alignment >> of >> >>>>> > splits or disable pause/resume capabilities. >> >>>>> > >> >>>>> > *Note:* If the SourceReader or some SplitReader do not override >> >>>>> > pauseOrResumeSplits but it is required in the application, an >> exception is >> >>>>> > thrown at runtime when an split alignment attempt is executed >> (not during >> >>>>> > startup or any time earlier). >> >>>>> > >> >>>>> > Also, I have revised the compatibility/migration section to >> describe a bit >> >>>>> > of a rationale for the default implementation with exception >> throwing >> >>>>> > behavior. >> >>>>> > >> >>>>> > Regards, >> >>>>> > Sebastian >> >>>>> > >> >>>>> > [1] >> >>>>> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits >> >>>>> > [2] >> https://github.com/smattheis/flink/tree/flip-217-split-wm-alignment >> >>>>> > >> >>>>> > On Mon, Jul 4, 2022 at 3:59 AM Thomas Weise <t...@apache.org> >> wrote: >> >>>>> > >> >>>>> >> Hi, >> >>>>> >> >> >>>>> >> Thank you Becket and Piotr for ironing out the "case 2" behavior. >> >>>>> >> Strictly speaking we are introducing a regression by allowing an >> >>>>> >> exception to bubble up that did not exist in the previous >> release, >> >>>>> >> regardless how suboptimal the behavior may be. However, given >> that the >> >>>>> >> feature is still experimental and we are planning to have a >> >>>>> >> configuration based way to revert to the previous behavior, I >> think >> >>>>> >> this is a good solution. >> >>>>> >> >> >>>>> >> +1 from my side >> >>>>> >> >> >>>>> >> Thomas >> >>>>> >> >> >>>>> >> On Wed, Jun 29, 2022 at 2:43 PM Piotr Nowojski < >> pnowoj...@apache.org> >> >>>>> >> wrote: >> >>>>> >> > >> >>>>> >> > +1 :) >> >>>>> >> > >> >>>>> >> > śr., 29 cze 2022 o 17:23 Becket Qin <becket....@gmail.com> >> napisał(a): >> >>>>> >> > >> >>>>> >> > > Thanks for the explanation, Piotr. >> >>>>> >> > > >> >>>>> >> > > So it looks like we have a conclusion here. >> >>>>> >> > > >> >>>>> >> > > 1. Regarding the supportsPausingSplits() method, I feel it >> brings more >> >>>>> >> > > confusion while the benefit is marginal, so I prefer not >> having that >> >>>>> >> if >> >>>>> >> > > possible. It would be good to also hear @Thomas Weise < >> t...@apache.org >> >>>>> >> >'s >> >>>>> >> > > opinion as he mentioned some concern earlier. >> >>>>> >> > > 2. Let's add the feature knob then. In the future we can >> simply >> >>>>> >> ignore the >> >>>>> >> > > configuration when deprecating it. >> >>>>> >> > > >> >>>>> >> > > Thanks, >> >>>>> >> > > >> >>>>> >> > > Jiangjie (Becket) Qin >> >>>>> >> > > >> >>>>> >> > > On Wed, Jun 29, 2022 at 10:19 PM Piotr Nowojski < >> pnowoj...@apache.org >> >>>>> >> > >> >>>>> >> > > wrote: >> >>>>> >> > > >> >>>>> >> > > > Hi, >> >>>>> >> > > > >> >>>>> >> > > > I mean I'm fine with throwing an exception by default in >> Flink 1.16 >> >>>>> >> in >> >>>>> >> > > the >> >>>>> >> > > > "Case 2", but I think we need to provide a way to >> workaround it for >> >>>>> >> > > example >> >>>>> >> > > > via a feature toggle, if it's an easy thing to do. And it >> seems to >> >>>>> >> be a >> >>>>> >> > > > simple thing. >> >>>>> >> > > > >> >>>>> >> > > > However this is orthogonal to the >> `supportsPausingSplits()` issue. I >> >>>>> >> > > don't >> >>>>> >> > > > have a big preference whether >> >>>>> >> > > > a) the exception should originate on JM, using `default >> boolean >> >>>>> >> > > > supportsPausingSplits() { return false; }` (as currently >> proposed >> >>>>> >> in the >> >>>>> >> > > > FLIP), >> >>>>> >> > > > b) or on the TM from `pauseOrResumeSplits()` throwing >> >>>>> >> > > > `UnsupportedOperationException` as you are proposing. >> >>>>> >> > > > >> >>>>> >> > > > a) fails earlier, so it's more user friendly from this >> perspective, >> >>>>> >> but >> >>>>> >> > > it >> >>>>> >> > > > provides more possibilities for bugs/inconsistencies for >> connector >> >>>>> >> > > > developers, since `supportsPausingSplits()` would have to >> be kept >> >>>>> >> in sync >> >>>>> >> > > > with `pauseOrResumeSplits()`. >> >>>>> >> > > > >> >>>>> >> > > > Best, >> >>>>> >> > > > Piotrek >> >>>>> >> > > > >> >>>>> >> > > > śr., 29 cze 2022 o 15:27 Becket Qin <becket....@gmail.com> >> >>>>> >> napisał(a): >> >>>>> >> > > > >> >>>>> >> > > > > Hi Piotr, >> >>>>> >> > > > > >> >>>>> >> > > > > Just to make sure we are on the same page. There are two >> cases >> >>>>> >> for the >> >>>>> >> > > > > existing FLIP-182 users: >> >>>>> >> > > > > >> >>>>> >> > > > > Case 1: Each source reader only has one split assigned. >> This is >> >>>>> >> the >> >>>>> >> > > > > targeted case for FLIP-182. >> >>>>> >> > > > > Case 2: Each source reader has multiple splits assigned. >> This is >> >>>>> >> the >> >>>>> >> > > > flaky >> >>>>> >> > > > > case that may or may not work. >> >>>>> >> > > > > >> >>>>> >> > > > > With solution 1, the users of case 1 won't be impacted. >> The users >> >>>>> >> in >> >>>>> >> > > > case 2 >> >>>>> >> > > > > will receive an exception which they won't get at the >> moment. >> >>>>> >> > > > > >> >>>>> >> > > > > Do you mean we should not throw an exception in case 2? >> >>>>> >> Personally I >> >>>>> >> > > feel >> >>>>> >> > > > > that is OK and could have been done in FLIP-182 itself >> because >> >>>>> >> it's >> >>>>> >> > > not a >> >>>>> >> > > > > designed use case. As a user I may see a big variation >> of the job >> >>>>> >> state >> >>>>> >> > > > > sizes from time to time and I am not able to rely on >> this feature >> >>>>> >> to >> >>>>> >> > > plan >> >>>>> >> > > > > my resources and uphold the SLA. >> >>>>> >> > > > > >> >>>>> >> > > > > That said, if you have a strong opinion on this, I am >> fine with >> >>>>> >> having >> >>>>> >> > > > the >> >>>>> >> > > > > configuration like >> "allow.coarse-grained.watermark.alignment" >> >>>>> >> with the >> >>>>> >> > > > > default value set to false, given that a configuration >> is much >> >>>>> >> easier >> >>>>> >> > > to >> >>>>> >> > > > > deprecate than a method. >> >>>>> >> > > > > >> >>>>> >> > > > > Thanks, >> >>>>> >> > > > > >> >>>>> >> > > > > Jiangjie (Becket) Qin >> >>>>> >> > > > > >> >>>>> >> > > > > >> >>>>> >> >> >>>>> > >> >