Hi Becket,

Thanks a lot for your fast and detailed response. For me, it converges and
dropping the supportsX method sounds very reasonable to me. (Side note:
With "pausable splits" enabled as "default" I think we misunderstood. As
you described now "default" I understand as that it should be the new
recommended way of implementation, and I think that is fully valid. Before,
I understood "default" here as the default implementation, i.e., throwing
UnsupportedOperationException, which is the exact opposite. :) )

Nevertheless: As mentioned, an open question for me is if watermark
alignment should enforce pausable splits. For clarification, the current
documentation [1] says:

*Note:* As of 1.15, Flink supports aligning across tasks of the same source
> and/or different sources. It does not support aligning
> splits/partitions/shards in the same task.
>
> In a case where there are e.g. two Kafka partitions that produce
> watermarks at different pace, that get assigned to the same task watermark
> might not behave as expected. Fortunately, worst case it should not perform
> worse than without alignment.
>
> Given the limitation above, we suggest applying watermark alignment in two
> situations:
>
>    1. You have two different sources (e.g. Kafka and File) that produce
>    watermarks at different speeds
>    2. You run your source with parallelism equal to the number of
>    splits/shards/partitions, which results in every subtask being assigned a
>    single unit of work.
>
> I personally see no issue in implementing and I see no reason against
implementing this dependency of watermark alignment and pausable splits. (I
think this would even be a good path towards shaping watermark alignment in
1.16.) However, "I don't see" means that I would be happy to hear Dawid's
and Piotrek's opinions as they implemented watermark alignment based on
FLIP-182 [2] and I don't want to miss relevant rationale/background info
from their side.

*@Piotrek* *@Dawid *What do you think?

Regards,
Sebastian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources?src=contextnavpagetreemode

On Wed, May 11, 2022 at 1:30 PM Becket Qin <becket....@gmail.com> wrote:

> +dev
>
> Hi Sebastian,
>
> Thank you for the summary. Please see the detailed replies inline. As a
> recap of my suggestions.
>
> 1. Pausable splits API.
>   a) Add default implementations to methods "pauseOrResumeSplits" in both
> SourceReader and SplitReader where both default implementations throw
>  UnsupportedOperationException.
>
> 2. User story.
>     a) We tell users to enable the watermark alignment as they like. This
> is exactly what the current Flink API is.
>     b) We tell the source developers, please implement pausable splits,
> otherwise bad things may happen. Think of it like you are expected to
> implement SourceReader#snapshotState() properly, otherwise exceptions will
> be thrown when users enable checkpointing.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, May 11, 2022 at 4:45 PM Sebastian Mattheis <
> sebast...@ververica.com> wrote:
>
>> Hi Becket, Hi everybody,
>>
>> I'm sorry if I misread the messages but I could not derive an agreement
>> from the mailing list. Nevertheless, if I understand you right the
>> suggestion is:
>>
>> * Add default implementations to methods "pauseOrResumeSplits" in both
>> SourceReader and SplitReader where both default implementations throw
>> UnsupportedOperationException.
>>
> Yes.
>
> * Add "supportsPauseOrResumeSplits" to the Source interface. (In the
>> following, I refer to supporting this as "pausable splits".)
>>
> We may no longer need this if pausable splits are expected to be
> implemented by the source developers, i.e. non-optional. Having this method
> would then be somewhat misleading as it looks like the sources that do not
> support pausable splits are also acceptable in the long term. So API wise,
> I'd say maybe we should remove this for this FLIP, although I believe this
> supportXXX pattern itself is still attractive for optional features.
>
>
>>
>> To make the conclusions explicit:
>>
>> 1. The implementation of pauseOrResumeSplits in both interfaces
>> SourceReader and SplitReader are optional where the default is that it
>> doesn't support it. (--> This means that the implementation is still
>> optional for the source developer.)
>>
> It is optional for backwards compatibility with existing sources, as they
> may still compile without code change. But starting from this FLIP, Flink
> will always optimistically assume that all the sources support pausable
> splits. If a source does not support pausable splits, it goes to an error
> handling path when watermark alignment is enabled on it. This is different
> from a usual optional feature, where no error is expected.
>
>
>> 2. If watermark alignment is enabled in the application code by adding
>> withWatermarkAlignment to the WatermarkStrategy while SourceReader or
>> SplitReader do not support pausableSplits, we throw an
>> UnsupportedOperationException.
>>
> Yes.
>
>
>> 3. With regard to your statement:
>>
>>> [...] basically means watermark alignment is an non-optional feature to
>>> the end users.
>>
>> You actually mean that "pausable splits" are non-optional for the app
>> developer if watermark alignment is enabled. However, watermark alignment
>> is optional and can be enabled/disabled.
>>
> Yes, watermark alignment can be enabled/disabled in individual sources in
> Flink jobs, which basically means the code supporting watermark alignment
> has to already be there. That again means the Source developers are also
> expected to support pausable splits by default. So this way we essentially
> tell the end users that you may enable / disable this feature as you wish,
> and tell the source developers that you SHOULD implement this because the
> end users may turn it on/off at will. And if the source does not support
> pausable splits, that goes to an error handling path when watermark
> alignment is enabled on it. So users know they have to explicitly exclude
> this source.
>
>
>>
>> So far it's totally clear to me and I hope this is what you mean. I also
>> agree with both statements:
>>
>> So making that expectation aligned with the source developers seems
>>> reasonable.
>>>
>>
>> I think this is a simple and clean solution from both the end user and
>>> source developers' standpoint.
>>>
>>
>> However, a last conclusion derives from 3. and is an open question for me:
>>
>> 4. The feature of "pausable splits" is now tightly bound to watermark
>> alignment, i.e., if sources do not support "pausable splits" one can not
>> enable watermark alignment for these sources. This dependency is not the
>> current status of watermark alignment implementation because it is/was
>> implemented without pausable splits. Do we want to introduce this
>> dependency? (This is an open question. I cannot judge that.)
>>
> The watermark alignment basically relies on the pausable splits, right? So
> personally I found it quite reasonable that if the source does not support
> pausable splits, end users cannot enable watermark alignment on it.
>
>
>> If something is wrong, please correct me.
>>
>> Regards,
>> Sebastian
>>
>> On Wed, May 11, 2022 at 9:05 AM Becket Qin <becket....@gmail.com> wrote:
>>
>>> Hi Sebastian,
>>>
>>> Thanks for the reply and patient discussion. I agree this is a tricky
>>> decision.
>>>
>>>
>>>> Nevertheless, Piotr has valid concerns about Option c) which I see as
>>>> follows:
>>>> (1) An interface with default NOOP implementation makes the
>>>> implementation optional. And in my opinion, a default implementation is and
>>>> will remain a way of making implementation optional because even in future
>>>> a developer can decide to implement the "old flavor" without support for
>>>> pausable splits.
>>>> (2) It may not be too critical but I also find it suboptimal that with
>>>> a NOOP default implementation there is no way to check at runtime if
>>>> SourceReader or SplitReader actually support pausing. (To do so, one would
>>>> need a supportsX method which makes it again more complicated.)\
>>>
>>>
>>> Based on the last few messages in the mailing list.  Piotr and I agreed
>>> that the default implementation should just throw an
>>> UnsupportedOperationException if the source is unpausable. So this
>>> basically tells the Source developers that this feature is expected to be
>>> supported. Because we cannot prevent end users from putting an unpausable
>>> source into the watermark alignment group, that basically means watermark
>>> alignment is an non-optional feature to the end users. So making that
>>> expectation aligned with the source developers seems reasonable.  And if a
>>> source does not support this feature, the end users should explicitly
>>> remove that source from the watermark alignment group.
>>>
>>> Personally speaking I think this is a simple and clean solution from
>>> both the end user and source developers' standpoint.
>>>
>>> Does this address your concerns?
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Wed, May 11, 2022 at 2:52 PM Sebastian Mattheis <
>>> sebast...@ververica.com> wrote:
>>>
>>>> Hi Piotr, Hi Becket, Hi everybody,
>>>>
>>>> we, Dawid and I, discussed the various suggestions/options and we would
>>>> be okay either way because we find neither solution is perfect just because
>>>> of the already present complexity.
>>>>
>>>> Option c) Adding methods to the interfaces of SourceReader and
>>>> SplitReader
>>>> Option a) Adding decorative interfaces to be used by SourceReader and
>>>> SplitReader
>>>>
>>>> As of the current status (v. 12) of the FLIP [1], it is based on Option
>>>> c) which we find acceptable because the complexity added is only a single
>>>> method.
>>>>
>>>> Nevertheless, Piotr has valid concerns about Option c) which I see as
>>>> follows:
>>>> (1) An interface with default NOOP implementation makes the
>>>> implementation optional. And in my opinion, a default implementation is and
>>>> will remain a way of making implementation optional because even in future
>>>> a developer can decide to implement the "old flavor" without support for
>>>> pausable splits.
>>>> (2) It may not be too critical but I also find it suboptimal that with
>>>> a NOOP default implementation there is no way to check at runtime if
>>>> SourceReader or SplitReader actually support pausing. (To do so, one would
>>>> need a supportsX method which makes it again more complicated.)
>>>>
>>>> However, we haven't changed it because Option a) is also not optimal or
>>>> straight-forward:
>>>> (1) We need to add two distinct yet similar decorative interfaces
>>>> since, as mentioned, the signatures of the methods are different. For
>>>> example, we would need decorative interfaces like
>>>> `SplitReaderWithPausableSplits` and `SourceReaderWithPausableSplits`.
>>>> (2) As a consequence, we would need to somehow document how/where to
>>>> implement both interfaces and how this relates to each other. This we could
>>>> solve by adding a note in the interface of SourceReader and SplitReader and
>>>> reference to the decorative interfaces but it still increases complexity
>>>> too.
>>>>
>>>> In summary, we see both as acceptable and preferred over other options.
>>>> The question is if we can find a solution or compromise that is acceptable
>>>> for everybody to reach consensus.
>>>>
>>>> Please let us know what you think because we would be happy if we can
>>>> conclude the discussion to avoid dropping the initiative on this FLIP.
>>>>
>>>> Regards,
>>>> Sebastian
>>>>
>>>> [1]
>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199540438
>>>> (v. 12)
>>>>
>>>> On Thu, May 5, 2022 at 10:13 AM Piotr Nowojski <pnowoj...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Guowei,
>>>>>
>>>>> as Dawid wrote a couple of messages back:
>>>>>
>>>>> > This is covered in the previous FLIP[1] which has been already
>>>>> implemented in 1.15. In short, it must be enabled with the watermark
>>>>> strategy which also configures drift and update interval
>>>>>
>>>>> So by default watermark alignment is disabled, regardless if a source
>>>>> supports it or not.
>>>>>
>>>>> Best,
>>>>> Piotrek
>>>>>
>>>>> czw., 5 maj 2022 o 09:56 Guowei Ma <guowei....@gmail.com> napisał(a):
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We know that in the case of Bounded input Flink supports the Batch
>>>>>> execution mode. Currently in Batch execution mode, flink is executed
>>>>>> on a
>>>>>> stage-by-stage basis. In this way, perhaps watermark alignment might
>>>>>> not
>>>>>> gain much.
>>>>>>
>>>>>> So my question is: Is watermark alignment the default behavior(for
>>>>>> implemented source only)? If so, have you considered evaluating the
>>>>>> impact
>>>>>> of this behavior on the Batch execution mode? Or thinks it is not
>>>>>> necessary.
>>>>>>
>>>>>> Correct me if I miss something.
>>>>>>
>>>>>> Best,
>>>>>> Guowei
>>>>>>
>>>>>>
>>>>>> On Thu, May 5, 2022 at 1:01 PM Piotr Nowojski <
>>>>>> piotr.nowoj...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Becket and Dawid,
>>>>>> >
>>>>>> > > I feel that no matter which option we choose this can not be
>>>>>> solved
>>>>>> > entirely in either of the options, because of the point above and
>>>>>> because
>>>>>> > the signature of SplitReader#pauseOrResumeSplits and
>>>>>> > SourceReader#pauseOrResumeSplits are slightly different (one
>>>>>> identifies
>>>>>> > splits with splitId the other one passes the splits directly).
>>>>>> >
>>>>>> > Yes, that's a good point in this case and for features that need to
>>>>>> be
>>>>>> > implemented in more than one place.
>>>>>> >
>>>>>> > > Is there any reason for pausing reading from a split an optional
>>>>>> feature,
>>>>>> > > other than that this was not included in the original interface?
>>>>>> >
>>>>>> > An additional argument in favor of making it optional is to
>>>>>> simplify source
>>>>>> > implementation. But on its own I'm not sure if that would be enough
>>>>>> to
>>>>>> > justify making this feature optional. Maybe.
>>>>>> >
>>>>>> > > I think it would be way simpler and clearer to just let end users
>>>>>> and
>>>>>> > Flink
>>>>>> > > assume all the connectors will implement this feature.
>>>>>> >
>>>>>> > As I wrote above that would be an interesting choice to make (ease
>>>>>> of
>>>>>> > implementation for new users, vs system consistency). Regardless of
>>>>>> that,
>>>>>> > yes, for me the main argument is the API backward compatibility.
>>>>>> But let's
>>>>>> > clear a couple of points:
>>>>>> > - The current proposal adding methods to the base interface with
>>>>>> default
>>>>>> > implementations is an OPTIONAL feature. Same as the decorative
>>>>>> version
>>>>>> > would be.
>>>>>> > - Decorative version could implement "throw
>>>>>> UnsupportedOperationException"
>>>>>> > if user enabled watermark alignment just as well and I agree that's
>>>>>> a
>>>>>> > better option compared to logging a warning.
>>>>>> >
>>>>>> > Best,
>>>>>> > Piotrek
>>>>>> >
>>>>>> >
>>>>>> > śr., 4 maj 2022 o 15:40 Becket Qin <becket....@gmail.com>
>>>>>> napisał(a):
>>>>>> >
>>>>>> > > Thanks for the reply and patient discussion, Piotr and Dawid.
>>>>>> > >
>>>>>> > > Is there any reason for pausing reading from a split an optional
>>>>>> feature,
>>>>>> > > other than that this was not included in the original interface?
>>>>>> > >
>>>>>> > > To be honest I am really worried about the complexity of the user
>>>>>> story
>>>>>> > > here. Optional features like this have a high overhead. Imagine
>>>>>> this
>>>>>> > > feature is optional, now a user enabled watermark alignment and
>>>>>> defined a
>>>>>> > > few watermark groups. Would it work? Hmm, that depends on whether
>>>>>> the
>>>>>> > > involved Source has implmemented this feature. If the Sources are
>>>>>> well
>>>>>> > > documented, good luck. Otherwise end users may have to look into
>>>>>> the code
>>>>>> > > of the Source to see whether the feature is supported. Which is
>>>>>> something
>>>>>> > > they shouldn't have to do.
>>>>>> > >
>>>>>> > > I think it would be way simpler and clearer to just let end users
>>>>>> and
>>>>>> > Flink
>>>>>> > > assume all the connectors will implement this feature. After all
>>>>>> the
>>>>>> > > watermark group is not optinoal to the end users. If in some rare
>>>>>> cases,
>>>>>> > > the feature cannot be supported, a clear
>>>>>> UnsupportedOperationException
>>>>>> > will
>>>>>> > > be thrown to tell users to explicitly remove this Source from the
>>>>>> > watermark
>>>>>> > > group. I don't think we should have a warning message here, as
>>>>>> they tend
>>>>>> > to
>>>>>> > > be ignored in many cases. If we do this, we don't even need the
>>>>>> > supportXXX
>>>>>> > > method in the Source for this feature. In fact this is exactly
>>>>>> how many
>>>>>> > > interfaces works today. For example,
>>>>>> SplitEnumerator#addSplitsBack() is
>>>>>> > not
>>>>>> > > supported by Pravega source because it does not support partial
>>>>>> failover.
>>>>>> > > In that case, it simply throws an exception to trigger a global
>>>>>> recovery.
>>>>>> > >
>>>>>> > > The reason we add a default implementation in this case would
>>>>>> just for
>>>>>> > the
>>>>>> > > sake of backwards compatibility so the old source can still
>>>>>> compile.
>>>>>> > Sure,
>>>>>> > > in short term, this feature might not be supported by many
>>>>>> existing
>>>>>> > > sources. That is OK, and it is quite visible to the source
>>>>>> developers
>>>>>> > that
>>>>>> > > they did not override the default impl which throws an
>>>>>> > > UnsupportedOperationException.
>>>>>> > >
>>>>>> > > @Dawid,
>>>>>> > >
>>>>>> > > the Java doc of the SupportXXX() method in the Source would be
>>>>>> the single
>>>>>> > > >> source of truth regarding how to implement this feature.
>>>>>> > > >
>>>>>> > > >
>>>>>> > >
>>>>>> > > I also don't find it entirely true. Half of the classes are
>>>>>> theoretically
>>>>>> > > > optional and are utility classes from the point of view how the
>>>>>> > > interfaces
>>>>>> > > > are organized. Theoretically users do not need to use any of
>>>>>> > > > SourceReaderBase & SplitReader. Would be weird to list their
>>>>>> methods in
>>>>>> > > the
>>>>>> > > > Source interface.
>>>>>> > >
>>>>>> > > I think the ultimate goal of java docs is to guide users to
>>>>>> implement the
>>>>>> > > Source. If SourceReaderBase is the preferred way to implement a
>>>>>> > > SourceReader, it seems worth mentioning that. Even the Java
>>>>>> language
>>>>>> > > documentation interfaces lists the konwn implementations [1] so
>>>>>> people
>>>>>> > can
>>>>>> > > leverage them. But for this particular case, if we make the
>>>>>> feature
>>>>>> > > non-optional, we don't even need the supportXXX() method for now.
>>>>>> > >
>>>>>> > > Thanks,
>>>>>> > >
>>>>>> > > Jiangjie (Becket) Qin
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > > On Wed, May 4, 2022 at 4:37 PM Dawid Wysakowicz <
>>>>>> dwysakow...@apache.org>
>>>>>> > > wrote:
>>>>>> > >
>>>>>> > > > Hey Piotr and Becket,
>>>>>> > > >
>>>>>> > > > First of all, let me say I am happy with whichever option is
>>>>>> agreed in
>>>>>> > > the
>>>>>> > > > discussion.
>>>>>> > > >
>>>>>> > > > I wanted to clarify a few points from the discussion though:
>>>>>> > > >
>>>>>> > > > @Becket:
>>>>>> > > >
>>>>>> > > > The main argument for adding the methods to the SourceReader is
>>>>>> that
>>>>>> > > these
>>>>>> > > > methods are effectively NON-OPTIONAL to SourceReader impl, i.e.
>>>>>> > starting
>>>>>> > > > from this FLIP, all the SourceReaders impl are expected to
>>>>>> support this
>>>>>> > > > method, although some old implementations may not have
>>>>>> implemented this
>>>>>> > > > feature. I think we should distinguish the new features from the
>>>>>> > optional
>>>>>> > > > features. While the public decorative interface is a solution
>>>>>> to the
>>>>>> > > > optional features. We should not use it for the features that
>>>>>> are
>>>>>> > > > non-optional.
>>>>>> > > >
>>>>>> > > > I don't think that this feature is NON-OPTIONAL. Even though
>>>>>> > preferred, I
>>>>>> > > > still think it can be simply optional.
>>>>>> > > >
>>>>>> > > > the Java doc of the SupportXXX() method in the Source would be
>>>>>> the
>>>>>> > single
>>>>>> > > > source of truth regarding how to implement this feature.
>>>>>> > > >
>>>>>> > > > I also don't find it entirely true. Half of the classes are
>>>>>> > theoretically
>>>>>> > > > optional and are utility classes from the point of view how the
>>>>>> > > interfaces
>>>>>> > > > are organized. Theoretically users do not need to use any of
>>>>>> > > > SourceReaderBase & SplitReader. Would be weird to list their
>>>>>> methods in
>>>>>> > > the
>>>>>> > > > Source interface.
>>>>>> > > >
>>>>>> > > > @Piotr
>>>>>> > > >
>>>>>> > > > If we have all of the methods with default implementation in
>>>>>> the base
>>>>>> > > > interface, the API doesn't give any clue to the user which set
>>>>>> of
>>>>>> > methods
>>>>>> > > > are required to be implemented at the same time.
>>>>>> > > >
>>>>>> > > > I feel that no matter which option we choose this can not be
>>>>>> solved
>>>>>> > > > entirely in either of the options, because of the point above
>>>>>> and
>>>>>> > because
>>>>>> > > > the signature of SplitReader#pauseOrResumeSplits and
>>>>>> > > > SourceReader#pauseOrResumeSplits are slightly different (one
>>>>>> identifies
>>>>>> > > > splits with splitId the other one passes the splits directly).
>>>>>> > > >
>>>>>> > > > Best,
>>>>>> > > >
>>>>>> > > > Dawid
>>>>>> > > > On 03/05/2022 14:30, Becket Qin wrote:
>>>>>> > > >
>>>>>> > > > Hi Piotr,
>>>>>> > > >
>>>>>> > > > Thanks for the comment.
>>>>>> > > >
>>>>>> > > > Just to clarify, I am not against the decorative interfaces,
>>>>>> but I do
>>>>>> > > > think we should use them with caution. The main argument for
>>>>>> adding the
>>>>>> > > > methods to the SourceReader is that these methods are
>>>>>> > > > effectively NON-OPTIONAL to SourceReader impl, i.e. starting
>>>>>> from this
>>>>>> > > > FLIP, all the SourceReaders impl are expected to support this
>>>>>> > > > method, although some old implementations may not have
>>>>>> implemented this
>>>>>> > > > feature. I think we should distinguish the new features from the
>>>>>> > optional
>>>>>> > > > features. While the public decorative interface is a solution
>>>>>> to the
>>>>>> > > > optional features. We should not use it for the features that
>>>>>> are
>>>>>> > > > non-optional.
>>>>>> > > >
>>>>>> > > > That said, this feature is optional for SplitReaders. Arguably
>>>>>> we can
>>>>>> > > have
>>>>>> > > > a decorative interface for that, but for simplicity and
>>>>>> symmetry of the
>>>>>> > > > interface, personally I prefer just adding a new method.
>>>>>> > > >
>>>>>> > > > Regarding the advantages you mentioned about the decorative
>>>>>> interfaces,
>>>>>> > > > they would make sense if:
>>>>>> > > > 1. The feature is optional.
>>>>>> > > > 2. There is only one decorative interface involved for a
>>>>>> feature.
>>>>>> > > > Otherwise the argument that all the methods are grouped
>>>>>> together will
>>>>>> > not
>>>>>> > > > stand.
>>>>>> > > >
>>>>>> > > > Compared with that, I think the current solution works fine in
>>>>>> all
>>>>>> > cases,
>>>>>> > > > i.e. "having supportXXX() method in Source, and default methods
>>>>>> /
>>>>>> > > > decorative interfaces in base interfaces.".
>>>>>> > > >
>>>>>> > > > The advantages are:
>>>>>> > > >> - clean and easy to implement base interface
>>>>>> > > >
>>>>>> > > > In the current approach, the Java doc of the SupportXXX()
>>>>>> method in the
>>>>>> > > > Source would be the single source of truth regarding how to
>>>>>> implement
>>>>>> > > this
>>>>>> > > > feature. It lists the method that has to be implemented to
>>>>>> support this
>>>>>> > > > feature, regardless of how many classes / interfaces are
>>>>>> involved.
>>>>>> > > >
>>>>>> > > > When implementing the base interface, users do not need to
>>>>>> implement a
>>>>>> > > > method with default implementation. If they are curious what
>>>>>> the method
>>>>>> > > is
>>>>>> > > > for, the java doc of that method simply points users to the
>>>>>> > SupportXXX()
>>>>>> > > > method in the Source. It seems not adding work to the users
>>>>>> compared
>>>>>> > with
>>>>>> > > > decorative interfaces, but gives much better discoverability.
>>>>>> > > >
>>>>>> > > > - all of the methods from a single feature are grouped in a
>>>>>> single
>>>>>> > > >> decorator interface, together with their dedicated java doc.
>>>>>> It's also
>>>>>> > > >> easier to google search for help using the decorator name
>>>>>> > > >
>>>>>> > > > - if an optional feature requires two methods to be implemented
>>>>>> at
>>>>>> > once,
>>>>>> > > >> decorator can guarantee that
>>>>>> > > >
>>>>>> > > > These two points are not true when multiple components and
>>>>>> classes are
>>>>>> > > > involved collaboratively to provide a feature. In our case, we
>>>>>> have
>>>>>> > both
>>>>>> > > > SourceReader and SplitReader involved. And there might be other
>>>>>> > > interfaces
>>>>>> > > > on the JM side involved for some future features. So the
>>>>>> relevant
>>>>>> > methods
>>>>>> > > > can actually be scattered over the places. That said, we may
>>>>>> still use
>>>>>> > > > decorative interfaces for each component, if the feature is
>>>>>> optional,
>>>>>> > > given
>>>>>> > > > there is a single source of truth for the feature.
>>>>>> > > >
>>>>>> > > > Here I would strongly lean towards making life easier for new
>>>>>> users,
>>>>>> > > >> lowering the entry barrier, at the (imo) slight expense for
>>>>>> the power
>>>>>> > > >> users.
>>>>>> > > >
>>>>>> > > > I actually think the current approach is simpler, more
>>>>>> extensible and
>>>>>> > > more
>>>>>> > > > general for all the users. Can you articulate a bit more on
>>>>>> which part
>>>>>> > > you
>>>>>> > > > think makes users harder to understand?
>>>>>> > > >
>>>>>> > > >
>>>>>> > > > There is another benefit of the decorative interfaces which is
>>>>>> not
>>>>>> > > > mentioned, but might be worth considering here. Usually the
>>>>>> decorative
>>>>>> > > > interfaces give slightly better backwards compatibility than
>>>>>> the new
>>>>>> > > > default method in the interfaces. That is when users are using
>>>>>> a jar
>>>>>> > that
>>>>>> > > > was compiled with an older version of Flink which does not have
>>>>>> the
>>>>>> > > default
>>>>>> > > > method in the interfaces in question. A decorative interface
>>>>>> may still
>>>>>> > > > provide backwards compatibility in that case, while default
>>>>>> method impl
>>>>>> > > > cannot.
>>>>>> > > >
>>>>>> > > > I think in Flink we in general do not guarantee custom
>>>>>> components
>>>>>> > > compiled
>>>>>> > > > with an older version can run with a newer version of Flink. A
>>>>>> > recompile
>>>>>> > > > with a newer version would be required. That said, if we do
>>>>>> care about
>>>>>> > > > this, we can just change the "supportXXX()" method in the Source
>>>>>> > > interface
>>>>>> > > > to use decorative interfaces, and leave the other parts
>>>>>> unchanged.
>>>>>> > > >
>>>>>> > > > Thanks,
>>>>>> > > >
>>>>>> > > > Jiangjie (Becket) Qin
>>>>>> > > >
>>>>>> > > >
>>>>>> > > >
>>>>>> > > >
>>>>>> > > > On Tue, May 3, 2022 at 6:25 PM Piotr Nowojski <
>>>>>> pnowoj...@apache.org>
>>>>>> > > > wrote:
>>>>>> > > >
>>>>>> > > >> Hi,
>>>>>> > > >>
>>>>>> > > >> Sorry for chipping in so late, but I was OoO for the last two
>>>>>> weeks.
>>>>>> > > >> Regarding the interfaces, I would be actually against adding
>>>>>> those
>>>>>> > > methods
>>>>>> > > >> to the base interfaces for the reasons mentioned above.
>>>>>> Clogging the
>>>>>> > > base
>>>>>> > > >> interface for new users with tons of methods that they do not
>>>>>> need, do
>>>>>> > > not
>>>>>> > > >> understand and do not know what to do with them. Moreover, such
>>>>>> > > decorative
>>>>>> > > >> interfaces are solving a problem if a feature requires two or
>>>>>> more
>>>>>> > > methods
>>>>>> > > >> to be implemented at the same time. If we have all of the
>>>>>> methods with
>>>>>> > > >> default implementation in the base interface, the API doesn't
>>>>>> give any
>>>>>> > > >> clue
>>>>>> > > >> to the user which set of methods are required to be
>>>>>> implemented at the
>>>>>> > > >> same
>>>>>> > > >> time.
>>>>>> > > >>
>>>>>> > > >> > a) I feel the biggest drawback of decorative interfaces is
>>>>>> which
>>>>>> > > >> interface
>>>>>> > > >> > they can decorate and which combinations of multiple
>>>>>> decorative
>>>>>> > > >> interfaces
>>>>>> > > >> > are valid. (...)
>>>>>> > > >> > In the future, if there is a new feature added
>>>>>> > > >> > (e.g. sorted or pre-partitioned data aware), are we going to
>>>>>> create
>>>>>> > > >> another
>>>>>> > > >> > interface of SplitReader such as SortedSplitReader or
>>>>>> > > >> PrePartitionedAware?
>>>>>> > > >> > Can they be combined? So I think the additional decorative
>>>>>> interface
>>>>>> > > >> like
>>>>>> > > >> > withSplitsAlignment actually increases the understanding
>>>>>> cost of
>>>>>> > users
>>>>>> > > >> > because they have to know what decorative interfaces are
>>>>>> there,
>>>>>> > which
>>>>>> > > >> > interface they can decorate and which combinations of the
>>>>>> decorative
>>>>>> > > >> > interfaces are valid and which are not. Ideally we want to
>>>>>> avoid
>>>>>> > that.
>>>>>> > > >>
>>>>>> > > >> I'm not sure if I understand how embedding default methods in
>>>>>> the base
>>>>>> > > >> interface is solving the problem: what can be combined or not?
>>>>>> If
>>>>>> > there
>>>>>> > > >> are
>>>>>> > > >> two conflicting features, having decorative interfaces that
>>>>>> can not be
>>>>>> > > >> mixed together actually makes much more sense to me rather
>>>>>> than having
>>>>>> > > >> them
>>>>>> > > >> all in one base class. How would you allow users to implement
>>>>>> only one
>>>>>> > > of
>>>>>> > > >> those two features?
>>>>>> > > >>
>>>>>> > > >> To reiterate on the issue. Yes, there are drawbacks:
>>>>>> > > >> - how can a user discover what decorators are there?
>>>>>> > > >> - how can a user know where the decorator can be applied to?
>>>>>> > > >>
>>>>>> > > >> However those are drawbacks for more power users, that can be
>>>>>> > mitigated
>>>>>> > > by
>>>>>> > > >> the documentation. For example listing all of the decorators
>>>>>> with
>>>>>> > > >> detailed explanation both in the docs and in the java docs.
>>>>>> More
>>>>>> > > >> experienced users will be able to deal with those issues
>>>>>> easier, as
>>>>>> > they
>>>>>> > > >> will already have some basic understanding of Flink. Also if
>>>>>> user has
>>>>>> > a
>>>>>> > > >> problem that he wants to solve, he will google search a
>>>>>> potential
>>>>>> > > solution
>>>>>> > > >> to his problem anyway, and while doing that he is very likely
>>>>>> to
>>>>>> > > discover
>>>>>> > > >> the decorator that he needs anyway in the docs.
>>>>>> > > >>
>>>>>> > > >> The advantages are:
>>>>>> > > >> - clean and easy to implement base interface
>>>>>> > > >> - all of the methods from a single feature are grouped in a
>>>>>> single
>>>>>> > > >> decorator interface, together with their dedicated java doc.
>>>>>> It's also
>>>>>> > > >> easier to google search for help using the decorator name
>>>>>> > > >> - if an optional feature requires two methods to be
>>>>>> implemented at
>>>>>> > once,
>>>>>> > > >> decorator can guarantee that
>>>>>> > > >>
>>>>>> > > >> Here I would strongly lean towards making life easier for new
>>>>>> users,
>>>>>> > > >> lowering the entry barrier, at the (imo) slight expense for
>>>>>> the power
>>>>>> > > >> users.
>>>>>> > > >>
>>>>>> > > >> Best,
>>>>>> > > >> Piotrek
>>>>>> > > >>
>>>>>> > > >>
>>>>>> > > >> wt., 26 kwi 2022 o 15:32 Becket Qin <becket....@gmail.com>
>>>>>> > napisał(a):
>>>>>> > > >>
>>>>>> > > >> > Thanks for the reply Sebastian and Dawid,
>>>>>> > > >> >
>>>>>> > > >> > I think Sebastion has a good summary. This is a really
>>>>>> helpful
>>>>>> > > >> discussion.
>>>>>> > > >> >
>>>>>> > > >> > Thinking a bit more, I feel that it might still be better to
>>>>>> add the
>>>>>> > > >> > supportsXXX() method in the Source rather than SourceReader.
>>>>>> > > >> >
>>>>>> > > >> > Generally speaking, what we are trying to do here is to let
>>>>>> the
>>>>>> > Flink
>>>>>> > > >> > framework know what the Source is capable of. In this FLIP,
>>>>>> it
>>>>>> > happens
>>>>>> > > >> to
>>>>>> > > >> > be the capability that only involves SourceReader. But in the
>>>>>> > future,
>>>>>> > > >> it is
>>>>>> > > >> > possible that another functionality involves both the
>>>>>> > SplitEnumerator
>>>>>> > > >> and
>>>>>> > > >> > SourceReader. In that case, following the current approach,
>>>>>> we
>>>>>> > should
>>>>>> > > >> put
>>>>>> > > >> > the "supportsXXX()" method in both SplitEnumerator and
>>>>>> SourceReader.
>>>>>> > > >> > Because if we only put this in the SourceReader, then the JM
>>>>>> would
>>>>>> > > have
>>>>>> > > >> to
>>>>>> > > >> > create a SourceReader in order to know whether this feature
>>>>>> is
>>>>>> > > >> supported,
>>>>>> > > >> > which is a little ugly. But if we put the "supportsXXX()"
>>>>>> method in
>>>>>> > > the
>>>>>> > > >> > Source, we will break the "symmetric" design because this
>>>>>> FLIP
>>>>>> > chose a
>>>>>> > > >> > different way.
>>>>>> > > >> >
>>>>>> > > >> > This is also why I think supportsXXX() method seems a good
>>>>>> thing to
>>>>>> > > >> have,
>>>>>> > > >> > because when there are a few interfaces / methods that are
>>>>>> expected
>>>>>> > to
>>>>>> > > >> be
>>>>>> > > >> > implemented at the same time in order to deliver a feature,
>>>>>> it is
>>>>>> > > always
>>>>>> > > >> > good to have a single source of truth to tell the framework
>>>>>> what to
>>>>>> > > do,
>>>>>> > > >> so
>>>>>> > > >> > the framework can do consistent things in different parts.
>>>>>> > > >> >
>>>>>> > > >> > @Sebastian Mattheis <sebast...@ververica.com>
>>>>>> > > >> >
>>>>>> > > >> > Regarding interface flavor b), i.e. AlignedSourceReader +
>>>>>> > > >> > AlignedSplitReader, what I feel awkward about is that we are
>>>>>> > > essentially
>>>>>> > > >> > expecting almost all the SourceReader implementations to
>>>>>> extend
>>>>>> > > >> > SourceReaderBase, which effectively makes the SourceReader
>>>>>> interface
>>>>>> > > >> > without the pausing support useless. So this indicates that
>>>>>> public
>>>>>> > > >> > decorative interfaces (or sub-interfaces for the same
>>>>>> purpose) only
>>>>>> > > >> > make sense if the original interface is also expected to be
>>>>>> used.
>>>>>> > > >> > Otherwise, it seems makes more sense to add the method to the
>>>>>> > original
>>>>>> > > >> > interface itself.
>>>>>> > > >> >
>>>>>> > > >> > Cheers,
>>>>>> > > >> >
>>>>>> > > >> > Jiangjie (Becket) Qin
>>>>>> > > >> >
>>>>>> > > >> >
>>>>>> > > >> >
>>>>>> > > >> >
>>>>>> > > >> > On Tue, Apr 26, 2022 at 6:05 PM Dawid Wysakowicz <
>>>>>> > > >> dwysakow...@apache.org>
>>>>>> > > >> > wrote:
>>>>>> > > >> >
>>>>>> > > >> > > Thanks @Sebastian for the nice summary.
>>>>>> > > >> > >
>>>>>> > > >> > > I think most of your points aligned with the suggestions I
>>>>>> made to
>>>>>> > > the
>>>>>> > > >> > > FLIP, while you were writing your reply (I believe we hit
>>>>>> enter
>>>>>> > > >> nearly at
>>>>>> > > >> > > the same time ;) )
>>>>>> > > >> > >
>>>>>> > > >> > > Two points after we synced offline
>>>>>> > > >> > >
>>>>>> > > >> > > 1. I changed also the supportsWatermarksSplitAlignment to
>>>>>> > > >> > > supportsPausingSplits to express the general capability of
>>>>>> > pausing.
>>>>>> > > >> > >
>>>>>> > > >> > > 2. As for if we should
>>>>>> PausingSourceReader/PausingSplitReader
>>>>>> > > (option
>>>>>> > > >> b)
>>>>>> > > >> > > or if we should just add the methods (option c), I suggest
>>>>>> to
>>>>>> > simply
>>>>>> > > >> add
>>>>>> > > >> > > the two methods as I felt this is much preferred approach
>>>>>> Becket,
>>>>>> > > >> which
>>>>>> > > >> > > others do not object. Unless there is an opposition let's
>>>>>> go with
>>>>>> > > this
>>>>>> > > >> > > option c.
>>>>>> > > >> > >
>>>>>> > > >> > > Best,
>>>>>> > > >> > >
>>>>>> > > >> > > Dawid
>>>>>> > > >> > > On 26/04/2022 10:06, Sebastian Mattheis wrote:
>>>>>> > > >> > >
>>>>>> > > >> > > Hi folks,
>>>>>> > > >> > >
>>>>>> > > >> > > Sorry for being a bit silent. Many thanks for all the
>>>>>> input and
>>>>>> > > >> > > suggestions. As I'm a bit new, I needed some time to catch
>>>>>> up and
>>>>>> > > >> > structure
>>>>>> > > >> > > (for myself) the discussion and I wanted to find a way to
>>>>>> > structure
>>>>>> > > >> the
>>>>>> > > >> > > conclusions. (Also because I had the feeling that some
>>>>>> concerns
>>>>>> > got
>>>>>> > > >> lost
>>>>>> > > >> > in
>>>>>> > > >> > > the discussion.) This is my attempt and please correct me
>>>>>> if
>>>>>> > > >> something is
>>>>>> > > >> > > wrong or misunderstood. I tried to collect and assemble the
>>>>>> > > opinions,
>>>>>> > > >> > > suggestions, and conclusions (to the best of my knowledge):
>>>>>> > > >> > >
>>>>>> > > >> > > # Top A: Should split alignment (pause/resume behavior) be
>>>>>> a
>>>>>> > general
>>>>>> > > >> > > capability?
>>>>>> > > >> > >
>>>>>> > > >> > > I personally don't see any reason no to have it a general
>>>>>> > capability
>>>>>> > > >> > > because for the alignSplit method it is actually
>>>>>> independent of
>>>>>> > the
>>>>>> > > >> > > watermarks. If we agree here to have it a general
>>>>>> capability, we
>>>>>> > > >> should
>>>>>> > > >> > > also agree on the right wording. Does
>>>>>> "alignSplits(splitsToResume,
>>>>>> > > >> > > splitsToPause)" refer to what is then actually meant? (I
>>>>>> see it as
>>>>>> > > >> okay.
>>>>>> > > >> > I
>>>>>> > > >> > > don't have any better idea whilst Arvid suggested
>>>>>> > > >> "pauseOrResumeSplits".)
>>>>>> > > >> > >
>>>>>> > > >> > > # Top B: Should it be possible do enable/disable split
>>>>>> alignment?
>>>>>> > > >> > >
>>>>>> > > >> > > I would personally not disable the split alignment on the
>>>>>> source
>>>>>> > > >> reader
>>>>>> > > >> > > side because if split alignment is used for some other use
>>>>>> case
>>>>>> > (see
>>>>>> > > >> A)
>>>>>> > > >> > it
>>>>>> > > >> > > could have nasty side effects on other/future use cases.
>>>>>> Instead,
>>>>>> > I
>>>>>> > > >> would
>>>>>> > > >> > > disable "watermark split alignment" where I think it should
>>>>>> > disable
>>>>>> > > >> the
>>>>>> > > >> > > watermark-dependent trigger for split alignment.
>>>>>> > > >> > >
>>>>>> > > >> > > # Top C: Should we add a supportsX method?
>>>>>> > > >> > >
>>>>>> > > >> > > I find it difficult to define the scope of a supportsX
>>>>>> method
>>>>>> > w.r.t.
>>>>>> > > >> to
>>>>>> > > >> > > the following questions: a) Where is it used? and b) What
>>>>>> is the
>>>>>> > > >> expected
>>>>>> > > >> > > output? To b), it's not straight-forward to provide a
>>>>>> meaningful
>>>>>> > > >> output,
>>>>>> > > >> > > e.g., if SourceReader supports split alignment but
>>>>>> SplitReader
>>>>>> > not.
>>>>>> > > >> This
>>>>>> > > >> > is
>>>>>> > > >> > > because with the current implementation, we can determine
>>>>>> whether
>>>>>> > > >> split
>>>>>> > > >> > > alignment is fully supported only during runtime and
>>>>>> specifically
>>>>>> > > >> > actually
>>>>>> > > >> > > only when calling alignSplits down the call hierarchy up
>>>>>> to the
>>>>>> > > actual
>>>>>> > > >> > > SplitReaders.
>>>>>> > > >> > >
>>>>>> > > >> > > Therefore, I would suggest to either raise an error or
>>>>>> warning if
>>>>>> > > the
>>>>>> > > >> > > alignment is called but not supported at some point. I
>>>>>> know we
>>>>>> > > should
>>>>>> > > >> > > carefully think about when this could be the case because
>>>>>> we don't
>>>>>> > > >> want
>>>>>> > > >> > to
>>>>>> > > >> > > flood anybody with such warnings. However, warnings could
>>>>>> be an
>>>>>> > > >> indicator
>>>>>> > > >> > > for the user that for watermark split alignment use case
>>>>>> split
>>>>>> > > >> reading is
>>>>>> > > >> > > imbalanced with the conclusion to either disable the
>>>>>> trigger for
>>>>>> > > >> > watermark
>>>>>> > > >> > > split alignment (see Top B) or to use/implement a source
>>>>>> and
>>>>>> > reader
>>>>>> > > >> that
>>>>>> > > >> > > fully supports split alignment.
>>>>>> > > >> > >
>>>>>> > > >> > > # Top D: How to design interfaces?
>>>>>> > > >> > >
>>>>>> > > >> > > Thanks for structuring the discussion with the the various
>>>>>> > > >> possibilities
>>>>>> > > >> > > (a-d). From the discussion and emails, I would like to
>>>>>> summarize
>>>>>> > the
>>>>>> > > >> > > following requirements:
>>>>>> > > >> > > - Interfaces should be consistent ("symmetric"), i.e.,
>>>>>> similar
>>>>>> > > >> semantics
>>>>>> > > >> > > should have similar interfaces with similar usage.
>>>>>> > > >> > > - Make explicit which implementations implement
>>>>>> interfaces/support
>>>>>> > > >> > > behavior.
>>>>>> > > >> > > - Make clear what are default implementations and how to
>>>>>> implement
>>>>>> > > >> > > interfaces with desired behavior.
>>>>>> > > >> > >
>>>>>> > > >> > > This is a simplified view of the relations between relevant
>>>>>> > classes
>>>>>> > > of
>>>>>> > > >> > the
>>>>>> > > >> > > PoC implementation:
>>>>>> > > >> > >
>>>>>> > > >> > > SourceReader (Public) <|-- SourceReaderBase (Internal)
>>>>>> <|-- ..
>>>>>> > <|--
>>>>>> > > >> > > MySourceReader
>>>>>> > > >> > >
>>>>>> > > >> > > MySourceReader <>-- SplitFetcherManager (Internal) <>--
>>>>>> > SplitFetcher
>>>>>> > > >> > > (Internal) <>-- SplitReader (Public) <|-- MySplitReader
>>>>>> > > >> > >
>>>>>> > > >> > > (A <|-- B: B inherits from A; A <>-- B: A "has a" B)
>>>>>> > > >> > >
>>>>>> > > >> > > Note that SourceReaderBase and SplitFetcherManager
>>>>>> implement most
>>>>>> > of
>>>>>> > > >> the
>>>>>> > > >> > > "logic" for split alignment just because we wanted to
>>>>>> implement
>>>>>> > > split
>>>>>> > > >> > > alignment and wanted it to be available as kind of a
>>>>>> default. As a
>>>>>> > > >> > > consequence, we have a "default implementation" for
>>>>>> SourceReader
>>>>>> > > that
>>>>>> > > >> > > implements the actual logic for split alignment. For that
>>>>>> reason,
>>>>>> > I
>>>>>> > > >> find
>>>>>> > > >> > it
>>>>>> > > >> > > very confusing to have a NOOP default implementation in the
>>>>>> > > interface
>>>>>> > > >> for
>>>>>> > > >> > > the SourceReader. As a consequence, interface strategy c)
>>>>>> is
>>>>>> > > difficult
>>>>>> > > >> > > because this would require NOOP default implementations in
>>>>>> the
>>>>>> > > public
>>>>>> > > >> > > interfaces of SourceReader and SplitReader. This is the
>>>>>> same for
>>>>>> > > >> strategy
>>>>>> > > >> > > d) because it would require NOOP default implementation in
>>>>>> the
>>>>>> > > >> > > SourceReader. Further, as Dawid described method
>>>>>> signatures of
>>>>>> > > >> alignSplit
>>>>>> > > >> > > for SourceReader and SplitReader differ and it would be
>>>>>> extremely
>>>>>> > > >> > difficult
>>>>>> > > >> > > to make the signatures the same (with even potential
>>>>>> performance
>>>>>> > > >> impact
>>>>>> > > >> > > because of additional loop-ups of split ids). Therefore,
>>>>>> having a
>>>>>> > > >> > symmetric
>>>>>> > > >> > > decorative interface as of strategy a) is actually not
>>>>>> possible
>>>>>> > and
>>>>>> > > >> > having
>>>>>> > > >> > > two decorative interfaces with different method signatures
>>>>>> is
>>>>>> > > >> confusing.
>>>>>> > > >> > My
>>>>>> > > >> > > conclusion is that we are best with strategy b) which
>>>>>> means to
>>>>>> > have
>>>>>> > > >> > > specializing sub-interfaces that inherit from the parent
>>>>>> > interface:
>>>>>> > > >> > > SourceReader <|-- AlignedSourceReader, SplitReader <|--
>>>>>> > > >> > AlignedSplitReader
>>>>>> > > >> > > With this option, I'm not 100% sure what the implications
>>>>>> are and
>>>>>> > if
>>>>>> > > >> this
>>>>>> > > >> > > could get nasty. I would suggest that Dawid and I just try
>>>>>> to
>>>>>> > > >> implement
>>>>>> > > >> > and
>>>>>> > > >> > > see if we like it. :)
>>>>>> > > >> > >
>>>>>> > > >> > > # Summary
>>>>>> > > >> > >
>>>>>> > > >> > > In conclusion, please let me know your perspectives. Please
>>>>>> > correct
>>>>>> > > >> me,
>>>>>> > > >> > if
>>>>>> > > >> > > something is wrong or if I misunderstood something. My
>>>>>> perspective
>>>>>> > > >> would
>>>>>> > > >> > be:
>>>>>> > > >> > >
>>>>>> > > >> > > Top A: Yes
>>>>>> > > >> > > Top B: Yes (but disable watermark trigger for split
>>>>>> alignment)
>>>>>> > > >> > > Top C: No
>>>>>> > > >> > > Top D: b)
>>>>>> > > >> > >
>>>>>> > > >> > > Best,
>>>>>> > > >> > > Sebastian
>>>>>> > > >> > >
>>>>>> > > >> > > On Tue, Apr 26, 2022 at 9:55 AM Dawid Wysakowicz <
>>>>>> > > >> dwysakow...@apache.org
>>>>>> > > >> > >
>>>>>> > > >> > > wrote:
>>>>>> > > >> > >
>>>>>> > > >> > >> @Arvid:
>>>>>> > > >> > >>
>>>>>> > > >> > >> While I also like Becket's capability approach, I fear
>>>>>> that it
>>>>>> > > >> doesn't
>>>>>> > > >> > work
>>>>>> > > >> > >> for this particular use case: Sources can always be
>>>>>> aligned
>>>>>> > > >> cross-task
>>>>>> > > >> > and
>>>>>> > > >> > >> this is just about intra-task alignment. So it's
>>>>>> plausible to put
>>>>>> > > >> > sources
>>>>>> > > >> > >> into an alignment group even though they do not use any
>>>>>> of the
>>>>>> > > >> presented
>>>>>> > > >> > >> API of FLIP-217. They should just issue a warning, if
>>>>>> they handle
>>>>>> > > >> > multiple
>>>>>> > > >> > >> splits (see motivation section).
>>>>>> > > >> > >>
>>>>>> > > >> > >> Yes, but the "supportXXX" method would be for telling if
>>>>>> it
>>>>>> > > supports
>>>>>> > > >> > that
>>>>>> > > >> > >> intra-task alignment. Cross-task alignment would always be
>>>>>> > > supported.
>>>>>> > > >> > >>
>>>>>> > > >> > >> I updated interfaces to what I believe to be closest to a
>>>>>> > consensus
>>>>>> > > >> > >> between all participants. Do you mind taking a look?
>>>>>> > > >> > >>
>>>>>> > > >> > >> @Sebastian Do you mind addressing the nits?
>>>>>> > > >> > >>
>>>>>> > > >> > >> Best,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Dawid
>>>>>> > > >> > >>
>>>>>> > > >> > >> On 25/04/2022 13:39, Arvid Heise wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thanks for pushing this effort.
>>>>>> > > >> > >>
>>>>>> > > >> > >> I'd actually be in favor of 1b). I fully agree that
>>>>>> decorator
>>>>>> > > >> interfaces
>>>>>> > > >> > >> should be avoided but I'm also not a big fan of
>>>>>> overloading the
>>>>>> > > base
>>>>>> > > >> > >> interfaces (they are hard to implement as is). The usual
>>>>>> feedback
>>>>>> > > to
>>>>>> > > >> > >> Source-related interfaces are always that they are
>>>>>> overwhelming
>>>>>> > and
>>>>>> > > >> too
>>>>>> > > >> > >> hard to implement. However, I'd also not oppose 1c) as
>>>>>> scattered
>>>>>> > > >> > interfaces
>>>>>> > > >> > >> also have drawbacks. I'd just dislike 1a) and 1d).
>>>>>> > > >> > >> While I also like Becket's capability approach, I fear
>>>>>> that it
>>>>>> > > >> doesn't
>>>>>> > > >> > work
>>>>>> > > >> > >> for this particular use case: Sources can always be
>>>>>> aligned
>>>>>> > > >> cross-task
>>>>>> > > >> > and
>>>>>> > > >> > >> this is just about intra-task alignment. So it's
>>>>>> plausible to put
>>>>>> > > >> > sources
>>>>>> > > >> > >> into an alignment group even though they do not use any
>>>>>> of the
>>>>>> > > >> presented
>>>>>> > > >> > >> API of FLIP-217. They should just issue a warning, if
>>>>>> they handle
>>>>>> > > >> > multiple
>>>>>> > > >> > >> splits (see motivation section).
>>>>>> > > >> > >>
>>>>>> > > >> > >> I think renaming alignSplits to facilitate future use
>>>>>> cases makes
>>>>>> > > >> sense
>>>>>> > > >> > but
>>>>>> > > >> > >> then all interfaces (if 1c) is chosen) should be adjusted
>>>>>> > > >> accordingly.
>>>>>> > > >> > >> AlignedSourceReader could be PausingSourceReader and I'd
>>>>>> go for
>>>>>> > > >> > >> pauseOrResumeSplits (Becket's proposal afaik). We could
>>>>>> also
>>>>>> > split
>>>>>> > > it
>>>>>> > > >> > into
>>>>>> > > >> > >> pauseSplit and resumeSplit. While pauseOrResumeSplits may
>>>>>> allow
>>>>>> > > >> Sources
>>>>>> > > >> > to
>>>>>> > > >> > >> just use 1 instead of 2 library calls (as written in the
>>>>>> > Javadoc),
>>>>>> > > >> both
>>>>>> > > >> > >> Kafka and Pulsar can't use it and I'm not sure if there
>>>>>> is a
>>>>>> > system
>>>>>> > > >> that
>>>>>> > > >> > >> can.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Some nit for the FLIP:
>>>>>> > > >> > >> - Please replace "stop" with "pause".
>>>>>> > > >> > >> - Not sure if it's worth it in the capability section:
>>>>>> Sources
>>>>>> > that
>>>>>> > > >> > adopt
>>>>>> > > >> > >> this interface cannot be used in earlier versions. So it
>>>>>> feels
>>>>>> > like
>>>>>> > > >> we
>>>>>> > > >> > are
>>>>>> > > >> > >> only forward compatible (old sources can be used after the
>>>>>> > change);
>>>>>> > > >> but
>>>>>> > > >> > I
>>>>>> > > >> > >> guess this holds for any API addition.
>>>>>> > > >> > >> - You might want to add what happens when all splits are
>>>>>> paused.
>>>>>> > > >> > >> - You may want to describe how the 3 flavors of
>>>>>> SourceReaderBase
>>>>>> > > >> > interact
>>>>>> > > >> > >> with the interface.
>>>>>> > > >> > >> - I'm not sure if it makes sense to include Kafka and
>>>>>> Pulsar in
>>>>>> > the
>>>>>> > > >> > FLIP.
>>>>>> > > >> > >> For me, this is rather immediate follow-up work. (could
>>>>>> be in the
>>>>>> > > >> same
>>>>>> > > >> > >> umbrella ticket)
>>>>>> > > >> > >>
>>>>>> > > >> > >> Best,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Arvid
>>>>>> > > >> > >>
>>>>>> > > >> > >> On Mon, Apr 25, 2022 at 12:52 PM Dawid Wysakowicz <
>>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org>
>>>>>> > > >> > >> wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> a) "MySourceReader implements SourceReader,
>>>>>> WithSplitsAlignment",
>>>>>> > > >> along
>>>>>> > > >> > >> with "MySplitReader implements SplitReader,
>>>>>> WithSplitsAlignment",
>>>>>> > > or
>>>>>> > > >> > >> b) "MySourceReader implements AlignedSourceReader" and
>>>>>> > > "MySplitReader
>>>>>> > > >> > >> implements AlignedSplitReader", or
>>>>>> > > >> > >> c) "MySourceReader implements SourceReader" and
>>>>>> "MySplitReader
>>>>>> > > >> > implements
>>>>>> > > >> > >> SplitReader".
>>>>>> > > >> > >>
>>>>>> > > >> > >> I think the latest proposal according to Dawid would be:
>>>>>> > > >> > >> d) "MySourceReader implements SourceReader" and
>>>>>> "MySplitReader
>>>>>> > > >> > implements
>>>>>> > > >> > >> AlignedSplitReader".
>>>>>> > > >> > >> I am fine with this API, although personally speaking I
>>>>>> think it
>>>>>> > is
>>>>>> > > >> > simpler
>>>>>> > > >> > >> to just add a new method to the split reader with default
>>>>>> impl.
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> I think that is a good idea to have it aligned as much as
>>>>>> > possible.
>>>>>> > > >> I'd
>>>>>> > > >> > be
>>>>>> > > >> > >> +1 for your option c). We can merge AlignedSplitReader
>>>>>> with
>>>>>> > > >> > SplitReader. We
>>>>>> > > >> > >> will update the FLIP shortly.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Best,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Dawid
>>>>>> > > >> > >>
>>>>>> > > >> > >> On 25/04/2022 12:43, Becket Qin wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thanks for the comment, Jark.
>>>>>> > > >> > >>
>>>>>> > > >> > >> 3. Interface/Method Name.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Can the interface be used to align other things in the
>>>>>> future?
>>>>>> > For
>>>>>> > > >> > example,
>>>>>> > > >> > >> align read speed, I have
>>>>>> > > >> > >> seen users requesting global rate limits. This feature
>>>>>> may also
>>>>>> > > need
>>>>>> > > >> an
>>>>>> > > >> > >> interface like this.
>>>>>> > > >> > >> If we don't plan to extend this interface to support
>>>>>> align other
>>>>>> > > >> > things, I
>>>>>> > > >> > >> suggest explicitly declaring
>>>>>> > > >> > >> the purpose of the methods, such as
>>>>>> `alignWatermarksForSplits`
>>>>>> > > >> instead
>>>>>> > > >> > of
>>>>>> > > >> > >> `alignSplits`.
>>>>>> > > >> > >>
>>>>>> > > >> > >> This is a good point. Naming wise, it would usually be
>>>>>> more
>>>>>> > > >> extensible
>>>>>> > > >> > to
>>>>>> > > >> > >> just describe what the method actually does, instead of
>>>>>> assuming
>>>>>> > > the
>>>>>> > > >> > >> purpose of doing this. For example, in this case,
>>>>>> > > >> pauseOrResumeSplits()
>>>>>> > > >> > >> would be more extensible because this can be used for any
>>>>>> kind of
>>>>>> > > >> flow
>>>>>> > > >> > >> control, be it watermark alignment or simple rate
>>>>>> limiting.
>>>>>> > > >> > >>
>>>>>> > > >> > >> 4. Interface or Method.
>>>>>> > > >> > >>
>>>>>> > > >> > >> I don't have a strong opinion on this. I think they have
>>>>>> their
>>>>>> > own
>>>>>> > > >> > >> advantages.
>>>>>> > > >> > >> In Flink SQL, we heavily use Interfaces for extending
>>>>>> abilities
>>>>>> > > >> > >> (SupportsXxxx) for TableSource/TableSink,
>>>>>> > > >> > >> and I prefer Interfaces rather than methods in this case.
>>>>>> When
>>>>>> > you
>>>>>> > > >> have
>>>>>> > > >> > a
>>>>>> > > >> > >> bunch of abilities and each ability
>>>>>> > > >> > >> has more than one method, Interfaces can help to organize
>>>>>> them
>>>>>> > and
>>>>>> > > >> make
>>>>>> > > >> > >> users clear which methods
>>>>>> > > >> > >> need to implement when you want to have an ability.
>>>>>> > > >> > >>
>>>>>> > > >> > >> I am OK with decorative interfaces if this is a general
>>>>>> design
>>>>>> > > >> pattern
>>>>>> > > >> > in
>>>>>> > > >> > >> the other components in Flink. But it looks like the
>>>>>> current API
>>>>>> > > >> > proposal
>>>>>> > > >> > >> is not symmetric.
>>>>>> > > >> > >>
>>>>>> > > >> > >> The current proposal is essentially "MySourceReader
>>>>>> implements
>>>>>> > > >> > >> SourceReader, WithSplitsAlignment", along with
>>>>>> "MySplitReader
>>>>>> > > >> implements
>>>>>> > > >> > >> AlignedSplitsReader".
>>>>>> > > >> > >>
>>>>>> > > >> > >> Should we make the API symmetric? I'd consider any one of
>>>>>> the
>>>>>> > > >> following
>>>>>> > > >> > as
>>>>>> > > >> > >> symmetric.
>>>>>> > > >> > >>
>>>>>> > > >> > >> a) "MySourceReader implements SourceReader,
>>>>>> WithSplitsAlignment",
>>>>>> > > >> along
>>>>>> > > >> > >> with "MySplitReader implements SplitReader,
>>>>>> WithSplitsAlignment",
>>>>>> > > or
>>>>>> > > >> > >> b) "MySourceReader implements AlignedSourceReader" and
>>>>>> > > "MySplitReader
>>>>>> > > >> > >> implements AlignedSplitReader", or
>>>>>> > > >> > >> c) "MySourceReader implements SourceReader" and
>>>>>> "MySplitReader
>>>>>> > > >> > implements
>>>>>> > > >> > >> SplitReader".
>>>>>> > > >> > >>
>>>>>> > > >> > >> I think the latest proposal according to Dawid would be:
>>>>>> > > >> > >> d) "MySourceReader implements SourceReader" and
>>>>>> "MySplitReader
>>>>>> > > >> > implements
>>>>>> > > >> > >> AlignedSplitReader".
>>>>>> > > >> > >> I am fine with this API, although personally speaking I
>>>>>> think it
>>>>>> > is
>>>>>> > > >> > simpler
>>>>>> > > >> > >> to just add a new method to the split reader with default
>>>>>> impl.
>>>>>> > > >> > >>
>>>>>> > > >> > >> @Dawid Wysakowicz <dwysakow...@apache.org> <
>>>>>> > dwysakow...@apache.org
>>>>>> > > >
>>>>>> > > >> <
>>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org>, thanks
>>>>>> for the
>>>>>> > > reply.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Having said that, as I don't have a preference and I
>>>>>> agree most
>>>>>> > of
>>>>>> > > >> the
>>>>>> > > >> > >>
>>>>>> > > >> > >> sources will support the alignment I am fine following
>>>>>> your
>>>>>> > > >> suggestion
>>>>>> > > >> > to
>>>>>> > > >> > >> have the SourceReader extending from
>>>>>> > WithWatermarksSplitsAlignment,
>>>>>> > > >> but
>>>>>> > > >> > >> would put the "supportsXXX" there, not in the Source to
>>>>>> keep the
>>>>>> > > two
>>>>>> > > >> > >> methods together.
>>>>>> > > >> > >>
>>>>>> > > >> > >> One benefit of having the "supportsXXX" in Source is that
>>>>>> this
>>>>>> > > allows
>>>>>> > > >> > some
>>>>>> > > >> > >> compile time check. For example, if a user enabled
>>>>>> watermark
>>>>>> > > >> alignment
>>>>>> > > >> > >> while it is not supported by the Source, an exception can
>>>>>> be
>>>>>> > thrown
>>>>>> > > >> at
>>>>>> > > >> > >> compile time. It seems in general useful. That said, I
>>>>>> agree that
>>>>>> > > API
>>>>>> > > >> > >> cleanliness wise it is better to put the two methods
>>>>>> together.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thanks,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Jiangjie (Becket) Qin
>>>>>> > > >> > >>
>>>>>> > > >> > >> On Mon, Apr 25, 2022 at 5:56 PM Jark Wu <imj...@gmail.com>
>>>>>> <
>>>>>> > > >> > imj...@gmail.com> <imj...@gmail.com> <imj...@gmail.com>
>>>>>> wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thank Dawid for the reminder on FLIP-182. Sorry I did
>>>>>> miss it.
>>>>>> > > >> > >> I don't have other concerns then.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Best,
>>>>>> > > >> > >> Jark
>>>>>> > > >> > >>
>>>>>> > > >> > >> On Mon, 25 Apr 2022 at 15:40, Dawid Wysakowicz <
>>>>>> > > >> dwysakow...@apache.org>
>>>>>> > > >> > <dwysakow...@apache.org> <dwysakow...@apache.org> <
>>>>>> > > >> dwysakow...@apache.org>
>>>>>> > > >> > >> wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> @Jark:
>>>>>> > > >> > >>
>>>>>> > > >> > >> 1. Will the framework always align with watermarks when
>>>>>> the
>>>>>> > source
>>>>>> > > >> > >> implements the interface?
>>>>>> > > >> > >> I'm afraid not every case needs watermark alignment even
>>>>>> if Kafka
>>>>>> > > >> > >> implements the interface,
>>>>>> > > >> > >> and this will affect the throughput somehow. I agree with
>>>>>> Becket
>>>>>> > > >> > >> we may need a
>>>>>> > > >> > >> `supportSplitsAlignment()` method for users to configure
>>>>>> the
>>>>>> > source
>>>>>> > > >> to
>>>>>> > > >> > >> enable/disable the alignment.
>>>>>> > > >> > >>
>>>>>> > > >> > >> 2. How does the framework calculate maxDesiredWatermark?
>>>>>> > > >> > >> I think the algorithm of maxDesiredWatermark will greatly
>>>>>> affect
>>>>>> > > >> > >>
>>>>>> > > >> > >> throughput
>>>>>> > > >> > >>
>>>>>> > > >> > >> if the reader is constantly
>>>>>> > > >> > >>  switching between pause and resume. Can users configure
>>>>>> the
>>>>>> > > >> alignment
>>>>>> > > >> > >> offset?
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> This is covered in the previous FLIP[1] which has been
>>>>>> already
>>>>>> > > >> > >>
>>>>>> > > >> > >> implemented
>>>>>> > > >> > >>
>>>>>> > > >> > >> in 1.15. In short, it must be enabled with the watermark
>>>>>> strategy
>>>>>> > > >> which
>>>>>> > > >> > >> also configures drift and update interval.
>>>>>> > > >> > >>
>>>>>> > > >> > >> If we don't plan to extend this interface to support
>>>>>> align other
>>>>>> > > >> things,
>>>>>> > > >> > >>
>>>>>> > > >> > >> I
>>>>>> > > >> > >>
>>>>>> > > >> > >> suggest explicitly declaring
>>>>>> > > >> > >> the purpose of the methods, such as
>>>>>> `alignWatermarksForSplits`
>>>>>> > > >> instead
>>>>>> > > >> > of
>>>>>> > > >> > >> `alignSplits`.
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> Sure let's rename it.
>>>>>> > > >> > >>
>>>>>> > > >> > >> @Becket:
>>>>>> > > >> > >>
>>>>>> > > >> > >> I understand your point. On the other hand putting all
>>>>>> methods,
>>>>>> > > even
>>>>>> > > >> > with
>>>>>> > > >> > >> "supportsXXX" methods for enabling certain features,
>>>>>> makes the
>>>>>> > > entry
>>>>>> > > >> > >> threshold for writing a new source higher. Instead of
>>>>>> focusing on
>>>>>> > > the
>>>>>> > > >> > >>
>>>>>> > > >> > >> basic
>>>>>> > > >> > >>
>>>>>> > > >> > >> and required properties of the Source, the person
>>>>>> implementing a
>>>>>> > > >> source
>>>>>> > > >> > >> must bother with and need to figure out what all of the
>>>>>> extra
>>>>>> > > >> features
>>>>>> > > >> > >>
>>>>>> > > >> > >> are
>>>>>> > > >> > >>
>>>>>> > > >> > >> about and how to deal with them. It makes it also harder
>>>>>> to
>>>>>> > > organize
>>>>>> > > >> > >> methods in coupled groups as Jark said.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Having said that, as I don't have a preference and I
>>>>>> agree most
>>>>>> > of
>>>>>> > > >> the
>>>>>> > > >> > >> sources will support the alignment I am fine following
>>>>>> your
>>>>>> > > >> suggestion
>>>>>> > > >> > to
>>>>>> > > >> > >> have the SourceReader extending from
>>>>>> > WithWatermarksSplitsAlignment,
>>>>>> > > >> but
>>>>>> > > >> > >> would put the "supportsXXX" there, not in the Source to
>>>>>> keep the
>>>>>> > > two
>>>>>> > > >> > >> methods together.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Lastly, I agree it is really unfortunate the "alignSplits"
>>>>>> > methods
>>>>>> > > >> > differ
>>>>>> > > >> > >> slightly for SourceReader and SpitReader. The reason for
>>>>>> that is
>>>>>> > > >> > >> SourceReaderBase deals only with SplitIds, whereas
>>>>>> SplitReader
>>>>>> > > needs
>>>>>> > > >> the
>>>>>> > > >> > >> actual splits to pause them. I found the discrepancy
>>>>>> acceptable
>>>>>> > for
>>>>>> > > >> the
>>>>>> > > >> > >> sake of simplifying changes significantly, especially as
>>>>>> they
>>>>>> > would
>>>>>> > > >> > >>
>>>>>> > > >> > >> highly
>>>>>> > > >> > >>
>>>>>> > > >> > >> likely impact performance as we would have to perform
>>>>>> additional
>>>>>> > > >> > lookups.
>>>>>> > > >> > >> Moreover the SplitReader is a secondary interface.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Best,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Dawid
>>>>>> > > >> > >>
>>>>>> > > >> > >> [1] https://cwiki.apache.org/confluence/x/hQYBCw
>>>>>> > > >> > >>
>>>>>> > > >> > >> On 24/04/2022 17:15, Jark Wu wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thanks for the effort, Dawid and Sebastian!
>>>>>> > > >> > >>
>>>>>> > > >> > >> I just have some minor questions (maybe I missed
>>>>>> something).
>>>>>> > > >> > >>
>>>>>> > > >> > >> 1. Will the framework always align with watermarks when
>>>>>> the
>>>>>> > source
>>>>>> > > >> > >> implements the interface?
>>>>>> > > >> > >> I'm afraid not every case needs watermark alignment even
>>>>>> if Kafka
>>>>>> > > >> > >> implements the interface,
>>>>>> > > >> > >> and this will affect the throughput somehow. I agree with
>>>>>> Becket
>>>>>> > > >> > >> we may need a
>>>>>> > > >> > >> `supportSplitsAlignment()` method for users to configure
>>>>>> the
>>>>>> > source
>>>>>> > > >> to
>>>>>> > > >> > >> enable/disable the alignment.
>>>>>> > > >> > >>
>>>>>> > > >> > >> 2. How does the framework calculate maxDesiredWatermark?
>>>>>> > > >> > >> I think the algorithm of maxDesiredWatermark will greatly
>>>>>> affect
>>>>>> > > >> > >>
>>>>>> > > >> > >> throughput
>>>>>> > > >> > >>
>>>>>> > > >> > >> if the reader is constantly
>>>>>> > > >> > >>  switching between pause and resume. Can users configure
>>>>>> the
>>>>>> > > >> alignment
>>>>>> > > >> > >> offset?
>>>>>> > > >> > >>
>>>>>> > > >> > >> 3. Interface/Method Name.
>>>>>> > > >> > >> Can the interface be used to align other things in the
>>>>>> future?
>>>>>> > For
>>>>>> > > >> > >>
>>>>>> > > >> > >> example,
>>>>>> > > >> > >>
>>>>>> > > >> > >> align read speed, I have
>>>>>> > > >> > >> seen users requesting global rate limits. This feature
>>>>>> may also
>>>>>> > > need
>>>>>> > > >> an
>>>>>> > > >> > >> interface like this.
>>>>>> > > >> > >> If we don't plan to extend this interface to support
>>>>>> align other
>>>>>> > > >> things,
>>>>>> > > >> > >>
>>>>>> > > >> > >> I
>>>>>> > > >> > >>
>>>>>> > > >> > >> suggest explicitly declaring
>>>>>> > > >> > >> the purpose of the methods, such as
>>>>>> `alignWatermarksForSplits`
>>>>>> > > >> instead
>>>>>> > > >> > of
>>>>>> > > >> > >> `alignSplits`.
>>>>>> > > >> > >>
>>>>>> > > >> > >> 4. Interface or Method.
>>>>>> > > >> > >> I don't have a strong opinion on this. I think they have
>>>>>> their
>>>>>> > own
>>>>>> > > >> > >> advantages.
>>>>>> > > >> > >> In Flink SQL, we heavily use Interfaces for extending
>>>>>> abilities
>>>>>> > > >> > >> (SupportsXxxx) for TableSource/TableSink,
>>>>>> > > >> > >> and I prefer Interfaces rather than methods in this case.
>>>>>> When
>>>>>> > you
>>>>>> > > >> have
>>>>>> > > >> > a
>>>>>> > > >> > >> bunch of abilities and each ability
>>>>>> > > >> > >> has more than one method, Interfaces can help to organize
>>>>>> them
>>>>>> > and
>>>>>> > > >> make
>>>>>> > > >> > >> users clear which methods
>>>>>> > > >> > >> need to implement when you want to have an ability.
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> Best,
>>>>>> > > >> > >> Jark
>>>>>> > > >> > >>
>>>>>> > > >> > >> On Sun, 24 Apr 2022 at 18:13, Becket Qin <
>>>>>> becket....@gmail.com>
>>>>>> > <
>>>>>> > > >> > becket....@gmail.com> <becket....@gmail.com> <
>>>>>> becket....@gmail.com>
>>>>>> > <
>>>>>> > > >> > >> becket....@gmail.com> wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >> Hi Dawid,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thanks for the explanation. Apologies that I somehow
>>>>>> misread a
>>>>>> > > bunch
>>>>>> > > >> of
>>>>>> > > >> > >> "align" and thought they were "assign".
>>>>>> > > >> > >>
>>>>>> > > >> > >> Regarding 1, by default implementation, I was thinking of
>>>>>> the
>>>>>> > > default
>>>>>> > > >> > >>
>>>>>> > > >> > >> no-op
>>>>>> > > >> > >>
>>>>>> > > >> > >> implementation. I am a little worried about the
>>>>>> proliferation of
>>>>>> > > >> > >>
>>>>>> > > >> > >> decorative
>>>>>> > > >> > >>
>>>>>> > > >> > >> interfaces. I think the most important thing about
>>>>>> interfaces is
>>>>>> > > that
>>>>>> > > >> > >>
>>>>>> > > >> > >> they
>>>>>> > > >> > >>
>>>>>> > > >> > >> are easy to understand. In this case, I prefer adding new
>>>>>> method
>>>>>> > to
>>>>>> > > >> the
>>>>>> > > >> > >> existing interface for the following reasons:
>>>>>> > > >> > >>
>>>>>> > > >> > >> a) I feel the biggest drawback of decorative interfaces
>>>>>> is which
>>>>>> > > >> > >>
>>>>>> > > >> > >> interface
>>>>>> > > >> > >>
>>>>>> > > >> > >> they can decorate and which combinations of multiple
>>>>>> decorative
>>>>>> > > >> > >>
>>>>>> > > >> > >> interfaces
>>>>>> > > >> > >>
>>>>>> > > >> > >> are valid. In the current FLIP, the withSplitsAlignment
>>>>>> interface
>>>>>> > > is
>>>>>> > > >> > only
>>>>>> > > >> > >> applicable to the SourceReader which means it can't
>>>>>> decorate any
>>>>>> > > >> other
>>>>>> > > >> > >> interface. From an interface design perspective, a natural
>>>>>> > question
>>>>>> > > >> is
>>>>>> > > >> > >>
>>>>>> > > >> > >> why
>>>>>> > > >> > >>
>>>>>> > > >> > >> not let "AlignedSplitReader" extend
>>>>>> "withSplitsAlignment"? And it
>>>>>> > > is
>>>>>> > > >> > also
>>>>>> > > >> > >> natural to assume that a split reader implementing both
>>>>>> > SplitReader
>>>>>> > > >> and
>>>>>> > > >> > >> WithSplitAlignment would work, because a source reader
>>>>>> > implementing
>>>>>> > > >> > >> SourceReader and withSplitsAlignment works. So why isn't
>>>>>> there an
>>>>>> > > >> > >>
>>>>>> > > >> > >> interface
>>>>>> > > >> > >>
>>>>>> > > >> > >> of AlignedSourceReader? In the future, if there is a new
>>>>>> feature
>>>>>> > > >> added
>>>>>> > > >> > >> (e.g. sorted or pre-partitioned data aware), are we going
>>>>>> to
>>>>>> > create
>>>>>> > > >> > >>
>>>>>> > > >> > >> another
>>>>>> > > >> > >>
>>>>>> > > >> > >> interface of SplitReader such as SortedSplitReader or
>>>>>> > > >> > >>
>>>>>> > > >> > >> PrePartitionedAware?
>>>>>> > > >> > >>
>>>>>> > > >> > >> Can they be combined? So I think the additional decorative
>>>>>> > > interface
>>>>>> > > >> > like
>>>>>> > > >> > >> withSplitsAlignment actually increases the understanding
>>>>>> cost of
>>>>>> > > >> users
>>>>>> > > >> > >> because they have to know what decorative interfaces are
>>>>>> there,
>>>>>> > > which
>>>>>> > > >> > >> interface they can decorate and which combinations of the
>>>>>> > > decorative
>>>>>> > > >> > >> interfaces are valid and which are not. Ideally we want
>>>>>> to avoid
>>>>>> > > >> that.
>>>>>> > > >> > To
>>>>>> > > >> > >> be clear, I am not opposing having an interface of
>>>>>> > > >> withSplitsAlignment,
>>>>>> > > >> > >>
>>>>>> > > >> > >> it
>>>>>> > > >> > >>
>>>>>> > > >> > >> is completely OK to have it as an internal interface and
>>>>>> let
>>>>>> > > >> > SourceReader
>>>>>> > > >> > >> and SplitReader both extend it.
>>>>>> > > >> > >>
>>>>>> > > >> > >> b) Adding a new method to the SourceReader with a default
>>>>>> > > >> implementation
>>>>>> > > >> > >>
>>>>>> > > >> > >> of
>>>>>> > > >> > >>
>>>>>> > > >> > >> no-op would help avoid logic branching in the source
>>>>>> logic,
>>>>>> > > >> especially
>>>>>> > > >> > >> given that we agree that the vast majority of the
>>>>>> SourceReader
>>>>>> > > >> > >> implementations, if not all, would just extend from the
>>>>>> > > >> > SourceReaderBase.
>>>>>> > > >> > >> That means adding a new method to the interface would
>>>>>> effectively
>>>>>> > > >> give
>>>>>> > > >> > >>
>>>>>> > > >> > >> the
>>>>>> > > >> > >>
>>>>>> > > >> > >> same user experience, but simpler.
>>>>>> > > >> > >>
>>>>>> > > >> > >> c) A related design principle that may be worth
>>>>>> discussing is how
>>>>>> > > do
>>>>>> > > >> we
>>>>>> > > >> > >>
>>>>>> > > >> > >> let
>>>>>> > > >> > >>
>>>>>> > > >> > >> the Source implementations tell Flink what capability is
>>>>>> > supported
>>>>>> > > >> and
>>>>>> > > >> > >>
>>>>>> > > >> > >> what
>>>>>> > > >> > >>
>>>>>> > > >> > >> is not. Personally speaking I feel the most intuitive
>>>>>> place to me
>>>>>> > > is
>>>>>> > > >> in
>>>>>> > > >> > >>
>>>>>> > > >> > >> the
>>>>>> > > >> > >>
>>>>>> > > >> > >> Source itself, because that is the entrance of the entire
>>>>>> Source
>>>>>> > > >> > >>
>>>>>> > > >> > >> connector
>>>>>> > > >> > >>
>>>>>> > > >> > >> logic.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Based on the above thoughts, I am wondering if the
>>>>>> following
>>>>>> > > >> interface
>>>>>> > > >> > >> would be easier to understand by the users.
>>>>>> > > >> > >>
>>>>>> > > >> > >> - Change "withSplitsAlignment" to internal interface, let
>>>>>> both
>>>>>> > > >> > >>
>>>>>> > > >> > >> SourceReader
>>>>>> > > >> > >>
>>>>>> > > >> > >> and SplitReader extend from it, with a default no-op
>>>>>> > > implementation.
>>>>>> > > >> > >> - Add a new method "boolean supportSplitsAlignment()" to
>>>>>> the
>>>>>> > Source
>>>>>> > > >> > >> interface, with a default implementation returning false.
>>>>>> Sources
>>>>>> > > >> that
>>>>>> > > >> > >>
>>>>>> > > >> > >> have
>>>>>> > > >> > >>
>>>>>> > > >> > >> implemented the alignment logic can change this to return
>>>>>> true,
>>>>>> > and
>>>>>> > > >> > >> override the alignSplits() methods in the SourceReader /
>>>>>> > > SplitReader
>>>>>> > > >> if
>>>>>> > > >> > >> needed.
>>>>>> > > >> > >> - In the future, if a new optional feature is going to be
>>>>>> added
>>>>>> > to
>>>>>> > > >> the
>>>>>> > > >> > >> Source, and that feature requires the awareness from
>>>>>> Flink, we
>>>>>> > can
>>>>>> > > >> add
>>>>>> > > >> > >>
>>>>>> > > >> > >> more
>>>>>> > > >> > >>
>>>>>> > > >> > >> such methods to the Source.
>>>>>> > > >> > >>
>>>>>> > > >> > >> What do you think?
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thanks,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Jiangjie (Becket) Qin
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> On Fri, Apr 22, 2022 at 4:05 PM Dawid Wysakowicz <
>>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org> <
>>>>>> > > >> dwysakow...@apache.org>
>>>>>> > > >> > <dwysakow...@apache.org>
>>>>>> > > >> > >> <dwysakow...@apache.org> <dwysakow...@apache.org> <
>>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org>
>>>>>> > > >> > >>
>>>>>> > > >> > >> wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> @Konstantin:
>>>>>> > > >> > >>
>>>>>> > > >> > >> As part of this FLIP, the `AlignedSplitReader` interface
>>>>>> (aka the
>>>>>> > > >> stop &
>>>>>> > > >> > >> resume behavior) will be implemented for Kafka and Pulsar
>>>>>> only,
>>>>>> > > >> correct?
>>>>>> > > >> > >>
>>>>>> > > >> > >> Correct, as far as I know though, those are the only
>>>>>> sources
>>>>>> > which
>>>>>> > > >> > >>
>>>>>> > > >> > >> consume
>>>>>> > > >> > >>
>>>>>> > > >> > >> concurrently from multiple splits and thus alignment
>>>>>> applies.
>>>>>> > > >> > >>
>>>>>> > > >> > >> @Thomas:
>>>>>> > > >> > >>
>>>>>> > > >> > >> I wonder if "supporting" split alignment in
>>>>>> SourceReaderBase and
>>>>>> > > then
>>>>>> > > >> > >>
>>>>>> > > >> > >> doing
>>>>>> > > >> > >>
>>>>>> > > >> > >> nothing if the split reader does not implement
>>>>>> AlignedSplitReader
>>>>>> > > >> could
>>>>>> > > >> > >>
>>>>>> > > >> > >> be
>>>>>> > > >> > >>
>>>>>> > > >> > >> misleading? Perhaps WithSplitsAlignment can instead be
>>>>>> added to
>>>>>> > the
>>>>>> > > >> > >> specific source reader (i.e. KafkaSourceReader) to make it
>>>>>> > explicit
>>>>>> > > >> that
>>>>>> > > >> > >> the source actually supports it.
>>>>>> > > >> > >>
>>>>>> > > >> > >> I understand your concern. Hmm, I think we could actually
>>>>>> do
>>>>>> > that.
>>>>>> > > >> Given
>>>>>> > > >> > >> the actual implementation of the
>>>>>> SourceReaderBase#alignSplits is
>>>>>> > > >> rather
>>>>>> > > >> > >> short (just a forward to the corresponding method of
>>>>>> > SplitFetcher),
>>>>>> > > >> we
>>>>>> > > >> > >> could reimplement it in the actual source
>>>>>> implementations. This
>>>>>> > > >> solution
>>>>>> > > >> > >> has the downside though. Authors of new sources would
>>>>>> have to do
>>>>>> > > two
>>>>>> > > >> > >> things: extend from AlignedSplitReader and implement
>>>>>> > > >> > >>
>>>>>> > > >> > >> WithSplitsAssignment,
>>>>>> > > >> > >>
>>>>>> > > >> > >> instead of just extending AlignedSplitReader. I would be
>>>>>> fine
>>>>>> > with
>>>>>> > > >> such
>>>>>> > > >> > a
>>>>>> > > >> > >> tradeoff though. What others think?
>>>>>> > > >> > >>
>>>>>> > > >> > >> @Steven:
>>>>>> > > >> > >>
>>>>>> > > >> > >> For this part from the motivation section, is it
>>>>>> accurate? Let's
>>>>>> > > >> assume
>>>>>> > > >> > >>
>>>>>> > > >> > >> one
>>>>>> > > >> > >>
>>>>>> > > >> > >> source task consumes from 3 partitions and one of the
>>>>>> partition
>>>>>> > is
>>>>>> > > >> > >> significantly slower. In this situation, watermark for
>>>>>> this
>>>>>> > source
>>>>>> > > >> task
>>>>>> > > >> > >> won't hold back as it is reading recent data from other
>>>>>> two Kafka
>>>>>> > > >> > >> partitions. As a result, it won't hold back the overall
>>>>>> > watermark.
>>>>>> > > I
>>>>>> > > >> > >> thought the problem is that we may have late data for
>>>>>> this slow
>>>>>> > > >> > >>
>>>>>> > > >> > >> partition.
>>>>>> > > >> > >>
>>>>>> > > >> > >> It will hold back the watermark. Watermark of an operator
>>>>>> is the
>>>>>> > > >> minimum
>>>>>> > > >> > >> of watermarks of all splits[1]
>>>>>> > > >> > >>
>>>>>> > > >> > >> I have another question about the restart. Say split
>>>>>> alignment is
>>>>>> > > >> > >> triggered. checkpoint is completed. job failed and
>>>>>> restored from
>>>>>> > > the
>>>>>> > > >> > last
>>>>>> > > >> > >> checkpoint. because alignment decision is not
>>>>>> checkpointed,
>>>>>> > > initially
>>>>>> > > >> > >> alignment won't be enforced until we get a cycle of
>>>>>> watermark
>>>>>> > > >> > aggregation
>>>>>> > > >> > >> and propagation, right? Not saying this corner is a
>>>>>> problem. Just
>>>>>> > > >> want
>>>>>> > > >> > to
>>>>>> > > >> > >> understand it more.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Your understanding is correct.
>>>>>> > > >> > >>
>>>>>> > > >> > >> @Becket:
>>>>>> > > >> > >>
>>>>>> > > >> > >> 1. I think watermark alignment is sort of a general use
>>>>>> case, so
>>>>>> > > >> should
>>>>>> > > >> > >>
>>>>>> > > >> > >> we
>>>>>> > > >> > >>
>>>>>> > > >> > >> just add the related methods to SourceReader directly
>>>>>> instead of
>>>>>> > > >> > >> introducing the new interface of WithSplitAssignment? We
>>>>>> can
>>>>>> > > provide
>>>>>> > > >> > >> default implementations, so backwards compatibility won't
>>>>>> be an
>>>>>> > > >> issue.
>>>>>> > > >> > >>
>>>>>> > > >> > >> I don't think we can provide a default implementation.
>>>>>> How would
>>>>>> > we
>>>>>> > > >> do
>>>>>> > > >> > >> that? Would it be just a no-op? Is it better than having
>>>>>> an
>>>>>> > opt-in
>>>>>> > > >> > >> interface? The default implementation would have to be
>>>>>> added
>>>>>> > > >> exclusively
>>>>>> > > >> > >>
>>>>>> > > >> > >> in
>>>>>> > > >> > >>
>>>>>> > > >> > >> a *Public* SourceReader interface. By the way notice
>>>>>> > > SourceReaderBase
>>>>>> > > >> > >> does extend from WithSplitsAlignment, so effectively all
>>>>>> > > >> implementations
>>>>>> > > >> > >>
>>>>>> > > >> > >> do
>>>>>> > > >> > >>
>>>>>> > > >> > >> handle the alignment case. To be honest I think it is
>>>>>> impossible
>>>>>> > to
>>>>>> > > >> > >> implement the SourceReader interface directly by end
>>>>>> users.
>>>>>> > > >> > >>
>>>>>> > > >> > >> 2. As you mentioned, the SplitReader interface probably
>>>>>> also
>>>>>> > needs
>>>>>> > > >> some
>>>>>> > > >> > >> change to support throttling at the split granularity.
>>>>>> Can you
>>>>>> > add
>>>>>> > > >> that
>>>>>> > > >> > >> interface change into the public interface section as
>>>>>> well?
>>>>>> > > >> > >>
>>>>>> > > >> > >> It has been added from the beginning. See
>>>>>> *AlignedSplitReader.*
>>>>>> > > >> > >>
>>>>>> > > >> > >> 3. Nit, can we avoid using the method name assignSplits
>>>>>> here,
>>>>>> > given
>>>>>> > > >> that
>>>>>> > > >> > >>
>>>>>> > > >> > >> it
>>>>>> > > >> > >>
>>>>>> > > >> > >> is not actually changing the split assignments? It seems
>>>>>> > something
>>>>>> > > >> like
>>>>>> > > >> > >> pauseOrResumeSplits(), or adjustSplitsThrottling() is more
>>>>>> > > accurate.
>>>>>> > > >> > >>
>>>>>> > > >> > >> The method's called *alignSplits*, not assign. Do you
>>>>>> still
>>>>>> > prefer
>>>>>> > > a
>>>>>> > > >> > >> different name for that? Personally, I am open for
>>>>>> suggestions
>>>>>> > > here.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Best,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Dawid
>>>>>> > > >> > >>
>>>>>> > > >> > >> [1]
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> >
>>>>>> > > >>
>>>>>> > >
>>>>>> >
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation
>>>>>> > > >> > >>
>>>>>> > > >> > >> On 22/04/2022 05:59, Becket Qin wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thanks for driving the effort, Sebastion. I think the
>>>>>> motivation
>>>>>> > > >> makes a
>>>>>> > > >> > >> lot of sense. Just a few suggestions / questions.
>>>>>> > > >> > >>
>>>>>> > > >> > >> 1. I think watermark alignment is sort of a general use
>>>>>> case, so
>>>>>> > > >> should
>>>>>> > > >> > >>
>>>>>> > > >> > >> we
>>>>>> > > >> > >>
>>>>>> > > >> > >> just add the related methods to SourceReader directly
>>>>>> instead of
>>>>>> > > >> > >> introducing the new interface of WithSplitAssignment? We
>>>>>> can
>>>>>> > > provide
>>>>>> > > >> > >> default implementations, so backwards compatibility won't
>>>>>> be an
>>>>>> > > >> issue.
>>>>>> > > >> > >>
>>>>>> > > >> > >> 2. As you mentioned, the SplitReader interface probably
>>>>>> also
>>>>>> > needs
>>>>>> > > >> some
>>>>>> > > >> > >> change to support throttling at the split granularity.
>>>>>> Can you
>>>>>> > add
>>>>>> > > >> that
>>>>>> > > >> > >> interface change into the public interface section as
>>>>>> well?
>>>>>> > > >> > >>
>>>>>> > > >> > >> 3. Nit, can we avoid using the method name assignSplits
>>>>>> here,
>>>>>> > given
>>>>>> > > >> that
>>>>>> > > >> > >>
>>>>>> > > >> > >> it
>>>>>> > > >> > >>
>>>>>> > > >> > >> is not actually changing the split assignments? It seems
>>>>>> > something
>>>>>> > > >> like
>>>>>> > > >> > >> pauseOrResumeSplits(), or adjustSplitsThrottling() is more
>>>>>> > > accurate.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thanks,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Jiangjie (Becket) Qin
>>>>>> > > >> > >>
>>>>>> > > >> > >> On Thu, Apr 21, 2022 at 11:39 PM Steven Wu <
>>>>>> stevenz...@gmail.com
>>>>>> > >
>>>>>> > > <
>>>>>> > > >> > stevenz...@gmail.com> <stevenz...@gmail.com> <
>>>>>> stevenz...@gmail.com>
>>>>>> > <
>>>>>> > > >> > >> stevenz...@gmail.com> <
>>>>>> > > >> > >> stevenz...@gmail.com> wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >> However, a single source operator may read data from
>>>>>> multiple
>>>>>> > > >> > >>
>>>>>> > > >> > >> splits/partitions, e.g., multiple Kafka partitions, such
>>>>>> that
>>>>>> > even
>>>>>> > > >> with
>>>>>> > > >> > >> watermark alignment the source operator may need to buffer
>>>>>> > > excessive
>>>>>> > > >> > >>
>>>>>> > > >> > >> amount
>>>>>> > > >> > >>
>>>>>> > > >> > >> of data if one split emits data faster than another.
>>>>>> > > >> > >>
>>>>>> > > >> > >> For this part from the motivation section, is it
>>>>>> accurate? Let's
>>>>>> > > >> assume
>>>>>> > > >> > >>
>>>>>> > > >> > >> one
>>>>>> > > >> > >>
>>>>>> > > >> > >> source task consumes from 3 partitions and one of the
>>>>>> partition
>>>>>> > is
>>>>>> > > >> > >> significantly slower. In this situation, watermark for
>>>>>> this
>>>>>> > source
>>>>>> > > >> task
>>>>>> > > >> > >> won't hold back as it is reading recent data from other
>>>>>> two Kafka
>>>>>> > > >> > >> partitions. As a result, it won't hold back the overall
>>>>>> > watermark.
>>>>>> > > I
>>>>>> > > >> > >> thought the problem is that we may have late data for
>>>>>> this slow
>>>>>> > > >> > >>
>>>>>> > > >> > >> partition.
>>>>>> > > >> > >>
>>>>>> > > >> > >> I have another question about the restart. Say split
>>>>>> alignment is
>>>>>> > > >> > >> triggered. checkpoint is completed. job failed and
>>>>>> restored from
>>>>>> > > the
>>>>>> > > >> > last
>>>>>> > > >> > >> checkpoint. because alignment decision is not
>>>>>> checkpointed,
>>>>>> > > initially
>>>>>> > > >> > >> alignment won't be enforced until we get a cycle of
>>>>>> watermark
>>>>>> > > >> > aggregation
>>>>>> > > >> > >> and propagation, right? Not saying this corner is a
>>>>>> problem. Just
>>>>>> > > >> want
>>>>>> > > >> > to
>>>>>> > > >> > >> understand it more.
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise <
>>>>>> t...@apache.org> <
>>>>>> > > >> > t...@apache.org> <t...@apache.org> <t...@apache.org> <
>>>>>> > > >> > >> t...@apache.org> <
>>>>>> > > >> > >> t...@apache.org> wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thanks for working on this!
>>>>>> > > >> > >>
>>>>>> > > >> > >> I wonder if "supporting" split alignment in
>>>>>> SourceReaderBase and
>>>>>> > > then
>>>>>> > > >> > >>
>>>>>> > > >> > >> doing
>>>>>> > > >> > >>
>>>>>> > > >> > >> nothing if the split reader does not implement
>>>>>> AlignedSplitReader
>>>>>> > > >> could
>>>>>> > > >> > >>
>>>>>> > > >> > >> be
>>>>>> > > >> > >>
>>>>>> > > >> > >> misleading? Perhaps WithSplitsAlignment can instead be
>>>>>> added to
>>>>>> > the
>>>>>> > > >> > >> specific source reader (i.e. KafkaSourceReader) to make it
>>>>>> > explicit
>>>>>> > > >> that
>>>>>> > > >> > >> the source actually supports it.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Thanks,
>>>>>> > > >> > >> Thomas
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf <
>>>>>> > > kna...@apache.org>
>>>>>> > > >> <
>>>>>> > > >> > kna...@apache.org> <kna...@apache.org> <kna...@apache.org> <
>>>>>> > > >> > >> kna...@apache.org> <
>>>>>> > > >> > >> kna...@apache.org>
>>>>>> > > >> > >>
>>>>>> > > >> > >> wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> Hi Sebastian, Hi Dawid,
>>>>>> > > >> > >>
>>>>>> > > >> > >> As part of this FLIP, the `AlignedSplitReader` interface
>>>>>> (aka the
>>>>>> > > >> stop
>>>>>> > > >> > >>
>>>>>> > > >> > >> &
>>>>>> > > >> > >>
>>>>>> > > >> > >> resume behavior) will be implemented for Kafka and Pulsar
>>>>>> only,
>>>>>> > > >> > >>
>>>>>> > > >> > >> correct?
>>>>>> > > >> > >>
>>>>>> > > >> > >> +1 in general. I believe it is valuable to complete the
>>>>>> watermark
>>>>>> > > >> > >>
>>>>>> > > >> > >> aligned
>>>>>> > > >> > >>
>>>>>> > > >> > >> story with this FLIP.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Cheers,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Konstantin
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz <
>>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org>
>>>>>> > > >> > >>
>>>>>> > > >> > >> wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >> To be explicit, having worked on it, I support it ;) I
>>>>>> think we
>>>>>> > can
>>>>>> > > >> > >> start a vote thread soonish, as there are no concerns so
>>>>>> far.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Best,
>>>>>> > > >> > >>
>>>>>> > > >> > >> Dawid
>>>>>> > > >> > >>
>>>>>> > > >> > >> On 13/04/2022 11:27, Sebastian Mattheis wrote:
>>>>>> > > >> > >>
>>>>>> > > >> > >> Dear Flink developers,
>>>>>> > > >> > >>
>>>>>> > > >> > >> I would like to open a discussion on FLIP 217 [1] for an
>>>>>> > extension
>>>>>> > > >> > >>
>>>>>> > > >> > >> of
>>>>>> > > >> > >>
>>>>>> > > >> > >> Watermark Alignment to perform alignment also in
>>>>>> SplitReaders. To
>>>>>> > > >> > >>
>>>>>> > > >> > >> do
>>>>>> > > >> > >>
>>>>>> > > >> > >> so,
>>>>>> > > >> > >>
>>>>>> > > >> > >> SplitReaders must be able to suspend and resume reading
>>>>>> from
>>>>>> > split
>>>>>> > > >> > >>
>>>>>> > > >> > >> sources
>>>>>> > > >> > >>
>>>>>> > > >> > >> where the SourceOperator coordinates and controlls
>>>>>> suspend and
>>>>>> > > >> > >>
>>>>>> > > >> > >> resume.
>>>>>> > > >> > >>
>>>>>> > > >> > >> To
>>>>>> > > >> > >>
>>>>>> > > >> > >> gather information about current watermarks of the
>>>>>> SplitReaders,
>>>>>> > we
>>>>>> > > >> > >>
>>>>>> > > >> > >> extend
>>>>>> > > >> > >>
>>>>>> > > >> > >> the internal WatermarkOutputMulitplexer and report
>>>>>> watermarks to
>>>>>> > > >> > >>
>>>>>> > > >> > >> the
>>>>>> > > >> > >>
>>>>>> > > >> > >> SourceOperator.
>>>>>> > > >> > >>
>>>>>> > > >> > >> There is a PoC for this FLIP [2], prototyped by Arvid
>>>>>> Heise and
>>>>>> > > >> > >>
>>>>>> > > >> > >> revised
>>>>>> > > >> > >>
>>>>>> > > >> > >> and
>>>>>> > > >> > >>
>>>>>> > > >> > >> reworked by Dawid Wysakowicz (He did most of the work.)
>>>>>> and me.
>>>>>> > The
>>>>>> > > >> > >>
>>>>>> > > >> > >> changes
>>>>>> > > >> > >>
>>>>>> > > >> > >> are backwards compatible in a way that if affected
>>>>>> components do
>>>>>> > > >> > >>
>>>>>> > > >> > >> not
>>>>>> > > >> > >>
>>>>>> > > >> > >> support split alignment the behavior is as before.
>>>>>> > > >> > >>
>>>>>> > > >> > >> Best,
>>>>>> > > >> > >> Sebastian
>>>>>> > > >> > >>
>>>>>> > > >> > >> [1]
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> >
>>>>>> > > >>
>>>>>> > >
>>>>>> >
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
>>>>>> > > >> > >>
>>>>>> > > >> > >> [2] https://github.com/dawidwys/flink/tree/aligned-splits
>>>>>> > > >> > >>
>>>>>> > > >> > >> --
>>>>>> > > >> > >>
>>>>>> > > >> > >> Konstantin Knaufhttps://
>>>>>> > > >> twitter.com/snntrablehttps://github.com/knaufk
>>>>>> > > >> > >>
>>>>>> > > >> > >>
>>>>>> > > >> >
>>>>>> > > >>
>>>>>> > > >
>>>>>> > >
>>>>>> >
>>>>>>
>>>>>

Reply via email to