Hi Sebastian,

Thank you for updating the FLIP page. It looks good and I think you
can start a VOTE.

Thomas

On Tue, Jul 26, 2022 at 10:57 AM Sebastian Mattheis
<sebast...@ververica.com> wrote:
>
> Hi everybody,
>
> I have updated FLIP-217 [1] and have implemented the respective changes in
> [2]. Please review. If there are no concerns, I would initiate the voting
> on Thursday.
>
> Best regards,
> Sebastian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
> [2] https://github.com/smattheis/flink/tree/flip-217-split-wm-alignment
>
> On Mon, Jul 25, 2022 at 9:19 AM Piotr Nowojski <pnowoj...@apache.org> wrote:
>
> > Thanks for the update Sebastian :)
> >
> > Best,
> > Piotrek
> >
> > pon., 25 lip 2022 o 08:12 Sebastian Mattheis <sebast...@ververica.com>
> > napisał(a):
> >
> >> Hi everybody,
> >>
> >> I discussed last week the semantics and an implementation stragegy of the
> >> configuration parameter with Piotr and did the implementation and some
> >> tests this weekend.
> >>
> >> A short summary of what I discussed and recapped with Piotr:
> >>
> >>    - The configuration parameter allows (and tolerates) the use of
> >>    `SourceReader`s that do not implement `pauseOrResumeSplits` method. (The
> >>    exception is ignored in `SourceOperator`.)
> >>    - The configuration parameter allows (and tolerates) the use of
> >>    `SourceSplitReader`s that do not implement `pauseOrResumeSplits` method.
> >>    (The exception is ignored in the `PauseResumeSplitsTask` of the
> >>    `SplitFetcher`.)
> >>
> >> In particular, this means that a `SourceReader` with two `SplitReader`s
> >> where one does not implement `pauseOrResumeSplits` and the other does. It
> >> will allow the use of the one that doesn't and will, nevertheless, still
> >> attempt to pause/resume the other. (Consequently, if the one that doesn't
> >> support pause is ahead it simply cannot not pause the `SplitReader` but if
> >> the other is ahead it will be paused until watermarks are aligned.)
> >>
> >> There is one flaw that I don't really like but which I accept as from the
> >> discussion and which I will add/update in the FLIP:
> >> If there is any other mechanism (e.g. other than watermark alignment)
> >> that attempts to pause or resume `SplitReader`s, it will have side effects
> >> and potential unexpected behavior if one or more `SplitReader`s do not
> >> implement `pauseOrResumeSplits` and the user set the configuration
> >> parameter to allow/tolerate it for split-level watermark alignment. (The
> >> reason is simply that we cannot differentiate which mechanism attempts to
> >> pause/resume, i.e., if it used for watermark alignment or something else.)
> >> Given that this configuration parameter is supposed to be an intermediate
> >> fallback, it is acceptable for me but changed at latest when some other
> >> mechanism uses pauseOrResumeSplits.
> >>
> >> As for the parameter naming, I have implemented it the following way
> >> (reason: There exists a parameter `pipeline.auto-watermark-interval`.):
> >>
> >> pipeline.watermark-alignment.allow-unaligned-source-splits (default:
> >> false)
> >>
> >> Status: I have implemented the configuration parameter (and an IT case).
> >> I still need to update the FLIP and will ping you (tomorrow or so) when I'm
> >> done with that. Please check/review my description from above if you see
> >> any problems with that.
> >>
> >> Thanks a lot and regards,
> >> Sebastian
> >>
> >>
> >> On Wed, Jul 20, 2022 at 11:24 PM Thomas Weise <t...@apache.org> wrote:
> >>
> >>> Hi Sebastian,
> >>>
> >>> Thank you for updating the FLIP and sorry for my delayed response. As
> >>> Piotr pointed out, we would need to incorporate the fallback flag into
> >>> the design to reflect the outcome of the previous discussion.
> >>>
> >>> Based on the current FLIP and as detailed by Becket, the
> >>> SourceOperator coordinates the alignment. It is responsible for the
> >>> pause/resume decision and knows how many splits are assigned.
> >>> Therefore shouldn't it have all the information needed to efficiently
> >>> handle the case of UnsupportedOperationException thrown by a reader?
> >>>
> >>> Although the fallback requires some extra implementation effort, I
> >>> think that is more than offset by not surprising users and offering a
> >>> smoother migration path. Yes, the flag is a temporary feature that
> >>> will become obsolete in perhaps 2-3 releases (can we please also
> >>> include that into the FLIP?). But since it would be just a
> >>> configuration property that can be ignored at that point (for which
> >>> there is precedence), no code change will be forced on users.
> >>>
> >>> As for the property name, perhaps the following would be even more
> >>> descriptive?
> >>>
> >>> coarse.grained.wm.alignment.fallback.enabled
> >>>
> >>> Thanks!
> >>> Thomas
> >>>
> >>>
> >>> On Wed, Jul 13, 2022 at 10:59 AM Becket Qin <becket....@gmail.com>
> >>> wrote:
> >>> >
> >>> > Thanks for the explanation, Sebastian. I understand your concern now.
> >>> >
> >>> > 1. About the major concern. Personally I'd consider the coarse grained
> >>> watermark alignment as a special case for backward compatibility. In the
> >>> future, if for whatever reason we want to pause a split and that is not
> >>> supported, it seems the only thing that makes sense is throwing an
> >>> exception, instead of pausing the entire source reader. Regarding this
> >>> FLIP, if the logic that determines which split should be paused is in the
> >>> SourceOperator, the SourceOperator actually knows the reason why it pauses
> >>> a split. It also knows whether there are more than one split assigned to
> >>> the source reader. So it can just fallback to the coarse grained watermark
> >>> alignment, without affecting other reasons of pausing a split, right? And
> >>> in the future, if there are more purposes for pausing / resuming a split,
> >>> the SourceOperator still needs to understand each of the reasons in order
> >>> to resume the splits after all the pausing conditions are no longer met.
> >>> >
> >>> > 2. Naming wise, would "coarse.grained.watermark.alignment.enabled"
> >>> address your concern?
> >>> >
> >>> > The only concern I have for Option A is that people may not be able to
> >>> benefit from split level WM alignment until all the sources they need have
> >>> that implemented. This seems unnecessarily delaying the adoption of a new
> >>> feature, which looks like a more substantive downside compared with the
> >>> "coarse.grained.wm.alignment.enabled" option.
> >>> >
> >>> > BTW, the SourceOperator doesn't need to invoke the
> >>> pauseOrResumeSplit() method and catch the UnsupportedOperation every time.
> >>> A flag can be set so it doesn't attempt to pause the split after the first
> >>> time it sees the exception.
> >>> >
> >>> >
> >>> > Thanks,
> >>> >
> >>> > Jiangjie (Becket) Qin
> >>> >
> >>> >
> >>> >
> >>> > On Wed, Jul 13, 2022 at 5:11 PM Sebastian Mattheis <
> >>> sebast...@ververica.com> wrote:
> >>> >>
> >>> >> Hi Becket, Hi Thomas, Hi Piotrek,
> >>> >>
> >>> >> Thanks for the feedback. I would like to highlight some concerns:
> >>> >>
> >>> >> Major: A configuration parameter like "allow coarse grained
> >>> alignment" defines a semantic that mixes two contexts conditionally as
> >>> follows: "ignore incapability to pause splits in SourceReader/SplitReader"
> >>> IF (conditional) we "allow coarse grained watermark alignment". At the 
> >>> same
> >>> time we said that there is no way to check the capability of
> >>> SourceReader/SplitReader to pause/resume other than observing a
> >>> UnsupportedOperationException during runtime such that we cannot disable
> >>> the trigger for watermark split alignment in the SourceOperator. Instead,
> >>> we can only ignore the incapability of SourceReader/SplitReader during
> >>> execution of a pause/resume attempt which, consequently, requires to check
> >>> the "allow coarse grained alignment " parameter value (to implement the
> >>> conditional semantic). However, during this execution we actually don't
> >>> know whether the attempt was executed for the purpose of watermark
> >>> alignment or for some other purpose such that the check actually depends 
> >>> on
> >>> who triggered the pause/resume attempt and hides the exception potentially
> >>> unexpectedly for some other use case. Of course, currently there is no
> >>> other purpose and, hence, no other trigger than watermark alignment.
> >>> However, this breaks, in my perspective, the idea of having
> >>> pauseOrResumeSplits (re)usable for other use cases.
> >>> >> Minor: I'm not aware of any configuration parameter in the format
> >>> like `allow.*` as you suggested with
> >>> `allow.coarse.grained.watermark.alignment`. Would that still be okay to 
> >>> do?
> >>> >>
> >>> >> As we have agreed to not have a "supportsPausableSplits" method
> >>> because of potential inconsistencies between return value of this method
> >>> and the actual implementation (and also the difficulty to have a 
> >>> meaningful
> >>> return value where the support actually depends on SourceReader AND the
> >>> assigned SplitReaders), I don't want to bring up the discussion about the
> >>> "supportsPauseableSplits" method again. Instead, I see the following
> >>> options:
> >>> >>
> >>> >> Option A: I would drop the idea of "allow coarse grained alignment"
> >>> semantic of the parameter but implement a parameter to "enable/disable
> >>> split watermark alignment" which we can easily use in the SourceOperator 
> >>> to
> >>> disable the trigger of split alignment. This is indeed more static and 
> >>> less
> >>> flexible, because it disables split alignment unconditionally, but it is
> >>> "context-decoupled" and more straight-forward to use. This would also
> >>> address the use case of disabling split alignment for the purpose of
> >>> runtime behavior evaluation, as mentioned by Thomas (if I remember
> >>> correctly.) I would implement the parameter with a default where watermark
> >>> split alignment is enabled which requires users to check their application
> >>> when upgrading to 1.16 if a) there is a source that reads from multiple
> >>> splits and b), if yes, all splits of that source support pause/resume. If
> >>> a) yes and b) no, the user must take action to disable watermark split
> >>> alignment (which disables the trigger of split alignment only for the
> >>> purpose).
> >>> >>
> >>> >> Option B: If we ignore my concern, I would simply check the "allow
> >>> coarse grained watermark alignment" parameter value on every attempt to
> >>> execute pause/resume in the SourceReader and in the SplitReader and will
> >>> not throw UnsupportedOperationException if the parameter value is set to
> >>> true.
> >>> >>
> >>> >> Please note that the parameter is also used only for some kind of
> >>> migration phase. Therefore, I wonder if we need to overcomplicate things.
> >>> >>
> >>> >> @Piotrek, @Becket, @Thomas: I would recommend/favour option A. Please
> >>> let me know your feedback and/or concerns as soon as possible, if 
> >>> possible.
> >>> :)
> >>> >>
> >>> >> Regards,
> >>> >> Sebastian
> >>> >>
> >>> >>
> >>> >> On Wed, Jul 13, 2022 at 9:37 AM Becket Qin <becket....@gmail.com>
> >>> wrote:
> >>> >>>
> >>> >>> Hi Sebastian,
> >>> >>>
> >>> >>> Thanks for updating the FLIP wiki.
> >>> >>>
> >>> >>> Just to double confirm, I was thinking of a configuration like
> >>> "allow.coarse.grained.watermark.alignment". This will allow the coarse
> >>> grained watermark alignment as a fallback instead of bubbling up an
> >>> exception if split pausing is not supported in some Sources in a Flink 
> >>> job.
> >>> And this will only affect the Sources that do not support split pausing,
> >>> but not the Sources that have split pausing supported.
> >>> >>>
> >>> >>> This seems slightly different from a <knob> enables / disables split
> >>> alignment. This sounds like a global thing, and it seems not necessary to
> >>> disable the split alignment, as long as the coarse grained alignment can 
> >>> be
> >>> a fallback.
> >>> >>>
> >>> >>> Thanks,
> >>> >>>
> >>> >>> Jiangjie (Becket) Qin
> >>> >>>
> >>> >>> On Wed, Jul 13, 2022 at 2:46 PM Sebastian Mattheis <
> >>> sebast...@ververica.com> wrote:
> >>> >>>>
> >>> >>>> Hi Piotrek,
> >>> >>>>
> >>> >>>> Sorry I've read it and forgot it when I was ripping out the
> >>> supportsPauseOrResume method again. Thanks for pointing that out. I will
> >>> add it as follows: The <knob> enables/disables split alignment in the
> >>> SourceOperator where the default is that split alignment is enabled. (And 
> >>> I
> >>> will add the note: "In future releases, the <knob> may be ignored such 
> >>> that
> >>> split alignment is always enabled.")
> >>> >>>>
> >>> >>>> Cheers,
> >>> >>>> Sebastian
> >>> >>>>
> >>> >>>> On Tue, Jul 12, 2022 at 11:14 PM Piotr Nowojski <
> >>> pnowoj...@apache.org> wrote:
> >>> >>>>>
> >>> >>>>> Hi Sebastian,
> >>> >>>>>
> >>> >>>>> Thanks for picking this up.
> >>> >>>>>
> >>> >>>>> > 5. There is NO configuration option to enable watermark
> >>> alignment of
> >>> >>>>> splits or disable pause/resume capabilities.
> >>> >>>>>
> >>> >>>>> Isn't this contradicting what we actually agreed on?
> >>> >>>>>
> >>> >>>>> > we are planning to have a configuration based way to revert to
> >>> the
> >>> >>>>> previous behavior
> >>> >>>>>
> >>> >>>>> I think what we agreed in the last couple of emails was to add a
> >>> >>>>> configuration toggle, that would allow Flink 1.15 users, that are
> >>> using
> >>> >>>>> watermark alignment with multiple splits per source operator, to
> >>> continue
> >>> >>>>> using it with the old 1.15 semantic, even if their source doesn't
> >>> support
> >>> >>>>> pausing/resuming splits. It seems to me like the current FLIP and
> >>> >>>>> implementation proposal would always throw an exception in that
> >>> case?
> >>> >>>>>
> >>> >>>>> Best,
> >>> >>>>> Piotrek
> >>> >>>>>
> >>> >>>>> wt., 12 lip 2022 o 10:18 Sebastian Mattheis <
> >>> sebast...@ververica.com>
> >>> >>>>> napisał(a):
> >>> >>>>>
> >>> >>>>> > Hi all,
> >>> >>>>> >
> >>> >>>>> > I have updated FLIP-217 [1] to the proposed specification and
> >>> adapted the
> >>> >>>>> > current implementation [2] respectively.
> >>> >>>>> >
> >>> >>>>> > This means both, FLIP and implementation, are ready for review
> >>> from my
> >>> >>>>> > side. (I would revise commit history and messages for the final
> >>> PR but left
> >>> >>>>> > it as is for now and the records of this discussion.)
> >>> >>>>> >
> >>> >>>>> > 1. Please review the updated version of FLIP-217 [1]. If there
> >>> are no
> >>> >>>>> > further concerns, I would initiate the voting.
> >>> >>>>> > (2. If you want to speed up things, please also have a look into
> >>> the
> >>> >>>>> > updated implementation [2].)
> >>> >>>>> >
> >>> >>>>> > Please consider the following updated specification in the
> >>> current status
> >>> >>>>> > of FLIP-217 where the essence is as follows:
> >>> >>>>> >
> >>> >>>>> > 1. A method pauseOrResumeSplits is added to SourceReader with
> >>> default
> >>> >>>>> > implementation that throws UnsupportedOperationException.
> >>> >>>>> > 2.  method pauseOrResumeSplits is added to SplitReader with
> >>> default
> >>> >>>>> > implementation that throws UnsupportedOperationException.
> >>> >>>>> > 3. SourceOperator initiates split alignment only if more than
> >>> one split is
> >>> >>>>> > assigned to the source (and, of course, only if
> >>> withSplitAlignment is used).
> >>> >>>>> > 4. There is NO "supportsPauseOrResumeSplits" method at any place
> >>> (to
> >>> >>>>> > indicate if the implementation supports pause/resume
> >>> capabilities).
> >>> >>>>> > 5. There is NO configuration option to enable watermark
> >>> alignment of
> >>> >>>>> > splits or disable pause/resume capabilities.
> >>> >>>>> >
> >>> >>>>> > *Note:* If the SourceReader or some SplitReader do not override
> >>> >>>>> > pauseOrResumeSplits but it is required in the application, an
> >>> exception is
> >>> >>>>> > thrown at runtime when an split alignment attempt is executed
> >>> (not during
> >>> >>>>> > startup or any time earlier).
> >>> >>>>> >
> >>> >>>>> > Also, I have revised the compatibility/migration section to
> >>> describe a bit
> >>> >>>>> > of a rationale for the default implementation with exception
> >>> throwing
> >>> >>>>> > behavior.
> >>> >>>>> >
> >>> >>>>> > Regards,
> >>> >>>>> > Sebastian
> >>> >>>>> >
> >>> >>>>> > [1]
> >>> >>>>> >
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
> >>> >>>>> > [2]
> >>> https://github.com/smattheis/flink/tree/flip-217-split-wm-alignment
> >>> >>>>> >
> >>> >>>>> > On Mon, Jul 4, 2022 at 3:59 AM Thomas Weise <t...@apache.org>
> >>> wrote:
> >>> >>>>> >
> >>> >>>>> >> Hi,
> >>> >>>>> >>
> >>> >>>>> >> Thank you Becket and Piotr for ironing out the "case 2"
> >>> behavior.
> >>> >>>>> >> Strictly speaking we are introducing a regression by allowing an
> >>> >>>>> >> exception to bubble up that did not exist in the previous
> >>> release,
> >>> >>>>> >> regardless how suboptimal the behavior may be. However, given
> >>> that the
> >>> >>>>> >> feature is still experimental and we are planning to have a
> >>> >>>>> >> configuration based way to revert to the previous behavior, I
> >>> think
> >>> >>>>> >> this is a good solution.
> >>> >>>>> >>
> >>> >>>>> >> +1 from my side
> >>> >>>>> >>
> >>> >>>>> >> Thomas
> >>> >>>>> >>
> >>> >>>>> >> On Wed, Jun 29, 2022 at 2:43 PM Piotr Nowojski <
> >>> pnowoj...@apache.org>
> >>> >>>>> >> wrote:
> >>> >>>>> >> >
> >>> >>>>> >> > +1 :)
> >>> >>>>> >> >
> >>> >>>>> >> > śr., 29 cze 2022 o 17:23 Becket Qin <becket....@gmail.com>
> >>> napisał(a):
> >>> >>>>> >> >
> >>> >>>>> >> > >  Thanks for the explanation, Piotr.
> >>> >>>>> >> > >
> >>> >>>>> >> > > So it looks like we have a conclusion here.
> >>> >>>>> >> > >
> >>> >>>>> >> > > 1. Regarding the supportsPausingSplits() method, I feel it
> >>> brings more
> >>> >>>>> >> > > confusion while the benefit is marginal, so I prefer not
> >>> having that
> >>> >>>>> >> if
> >>> >>>>> >> > > possible. It would be good to also hear @Thomas Weise <
> >>> t...@apache.org
> >>> >>>>> >> >'s
> >>> >>>>> >> > > opinion as he mentioned some concern earlier.
> >>> >>>>> >> > > 2. Let's add the feature knob then. In the future we can
> >>> simply
> >>> >>>>> >> ignore the
> >>> >>>>> >> > > configuration when deprecating it.
> >>> >>>>> >> > >
> >>> >>>>> >> > > Thanks,
> >>> >>>>> >> > >
> >>> >>>>> >> > > Jiangjie (Becket) Qin
> >>> >>>>> >> > >
> >>> >>>>> >> > > On Wed, Jun 29, 2022 at 10:19 PM Piotr Nowojski <
> >>> pnowoj...@apache.org
> >>> >>>>> >> >
> >>> >>>>> >> > > wrote:
> >>> >>>>> >> > >
> >>> >>>>> >> > > > Hi,
> >>> >>>>> >> > > >
> >>> >>>>> >> > > > I mean I'm fine with throwing an exception by default in
> >>> Flink 1.16
> >>> >>>>> >> in
> >>> >>>>> >> > > the
> >>> >>>>> >> > > > "Case 2", but I think we need to provide a way to
> >>> workaround it for
> >>> >>>>> >> > > example
> >>> >>>>> >> > > > via a feature toggle, if it's an easy thing to do. And it
> >>> seems to
> >>> >>>>> >> be a
> >>> >>>>> >> > > > simple thing.
> >>> >>>>> >> > > >
> >>> >>>>> >> > > > However this is orthogonal to the
> >>> `supportsPausingSplits()` issue. I
> >>> >>>>> >> > > don't
> >>> >>>>> >> > > > have a big preference whether
> >>> >>>>> >> > > >   a) the exception should originate on JM, using `default
> >>> boolean
> >>> >>>>> >> > > > supportsPausingSplits() { return false; }` (as currently
> >>> proposed
> >>> >>>>> >> in the
> >>> >>>>> >> > > > FLIP),
> >>> >>>>> >> > > >   b) or on the TM from `pauseOrResumeSplits()` throwing
> >>> >>>>> >> > > > `UnsupportedOperationException` as you are proposing.
> >>> >>>>> >> > > >
> >>> >>>>> >> > > > a) fails earlier, so it's more user friendly from this
> >>> perspective,
> >>> >>>>> >> but
> >>> >>>>> >> > > it
> >>> >>>>> >> > > > provides more possibilities for bugs/inconsistencies for
> >>> connector
> >>> >>>>> >> > > > developers, since `supportsPausingSplits()` would have to
> >>> be kept
> >>> >>>>> >> in sync
> >>> >>>>> >> > > > with `pauseOrResumeSplits()`.
> >>> >>>>> >> > > >
> >>> >>>>> >> > > > Best,
> >>> >>>>> >> > > > Piotrek
> >>> >>>>> >> > > >
> >>> >>>>> >> > > > śr., 29 cze 2022 o 15:27 Becket Qin <becket....@gmail.com
> >>> >
> >>> >>>>> >> napisał(a):
> >>> >>>>> >> > > >
> >>> >>>>> >> > > > > Hi Piotr,
> >>> >>>>> >> > > > >
> >>> >>>>> >> > > > > Just to make sure we are on the same page. There are
> >>> two cases
> >>> >>>>> >> for the
> >>> >>>>> >> > > > > existing FLIP-182 users:
> >>> >>>>> >> > > > >
> >>> >>>>> >> > > > > Case 1: Each source reader only has one split assigned.
> >>> This is
> >>> >>>>> >> the
> >>> >>>>> >> > > > > targeted case for FLIP-182.
> >>> >>>>> >> > > > > Case 2: Each source reader has multiple splits
> >>> assigned. This is
> >>> >>>>> >> the
> >>> >>>>> >> > > > flaky
> >>> >>>>> >> > > > > case that may or may not work.
> >>> >>>>> >> > > > >
> >>> >>>>> >> > > > > With solution 1, the users of case 1 won't be impacted.
> >>> The users
> >>> >>>>> >> in
> >>> >>>>> >> > > > case 2
> >>> >>>>> >> > > > > will receive an exception which they won't get at the
> >>> moment.
> >>> >>>>> >> > > > >
> >>> >>>>> >> > > > > Do you mean we should not throw an exception in case 2?
> >>> >>>>> >> Personally I
> >>> >>>>> >> > > feel
> >>> >>>>> >> > > > > that is OK and could have been done in FLIP-182 itself
> >>> because
> >>> >>>>> >> it's
> >>> >>>>> >> > > not a
> >>> >>>>> >> > > > > designed use case. As a user I may see a big variation
> >>> of the job
> >>> >>>>> >> state
> >>> >>>>> >> > > > > sizes from time to time and I am not able to rely on
> >>> this feature
> >>> >>>>> >> to
> >>> >>>>> >> > > plan
> >>> >>>>> >> > > > > my resources and uphold the SLA.
> >>> >>>>> >> > > > >
> >>> >>>>> >> > > > > That said, if you have a strong opinion on this, I am
> >>> fine with
> >>> >>>>> >> having
> >>> >>>>> >> > > > the
> >>> >>>>> >> > > > > configuration like
> >>> "allow.coarse-grained.watermark.alignment"
> >>> >>>>> >> with the
> >>> >>>>> >> > > > > default value set to false, given that a configuration
> >>> is much
> >>> >>>>> >> easier
> >>> >>>>> >> > > to
> >>> >>>>> >> > > > > deprecate than a method.
> >>> >>>>> >> > > > >
> >>> >>>>> >> > > > > Thanks,
> >>> >>>>> >> > > > >
> >>> >>>>> >> > > > > Jiangjie (Becket) Qin
> >>> >>>>> >> > > > >
> >>> >>>>> >> > > > >
> >>> >>>>> >>
> >>> >>>>> >
> >>>
> >>

Reply via email to