+dev

Hi Sebastian,

Thank you for the summary. Please see the detailed replies inline. As a
recap of my suggestions.

1. Pausable splits API.
  a) Add default implementations to methods "pauseOrResumeSplits" in both
SourceReader and SplitReader where both default implementations throw
 UnsupportedOperationException.

2. User story.
    a) We tell users to enable the watermark alignment as they like. This
is exactly what the current Flink API is.
    b) We tell the source developers, please implement pausable splits,
otherwise bad things may happen. Think of it like you are expected to
implement SourceReader#snapshotState() properly, otherwise exceptions will
be thrown when users enable checkpointing.

Thanks,

Jiangjie (Becket) Qin

On Wed, May 11, 2022 at 4:45 PM Sebastian Mattheis <sebast...@ververica.com>
wrote:

> Hi Becket, Hi everybody,
>
> I'm sorry if I misread the messages but I could not derive an agreement
> from the mailing list. Nevertheless, if I understand you right the
> suggestion is:
>
> * Add default implementations to methods "pauseOrResumeSplits" in both
> SourceReader and SplitReader where both default implementations throw
> UnsupportedOperationException.
>
Yes.

* Add "supportsPauseOrResumeSplits" to the Source interface. (In the
> following, I refer to supporting this as "pausable splits".)
>
We may no longer need this if pausable splits are expected to be
implemented by the source developers, i.e. non-optional. Having this method
would then be somewhat misleading as it looks like the sources that do not
support pausable splits are also acceptable in the long term. So API wise,
I'd say maybe we should remove this for this FLIP, although I believe this
supportXXX pattern itself is still attractive for optional features.


>
> To make the conclusions explicit:
>
> 1. The implementation of pauseOrResumeSplits in both interfaces
> SourceReader and SplitReader are optional where the default is that it
> doesn't support it. (--> This means that the implementation is still
> optional for the source developer.)
>
It is optional for backwards compatibility with existing sources, as they
may still compile without code change. But starting from this FLIP, Flink
will always optimistically assume that all the sources support pausable
splits. If a source does not support pausable splits, it goes to an error
handling path when watermark alignment is enabled on it. This is different
from a usual optional feature, where no error is expected.


> 2. If watermark alignment is enabled in the application code by adding
> withWatermarkAlignment to the WatermarkStrategy while SourceReader or
> SplitReader do not support pausableSplits, we throw an
> UnsupportedOperationException.
>
Yes.


> 3. With regard to your statement:
>
>> [...] basically means watermark alignment is an non-optional feature to
>> the end users.
>
> You actually mean that "pausable splits" are non-optional for the app
> developer if watermark alignment is enabled. However, watermark alignment
> is optional and can be enabled/disabled.
>
Yes, watermark alignment can be enabled/disabled in individual sources in
Flink jobs, which basically means the code supporting watermark alignment
has to already be there. That again means the Source developers are also
expected to support pausable splits by default. So this way we essentially
tell the end users that you may enable / disable this feature as you wish,
and tell the source developers that you SHOULD implement this because the
end users may turn it on/off at will. And if the source does not support
pausable splits, that goes to an error handling path when watermark
alignment is enabled on it. So users know they have to explicitly exclude
this source.


>
> So far it's totally clear to me and I hope this is what you mean. I also
> agree with both statements:
>
> So making that expectation aligned with the source developers seems
>> reasonable.
>>
>
> I think this is a simple and clean solution from both the end user and
>> source developers' standpoint.
>>
>
> However, a last conclusion derives from 3. and is an open question for me:
>
> 4. The feature of "pausable splits" is now tightly bound to watermark
> alignment, i.e., if sources do not support "pausable splits" one can not
> enable watermark alignment for these sources. This dependency is not the
> current status of watermark alignment implementation because it is/was
> implemented without pausable splits. Do we want to introduce this
> dependency? (This is an open question. I cannot judge that.)
>
The watermark alignment basically relies on the pausable splits, right? So
personally I found it quite reasonable that if the source does not support
pausable splits, end users cannot enable watermark alignment on it.


> If something is wrong, please correct me.
>
> Regards,
> Sebastian
>
> On Wed, May 11, 2022 at 9:05 AM Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Sebastian,
>>
>> Thanks for the reply and patient discussion. I agree this is a tricky
>> decision.
>>
>>
>>> Nevertheless, Piotr has valid concerns about Option c) which I see as
>>> follows:
>>> (1) An interface with default NOOP implementation makes the
>>> implementation optional. And in my opinion, a default implementation is and
>>> will remain a way of making implementation optional because even in future
>>> a developer can decide to implement the "old flavor" without support for
>>> pausable splits.
>>> (2) It may not be too critical but I also find it suboptimal that with a
>>> NOOP default implementation there is no way to check at runtime if
>>> SourceReader or SplitReader actually support pausing. (To do so, one would
>>> need a supportsX method which makes it again more complicated.)\
>>
>>
>> Based on the last few messages in the mailing list.  Piotr and I agreed
>> that the default implementation should just throw an
>> UnsupportedOperationException if the source is unpausable. So this
>> basically tells the Source developers that this feature is expected to be
>> supported. Because we cannot prevent end users from putting an unpausable
>> source into the watermark alignment group, that basically means watermark
>> alignment is an non-optional feature to the end users. So making that
>> expectation aligned with the source developers seems reasonable.  And if a
>> source does not support this feature, the end users should explicitly
>> remove that source from the watermark alignment group.
>>
>> Personally speaking I think this is a simple and clean solution from both
>> the end user and source developers' standpoint.
>>
>> Does this address your concerns?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Wed, May 11, 2022 at 2:52 PM Sebastian Mattheis <
>> sebast...@ververica.com> wrote:
>>
>>> Hi Piotr, Hi Becket, Hi everybody,
>>>
>>> we, Dawid and I, discussed the various suggestions/options and we would
>>> be okay either way because we find neither solution is perfect just because
>>> of the already present complexity.
>>>
>>> Option c) Adding methods to the interfaces of SourceReader and
>>> SplitReader
>>> Option a) Adding decorative interfaces to be used by SourceReader and
>>> SplitReader
>>>
>>> As of the current status (v. 12) of the FLIP [1], it is based on Option
>>> c) which we find acceptable because the complexity added is only a single
>>> method.
>>>
>>> Nevertheless, Piotr has valid concerns about Option c) which I see as
>>> follows:
>>> (1) An interface with default NOOP implementation makes the
>>> implementation optional. And in my opinion, a default implementation is and
>>> will remain a way of making implementation optional because even in future
>>> a developer can decide to implement the "old flavor" without support for
>>> pausable splits.
>>> (2) It may not be too critical but I also find it suboptimal that with a
>>> NOOP default implementation there is no way to check at runtime if
>>> SourceReader or SplitReader actually support pausing. (To do so, one would
>>> need a supportsX method which makes it again more complicated.)
>>>
>>> However, we haven't changed it because Option a) is also not optimal or
>>> straight-forward:
>>> (1) We need to add two distinct yet similar decorative interfaces since,
>>> as mentioned, the signatures of the methods are different. For example, we
>>> would need decorative interfaces like `SplitReaderWithPausableSplits` and
>>> `SourceReaderWithPausableSplits`.
>>> (2) As a consequence, we would need to somehow document how/where to
>>> implement both interfaces and how this relates to each other. This we could
>>> solve by adding a note in the interface of SourceReader and SplitReader and
>>> reference to the decorative interfaces but it still increases complexity
>>> too.
>>>
>>> In summary, we see both as acceptable and preferred over other options.
>>> The question is if we can find a solution or compromise that is acceptable
>>> for everybody to reach consensus.
>>>
>>> Please let us know what you think because we would be happy if we can
>>> conclude the discussion to avoid dropping the initiative on this FLIP.
>>>
>>> Regards,
>>> Sebastian
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199540438
>>> (v. 12)
>>>
>>> On Thu, May 5, 2022 at 10:13 AM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi Guowei,
>>>>
>>>> as Dawid wrote a couple of messages back:
>>>>
>>>> > This is covered in the previous FLIP[1] which has been already
>>>> implemented in 1.15. In short, it must be enabled with the watermark
>>>> strategy which also configures drift and update interval
>>>>
>>>> So by default watermark alignment is disabled, regardless if a source
>>>> supports it or not.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> czw., 5 maj 2022 o 09:56 Guowei Ma <guowei....@gmail.com> napisał(a):
>>>>
>>>>> Hi,
>>>>>
>>>>> We know that in the case of Bounded input Flink supports the Batch
>>>>> execution mode. Currently in Batch execution mode, flink is executed
>>>>> on a
>>>>> stage-by-stage basis. In this way, perhaps watermark alignment might
>>>>> not
>>>>> gain much.
>>>>>
>>>>> So my question is: Is watermark alignment the default behavior(for
>>>>> implemented source only)? If so, have you considered evaluating the
>>>>> impact
>>>>> of this behavior on the Batch execution mode? Or thinks it is not
>>>>> necessary.
>>>>>
>>>>> Correct me if I miss something.
>>>>>
>>>>> Best,
>>>>> Guowei
>>>>>
>>>>>
>>>>> On Thu, May 5, 2022 at 1:01 PM Piotr Nowojski <
>>>>> piotr.nowoj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi Becket and Dawid,
>>>>> >
>>>>> > > I feel that no matter which option we choose this can not be solved
>>>>> > entirely in either of the options, because of the point above and
>>>>> because
>>>>> > the signature of SplitReader#pauseOrResumeSplits and
>>>>> > SourceReader#pauseOrResumeSplits are slightly different (one
>>>>> identifies
>>>>> > splits with splitId the other one passes the splits directly).
>>>>> >
>>>>> > Yes, that's a good point in this case and for features that need to
>>>>> be
>>>>> > implemented in more than one place.
>>>>> >
>>>>> > > Is there any reason for pausing reading from a split an optional
>>>>> feature,
>>>>> > > other than that this was not included in the original interface?
>>>>> >
>>>>> > An additional argument in favor of making it optional is to simplify
>>>>> source
>>>>> > implementation. But on its own I'm not sure if that would be enough
>>>>> to
>>>>> > justify making this feature optional. Maybe.
>>>>> >
>>>>> > > I think it would be way simpler and clearer to just let end users
>>>>> and
>>>>> > Flink
>>>>> > > assume all the connectors will implement this feature.
>>>>> >
>>>>> > As I wrote above that would be an interesting choice to make (ease of
>>>>> > implementation for new users, vs system consistency). Regardless of
>>>>> that,
>>>>> > yes, for me the main argument is the API backward compatibility. But
>>>>> let's
>>>>> > clear a couple of points:
>>>>> > - The current proposal adding methods to the base interface with
>>>>> default
>>>>> > implementations is an OPTIONAL feature. Same as the decorative
>>>>> version
>>>>> > would be.
>>>>> > - Decorative version could implement "throw
>>>>> UnsupportedOperationException"
>>>>> > if user enabled watermark alignment just as well and I agree that's a
>>>>> > better option compared to logging a warning.
>>>>> >
>>>>> > Best,
>>>>> > Piotrek
>>>>> >
>>>>> >
>>>>> > śr., 4 maj 2022 o 15:40 Becket Qin <becket....@gmail.com>
>>>>> napisał(a):
>>>>> >
>>>>> > > Thanks for the reply and patient discussion, Piotr and Dawid.
>>>>> > >
>>>>> > > Is there any reason for pausing reading from a split an optional
>>>>> feature,
>>>>> > > other than that this was not included in the original interface?
>>>>> > >
>>>>> > > To be honest I am really worried about the complexity of the user
>>>>> story
>>>>> > > here. Optional features like this have a high overhead. Imagine
>>>>> this
>>>>> > > feature is optional, now a user enabled watermark alignment and
>>>>> defined a
>>>>> > > few watermark groups. Would it work? Hmm, that depends on whether
>>>>> the
>>>>> > > involved Source has implmemented this feature. If the Sources are
>>>>> well
>>>>> > > documented, good luck. Otherwise end users may have to look into
>>>>> the code
>>>>> > > of the Source to see whether the feature is supported. Which is
>>>>> something
>>>>> > > they shouldn't have to do.
>>>>> > >
>>>>> > > I think it would be way simpler and clearer to just let end users
>>>>> and
>>>>> > Flink
>>>>> > > assume all the connectors will implement this feature. After all
>>>>> the
>>>>> > > watermark group is not optinoal to the end users. If in some rare
>>>>> cases,
>>>>> > > the feature cannot be supported, a clear
>>>>> UnsupportedOperationException
>>>>> > will
>>>>> > > be thrown to tell users to explicitly remove this Source from the
>>>>> > watermark
>>>>> > > group. I don't think we should have a warning message here, as
>>>>> they tend
>>>>> > to
>>>>> > > be ignored in many cases. If we do this, we don't even need the
>>>>> > supportXXX
>>>>> > > method in the Source for this feature. In fact this is exactly how
>>>>> many
>>>>> > > interfaces works today. For example,
>>>>> SplitEnumerator#addSplitsBack() is
>>>>> > not
>>>>> > > supported by Pravega source because it does not support partial
>>>>> failover.
>>>>> > > In that case, it simply throws an exception to trigger a global
>>>>> recovery.
>>>>> > >
>>>>> > > The reason we add a default implementation in this case would just
>>>>> for
>>>>> > the
>>>>> > > sake of backwards compatibility so the old source can still
>>>>> compile.
>>>>> > Sure,
>>>>> > > in short term, this feature might not be supported by many existing
>>>>> > > sources. That is OK, and it is quite visible to the source
>>>>> developers
>>>>> > that
>>>>> > > they did not override the default impl which throws an
>>>>> > > UnsupportedOperationException.
>>>>> > >
>>>>> > > @Dawid,
>>>>> > >
>>>>> > > the Java doc of the SupportXXX() method in the Source would be the
>>>>> single
>>>>> > > >> source of truth regarding how to implement this feature.
>>>>> > > >
>>>>> > > >
>>>>> > >
>>>>> > > I also don't find it entirely true. Half of the classes are
>>>>> theoretically
>>>>> > > > optional and are utility classes from the point of view how the
>>>>> > > interfaces
>>>>> > > > are organized. Theoretically users do not need to use any of
>>>>> > > > SourceReaderBase & SplitReader. Would be weird to list their
>>>>> methods in
>>>>> > > the
>>>>> > > > Source interface.
>>>>> > >
>>>>> > > I think the ultimate goal of java docs is to guide users to
>>>>> implement the
>>>>> > > Source. If SourceReaderBase is the preferred way to implement a
>>>>> > > SourceReader, it seems worth mentioning that. Even the Java
>>>>> language
>>>>> > > documentation interfaces lists the konwn implementations [1] so
>>>>> people
>>>>> > can
>>>>> > > leverage them. But for this particular case, if we make the feature
>>>>> > > non-optional, we don't even need the supportXXX() method for now.
>>>>> > >
>>>>> > > Thanks,
>>>>> > >
>>>>> > > Jiangjie (Becket) Qin
>>>>> > >
>>>>> > >
>>>>> > >
>>>>> > > On Wed, May 4, 2022 at 4:37 PM Dawid Wysakowicz <
>>>>> dwysakow...@apache.org>
>>>>> > > wrote:
>>>>> > >
>>>>> > > > Hey Piotr and Becket,
>>>>> > > >
>>>>> > > > First of all, let me say I am happy with whichever option is
>>>>> agreed in
>>>>> > > the
>>>>> > > > discussion.
>>>>> > > >
>>>>> > > > I wanted to clarify a few points from the discussion though:
>>>>> > > >
>>>>> > > > @Becket:
>>>>> > > >
>>>>> > > > The main argument for adding the methods to the SourceReader is
>>>>> that
>>>>> > > these
>>>>> > > > methods are effectively NON-OPTIONAL to SourceReader impl, i.e.
>>>>> > starting
>>>>> > > > from this FLIP, all the SourceReaders impl are expected to
>>>>> support this
>>>>> > > > method, although some old implementations may not have
>>>>> implemented this
>>>>> > > > feature. I think we should distinguish the new features from the
>>>>> > optional
>>>>> > > > features. While the public decorative interface is a solution to
>>>>> the
>>>>> > > > optional features. We should not use it for the features that are
>>>>> > > > non-optional.
>>>>> > > >
>>>>> > > > I don't think that this feature is NON-OPTIONAL. Even though
>>>>> > preferred, I
>>>>> > > > still think it can be simply optional.
>>>>> > > >
>>>>> > > > the Java doc of the SupportXXX() method in the Source would be
>>>>> the
>>>>> > single
>>>>> > > > source of truth regarding how to implement this feature.
>>>>> > > >
>>>>> > > > I also don't find it entirely true. Half of the classes are
>>>>> > theoretically
>>>>> > > > optional and are utility classes from the point of view how the
>>>>> > > interfaces
>>>>> > > > are organized. Theoretically users do not need to use any of
>>>>> > > > SourceReaderBase & SplitReader. Would be weird to list their
>>>>> methods in
>>>>> > > the
>>>>> > > > Source interface.
>>>>> > > >
>>>>> > > > @Piotr
>>>>> > > >
>>>>> > > > If we have all of the methods with default implementation in the
>>>>> base
>>>>> > > > interface, the API doesn't give any clue to the user which set of
>>>>> > methods
>>>>> > > > are required to be implemented at the same time.
>>>>> > > >
>>>>> > > > I feel that no matter which option we choose this can not be
>>>>> solved
>>>>> > > > entirely in either of the options, because of the point above and
>>>>> > because
>>>>> > > > the signature of SplitReader#pauseOrResumeSplits and
>>>>> > > > SourceReader#pauseOrResumeSplits are slightly different (one
>>>>> identifies
>>>>> > > > splits with splitId the other one passes the splits directly).
>>>>> > > >
>>>>> > > > Best,
>>>>> > > >
>>>>> > > > Dawid
>>>>> > > > On 03/05/2022 14:30, Becket Qin wrote:
>>>>> > > >
>>>>> > > > Hi Piotr,
>>>>> > > >
>>>>> > > > Thanks for the comment.
>>>>> > > >
>>>>> > > > Just to clarify, I am not against the decorative interfaces, but
>>>>> I do
>>>>> > > > think we should use them with caution. The main argument for
>>>>> adding the
>>>>> > > > methods to the SourceReader is that these methods are
>>>>> > > > effectively NON-OPTIONAL to SourceReader impl, i.e. starting
>>>>> from this
>>>>> > > > FLIP, all the SourceReaders impl are expected to support this
>>>>> > > > method, although some old implementations may not have
>>>>> implemented this
>>>>> > > > feature. I think we should distinguish the new features from the
>>>>> > optional
>>>>> > > > features. While the public decorative interface is a solution to
>>>>> the
>>>>> > > > optional features. We should not use it for the features that are
>>>>> > > > non-optional.
>>>>> > > >
>>>>> > > > That said, this feature is optional for SplitReaders. Arguably
>>>>> we can
>>>>> > > have
>>>>> > > > a decorative interface for that, but for simplicity and symmetry
>>>>> of the
>>>>> > > > interface, personally I prefer just adding a new method.
>>>>> > > >
>>>>> > > > Regarding the advantages you mentioned about the decorative
>>>>> interfaces,
>>>>> > > > they would make sense if:
>>>>> > > > 1. The feature is optional.
>>>>> > > > 2. There is only one decorative interface involved for a feature.
>>>>> > > > Otherwise the argument that all the methods are grouped together
>>>>> will
>>>>> > not
>>>>> > > > stand.
>>>>> > > >
>>>>> > > > Compared with that, I think the current solution works fine in
>>>>> all
>>>>> > cases,
>>>>> > > > i.e. "having supportXXX() method in Source, and default methods /
>>>>> > > > decorative interfaces in base interfaces.".
>>>>> > > >
>>>>> > > > The advantages are:
>>>>> > > >> - clean and easy to implement base interface
>>>>> > > >
>>>>> > > > In the current approach, the Java doc of the SupportXXX() method
>>>>> in the
>>>>> > > > Source would be the single source of truth regarding how to
>>>>> implement
>>>>> > > this
>>>>> > > > feature. It lists the method that has to be implemented to
>>>>> support this
>>>>> > > > feature, regardless of how many classes / interfaces are
>>>>> involved.
>>>>> > > >
>>>>> > > > When implementing the base interface, users do not need to
>>>>> implement a
>>>>> > > > method with default implementation. If they are curious what the
>>>>> method
>>>>> > > is
>>>>> > > > for, the java doc of that method simply points users to the
>>>>> > SupportXXX()
>>>>> > > > method in the Source. It seems not adding work to the users
>>>>> compared
>>>>> > with
>>>>> > > > decorative interfaces, but gives much better discoverability.
>>>>> > > >
>>>>> > > > - all of the methods from a single feature are grouped in a
>>>>> single
>>>>> > > >> decorator interface, together with their dedicated java doc.
>>>>> It's also
>>>>> > > >> easier to google search for help using the decorator name
>>>>> > > >
>>>>> > > > - if an optional feature requires two methods to be implemented
>>>>> at
>>>>> > once,
>>>>> > > >> decorator can guarantee that
>>>>> > > >
>>>>> > > > These two points are not true when multiple components and
>>>>> classes are
>>>>> > > > involved collaboratively to provide a feature. In our case, we
>>>>> have
>>>>> > both
>>>>> > > > SourceReader and SplitReader involved. And there might be other
>>>>> > > interfaces
>>>>> > > > on the JM side involved for some future features. So the relevant
>>>>> > methods
>>>>> > > > can actually be scattered over the places. That said, we may
>>>>> still use
>>>>> > > > decorative interfaces for each component, if the feature is
>>>>> optional,
>>>>> > > given
>>>>> > > > there is a single source of truth for the feature.
>>>>> > > >
>>>>> > > > Here I would strongly lean towards making life easier for new
>>>>> users,
>>>>> > > >> lowering the entry barrier, at the (imo) slight expense for the
>>>>> power
>>>>> > > >> users.
>>>>> > > >
>>>>> > > > I actually think the current approach is simpler, more
>>>>> extensible and
>>>>> > > more
>>>>> > > > general for all the users. Can you articulate a bit more on
>>>>> which part
>>>>> > > you
>>>>> > > > think makes users harder to understand?
>>>>> > > >
>>>>> > > >
>>>>> > > > There is another benefit of the decorative interfaces which is
>>>>> not
>>>>> > > > mentioned, but might be worth considering here. Usually the
>>>>> decorative
>>>>> > > > interfaces give slightly better backwards compatibility than the
>>>>> new
>>>>> > > > default method in the interfaces. That is when users are using a
>>>>> jar
>>>>> > that
>>>>> > > > was compiled with an older version of Flink which does not have
>>>>> the
>>>>> > > default
>>>>> > > > method in the interfaces in question. A decorative interface may
>>>>> still
>>>>> > > > provide backwards compatibility in that case, while default
>>>>> method impl
>>>>> > > > cannot.
>>>>> > > >
>>>>> > > > I think in Flink we in general do not guarantee custom components
>>>>> > > compiled
>>>>> > > > with an older version can run with a newer version of Flink. A
>>>>> > recompile
>>>>> > > > with a newer version would be required. That said, if we do care
>>>>> about
>>>>> > > > this, we can just change the "supportXXX()" method in the Source
>>>>> > > interface
>>>>> > > > to use decorative interfaces, and leave the other parts
>>>>> unchanged.
>>>>> > > >
>>>>> > > > Thanks,
>>>>> > > >
>>>>> > > > Jiangjie (Becket) Qin
>>>>> > > >
>>>>> > > >
>>>>> > > >
>>>>> > > >
>>>>> > > > On Tue, May 3, 2022 at 6:25 PM Piotr Nowojski <
>>>>> pnowoj...@apache.org>
>>>>> > > > wrote:
>>>>> > > >
>>>>> > > >> Hi,
>>>>> > > >>
>>>>> > > >> Sorry for chipping in so late, but I was OoO for the last two
>>>>> weeks.
>>>>> > > >> Regarding the interfaces, I would be actually against adding
>>>>> those
>>>>> > > methods
>>>>> > > >> to the base interfaces for the reasons mentioned above.
>>>>> Clogging the
>>>>> > > base
>>>>> > > >> interface for new users with tons of methods that they do not
>>>>> need, do
>>>>> > > not
>>>>> > > >> understand and do not know what to do with them. Moreover, such
>>>>> > > decorative
>>>>> > > >> interfaces are solving a problem if a feature requires two or
>>>>> more
>>>>> > > methods
>>>>> > > >> to be implemented at the same time. If we have all of the
>>>>> methods with
>>>>> > > >> default implementation in the base interface, the API doesn't
>>>>> give any
>>>>> > > >> clue
>>>>> > > >> to the user which set of methods are required to be implemented
>>>>> at the
>>>>> > > >> same
>>>>> > > >> time.
>>>>> > > >>
>>>>> > > >> > a) I feel the biggest drawback of decorative interfaces is
>>>>> which
>>>>> > > >> interface
>>>>> > > >> > they can decorate and which combinations of multiple
>>>>> decorative
>>>>> > > >> interfaces
>>>>> > > >> > are valid. (...)
>>>>> > > >> > In the future, if there is a new feature added
>>>>> > > >> > (e.g. sorted or pre-partitioned data aware), are we going to
>>>>> create
>>>>> > > >> another
>>>>> > > >> > interface of SplitReader such as SortedSplitReader or
>>>>> > > >> PrePartitionedAware?
>>>>> > > >> > Can they be combined? So I think the additional decorative
>>>>> interface
>>>>> > > >> like
>>>>> > > >> > withSplitsAlignment actually increases the understanding cost
>>>>> of
>>>>> > users
>>>>> > > >> > because they have to know what decorative interfaces are
>>>>> there,
>>>>> > which
>>>>> > > >> > interface they can decorate and which combinations of the
>>>>> decorative
>>>>> > > >> > interfaces are valid and which are not. Ideally we want to
>>>>> avoid
>>>>> > that.
>>>>> > > >>
>>>>> > > >> I'm not sure if I understand how embedding default methods in
>>>>> the base
>>>>> > > >> interface is solving the problem: what can be combined or not?
>>>>> If
>>>>> > there
>>>>> > > >> are
>>>>> > > >> two conflicting features, having decorative interfaces that can
>>>>> not be
>>>>> > > >> mixed together actually makes much more sense to me rather than
>>>>> having
>>>>> > > >> them
>>>>> > > >> all in one base class. How would you allow users to implement
>>>>> only one
>>>>> > > of
>>>>> > > >> those two features?
>>>>> > > >>
>>>>> > > >> To reiterate on the issue. Yes, there are drawbacks:
>>>>> > > >> - how can a user discover what decorators are there?
>>>>> > > >> - how can a user know where the decorator can be applied to?
>>>>> > > >>
>>>>> > > >> However those are drawbacks for more power users, that can be
>>>>> > mitigated
>>>>> > > by
>>>>> > > >> the documentation. For example listing all of the decorators
>>>>> with
>>>>> > > >> detailed explanation both in the docs and in the java docs. More
>>>>> > > >> experienced users will be able to deal with those issues
>>>>> easier, as
>>>>> > they
>>>>> > > >> will already have some basic understanding of Flink. Also if
>>>>> user has
>>>>> > a
>>>>> > > >> problem that he wants to solve, he will google search a
>>>>> potential
>>>>> > > solution
>>>>> > > >> to his problem anyway, and while doing that he is very likely to
>>>>> > > discover
>>>>> > > >> the decorator that he needs anyway in the docs.
>>>>> > > >>
>>>>> > > >> The advantages are:
>>>>> > > >> - clean and easy to implement base interface
>>>>> > > >> - all of the methods from a single feature are grouped in a
>>>>> single
>>>>> > > >> decorator interface, together with their dedicated java doc.
>>>>> It's also
>>>>> > > >> easier to google search for help using the decorator name
>>>>> > > >> - if an optional feature requires two methods to be implemented
>>>>> at
>>>>> > once,
>>>>> > > >> decorator can guarantee that
>>>>> > > >>
>>>>> > > >> Here I would strongly lean towards making life easier for new
>>>>> users,
>>>>> > > >> lowering the entry barrier, at the (imo) slight expense for the
>>>>> power
>>>>> > > >> users.
>>>>> > > >>
>>>>> > > >> Best,
>>>>> > > >> Piotrek
>>>>> > > >>
>>>>> > > >>
>>>>> > > >> wt., 26 kwi 2022 o 15:32 Becket Qin <becket....@gmail.com>
>>>>> > napisał(a):
>>>>> > > >>
>>>>> > > >> > Thanks for the reply Sebastian and Dawid,
>>>>> > > >> >
>>>>> > > >> > I think Sebastion has a good summary. This is a really helpful
>>>>> > > >> discussion.
>>>>> > > >> >
>>>>> > > >> > Thinking a bit more, I feel that it might still be better to
>>>>> add the
>>>>> > > >> > supportsXXX() method in the Source rather than SourceReader.
>>>>> > > >> >
>>>>> > > >> > Generally speaking, what we are trying to do here is to let
>>>>> the
>>>>> > Flink
>>>>> > > >> > framework know what the Source is capable of. In this FLIP, it
>>>>> > happens
>>>>> > > >> to
>>>>> > > >> > be the capability that only involves SourceReader. But in the
>>>>> > future,
>>>>> > > >> it is
>>>>> > > >> > possible that another functionality involves both the
>>>>> > SplitEnumerator
>>>>> > > >> and
>>>>> > > >> > SourceReader. In that case, following the current approach, we
>>>>> > should
>>>>> > > >> put
>>>>> > > >> > the "supportsXXX()" method in both SplitEnumerator and
>>>>> SourceReader.
>>>>> > > >> > Because if we only put this in the SourceReader, then the JM
>>>>> would
>>>>> > > have
>>>>> > > >> to
>>>>> > > >> > create a SourceReader in order to know whether this feature is
>>>>> > > >> supported,
>>>>> > > >> > which is a little ugly. But if we put the "supportsXXX()"
>>>>> method in
>>>>> > > the
>>>>> > > >> > Source, we will break the "symmetric" design because this FLIP
>>>>> > chose a
>>>>> > > >> > different way.
>>>>> > > >> >
>>>>> > > >> > This is also why I think supportsXXX() method seems a good
>>>>> thing to
>>>>> > > >> have,
>>>>> > > >> > because when there are a few interfaces / methods that are
>>>>> expected
>>>>> > to
>>>>> > > >> be
>>>>> > > >> > implemented at the same time in order to deliver a feature,
>>>>> it is
>>>>> > > always
>>>>> > > >> > good to have a single source of truth to tell the framework
>>>>> what to
>>>>> > > do,
>>>>> > > >> so
>>>>> > > >> > the framework can do consistent things in different parts.
>>>>> > > >> >
>>>>> > > >> > @Sebastian Mattheis <sebast...@ververica.com>
>>>>> > > >> >
>>>>> > > >> > Regarding interface flavor b), i.e. AlignedSourceReader +
>>>>> > > >> > AlignedSplitReader, what I feel awkward about is that we are
>>>>> > > essentially
>>>>> > > >> > expecting almost all the SourceReader implementations to
>>>>> extend
>>>>> > > >> > SourceReaderBase, which effectively makes the SourceReader
>>>>> interface
>>>>> > > >> > without the pausing support useless. So this indicates that
>>>>> public
>>>>> > > >> > decorative interfaces (or sub-interfaces for the same
>>>>> purpose) only
>>>>> > > >> > make sense if the original interface is also expected to be
>>>>> used.
>>>>> > > >> > Otherwise, it seems makes more sense to add the method to the
>>>>> > original
>>>>> > > >> > interface itself.
>>>>> > > >> >
>>>>> > > >> > Cheers,
>>>>> > > >> >
>>>>> > > >> > Jiangjie (Becket) Qin
>>>>> > > >> >
>>>>> > > >> >
>>>>> > > >> >
>>>>> > > >> >
>>>>> > > >> > On Tue, Apr 26, 2022 at 6:05 PM Dawid Wysakowicz <
>>>>> > > >> dwysakow...@apache.org>
>>>>> > > >> > wrote:
>>>>> > > >> >
>>>>> > > >> > > Thanks @Sebastian for the nice summary.
>>>>> > > >> > >
>>>>> > > >> > > I think most of your points aligned with the suggestions I
>>>>> made to
>>>>> > > the
>>>>> > > >> > > FLIP, while you were writing your reply (I believe we hit
>>>>> enter
>>>>> > > >> nearly at
>>>>> > > >> > > the same time ;) )
>>>>> > > >> > >
>>>>> > > >> > > Two points after we synced offline
>>>>> > > >> > >
>>>>> > > >> > > 1. I changed also the supportsWatermarksSplitAlignment to
>>>>> > > >> > > supportsPausingSplits to express the general capability of
>>>>> > pausing.
>>>>> > > >> > >
>>>>> > > >> > > 2. As for if we should
>>>>> PausingSourceReader/PausingSplitReader
>>>>> > > (option
>>>>> > > >> b)
>>>>> > > >> > > or if we should just add the methods (option c), I suggest
>>>>> to
>>>>> > simply
>>>>> > > >> add
>>>>> > > >> > > the two methods as I felt this is much preferred approach
>>>>> Becket,
>>>>> > > >> which
>>>>> > > >> > > others do not object. Unless there is an opposition let's
>>>>> go with
>>>>> > > this
>>>>> > > >> > > option c.
>>>>> > > >> > >
>>>>> > > >> > > Best,
>>>>> > > >> > >
>>>>> > > >> > > Dawid
>>>>> > > >> > > On 26/04/2022 10:06, Sebastian Mattheis wrote:
>>>>> > > >> > >
>>>>> > > >> > > Hi folks,
>>>>> > > >> > >
>>>>> > > >> > > Sorry for being a bit silent. Many thanks for all the input
>>>>> and
>>>>> > > >> > > suggestions. As I'm a bit new, I needed some time to catch
>>>>> up and
>>>>> > > >> > structure
>>>>> > > >> > > (for myself) the discussion and I wanted to find a way to
>>>>> > structure
>>>>> > > >> the
>>>>> > > >> > > conclusions. (Also because I had the feeling that some
>>>>> concerns
>>>>> > got
>>>>> > > >> lost
>>>>> > > >> > in
>>>>> > > >> > > the discussion.) This is my attempt and please correct me if
>>>>> > > >> something is
>>>>> > > >> > > wrong or misunderstood. I tried to collect and assemble the
>>>>> > > opinions,
>>>>> > > >> > > suggestions, and conclusions (to the best of my knowledge):
>>>>> > > >> > >
>>>>> > > >> > > # Top A: Should split alignment (pause/resume behavior) be a
>>>>> > general
>>>>> > > >> > > capability?
>>>>> > > >> > >
>>>>> > > >> > > I personally don't see any reason no to have it a general
>>>>> > capability
>>>>> > > >> > > because for the alignSplit method it is actually
>>>>> independent of
>>>>> > the
>>>>> > > >> > > watermarks. If we agree here to have it a general
>>>>> capability, we
>>>>> > > >> should
>>>>> > > >> > > also agree on the right wording. Does
>>>>> "alignSplits(splitsToResume,
>>>>> > > >> > > splitsToPause)" refer to what is then actually meant? (I
>>>>> see it as
>>>>> > > >> okay.
>>>>> > > >> > I
>>>>> > > >> > > don't have any better idea whilst Arvid suggested
>>>>> > > >> "pauseOrResumeSplits".)
>>>>> > > >> > >
>>>>> > > >> > > # Top B: Should it be possible do enable/disable split
>>>>> alignment?
>>>>> > > >> > >
>>>>> > > >> > > I would personally not disable the split alignment on the
>>>>> source
>>>>> > > >> reader
>>>>> > > >> > > side because if split alignment is used for some other use
>>>>> case
>>>>> > (see
>>>>> > > >> A)
>>>>> > > >> > it
>>>>> > > >> > > could have nasty side effects on other/future use cases.
>>>>> Instead,
>>>>> > I
>>>>> > > >> would
>>>>> > > >> > > disable "watermark split alignment" where I think it should
>>>>> > disable
>>>>> > > >> the
>>>>> > > >> > > watermark-dependent trigger for split alignment.
>>>>> > > >> > >
>>>>> > > >> > > # Top C: Should we add a supportsX method?
>>>>> > > >> > >
>>>>> > > >> > > I find it difficult to define the scope of a supportsX
>>>>> method
>>>>> > w.r.t.
>>>>> > > >> to
>>>>> > > >> > > the following questions: a) Where is it used? and b) What
>>>>> is the
>>>>> > > >> expected
>>>>> > > >> > > output? To b), it's not straight-forward to provide a
>>>>> meaningful
>>>>> > > >> output,
>>>>> > > >> > > e.g., if SourceReader supports split alignment but
>>>>> SplitReader
>>>>> > not.
>>>>> > > >> This
>>>>> > > >> > is
>>>>> > > >> > > because with the current implementation, we can determine
>>>>> whether
>>>>> > > >> split
>>>>> > > >> > > alignment is fully supported only during runtime and
>>>>> specifically
>>>>> > > >> > actually
>>>>> > > >> > > only when calling alignSplits down the call hierarchy up to
>>>>> the
>>>>> > > actual
>>>>> > > >> > > SplitReaders.
>>>>> > > >> > >
>>>>> > > >> > > Therefore, I would suggest to either raise an error or
>>>>> warning if
>>>>> > > the
>>>>> > > >> > > alignment is called but not supported at some point. I know
>>>>> we
>>>>> > > should
>>>>> > > >> > > carefully think about when this could be the case because
>>>>> we don't
>>>>> > > >> want
>>>>> > > >> > to
>>>>> > > >> > > flood anybody with such warnings. However, warnings could
>>>>> be an
>>>>> > > >> indicator
>>>>> > > >> > > for the user that for watermark split alignment use case
>>>>> split
>>>>> > > >> reading is
>>>>> > > >> > > imbalanced with the conclusion to either disable the
>>>>> trigger for
>>>>> > > >> > watermark
>>>>> > > >> > > split alignment (see Top B) or to use/implement a source and
>>>>> > reader
>>>>> > > >> that
>>>>> > > >> > > fully supports split alignment.
>>>>> > > >> > >
>>>>> > > >> > > # Top D: How to design interfaces?
>>>>> > > >> > >
>>>>> > > >> > > Thanks for structuring the discussion with the the various
>>>>> > > >> possibilities
>>>>> > > >> > > (a-d). From the discussion and emails, I would like to
>>>>> summarize
>>>>> > the
>>>>> > > >> > > following requirements:
>>>>> > > >> > > - Interfaces should be consistent ("symmetric"), i.e.,
>>>>> similar
>>>>> > > >> semantics
>>>>> > > >> > > should have similar interfaces with similar usage.
>>>>> > > >> > > - Make explicit which implementations implement
>>>>> interfaces/support
>>>>> > > >> > > behavior.
>>>>> > > >> > > - Make clear what are default implementations and how to
>>>>> implement
>>>>> > > >> > > interfaces with desired behavior.
>>>>> > > >> > >
>>>>> > > >> > > This is a simplified view of the relations between relevant
>>>>> > classes
>>>>> > > of
>>>>> > > >> > the
>>>>> > > >> > > PoC implementation:
>>>>> > > >> > >
>>>>> > > >> > > SourceReader (Public) <|-- SourceReaderBase (Internal) <|--
>>>>> ..
>>>>> > <|--
>>>>> > > >> > > MySourceReader
>>>>> > > >> > >
>>>>> > > >> > > MySourceReader <>-- SplitFetcherManager (Internal) <>--
>>>>> > SplitFetcher
>>>>> > > >> > > (Internal) <>-- SplitReader (Public) <|-- MySplitReader
>>>>> > > >> > >
>>>>> > > >> > > (A <|-- B: B inherits from A; A <>-- B: A "has a" B)
>>>>> > > >> > >
>>>>> > > >> > > Note that SourceReaderBase and SplitFetcherManager
>>>>> implement most
>>>>> > of
>>>>> > > >> the
>>>>> > > >> > > "logic" for split alignment just because we wanted to
>>>>> implement
>>>>> > > split
>>>>> > > >> > > alignment and wanted it to be available as kind of a
>>>>> default. As a
>>>>> > > >> > > consequence, we have a "default implementation" for
>>>>> SourceReader
>>>>> > > that
>>>>> > > >> > > implements the actual logic for split alignment. For that
>>>>> reason,
>>>>> > I
>>>>> > > >> find
>>>>> > > >> > it
>>>>> > > >> > > very confusing to have a NOOP default implementation in the
>>>>> > > interface
>>>>> > > >> for
>>>>> > > >> > > the SourceReader. As a consequence, interface strategy c) is
>>>>> > > difficult
>>>>> > > >> > > because this would require NOOP default implementations in
>>>>> the
>>>>> > > public
>>>>> > > >> > > interfaces of SourceReader and SplitReader. This is the
>>>>> same for
>>>>> > > >> strategy
>>>>> > > >> > > d) because it would require NOOP default implementation in
>>>>> the
>>>>> > > >> > > SourceReader. Further, as Dawid described method signatures
>>>>> of
>>>>> > > >> alignSplit
>>>>> > > >> > > for SourceReader and SplitReader differ and it would be
>>>>> extremely
>>>>> > > >> > difficult
>>>>> > > >> > > to make the signatures the same (with even potential
>>>>> performance
>>>>> > > >> impact
>>>>> > > >> > > because of additional loop-ups of split ids). Therefore,
>>>>> having a
>>>>> > > >> > symmetric
>>>>> > > >> > > decorative interface as of strategy a) is actually not
>>>>> possible
>>>>> > and
>>>>> > > >> > having
>>>>> > > >> > > two decorative interfaces with different method signatures
>>>>> is
>>>>> > > >> confusing.
>>>>> > > >> > My
>>>>> > > >> > > conclusion is that we are best with strategy b) which means
>>>>> to
>>>>> > have
>>>>> > > >> > > specializing sub-interfaces that inherit from the parent
>>>>> > interface:
>>>>> > > >> > > SourceReader <|-- AlignedSourceReader, SplitReader <|--
>>>>> > > >> > AlignedSplitReader
>>>>> > > >> > > With this option, I'm not 100% sure what the implications
>>>>> are and
>>>>> > if
>>>>> > > >> this
>>>>> > > >> > > could get nasty. I would suggest that Dawid and I just try
>>>>> to
>>>>> > > >> implement
>>>>> > > >> > and
>>>>> > > >> > > see if we like it. :)
>>>>> > > >> > >
>>>>> > > >> > > # Summary
>>>>> > > >> > >
>>>>> > > >> > > In conclusion, please let me know your perspectives. Please
>>>>> > correct
>>>>> > > >> me,
>>>>> > > >> > if
>>>>> > > >> > > something is wrong or if I misunderstood something. My
>>>>> perspective
>>>>> > > >> would
>>>>> > > >> > be:
>>>>> > > >> > >
>>>>> > > >> > > Top A: Yes
>>>>> > > >> > > Top B: Yes (but disable watermark trigger for split
>>>>> alignment)
>>>>> > > >> > > Top C: No
>>>>> > > >> > > Top D: b)
>>>>> > > >> > >
>>>>> > > >> > > Best,
>>>>> > > >> > > Sebastian
>>>>> > > >> > >
>>>>> > > >> > > On Tue, Apr 26, 2022 at 9:55 AM Dawid Wysakowicz <
>>>>> > > >> dwysakow...@apache.org
>>>>> > > >> > >
>>>>> > > >> > > wrote:
>>>>> > > >> > >
>>>>> > > >> > >> @Arvid:
>>>>> > > >> > >>
>>>>> > > >> > >> While I also like Becket's capability approach, I fear
>>>>> that it
>>>>> > > >> doesn't
>>>>> > > >> > work
>>>>> > > >> > >> for this particular use case: Sources can always be aligned
>>>>> > > >> cross-task
>>>>> > > >> > and
>>>>> > > >> > >> this is just about intra-task alignment. So it's plausible
>>>>> to put
>>>>> > > >> > sources
>>>>> > > >> > >> into an alignment group even though they do not use any of
>>>>> the
>>>>> > > >> presented
>>>>> > > >> > >> API of FLIP-217. They should just issue a warning, if they
>>>>> handle
>>>>> > > >> > multiple
>>>>> > > >> > >> splits (see motivation section).
>>>>> > > >> > >>
>>>>> > > >> > >> Yes, but the "supportXXX" method would be for telling if it
>>>>> > > supports
>>>>> > > >> > that
>>>>> > > >> > >> intra-task alignment. Cross-task alignment would always be
>>>>> > > supported.
>>>>> > > >> > >>
>>>>> > > >> > >> I updated interfaces to what I believe to be closest to a
>>>>> > consensus
>>>>> > > >> > >> between all participants. Do you mind taking a look?
>>>>> > > >> > >>
>>>>> > > >> > >> @Sebastian Do you mind addressing the nits?
>>>>> > > >> > >>
>>>>> > > >> > >> Best,
>>>>> > > >> > >>
>>>>> > > >> > >> Dawid
>>>>> > > >> > >>
>>>>> > > >> > >> On 25/04/2022 13:39, Arvid Heise wrote:
>>>>> > > >> > >>
>>>>> > > >> > >> Thanks for pushing this effort.
>>>>> > > >> > >>
>>>>> > > >> > >> I'd actually be in favor of 1b). I fully agree that
>>>>> decorator
>>>>> > > >> interfaces
>>>>> > > >> > >> should be avoided but I'm also not a big fan of
>>>>> overloading the
>>>>> > > base
>>>>> > > >> > >> interfaces (they are hard to implement as is). The usual
>>>>> feedback
>>>>> > > to
>>>>> > > >> > >> Source-related interfaces are always that they are
>>>>> overwhelming
>>>>> > and
>>>>> > > >> too
>>>>> > > >> > >> hard to implement. However, I'd also not oppose 1c) as
>>>>> scattered
>>>>> > > >> > interfaces
>>>>> > > >> > >> also have drawbacks. I'd just dislike 1a) and 1d).
>>>>> > > >> > >> While I also like Becket's capability approach, I fear
>>>>> that it
>>>>> > > >> doesn't
>>>>> > > >> > work
>>>>> > > >> > >> for this particular use case: Sources can always be aligned
>>>>> > > >> cross-task
>>>>> > > >> > and
>>>>> > > >> > >> this is just about intra-task alignment. So it's plausible
>>>>> to put
>>>>> > > >> > sources
>>>>> > > >> > >> into an alignment group even though they do not use any of
>>>>> the
>>>>> > > >> presented
>>>>> > > >> > >> API of FLIP-217. They should just issue a warning, if they
>>>>> handle
>>>>> > > >> > multiple
>>>>> > > >> > >> splits (see motivation section).
>>>>> > > >> > >>
>>>>> > > >> > >> I think renaming alignSplits to facilitate future use
>>>>> cases makes
>>>>> > > >> sense
>>>>> > > >> > but
>>>>> > > >> > >> then all interfaces (if 1c) is chosen) should be adjusted
>>>>> > > >> accordingly.
>>>>> > > >> > >> AlignedSourceReader could be PausingSourceReader and I'd
>>>>> go for
>>>>> > > >> > >> pauseOrResumeSplits (Becket's proposal afaik). We could
>>>>> also
>>>>> > split
>>>>> > > it
>>>>> > > >> > into
>>>>> > > >> > >> pauseSplit and resumeSplit. While pauseOrResumeSplits may
>>>>> allow
>>>>> > > >> Sources
>>>>> > > >> > to
>>>>> > > >> > >> just use 1 instead of 2 library calls (as written in the
>>>>> > Javadoc),
>>>>> > > >> both
>>>>> > > >> > >> Kafka and Pulsar can't use it and I'm not sure if there is
>>>>> a
>>>>> > system
>>>>> > > >> that
>>>>> > > >> > >> can.
>>>>> > > >> > >>
>>>>> > > >> > >> Some nit for the FLIP:
>>>>> > > >> > >> - Please replace "stop" with "pause".
>>>>> > > >> > >> - Not sure if it's worth it in the capability section:
>>>>> Sources
>>>>> > that
>>>>> > > >> > adopt
>>>>> > > >> > >> this interface cannot be used in earlier versions. So it
>>>>> feels
>>>>> > like
>>>>> > > >> we
>>>>> > > >> > are
>>>>> > > >> > >> only forward compatible (old sources can be used after the
>>>>> > change);
>>>>> > > >> but
>>>>> > > >> > I
>>>>> > > >> > >> guess this holds for any API addition.
>>>>> > > >> > >> - You might want to add what happens when all splits are
>>>>> paused.
>>>>> > > >> > >> - You may want to describe how the 3 flavors of
>>>>> SourceReaderBase
>>>>> > > >> > interact
>>>>> > > >> > >> with the interface.
>>>>> > > >> > >> - I'm not sure if it makes sense to include Kafka and
>>>>> Pulsar in
>>>>> > the
>>>>> > > >> > FLIP.
>>>>> > > >> > >> For me, this is rather immediate follow-up work. (could be
>>>>> in the
>>>>> > > >> same
>>>>> > > >> > >> umbrella ticket)
>>>>> > > >> > >>
>>>>> > > >> > >> Best,
>>>>> > > >> > >>
>>>>> > > >> > >> Arvid
>>>>> > > >> > >>
>>>>> > > >> > >> On Mon, Apr 25, 2022 at 12:52 PM Dawid Wysakowicz <
>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org>
>>>>> > > >> > >> wrote:
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> a) "MySourceReader implements SourceReader,
>>>>> WithSplitsAlignment",
>>>>> > > >> along
>>>>> > > >> > >> with "MySplitReader implements SplitReader,
>>>>> WithSplitsAlignment",
>>>>> > > or
>>>>> > > >> > >> b) "MySourceReader implements AlignedSourceReader" and
>>>>> > > "MySplitReader
>>>>> > > >> > >> implements AlignedSplitReader", or
>>>>> > > >> > >> c) "MySourceReader implements SourceReader" and
>>>>> "MySplitReader
>>>>> > > >> > implements
>>>>> > > >> > >> SplitReader".
>>>>> > > >> > >>
>>>>> > > >> > >> I think the latest proposal according to Dawid would be:
>>>>> > > >> > >> d) "MySourceReader implements SourceReader" and
>>>>> "MySplitReader
>>>>> > > >> > implements
>>>>> > > >> > >> AlignedSplitReader".
>>>>> > > >> > >> I am fine with this API, although personally speaking I
>>>>> think it
>>>>> > is
>>>>> > > >> > simpler
>>>>> > > >> > >> to just add a new method to the split reader with default
>>>>> impl.
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> I think that is a good idea to have it aligned as much as
>>>>> > possible.
>>>>> > > >> I'd
>>>>> > > >> > be
>>>>> > > >> > >> +1 for your option c). We can merge AlignedSplitReader with
>>>>> > > >> > SplitReader. We
>>>>> > > >> > >> will update the FLIP shortly.
>>>>> > > >> > >>
>>>>> > > >> > >> Best,
>>>>> > > >> > >>
>>>>> > > >> > >> Dawid
>>>>> > > >> > >>
>>>>> > > >> > >> On 25/04/2022 12:43, Becket Qin wrote:
>>>>> > > >> > >>
>>>>> > > >> > >> Thanks for the comment, Jark.
>>>>> > > >> > >>
>>>>> > > >> > >> 3. Interface/Method Name.
>>>>> > > >> > >>
>>>>> > > >> > >> Can the interface be used to align other things in the
>>>>> future?
>>>>> > For
>>>>> > > >> > example,
>>>>> > > >> > >> align read speed, I have
>>>>> > > >> > >> seen users requesting global rate limits. This feature may
>>>>> also
>>>>> > > need
>>>>> > > >> an
>>>>> > > >> > >> interface like this.
>>>>> > > >> > >> If we don't plan to extend this interface to support align
>>>>> other
>>>>> > > >> > things, I
>>>>> > > >> > >> suggest explicitly declaring
>>>>> > > >> > >> the purpose of the methods, such as
>>>>> `alignWatermarksForSplits`
>>>>> > > >> instead
>>>>> > > >> > of
>>>>> > > >> > >> `alignSplits`.
>>>>> > > >> > >>
>>>>> > > >> > >> This is a good point. Naming wise, it would usually be more
>>>>> > > >> extensible
>>>>> > > >> > to
>>>>> > > >> > >> just describe what the method actually does, instead of
>>>>> assuming
>>>>> > > the
>>>>> > > >> > >> purpose of doing this. For example, in this case,
>>>>> > > >> pauseOrResumeSplits()
>>>>> > > >> > >> would be more extensible because this can be used for any
>>>>> kind of
>>>>> > > >> flow
>>>>> > > >> > >> control, be it watermark alignment or simple rate limiting.
>>>>> > > >> > >>
>>>>> > > >> > >> 4. Interface or Method.
>>>>> > > >> > >>
>>>>> > > >> > >> I don't have a strong opinion on this. I think they have
>>>>> their
>>>>> > own
>>>>> > > >> > >> advantages.
>>>>> > > >> > >> In Flink SQL, we heavily use Interfaces for extending
>>>>> abilities
>>>>> > > >> > >> (SupportsXxxx) for TableSource/TableSink,
>>>>> > > >> > >> and I prefer Interfaces rather than methods in this case.
>>>>> When
>>>>> > you
>>>>> > > >> have
>>>>> > > >> > a
>>>>> > > >> > >> bunch of abilities and each ability
>>>>> > > >> > >> has more than one method, Interfaces can help to organize
>>>>> them
>>>>> > and
>>>>> > > >> make
>>>>> > > >> > >> users clear which methods
>>>>> > > >> > >> need to implement when you want to have an ability.
>>>>> > > >> > >>
>>>>> > > >> > >> I am OK with decorative interfaces if this is a general
>>>>> design
>>>>> > > >> pattern
>>>>> > > >> > in
>>>>> > > >> > >> the other components in Flink. But it looks like the
>>>>> current API
>>>>> > > >> > proposal
>>>>> > > >> > >> is not symmetric.
>>>>> > > >> > >>
>>>>> > > >> > >> The current proposal is essentially "MySourceReader
>>>>> implements
>>>>> > > >> > >> SourceReader, WithSplitsAlignment", along with
>>>>> "MySplitReader
>>>>> > > >> implements
>>>>> > > >> > >> AlignedSplitsReader".
>>>>> > > >> > >>
>>>>> > > >> > >> Should we make the API symmetric? I'd consider any one of
>>>>> the
>>>>> > > >> following
>>>>> > > >> > as
>>>>> > > >> > >> symmetric.
>>>>> > > >> > >>
>>>>> > > >> > >> a) "MySourceReader implements SourceReader,
>>>>> WithSplitsAlignment",
>>>>> > > >> along
>>>>> > > >> > >> with "MySplitReader implements SplitReader,
>>>>> WithSplitsAlignment",
>>>>> > > or
>>>>> > > >> > >> b) "MySourceReader implements AlignedSourceReader" and
>>>>> > > "MySplitReader
>>>>> > > >> > >> implements AlignedSplitReader", or
>>>>> > > >> > >> c) "MySourceReader implements SourceReader" and
>>>>> "MySplitReader
>>>>> > > >> > implements
>>>>> > > >> > >> SplitReader".
>>>>> > > >> > >>
>>>>> > > >> > >> I think the latest proposal according to Dawid would be:
>>>>> > > >> > >> d) "MySourceReader implements SourceReader" and
>>>>> "MySplitReader
>>>>> > > >> > implements
>>>>> > > >> > >> AlignedSplitReader".
>>>>> > > >> > >> I am fine with this API, although personally speaking I
>>>>> think it
>>>>> > is
>>>>> > > >> > simpler
>>>>> > > >> > >> to just add a new method to the split reader with default
>>>>> impl.
>>>>> > > >> > >>
>>>>> > > >> > >> @Dawid Wysakowicz <dwysakow...@apache.org> <
>>>>> > dwysakow...@apache.org
>>>>> > > >
>>>>> > > >> <
>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org>, thanks for
>>>>> the
>>>>> > > reply.
>>>>> > > >> > >>
>>>>> > > >> > >> Having said that, as I don't have a preference and I agree
>>>>> most
>>>>> > of
>>>>> > > >> the
>>>>> > > >> > >>
>>>>> > > >> > >> sources will support the alignment I am fine following your
>>>>> > > >> suggestion
>>>>> > > >> > to
>>>>> > > >> > >> have the SourceReader extending from
>>>>> > WithWatermarksSplitsAlignment,
>>>>> > > >> but
>>>>> > > >> > >> would put the "supportsXXX" there, not in the Source to
>>>>> keep the
>>>>> > > two
>>>>> > > >> > >> methods together.
>>>>> > > >> > >>
>>>>> > > >> > >> One benefit of having the "supportsXXX" in Source is that
>>>>> this
>>>>> > > allows
>>>>> > > >> > some
>>>>> > > >> > >> compile time check. For example, if a user enabled
>>>>> watermark
>>>>> > > >> alignment
>>>>> > > >> > >> while it is not supported by the Source, an exception can
>>>>> be
>>>>> > thrown
>>>>> > > >> at
>>>>> > > >> > >> compile time. It seems in general useful. That said, I
>>>>> agree that
>>>>> > > API
>>>>> > > >> > >> cleanliness wise it is better to put the two methods
>>>>> together.
>>>>> > > >> > >>
>>>>> > > >> > >> Thanks,
>>>>> > > >> > >>
>>>>> > > >> > >> Jiangjie (Becket) Qin
>>>>> > > >> > >>
>>>>> > > >> > >> On Mon, Apr 25, 2022 at 5:56 PM Jark Wu <imj...@gmail.com>
>>>>> <
>>>>> > > >> > imj...@gmail.com> <imj...@gmail.com> <imj...@gmail.com>
>>>>> wrote:
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> Thank Dawid for the reminder on FLIP-182. Sorry I did miss
>>>>> it.
>>>>> > > >> > >> I don't have other concerns then.
>>>>> > > >> > >>
>>>>> > > >> > >> Best,
>>>>> > > >> > >> Jark
>>>>> > > >> > >>
>>>>> > > >> > >> On Mon, 25 Apr 2022 at 15:40, Dawid Wysakowicz <
>>>>> > > >> dwysakow...@apache.org>
>>>>> > > >> > <dwysakow...@apache.org> <dwysakow...@apache.org> <
>>>>> > > >> dwysakow...@apache.org>
>>>>> > > >> > >> wrote:
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> @Jark:
>>>>> > > >> > >>
>>>>> > > >> > >> 1. Will the framework always align with watermarks when the
>>>>> > source
>>>>> > > >> > >> implements the interface?
>>>>> > > >> > >> I'm afraid not every case needs watermark alignment even
>>>>> if Kafka
>>>>> > > >> > >> implements the interface,
>>>>> > > >> > >> and this will affect the throughput somehow. I agree with
>>>>> Becket
>>>>> > > >> > >> we may need a
>>>>> > > >> > >> `supportSplitsAlignment()` method for users to configure
>>>>> the
>>>>> > source
>>>>> > > >> to
>>>>> > > >> > >> enable/disable the alignment.
>>>>> > > >> > >>
>>>>> > > >> > >> 2. How does the framework calculate maxDesiredWatermark?
>>>>> > > >> > >> I think the algorithm of maxDesiredWatermark will greatly
>>>>> affect
>>>>> > > >> > >>
>>>>> > > >> > >> throughput
>>>>> > > >> > >>
>>>>> > > >> > >> if the reader is constantly
>>>>> > > >> > >>  switching between pause and resume. Can users configure
>>>>> the
>>>>> > > >> alignment
>>>>> > > >> > >> offset?
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> This is covered in the previous FLIP[1] which has been
>>>>> already
>>>>> > > >> > >>
>>>>> > > >> > >> implemented
>>>>> > > >> > >>
>>>>> > > >> > >> in 1.15. In short, it must be enabled with the watermark
>>>>> strategy
>>>>> > > >> which
>>>>> > > >> > >> also configures drift and update interval.
>>>>> > > >> > >>
>>>>> > > >> > >> If we don't plan to extend this interface to support align
>>>>> other
>>>>> > > >> things,
>>>>> > > >> > >>
>>>>> > > >> > >> I
>>>>> > > >> > >>
>>>>> > > >> > >> suggest explicitly declaring
>>>>> > > >> > >> the purpose of the methods, such as
>>>>> `alignWatermarksForSplits`
>>>>> > > >> instead
>>>>> > > >> > of
>>>>> > > >> > >> `alignSplits`.
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> Sure let's rename it.
>>>>> > > >> > >>
>>>>> > > >> > >> @Becket:
>>>>> > > >> > >>
>>>>> > > >> > >> I understand your point. On the other hand putting all
>>>>> methods,
>>>>> > > even
>>>>> > > >> > with
>>>>> > > >> > >> "supportsXXX" methods for enabling certain features, makes
>>>>> the
>>>>> > > entry
>>>>> > > >> > >> threshold for writing a new source higher. Instead of
>>>>> focusing on
>>>>> > > the
>>>>> > > >> > >>
>>>>> > > >> > >> basic
>>>>> > > >> > >>
>>>>> > > >> > >> and required properties of the Source, the person
>>>>> implementing a
>>>>> > > >> source
>>>>> > > >> > >> must bother with and need to figure out what all of the
>>>>> extra
>>>>> > > >> features
>>>>> > > >> > >>
>>>>> > > >> > >> are
>>>>> > > >> > >>
>>>>> > > >> > >> about and how to deal with them. It makes it also harder to
>>>>> > > organize
>>>>> > > >> > >> methods in coupled groups as Jark said.
>>>>> > > >> > >>
>>>>> > > >> > >> Having said that, as I don't have a preference and I agree
>>>>> most
>>>>> > of
>>>>> > > >> the
>>>>> > > >> > >> sources will support the alignment I am fine following your
>>>>> > > >> suggestion
>>>>> > > >> > to
>>>>> > > >> > >> have the SourceReader extending from
>>>>> > WithWatermarksSplitsAlignment,
>>>>> > > >> but
>>>>> > > >> > >> would put the "supportsXXX" there, not in the Source to
>>>>> keep the
>>>>> > > two
>>>>> > > >> > >> methods together.
>>>>> > > >> > >>
>>>>> > > >> > >> Lastly, I agree it is really unfortunate the "alignSplits"
>>>>> > methods
>>>>> > > >> > differ
>>>>> > > >> > >> slightly for SourceReader and SpitReader. The reason for
>>>>> that is
>>>>> > > >> > >> SourceReaderBase deals only with SplitIds, whereas
>>>>> SplitReader
>>>>> > > needs
>>>>> > > >> the
>>>>> > > >> > >> actual splits to pause them. I found the discrepancy
>>>>> acceptable
>>>>> > for
>>>>> > > >> the
>>>>> > > >> > >> sake of simplifying changes significantly, especially as
>>>>> they
>>>>> > would
>>>>> > > >> > >>
>>>>> > > >> > >> highly
>>>>> > > >> > >>
>>>>> > > >> > >> likely impact performance as we would have to perform
>>>>> additional
>>>>> > > >> > lookups.
>>>>> > > >> > >> Moreover the SplitReader is a secondary interface.
>>>>> > > >> > >>
>>>>> > > >> > >> Best,
>>>>> > > >> > >>
>>>>> > > >> > >> Dawid
>>>>> > > >> > >>
>>>>> > > >> > >> [1] https://cwiki.apache.org/confluence/x/hQYBCw
>>>>> > > >> > >>
>>>>> > > >> > >> On 24/04/2022 17:15, Jark Wu wrote:
>>>>> > > >> > >>
>>>>> > > >> > >> Thanks for the effort, Dawid and Sebastian!
>>>>> > > >> > >>
>>>>> > > >> > >> I just have some minor questions (maybe I missed
>>>>> something).
>>>>> > > >> > >>
>>>>> > > >> > >> 1. Will the framework always align with watermarks when the
>>>>> > source
>>>>> > > >> > >> implements the interface?
>>>>> > > >> > >> I'm afraid not every case needs watermark alignment even
>>>>> if Kafka
>>>>> > > >> > >> implements the interface,
>>>>> > > >> > >> and this will affect the throughput somehow. I agree with
>>>>> Becket
>>>>> > > >> > >> we may need a
>>>>> > > >> > >> `supportSplitsAlignment()` method for users to configure
>>>>> the
>>>>> > source
>>>>> > > >> to
>>>>> > > >> > >> enable/disable the alignment.
>>>>> > > >> > >>
>>>>> > > >> > >> 2. How does the framework calculate maxDesiredWatermark?
>>>>> > > >> > >> I think the algorithm of maxDesiredWatermark will greatly
>>>>> affect
>>>>> > > >> > >>
>>>>> > > >> > >> throughput
>>>>> > > >> > >>
>>>>> > > >> > >> if the reader is constantly
>>>>> > > >> > >>  switching between pause and resume. Can users configure
>>>>> the
>>>>> > > >> alignment
>>>>> > > >> > >> offset?
>>>>> > > >> > >>
>>>>> > > >> > >> 3. Interface/Method Name.
>>>>> > > >> > >> Can the interface be used to align other things in the
>>>>> future?
>>>>> > For
>>>>> > > >> > >>
>>>>> > > >> > >> example,
>>>>> > > >> > >>
>>>>> > > >> > >> align read speed, I have
>>>>> > > >> > >> seen users requesting global rate limits. This feature may
>>>>> also
>>>>> > > need
>>>>> > > >> an
>>>>> > > >> > >> interface like this.
>>>>> > > >> > >> If we don't plan to extend this interface to support align
>>>>> other
>>>>> > > >> things,
>>>>> > > >> > >>
>>>>> > > >> > >> I
>>>>> > > >> > >>
>>>>> > > >> > >> suggest explicitly declaring
>>>>> > > >> > >> the purpose of the methods, such as
>>>>> `alignWatermarksForSplits`
>>>>> > > >> instead
>>>>> > > >> > of
>>>>> > > >> > >> `alignSplits`.
>>>>> > > >> > >>
>>>>> > > >> > >> 4. Interface or Method.
>>>>> > > >> > >> I don't have a strong opinion on this. I think they have
>>>>> their
>>>>> > own
>>>>> > > >> > >> advantages.
>>>>> > > >> > >> In Flink SQL, we heavily use Interfaces for extending
>>>>> abilities
>>>>> > > >> > >> (SupportsXxxx) for TableSource/TableSink,
>>>>> > > >> > >> and I prefer Interfaces rather than methods in this case.
>>>>> When
>>>>> > you
>>>>> > > >> have
>>>>> > > >> > a
>>>>> > > >> > >> bunch of abilities and each ability
>>>>> > > >> > >> has more than one method, Interfaces can help to organize
>>>>> them
>>>>> > and
>>>>> > > >> make
>>>>> > > >> > >> users clear which methods
>>>>> > > >> > >> need to implement when you want to have an ability.
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> Best,
>>>>> > > >> > >> Jark
>>>>> > > >> > >>
>>>>> > > >> > >> On Sun, 24 Apr 2022 at 18:13, Becket Qin <
>>>>> becket....@gmail.com>
>>>>> > <
>>>>> > > >> > becket....@gmail.com> <becket....@gmail.com> <
>>>>> becket....@gmail.com>
>>>>> > <
>>>>> > > >> > >> becket....@gmail.com> wrote:
>>>>> > > >> > >>
>>>>> > > >> > >> Hi Dawid,
>>>>> > > >> > >>
>>>>> > > >> > >> Thanks for the explanation. Apologies that I somehow
>>>>> misread a
>>>>> > > bunch
>>>>> > > >> of
>>>>> > > >> > >> "align" and thought they were "assign".
>>>>> > > >> > >>
>>>>> > > >> > >> Regarding 1, by default implementation, I was thinking of
>>>>> the
>>>>> > > default
>>>>> > > >> > >>
>>>>> > > >> > >> no-op
>>>>> > > >> > >>
>>>>> > > >> > >> implementation. I am a little worried about the
>>>>> proliferation of
>>>>> > > >> > >>
>>>>> > > >> > >> decorative
>>>>> > > >> > >>
>>>>> > > >> > >> interfaces. I think the most important thing about
>>>>> interfaces is
>>>>> > > that
>>>>> > > >> > >>
>>>>> > > >> > >> they
>>>>> > > >> > >>
>>>>> > > >> > >> are easy to understand. In this case, I prefer adding new
>>>>> method
>>>>> > to
>>>>> > > >> the
>>>>> > > >> > >> existing interface for the following reasons:
>>>>> > > >> > >>
>>>>> > > >> > >> a) I feel the biggest drawback of decorative interfaces is
>>>>> which
>>>>> > > >> > >>
>>>>> > > >> > >> interface
>>>>> > > >> > >>
>>>>> > > >> > >> they can decorate and which combinations of multiple
>>>>> decorative
>>>>> > > >> > >>
>>>>> > > >> > >> interfaces
>>>>> > > >> > >>
>>>>> > > >> > >> are valid. In the current FLIP, the withSplitsAlignment
>>>>> interface
>>>>> > > is
>>>>> > > >> > only
>>>>> > > >> > >> applicable to the SourceReader which means it can't
>>>>> decorate any
>>>>> > > >> other
>>>>> > > >> > >> interface. From an interface design perspective, a natural
>>>>> > question
>>>>> > > >> is
>>>>> > > >> > >>
>>>>> > > >> > >> why
>>>>> > > >> > >>
>>>>> > > >> > >> not let "AlignedSplitReader" extend "withSplitsAlignment"?
>>>>> And it
>>>>> > > is
>>>>> > > >> > also
>>>>> > > >> > >> natural to assume that a split reader implementing both
>>>>> > SplitReader
>>>>> > > >> and
>>>>> > > >> > >> WithSplitAlignment would work, because a source reader
>>>>> > implementing
>>>>> > > >> > >> SourceReader and withSplitsAlignment works. So why isn't
>>>>> there an
>>>>> > > >> > >>
>>>>> > > >> > >> interface
>>>>> > > >> > >>
>>>>> > > >> > >> of AlignedSourceReader? In the future, if there is a new
>>>>> feature
>>>>> > > >> added
>>>>> > > >> > >> (e.g. sorted or pre-partitioned data aware), are we going
>>>>> to
>>>>> > create
>>>>> > > >> > >>
>>>>> > > >> > >> another
>>>>> > > >> > >>
>>>>> > > >> > >> interface of SplitReader such as SortedSplitReader or
>>>>> > > >> > >>
>>>>> > > >> > >> PrePartitionedAware?
>>>>> > > >> > >>
>>>>> > > >> > >> Can they be combined? So I think the additional decorative
>>>>> > > interface
>>>>> > > >> > like
>>>>> > > >> > >> withSplitsAlignment actually increases the understanding
>>>>> cost of
>>>>> > > >> users
>>>>> > > >> > >> because they have to know what decorative interfaces are
>>>>> there,
>>>>> > > which
>>>>> > > >> > >> interface they can decorate and which combinations of the
>>>>> > > decorative
>>>>> > > >> > >> interfaces are valid and which are not. Ideally we want to
>>>>> avoid
>>>>> > > >> that.
>>>>> > > >> > To
>>>>> > > >> > >> be clear, I am not opposing having an interface of
>>>>> > > >> withSplitsAlignment,
>>>>> > > >> > >>
>>>>> > > >> > >> it
>>>>> > > >> > >>
>>>>> > > >> > >> is completely OK to have it as an internal interface and
>>>>> let
>>>>> > > >> > SourceReader
>>>>> > > >> > >> and SplitReader both extend it.
>>>>> > > >> > >>
>>>>> > > >> > >> b) Adding a new method to the SourceReader with a default
>>>>> > > >> implementation
>>>>> > > >> > >>
>>>>> > > >> > >> of
>>>>> > > >> > >>
>>>>> > > >> > >> no-op would help avoid logic branching in the source logic,
>>>>> > > >> especially
>>>>> > > >> > >> given that we agree that the vast majority of the
>>>>> SourceReader
>>>>> > > >> > >> implementations, if not all, would just extend from the
>>>>> > > >> > SourceReaderBase.
>>>>> > > >> > >> That means adding a new method to the interface would
>>>>> effectively
>>>>> > > >> give
>>>>> > > >> > >>
>>>>> > > >> > >> the
>>>>> > > >> > >>
>>>>> > > >> > >> same user experience, but simpler.
>>>>> > > >> > >>
>>>>> > > >> > >> c) A related design principle that may be worth discussing
>>>>> is how
>>>>> > > do
>>>>> > > >> we
>>>>> > > >> > >>
>>>>> > > >> > >> let
>>>>> > > >> > >>
>>>>> > > >> > >> the Source implementations tell Flink what capability is
>>>>> > supported
>>>>> > > >> and
>>>>> > > >> > >>
>>>>> > > >> > >> what
>>>>> > > >> > >>
>>>>> > > >> > >> is not. Personally speaking I feel the most intuitive
>>>>> place to me
>>>>> > > is
>>>>> > > >> in
>>>>> > > >> > >>
>>>>> > > >> > >> the
>>>>> > > >> > >>
>>>>> > > >> > >> Source itself, because that is the entrance of the entire
>>>>> Source
>>>>> > > >> > >>
>>>>> > > >> > >> connector
>>>>> > > >> > >>
>>>>> > > >> > >> logic.
>>>>> > > >> > >>
>>>>> > > >> > >> Based on the above thoughts, I am wondering if the
>>>>> following
>>>>> > > >> interface
>>>>> > > >> > >> would be easier to understand by the users.
>>>>> > > >> > >>
>>>>> > > >> > >> - Change "withSplitsAlignment" to internal interface, let
>>>>> both
>>>>> > > >> > >>
>>>>> > > >> > >> SourceReader
>>>>> > > >> > >>
>>>>> > > >> > >> and SplitReader extend from it, with a default no-op
>>>>> > > implementation.
>>>>> > > >> > >> - Add a new method "boolean supportSplitsAlignment()" to
>>>>> the
>>>>> > Source
>>>>> > > >> > >> interface, with a default implementation returning false.
>>>>> Sources
>>>>> > > >> that
>>>>> > > >> > >>
>>>>> > > >> > >> have
>>>>> > > >> > >>
>>>>> > > >> > >> implemented the alignment logic can change this to return
>>>>> true,
>>>>> > and
>>>>> > > >> > >> override the alignSplits() methods in the SourceReader /
>>>>> > > SplitReader
>>>>> > > >> if
>>>>> > > >> > >> needed.
>>>>> > > >> > >> - In the future, if a new optional feature is going to be
>>>>> added
>>>>> > to
>>>>> > > >> the
>>>>> > > >> > >> Source, and that feature requires the awareness from
>>>>> Flink, we
>>>>> > can
>>>>> > > >> add
>>>>> > > >> > >>
>>>>> > > >> > >> more
>>>>> > > >> > >>
>>>>> > > >> > >> such methods to the Source.
>>>>> > > >> > >>
>>>>> > > >> > >> What do you think?
>>>>> > > >> > >>
>>>>> > > >> > >> Thanks,
>>>>> > > >> > >>
>>>>> > > >> > >> Jiangjie (Becket) Qin
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> On Fri, Apr 22, 2022 at 4:05 PM Dawid Wysakowicz <
>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org> <
>>>>> > > >> dwysakow...@apache.org>
>>>>> > > >> > <dwysakow...@apache.org>
>>>>> > > >> > >> <dwysakow...@apache.org> <dwysakow...@apache.org> <
>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org>
>>>>> > > >> > >>
>>>>> > > >> > >> wrote:
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> @Konstantin:
>>>>> > > >> > >>
>>>>> > > >> > >> As part of this FLIP, the `AlignedSplitReader` interface
>>>>> (aka the
>>>>> > > >> stop &
>>>>> > > >> > >> resume behavior) will be implemented for Kafka and Pulsar
>>>>> only,
>>>>> > > >> correct?
>>>>> > > >> > >>
>>>>> > > >> > >> Correct, as far as I know though, those are the only
>>>>> sources
>>>>> > which
>>>>> > > >> > >>
>>>>> > > >> > >> consume
>>>>> > > >> > >>
>>>>> > > >> > >> concurrently from multiple splits and thus alignment
>>>>> applies.
>>>>> > > >> > >>
>>>>> > > >> > >> @Thomas:
>>>>> > > >> > >>
>>>>> > > >> > >> I wonder if "supporting" split alignment in
>>>>> SourceReaderBase and
>>>>> > > then
>>>>> > > >> > >>
>>>>> > > >> > >> doing
>>>>> > > >> > >>
>>>>> > > >> > >> nothing if the split reader does not implement
>>>>> AlignedSplitReader
>>>>> > > >> could
>>>>> > > >> > >>
>>>>> > > >> > >> be
>>>>> > > >> > >>
>>>>> > > >> > >> misleading? Perhaps WithSplitsAlignment can instead be
>>>>> added to
>>>>> > the
>>>>> > > >> > >> specific source reader (i.e. KafkaSourceReader) to make it
>>>>> > explicit
>>>>> > > >> that
>>>>> > > >> > >> the source actually supports it.
>>>>> > > >> > >>
>>>>> > > >> > >> I understand your concern. Hmm, I think we could actually
>>>>> do
>>>>> > that.
>>>>> > > >> Given
>>>>> > > >> > >> the actual implementation of the
>>>>> SourceReaderBase#alignSplits is
>>>>> > > >> rather
>>>>> > > >> > >> short (just a forward to the corresponding method of
>>>>> > SplitFetcher),
>>>>> > > >> we
>>>>> > > >> > >> could reimplement it in the actual source implementations.
>>>>> This
>>>>> > > >> solution
>>>>> > > >> > >> has the downside though. Authors of new sources would have
>>>>> to do
>>>>> > > two
>>>>> > > >> > >> things: extend from AlignedSplitReader and implement
>>>>> > > >> > >>
>>>>> > > >> > >> WithSplitsAssignment,
>>>>> > > >> > >>
>>>>> > > >> > >> instead of just extending AlignedSplitReader. I would be
>>>>> fine
>>>>> > with
>>>>> > > >> such
>>>>> > > >> > a
>>>>> > > >> > >> tradeoff though. What others think?
>>>>> > > >> > >>
>>>>> > > >> > >> @Steven:
>>>>> > > >> > >>
>>>>> > > >> > >> For this part from the motivation section, is it accurate?
>>>>> Let's
>>>>> > > >> assume
>>>>> > > >> > >>
>>>>> > > >> > >> one
>>>>> > > >> > >>
>>>>> > > >> > >> source task consumes from 3 partitions and one of the
>>>>> partition
>>>>> > is
>>>>> > > >> > >> significantly slower. In this situation, watermark for this
>>>>> > source
>>>>> > > >> task
>>>>> > > >> > >> won't hold back as it is reading recent data from other
>>>>> two Kafka
>>>>> > > >> > >> partitions. As a result, it won't hold back the overall
>>>>> > watermark.
>>>>> > > I
>>>>> > > >> > >> thought the problem is that we may have late data for this
>>>>> slow
>>>>> > > >> > >>
>>>>> > > >> > >> partition.
>>>>> > > >> > >>
>>>>> > > >> > >> It will hold back the watermark. Watermark of an operator
>>>>> is the
>>>>> > > >> minimum
>>>>> > > >> > >> of watermarks of all splits[1]
>>>>> > > >> > >>
>>>>> > > >> > >> I have another question about the restart. Say split
>>>>> alignment is
>>>>> > > >> > >> triggered. checkpoint is completed. job failed and
>>>>> restored from
>>>>> > > the
>>>>> > > >> > last
>>>>> > > >> > >> checkpoint. because alignment decision is not checkpointed,
>>>>> > > initially
>>>>> > > >> > >> alignment won't be enforced until we get a cycle of
>>>>> watermark
>>>>> > > >> > aggregation
>>>>> > > >> > >> and propagation, right? Not saying this corner is a
>>>>> problem. Just
>>>>> > > >> want
>>>>> > > >> > to
>>>>> > > >> > >> understand it more.
>>>>> > > >> > >>
>>>>> > > >> > >> Your understanding is correct.
>>>>> > > >> > >>
>>>>> > > >> > >> @Becket:
>>>>> > > >> > >>
>>>>> > > >> > >> 1. I think watermark alignment is sort of a general use
>>>>> case, so
>>>>> > > >> should
>>>>> > > >> > >>
>>>>> > > >> > >> we
>>>>> > > >> > >>
>>>>> > > >> > >> just add the related methods to SourceReader directly
>>>>> instead of
>>>>> > > >> > >> introducing the new interface of WithSplitAssignment? We
>>>>> can
>>>>> > > provide
>>>>> > > >> > >> default implementations, so backwards compatibility won't
>>>>> be an
>>>>> > > >> issue.
>>>>> > > >> > >>
>>>>> > > >> > >> I don't think we can provide a default implementation. How
>>>>> would
>>>>> > we
>>>>> > > >> do
>>>>> > > >> > >> that? Would it be just a no-op? Is it better than having an
>>>>> > opt-in
>>>>> > > >> > >> interface? The default implementation would have to be
>>>>> added
>>>>> > > >> exclusively
>>>>> > > >> > >>
>>>>> > > >> > >> in
>>>>> > > >> > >>
>>>>> > > >> > >> a *Public* SourceReader interface. By the way notice
>>>>> > > SourceReaderBase
>>>>> > > >> > >> does extend from WithSplitsAlignment, so effectively all
>>>>> > > >> implementations
>>>>> > > >> > >>
>>>>> > > >> > >> do
>>>>> > > >> > >>
>>>>> > > >> > >> handle the alignment case. To be honest I think it is
>>>>> impossible
>>>>> > to
>>>>> > > >> > >> implement the SourceReader interface directly by end users.
>>>>> > > >> > >>
>>>>> > > >> > >> 2. As you mentioned, the SplitReader interface probably
>>>>> also
>>>>> > needs
>>>>> > > >> some
>>>>> > > >> > >> change to support throttling at the split granularity. Can
>>>>> you
>>>>> > add
>>>>> > > >> that
>>>>> > > >> > >> interface change into the public interface section as well?
>>>>> > > >> > >>
>>>>> > > >> > >> It has been added from the beginning. See
>>>>> *AlignedSplitReader.*
>>>>> > > >> > >>
>>>>> > > >> > >> 3. Nit, can we avoid using the method name assignSplits
>>>>> here,
>>>>> > given
>>>>> > > >> that
>>>>> > > >> > >>
>>>>> > > >> > >> it
>>>>> > > >> > >>
>>>>> > > >> > >> is not actually changing the split assignments? It seems
>>>>> > something
>>>>> > > >> like
>>>>> > > >> > >> pauseOrResumeSplits(), or adjustSplitsThrottling() is more
>>>>> > > accurate.
>>>>> > > >> > >>
>>>>> > > >> > >> The method's called *alignSplits*, not assign. Do you still
>>>>> > prefer
>>>>> > > a
>>>>> > > >> > >> different name for that? Personally, I am open for
>>>>> suggestions
>>>>> > > here.
>>>>> > > >> > >>
>>>>> > > >> > >> Best,
>>>>> > > >> > >>
>>>>> > > >> > >> Dawid
>>>>> > > >> > >>
>>>>> > > >> > >> [1]
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> >
>>>>> > > >>
>>>>> > >
>>>>> >
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation
>>>>> > > >> > >>
>>>>> > > >> > >> On 22/04/2022 05:59, Becket Qin wrote:
>>>>> > > >> > >>
>>>>> > > >> > >> Thanks for driving the effort, Sebastion. I think the
>>>>> motivation
>>>>> > > >> makes a
>>>>> > > >> > >> lot of sense. Just a few suggestions / questions.
>>>>> > > >> > >>
>>>>> > > >> > >> 1. I think watermark alignment is sort of a general use
>>>>> case, so
>>>>> > > >> should
>>>>> > > >> > >>
>>>>> > > >> > >> we
>>>>> > > >> > >>
>>>>> > > >> > >> just add the related methods to SourceReader directly
>>>>> instead of
>>>>> > > >> > >> introducing the new interface of WithSplitAssignment? We
>>>>> can
>>>>> > > provide
>>>>> > > >> > >> default implementations, so backwards compatibility won't
>>>>> be an
>>>>> > > >> issue.
>>>>> > > >> > >>
>>>>> > > >> > >> 2. As you mentioned, the SplitReader interface probably
>>>>> also
>>>>> > needs
>>>>> > > >> some
>>>>> > > >> > >> change to support throttling at the split granularity. Can
>>>>> you
>>>>> > add
>>>>> > > >> that
>>>>> > > >> > >> interface change into the public interface section as well?
>>>>> > > >> > >>
>>>>> > > >> > >> 3. Nit, can we avoid using the method name assignSplits
>>>>> here,
>>>>> > given
>>>>> > > >> that
>>>>> > > >> > >>
>>>>> > > >> > >> it
>>>>> > > >> > >>
>>>>> > > >> > >> is not actually changing the split assignments? It seems
>>>>> > something
>>>>> > > >> like
>>>>> > > >> > >> pauseOrResumeSplits(), or adjustSplitsThrottling() is more
>>>>> > > accurate.
>>>>> > > >> > >>
>>>>> > > >> > >> Thanks,
>>>>> > > >> > >>
>>>>> > > >> > >> Jiangjie (Becket) Qin
>>>>> > > >> > >>
>>>>> > > >> > >> On Thu, Apr 21, 2022 at 11:39 PM Steven Wu <
>>>>> stevenz...@gmail.com
>>>>> > >
>>>>> > > <
>>>>> > > >> > stevenz...@gmail.com> <stevenz...@gmail.com> <
>>>>> stevenz...@gmail.com>
>>>>> > <
>>>>> > > >> > >> stevenz...@gmail.com> <
>>>>> > > >> > >> stevenz...@gmail.com> wrote:
>>>>> > > >> > >>
>>>>> > > >> > >> However, a single source operator may read data from
>>>>> multiple
>>>>> > > >> > >>
>>>>> > > >> > >> splits/partitions, e.g., multiple Kafka partitions, such
>>>>> that
>>>>> > even
>>>>> > > >> with
>>>>> > > >> > >> watermark alignment the source operator may need to buffer
>>>>> > > excessive
>>>>> > > >> > >>
>>>>> > > >> > >> amount
>>>>> > > >> > >>
>>>>> > > >> > >> of data if one split emits data faster than another.
>>>>> > > >> > >>
>>>>> > > >> > >> For this part from the motivation section, is it accurate?
>>>>> Let's
>>>>> > > >> assume
>>>>> > > >> > >>
>>>>> > > >> > >> one
>>>>> > > >> > >>
>>>>> > > >> > >> source task consumes from 3 partitions and one of the
>>>>> partition
>>>>> > is
>>>>> > > >> > >> significantly slower. In this situation, watermark for this
>>>>> > source
>>>>> > > >> task
>>>>> > > >> > >> won't hold back as it is reading recent data from other
>>>>> two Kafka
>>>>> > > >> > >> partitions. As a result, it won't hold back the overall
>>>>> > watermark.
>>>>> > > I
>>>>> > > >> > >> thought the problem is that we may have late data for this
>>>>> slow
>>>>> > > >> > >>
>>>>> > > >> > >> partition.
>>>>> > > >> > >>
>>>>> > > >> > >> I have another question about the restart. Say split
>>>>> alignment is
>>>>> > > >> > >> triggered. checkpoint is completed. job failed and
>>>>> restored from
>>>>> > > the
>>>>> > > >> > last
>>>>> > > >> > >> checkpoint. because alignment decision is not checkpointed,
>>>>> > > initially
>>>>> > > >> > >> alignment won't be enforced until we get a cycle of
>>>>> watermark
>>>>> > > >> > aggregation
>>>>> > > >> > >> and propagation, right? Not saying this corner is a
>>>>> problem. Just
>>>>> > > >> want
>>>>> > > >> > to
>>>>> > > >> > >> understand it more.
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise <
>>>>> t...@apache.org> <
>>>>> > > >> > t...@apache.org> <t...@apache.org> <t...@apache.org> <
>>>>> > > >> > >> t...@apache.org> <
>>>>> > > >> > >> t...@apache.org> wrote:
>>>>> > > >> > >>
>>>>> > > >> > >> Thanks for working on this!
>>>>> > > >> > >>
>>>>> > > >> > >> I wonder if "supporting" split alignment in
>>>>> SourceReaderBase and
>>>>> > > then
>>>>> > > >> > >>
>>>>> > > >> > >> doing
>>>>> > > >> > >>
>>>>> > > >> > >> nothing if the split reader does not implement
>>>>> AlignedSplitReader
>>>>> > > >> could
>>>>> > > >> > >>
>>>>> > > >> > >> be
>>>>> > > >> > >>
>>>>> > > >> > >> misleading? Perhaps WithSplitsAlignment can instead be
>>>>> added to
>>>>> > the
>>>>> > > >> > >> specific source reader (i.e. KafkaSourceReader) to make it
>>>>> > explicit
>>>>> > > >> that
>>>>> > > >> > >> the source actually supports it.
>>>>> > > >> > >>
>>>>> > > >> > >> Thanks,
>>>>> > > >> > >> Thomas
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf <
>>>>> > > kna...@apache.org>
>>>>> > > >> <
>>>>> > > >> > kna...@apache.org> <kna...@apache.org> <kna...@apache.org> <
>>>>> > > >> > >> kna...@apache.org> <
>>>>> > > >> > >> kna...@apache.org>
>>>>> > > >> > >>
>>>>> > > >> > >> wrote:
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> Hi Sebastian, Hi Dawid,
>>>>> > > >> > >>
>>>>> > > >> > >> As part of this FLIP, the `AlignedSplitReader` interface
>>>>> (aka the
>>>>> > > >> stop
>>>>> > > >> > >>
>>>>> > > >> > >> &
>>>>> > > >> > >>
>>>>> > > >> > >> resume behavior) will be implemented for Kafka and Pulsar
>>>>> only,
>>>>> > > >> > >>
>>>>> > > >> > >> correct?
>>>>> > > >> > >>
>>>>> > > >> > >> +1 in general. I believe it is valuable to complete the
>>>>> watermark
>>>>> > > >> > >>
>>>>> > > >> > >> aligned
>>>>> > > >> > >>
>>>>> > > >> > >> story with this FLIP.
>>>>> > > >> > >>
>>>>> > > >> > >> Cheers,
>>>>> > > >> > >>
>>>>> > > >> > >> Konstantin
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz <
>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org>
>>>>> > > >> > >>
>>>>> > > >> > >> wrote:
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >> To be explicit, having worked on it, I support it ;) I
>>>>> think we
>>>>> > can
>>>>> > > >> > >> start a vote thread soonish, as there are no concerns so
>>>>> far.
>>>>> > > >> > >>
>>>>> > > >> > >> Best,
>>>>> > > >> > >>
>>>>> > > >> > >> Dawid
>>>>> > > >> > >>
>>>>> > > >> > >> On 13/04/2022 11:27, Sebastian Mattheis wrote:
>>>>> > > >> > >>
>>>>> > > >> > >> Dear Flink developers,
>>>>> > > >> > >>
>>>>> > > >> > >> I would like to open a discussion on FLIP 217 [1] for an
>>>>> > extension
>>>>> > > >> > >>
>>>>> > > >> > >> of
>>>>> > > >> > >>
>>>>> > > >> > >> Watermark Alignment to perform alignment also in
>>>>> SplitReaders. To
>>>>> > > >> > >>
>>>>> > > >> > >> do
>>>>> > > >> > >>
>>>>> > > >> > >> so,
>>>>> > > >> > >>
>>>>> > > >> > >> SplitReaders must be able to suspend and resume reading
>>>>> from
>>>>> > split
>>>>> > > >> > >>
>>>>> > > >> > >> sources
>>>>> > > >> > >>
>>>>> > > >> > >> where the SourceOperator coordinates and controlls suspend
>>>>> and
>>>>> > > >> > >>
>>>>> > > >> > >> resume.
>>>>> > > >> > >>
>>>>> > > >> > >> To
>>>>> > > >> > >>
>>>>> > > >> > >> gather information about current watermarks of the
>>>>> SplitReaders,
>>>>> > we
>>>>> > > >> > >>
>>>>> > > >> > >> extend
>>>>> > > >> > >>
>>>>> > > >> > >> the internal WatermarkOutputMulitplexer and report
>>>>> watermarks to
>>>>> > > >> > >>
>>>>> > > >> > >> the
>>>>> > > >> > >>
>>>>> > > >> > >> SourceOperator.
>>>>> > > >> > >>
>>>>> > > >> > >> There is a PoC for this FLIP [2], prototyped by Arvid
>>>>> Heise and
>>>>> > > >> > >>
>>>>> > > >> > >> revised
>>>>> > > >> > >>
>>>>> > > >> > >> and
>>>>> > > >> > >>
>>>>> > > >> > >> reworked by Dawid Wysakowicz (He did most of the work.)
>>>>> and me.
>>>>> > The
>>>>> > > >> > >>
>>>>> > > >> > >> changes
>>>>> > > >> > >>
>>>>> > > >> > >> are backwards compatible in a way that if affected
>>>>> components do
>>>>> > > >> > >>
>>>>> > > >> > >> not
>>>>> > > >> > >>
>>>>> > > >> > >> support split alignment the behavior is as before.
>>>>> > > >> > >>
>>>>> > > >> > >> Best,
>>>>> > > >> > >> Sebastian
>>>>> > > >> > >>
>>>>> > > >> > >> [1]
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> >
>>>>> > > >>
>>>>> > >
>>>>> >
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
>>>>> > > >> > >>
>>>>> > > >> > >> [2] https://github.com/dawidwys/flink/tree/aligned-splits
>>>>> > > >> > >>
>>>>> > > >> > >> --
>>>>> > > >> > >>
>>>>> > > >> > >> Konstantin Knaufhttps://
>>>>> > > >> twitter.com/snntrablehttps://github.com/knaufk
>>>>> > > >> > >>
>>>>> > > >> > >>
>>>>> > > >> >
>>>>> > > >>
>>>>> > > >
>>>>> > >
>>>>> >
>>>>>
>>>>

Reply via email to