Hi Becket, Hi Thomas, Hi Piotrek,

Thanks for the feedback. I would like to highlight some concerns:

   1. Major: A configuration parameter like "allow coarse grained
   alignment" defines a semantic that mixes two contexts conditionally as
   follows: "ignore incapability to pause splits in SourceReader/SplitReader"
   IF (conditional) we "allow coarse grained watermark alignment". At the same
   time we said that there is no way to check the capability of
   SourceReader/SplitReader to pause/resume other than observing a
   UnsupportedOperationException during runtime such that we cannot disable
   the trigger for watermark split alignment in the SourceOperator. Instead,
   we can only ignore the incapability of SourceReader/SplitReader during
   execution of a pause/resume attempt which, consequently, requires to check
   the "allow coarse grained alignment " parameter value (to implement the
   conditional semantic). However, during this execution we actually don't
   know whether the attempt was executed for the purpose of watermark
   alignment or for some other purpose such that the check actually depends on
   who triggered the pause/resume attempt and hides the exception potentially
   unexpectedly for some other use case. Of course, currently there is no
   other purpose and, hence, no other trigger than watermark alignment.
   However, this breaks, in my perspective, the idea of having
   pauseOrResumeSplits (re)usable for other use cases.
   2. Minor: I'm not aware of any configuration parameter in the format
   like `allow.*` as you suggested with
   `allow.coarse.grained.watermark.alignment`. Would that still be okay to do?

As we have agreed to not have a "supportsPausableSplits" method because of
potential inconsistencies between return value of this method and the
actual implementation (and also the difficulty to have a meaningful return
value where the support actually depends on SourceReader AND the assigned
SplitReaders), I don't want to bring up the discussion about the
"supportsPauseableSplits" method again. Instead, I see the following
options:

Option A: I would drop the idea of "allow coarse grained alignment"
semantic of the parameter but implement a parameter to "enable/disable
split watermark alignment" which we can easily use in the SourceOperator to
disable the trigger of split alignment. This is indeed more static and less
flexible, because it disables split alignment unconditionally, but it is
"context-decoupled" and more straight-forward to use. This would also
address the use case of disabling split alignment for the purpose of
runtime behavior evaluation, as mentioned by Thomas (if I remember
correctly.) I would implement the parameter with a default where watermark
split alignment is enabled which requires users to check their application
when upgrading to 1.16 if a) there is a source that reads from multiple
splits and b), if yes, all splits of that source support pause/resume. If
a) yes and b) no, the user must take action to disable watermark split
alignment (which disables the trigger of split alignment only for the
purpose).

Option B: If we ignore my concern, I would simply check the "allow coarse
grained watermark alignment" parameter value on every attempt to execute
pause/resume in the SourceReader and in the SplitReader and will not throw
UnsupportedOperationException if the parameter value is set to true.

Please note that the parameter is also used only for some kind of migration
phase. Therefore, I wonder if we need to overcomplicate things.

@Piotrek, @Becket, @Thomas: I would recommend/favour option A. Please let
me know your feedback and/or concerns as soon as possible, if possible. :)

Regards,
Sebastian


On Wed, Jul 13, 2022 at 9:37 AM Becket Qin <becket....@gmail.com> wrote:

> Hi Sebastian,
>
> Thanks for updating the FLIP wiki.
>
> Just to double confirm, I was thinking of a configuration like
> "allow.coarse.grained.watermark.alignment". This will allow the coarse
> grained watermark alignment as a fallback instead of bubbling up an
> exception if split pausing is not supported in some Sources in a Flink job.
> And this will only affect the Sources that do not support split pausing,
> but not the Sources that have split pausing supported.
>
> This seems slightly different from a <knob> enables / disables split
> alignment. This sounds like a global thing, and it seems not necessary to
> disable the split alignment, as long as the coarse grained alignment can be
> a fallback.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Jul 13, 2022 at 2:46 PM Sebastian Mattheis <
> sebast...@ververica.com> wrote:
>
>> Hi Piotrek,
>>
>> Sorry I've read it and forgot it when I was ripping out the
>> supportsPauseOrResume method again. Thanks for pointing that out. I will
>> add it as follows: The <knob> enables/disables split alignment in the
>> SourceOperator where the default is that split alignment is enabled. (And I
>> will add the note: "In future releases, the <knob> may be ignored such that
>> split alignment is always enabled.")
>>
>> Cheers,
>> Sebastian
>>
>> On Tue, Jul 12, 2022 at 11:14 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hi Sebastian,
>>>
>>> Thanks for picking this up.
>>>
>>> > 5. There is NO configuration option to enable watermark alignment of
>>> splits or disable pause/resume capabilities.
>>>
>>> Isn't this contradicting what we actually agreed on?
>>>
>>> > we are planning to have a configuration based way to revert to the
>>> previous behavior
>>>
>>> I think what we agreed in the last couple of emails was to add a
>>> configuration toggle, that would allow Flink 1.15 users, that are using
>>> watermark alignment with multiple splits per source operator, to continue
>>> using it with the old 1.15 semantic, even if their source doesn't support
>>> pausing/resuming splits. It seems to me like the current FLIP and
>>> implementation proposal would always throw an exception in that case?
>>>
>>> Best,
>>> Piotrek
>>>
>>> wt., 12 lip 2022 o 10:18 Sebastian Mattheis <sebast...@ververica.com>
>>> napisał(a):
>>>
>>> > Hi all,
>>> >
>>> > I have updated FLIP-217 [1] to the proposed specification and adapted
>>> the
>>> > current implementation [2] respectively.
>>> >
>>> > This means both, FLIP and implementation, are ready for review from my
>>> > side. (I would revise commit history and messages for the final PR but
>>> left
>>> > it as is for now and the records of this discussion.)
>>> >
>>> > 1. Please review the updated version of FLIP-217 [1]. If there are no
>>> > further concerns, I would initiate the voting.
>>> > (2. If you want to speed up things, please also have a look into the
>>> > updated implementation [2].)
>>> >
>>> > Please consider the following updated specification in the current
>>> status
>>> > of FLIP-217 where the essence is as follows:
>>> >
>>> > 1. A method pauseOrResumeSplits is added to SourceReader with default
>>> > implementation that throws UnsupportedOperationException.
>>> > 2.  method pauseOrResumeSplits is added to SplitReader with default
>>> > implementation that throws UnsupportedOperationException.
>>> > 3. SourceOperator initiates split alignment only if more than one
>>> split is
>>> > assigned to the source (and, of course, only if withSplitAlignment is
>>> used).
>>> > 4. There is NO "supportsPauseOrResumeSplits" method at any place (to
>>> > indicate if the implementation supports pause/resume capabilities).
>>> > 5. There is NO configuration option to enable watermark alignment of
>>> > splits or disable pause/resume capabilities.
>>> >
>>> > *Note:* If the SourceReader or some SplitReader do not override
>>> > pauseOrResumeSplits but it is required in the application, an
>>> exception is
>>> > thrown at runtime when an split alignment attempt is executed (not
>>> during
>>> > startup or any time earlier).
>>> >
>>> > Also, I have revised the compatibility/migration section to describe a
>>> bit
>>> > of a rationale for the default implementation with exception throwing
>>> > behavior.
>>> >
>>> > Regards,
>>> > Sebastian
>>> >
>>> > [1]
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
>>> > [2]
>>> https://github.com/smattheis/flink/tree/flip-217-split-wm-alignment
>>> >
>>> > On Mon, Jul 4, 2022 at 3:59 AM Thomas Weise <t...@apache.org> wrote:
>>> >
>>> >> Hi,
>>> >>
>>> >> Thank you Becket and Piotr for ironing out the "case 2" behavior.
>>> >> Strictly speaking we are introducing a regression by allowing an
>>> >> exception to bubble up that did not exist in the previous release,
>>> >> regardless how suboptimal the behavior may be. However, given that the
>>> >> feature is still experimental and we are planning to have a
>>> >> configuration based way to revert to the previous behavior, I think
>>> >> this is a good solution.
>>> >>
>>> >> +1 from my side
>>> >>
>>> >> Thomas
>>> >>
>>> >> On Wed, Jun 29, 2022 at 2:43 PM Piotr Nowojski <pnowoj...@apache.org>
>>> >> wrote:
>>> >> >
>>> >> > +1 :)
>>> >> >
>>> >> > śr., 29 cze 2022 o 17:23 Becket Qin <becket....@gmail.com>
>>> napisał(a):
>>> >> >
>>> >> > >  Thanks for the explanation, Piotr.
>>> >> > >
>>> >> > > So it looks like we have a conclusion here.
>>> >> > >
>>> >> > > 1. Regarding the supportsPausingSplits() method, I feel it brings
>>> more
>>> >> > > confusion while the benefit is marginal, so I prefer not having
>>> that
>>> >> if
>>> >> > > possible. It would be good to also hear @Thomas Weise <
>>> t...@apache.org
>>> >> >'s
>>> >> > > opinion as he mentioned some concern earlier.
>>> >> > > 2. Let's add the feature knob then. In the future we can simply
>>> >> ignore the
>>> >> > > configuration when deprecating it.
>>> >> > >
>>> >> > > Thanks,
>>> >> > >
>>> >> > > Jiangjie (Becket) Qin
>>> >> > >
>>> >> > > On Wed, Jun 29, 2022 at 10:19 PM Piotr Nowojski <
>>> pnowoj...@apache.org
>>> >> >
>>> >> > > wrote:
>>> >> > >
>>> >> > > > Hi,
>>> >> > > >
>>> >> > > > I mean I'm fine with throwing an exception by default in Flink
>>> 1.16
>>> >> in
>>> >> > > the
>>> >> > > > "Case 2", but I think we need to provide a way to workaround it
>>> for
>>> >> > > example
>>> >> > > > via a feature toggle, if it's an easy thing to do. And it seems
>>> to
>>> >> be a
>>> >> > > > simple thing.
>>> >> > > >
>>> >> > > > However this is orthogonal to the `supportsPausingSplits()`
>>> issue. I
>>> >> > > don't
>>> >> > > > have a big preference whether
>>> >> > > >   a) the exception should originate on JM, using `default
>>> boolean
>>> >> > > > supportsPausingSplits() { return false; }` (as currently
>>> proposed
>>> >> in the
>>> >> > > > FLIP),
>>> >> > > >   b) or on the TM from `pauseOrResumeSplits()` throwing
>>> >> > > > `UnsupportedOperationException` as you are proposing.
>>> >> > > >
>>> >> > > > a) fails earlier, so it's more user friendly from this
>>> perspective,
>>> >> but
>>> >> > > it
>>> >> > > > provides more possibilities for bugs/inconsistencies for
>>> connector
>>> >> > > > developers, since `supportsPausingSplits()` would have to be
>>> kept
>>> >> in sync
>>> >> > > > with `pauseOrResumeSplits()`.
>>> >> > > >
>>> >> > > > Best,
>>> >> > > > Piotrek
>>> >> > > >
>>> >> > > > śr., 29 cze 2022 o 15:27 Becket Qin <becket....@gmail.com>
>>> >> napisał(a):
>>> >> > > >
>>> >> > > > > Hi Piotr,
>>> >> > > > >
>>> >> > > > > Just to make sure we are on the same page. There are two cases
>>> >> for the
>>> >> > > > > existing FLIP-182 users:
>>> >> > > > >
>>> >> > > > > Case 1: Each source reader only has one split assigned. This
>>> is
>>> >> the
>>> >> > > > > targeted case for FLIP-182.
>>> >> > > > > Case 2: Each source reader has multiple splits assigned. This
>>> is
>>> >> the
>>> >> > > > flaky
>>> >> > > > > case that may or may not work.
>>> >> > > > >
>>> >> > > > > With solution 1, the users of case 1 won't be impacted. The
>>> users
>>> >> in
>>> >> > > > case 2
>>> >> > > > > will receive an exception which they won't get at the moment.
>>> >> > > > >
>>> >> > > > > Do you mean we should not throw an exception in case 2?
>>> >> Personally I
>>> >> > > feel
>>> >> > > > > that is OK and could have been done in FLIP-182 itself because
>>> >> it's
>>> >> > > not a
>>> >> > > > > designed use case. As a user I may see a big variation of the
>>> job
>>> >> state
>>> >> > > > > sizes from time to time and I am not able to rely on this
>>> feature
>>> >> to
>>> >> > > plan
>>> >> > > > > my resources and uphold the SLA.
>>> >> > > > >
>>> >> > > > > That said, if you have a strong opinion on this, I am fine
>>> with
>>> >> having
>>> >> > > > the
>>> >> > > > > configuration like "allow.coarse-grained.watermark.alignment"
>>> >> with the
>>> >> > > > > default value set to false, given that a configuration is much
>>> >> easier
>>> >> > > to
>>> >> > > > > deprecate than a method.
>>> >> > > > >
>>> >> > > > > Thanks,
>>> >> > > > >
>>> >> > > > > Jiangjie (Becket) Qin
>>> >> > > > >
>>> >> > > > >
>>> >>
>>> >
>>>
>>

Reply via email to