Thanks @Sebastian for the nice summary.

I think most of your points aligned with the suggestions I made to the FLIP, while you were writing your reply (I believe we hit enter nearly at the same time ;) )

Two points after we synced offline

1. I changed also the supportsWatermarksSplitAlignment to supportsPausingSplits to express the general capability of pausing.

2. As for if we should PausingSourceReader/PausingSplitReader (option b) or if we should just add the methods (option c), I suggest to simply add the two methods as I felt this is much preferred approach Becket, which others do not object. Unless there is an opposition let's go with this option c.

Best,

Dawid

On 26/04/2022 10:06, Sebastian Mattheis wrote:
Hi folks,

Sorry for being a bit silent. Many thanks for all the input and suggestions. As I'm a bit new, I needed some time to catch up and structure (for myself) the discussion and I wanted to find a way to structure the conclusions. (Also because I had the feeling that some concerns got lost in the discussion.) This is my attempt and please correct me if something is wrong or misunderstood. I tried to collect and assemble the opinions, suggestions, and conclusions (to the best of my knowledge):

# Top A: Should split alignment (pause/resume behavior) be a general capability?

I personally don't see any reason no to have it a general capability because for the alignSplit method it is actually independent of the watermarks. If we agree here to have it a general capability, we should also agree on the right wording. Does "alignSplits(splitsToResume, splitsToPause)" refer to what is then actually meant? (I see it as okay. I don't have any better idea whilst Arvid suggested "pauseOrResumeSplits".)

# Top B: Should it be possible do enable/disable split alignment?

I would personally not disable the split alignment on the source reader side because if split alignment is used for some other use case (see A) it could have nasty side effects on other/future use cases. Instead, I would disable "watermark split alignment" where I think it should disable the watermark-dependent trigger for split alignment.

# Top C: Should we add a supportsX method?

I find it difficult to define the scope of a supportsX method w.r.t. to the following questions: a) Where is it used? and b) What is the expected output? To b), it's not straight-forward to provide a meaningful output, e.g., if SourceReader supports split alignment but SplitReader not. This is because with the current implementation, we can determine whether split alignment is fully supported only during runtime and specifically actually only when calling alignSplits down the call hierarchy up to the actual SplitReaders.

Therefore, I would suggest to either raise an error or warning if the alignment is called but not supported at some point. I know we should carefully think about when this could be the case because we don't want to flood anybody with such warnings. However, warnings could be an indicator for the user that for watermark split alignment use case split reading is imbalanced with the conclusion to either disable the trigger for watermark split alignment (see Top B) or to use/implement a source and reader that fully supports split alignment.

# Top D: How to design interfaces?

Thanks for structuring the discussion with the the various possibilities (a-d). From the discussion and emails, I would like to summarize the following requirements: - Interfaces should be consistent ("symmetric"), i.e., similar semantics should have similar interfaces with similar usage. - Make explicit which implementations implement interfaces/support behavior. - Make clear what are default implementations and how to implement interfaces with desired behavior.

This is a simplified view of the relations between relevant classes of the PoC implementation:

SourceReader (Public) <|-- SourceReaderBase (Internal) <|-- .. <|-- MySourceReader

MySourceReader <>-- SplitFetcherManager (Internal) <>-- SplitFetcher (Internal) <>-- SplitReader (Public) <|-- MySplitReader

(A <|-- B: B inherits from A; A <>-- B: A "has a" B)

Note that SourceReaderBase and SplitFetcherManager implement most of the "logic" for split alignment just because we wanted to implement split alignment and wanted it to be available as kind of a default. As a consequence, we have a "default implementation" for SourceReader that implements the actual logic for split alignment. For that reason, I find it very confusing to have a NOOP default implementation in the interface for the SourceReader. As a consequence, interface strategy c) is difficult because this would require NOOP default implementations in the public interfaces of SourceReader and SplitReader. This is the same for strategy d) because it would require NOOP default implementation in the SourceReader. Further, as Dawid described method signatures of alignSplit for SourceReader and SplitReader differ and it would be extremely difficult to make the signatures the same (with even potential performance impact because of additional loop-ups of split ids). Therefore, having a symmetric decorative interface as of strategy a) is actually not possible and having two decorative interfaces with different method signatures is confusing. My conclusion is that we are best with strategy b) which means to have specializing sub-interfaces that inherit from the parent interface: SourceReader <|-- AlignedSourceReader, SplitReader <|-- AlignedSplitReader With this option, I'm not 100% sure what the implications are and if this could get nasty. I would suggest that Dawid and I just try to implement and see if we like it. :)

# Summary

In conclusion, please let me know your perspectives. Please correct me, if something is wrong or if I misunderstood something. My perspective would be:

Top A: Yes
Top B: Yes (but disable watermark trigger for split alignment)
Top C: No
Top D: b)

Best,
Sebastian

On Tue, Apr 26, 2022 at 9:55 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote:

    @Arvid:

        While I also like Becket's capability approach, I fear that it doesn't 
work
        for this particular use case: Sources can always be aligned cross-task 
and
        this is just about intra-task alignment. So it's plausible to put 
sources
        into an alignment group even though they do not use any of the presented
        API of FLIP-217. They should just issue a warning, if they handle 
multiple
        splits (see motivation section).

    Yes, but the "supportXXX" method would be for telling if it
    supports that intra-task alignment. Cross-task alignment would
    always be supported.

    I updated interfaces to what I believe to be closest to a
    consensus between all participants. Do you mind taking a look?

    @Sebastian Do you mind addressing the nits?

    Best,

    Dawid

    On 25/04/2022 13:39, Arvid Heise wrote:
    Thanks for pushing this effort.

    I'd actually be in favor of 1b). I fully agree that decorator interfaces
    should be avoided but I'm also not a big fan of overloading the base
    interfaces (they are hard to implement as is). The usual feedback to
    Source-related interfaces are always that they are overwhelming and too
    hard to implement. However, I'd also not oppose 1c) as scattered interfaces
    also have drawbacks. I'd just dislike 1a) and 1d).
    While I also like Becket's capability approach, I fear that it doesn't work
    for this particular use case: Sources can always be aligned cross-task and
    this is just about intra-task alignment. So it's plausible to put sources
    into an alignment group even though they do not use any of the presented
    API of FLIP-217. They should just issue a warning, if they handle multiple
    splits (see motivation section).

    I think renaming alignSplits to facilitate future use cases makes sense but
    then all interfaces (if 1c) is chosen) should be adjusted accordingly.
    AlignedSourceReader could be PausingSourceReader and I'd go for
    pauseOrResumeSplits (Becket's proposal afaik). We could also split it into
    pauseSplit and resumeSplit. While pauseOrResumeSplits may allow Sources to
    just use 1 instead of 2 library calls (as written in the Javadoc), both
    Kafka and Pulsar can't use it and I'm not sure if there is a system that
    can.

    Some nit for the FLIP:
    - Please replace "stop" with "pause".
    - Not sure if it's worth it in the capability section: Sources that adopt
    this interface cannot be used in earlier versions. So it feels like we are
    only forward compatible (old sources can be used after the change); but I
    guess this holds for any API addition.
    - You might want to add what happens when all splits are paused.
    - You may want to describe how the 3 flavors of SourceReaderBase interact
    with the interface.
    - I'm not sure if it makes sense to include Kafka and Pulsar in the FLIP.
    For me, this is rather immediate follow-up work. (could be in the same
    umbrella ticket)

    Best,

    Arvid

    On Mon, Apr 25, 2022 at 12:52 PM Dawid Wysakowicz<dwysakow...@apache.org>  
<mailto:dwysakow...@apache.org>
    wrote:

    a) "MySourceReader implements SourceReader, WithSplitsAlignment", along
    with "MySplitReader implements SplitReader, WithSplitsAlignment", or
    b) "MySourceReader implements AlignedSourceReader" and "MySplitReader
    implements AlignedSplitReader", or
    c) "MySourceReader implements SourceReader" and "MySplitReader implements
    SplitReader".

    I think the latest proposal according to Dawid would be:
    d) "MySourceReader implements SourceReader" and "MySplitReader implements
    AlignedSplitReader".
    I am fine with this API, although personally speaking I think it is simpler
    to just add a new method to the split reader with default impl.


    I think that is a good idea to have it aligned as much as possible. I'd be
    +1 for your option c). We can merge AlignedSplitReader with SplitReader. We
    will update the FLIP shortly.

    Best,

    Dawid

    On 25/04/2022 12:43, Becket Qin wrote:

    Thanks for the comment, Jark.

    3. Interface/Method Name.

    Can the interface be used to align other things in the future? For example,
    align read speed, I have
    seen users requesting global rate limits. This feature may also need an
    interface like this.
    If we don't plan to extend this interface to support align other things, I
    suggest explicitly declaring
    the purpose of the methods, such as `alignWatermarksForSplits` instead of
    `alignSplits`.

    This is a good point. Naming wise, it would usually be more extensible to
    just describe what the method actually does, instead of assuming the
    purpose of doing this. For example, in this case, pauseOrResumeSplits()
    would be more extensible because this can be used for any kind of flow
    control, be it watermark alignment or simple rate limiting.

    4. Interface or Method.

    I don't have a strong opinion on this. I think they have their own
    advantages.
    In Flink SQL, we heavily use Interfaces for extending abilities
    (SupportsXxxx) for TableSource/TableSink,
    and I prefer Interfaces rather than methods in this case. When you have a
    bunch of abilities and each ability
    has more than one method, Interfaces can help to organize them and make
    users clear which methods
    need to implement when you want to have an ability.

    I am OK with decorative interfaces if this is a general design pattern in
    the other components in Flink. But it looks like the current API proposal
    is not symmetric.

    The current proposal is essentially "MySourceReader implements
    SourceReader, WithSplitsAlignment", along with "MySplitReader implements
    AlignedSplitsReader".

    Should we make the API symmetric? I'd consider any one of the following as
    symmetric.

    a) "MySourceReader implements SourceReader, WithSplitsAlignment", along
    with "MySplitReader implements SplitReader, WithSplitsAlignment", or
    b) "MySourceReader implements AlignedSourceReader" and "MySplitReader
    implements AlignedSplitReader", or
    c) "MySourceReader implements SourceReader" and "MySplitReader implements
    SplitReader".

    I think the latest proposal according to Dawid would be:
    d) "MySourceReader implements SourceReader" and "MySplitReader implements
    AlignedSplitReader".
    I am fine with this API, although personally speaking I think it is simpler
    to just add a new method to the split reader with default impl.

    @Dawid Wysakowicz<dwysakow...@apache.org>  <mailto:dwysakow...@apache.org>  
<dwysakow...@apache.org>  <mailto:dwysakow...@apache.org>, thanks for the reply.

    Having said that, as I don't have a preference and I agree most of the

    sources will support the alignment I am fine following your suggestion to
    have the SourceReader extending from WithWatermarksSplitsAlignment, but
    would put the "supportsXXX" there, not in the Source to keep the two
    methods together.

    One benefit of having the "supportsXXX" in Source is that this allows some
    compile time check. For example, if a user enabled watermark alignment
    while it is not supported by the Source, an exception can be thrown at
    compile time. It seems in general useful. That said, I agree that API
    cleanliness wise it is better to put the two methods together.

    Thanks,

    Jiangjie (Becket) Qin

    On Mon, Apr 25, 2022 at 5:56 PM Jark Wu<imj...@gmail.com>  <mailto:imj...@gmail.com>  
<imj...@gmail.com>  <mailto:imj...@gmail.com>  wrote:


    Thank Dawid for the reminder on FLIP-182. Sorry I did miss it.
    I don't have other concerns then.

    Best,
    Jark

    On Mon, 25 Apr 2022 at 15:40, Dawid Wysakowicz<dwysakow...@apache.org>  
<mailto:dwysakow...@apache.org>  <dwysakow...@apache.org>  
<mailto:dwysakow...@apache.org>
    wrote:


    @Jark:

    1. Will the framework always align with watermarks when the source
    implements the interface?
    I'm afraid not every case needs watermark alignment even if Kafka
    implements the interface,
    and this will affect the throughput somehow. I agree with Becket
    we may need a
    `supportSplitsAlignment()` method for users to configure the source to
    enable/disable the alignment.

    2. How does the framework calculate maxDesiredWatermark?
    I think the algorithm of maxDesiredWatermark will greatly affect

    throughput

    if the reader is constantly
      switching between pause and resume. Can users configure the alignment
    offset?


    This is covered in the previous FLIP[1] which has been already

    implemented

    in 1.15. In short, it must be enabled with the watermark strategy which
    also configures drift and update interval.

    If we don't plan to extend this interface to support align other things,

    I

    suggest explicitly declaring
    the purpose of the methods, such as `alignWatermarksForSplits` instead of
    `alignSplits`.


    Sure let's rename it.

    @Becket:

    I understand your point. On the other hand putting all methods, even with
    "supportsXXX" methods for enabling certain features, makes the entry
    threshold for writing a new source higher. Instead of focusing on the

    basic

    and required properties of the Source, the person implementing a source
    must bother with and need to figure out what all of the extra features

    are

    about and how to deal with them. It makes it also harder to organize
    methods in coupled groups as Jark said.

    Having said that, as I don't have a preference and I agree most of the
    sources will support the alignment I am fine following your suggestion to
    have the SourceReader extending from WithWatermarksSplitsAlignment, but
    would put the "supportsXXX" there, not in the Source to keep the two
    methods together.

    Lastly, I agree it is really unfortunate the "alignSplits" methods differ
    slightly for SourceReader and SpitReader. The reason for that is
    SourceReaderBase deals only with SplitIds, whereas SplitReader needs the
    actual splits to pause them. I found the discrepancy acceptable for the
    sake of simplifying changes significantly, especially as they would

    highly

    likely impact performance as we would have to perform additional lookups.
    Moreover the SplitReader is a secondary interface.

    Best,

    Dawid

    [1]https://cwiki.apache.org/confluence/x/hQYBCw

    On 24/04/2022 17:15, Jark Wu wrote:

    Thanks for the effort, Dawid and Sebastian!

    I just have some minor questions (maybe I missed something).

    1. Will the framework always align with watermarks when the source
    implements the interface?
    I'm afraid not every case needs watermark alignment even if Kafka
    implements the interface,
    and this will affect the throughput somehow. I agree with Becket
    we may need a
    `supportSplitsAlignment()` method for users to configure the source to
    enable/disable the alignment.

    2. How does the framework calculate maxDesiredWatermark?
    I think the algorithm of maxDesiredWatermark will greatly affect

    throughput

    if the reader is constantly
      switching between pause and resume. Can users configure the alignment
    offset?

    3. Interface/Method Name.
    Can the interface be used to align other things in the future? For

    example,

    align read speed, I have
    seen users requesting global rate limits. This feature may also need an
    interface like this.
    If we don't plan to extend this interface to support align other things,

    I

    suggest explicitly declaring
    the purpose of the methods, such as `alignWatermarksForSplits` instead of
    `alignSplits`.

    4. Interface or Method.
    I don't have a strong opinion on this. I think they have their own
    advantages.
    In Flink SQL, we heavily use Interfaces for extending abilities
    (SupportsXxxx) for TableSource/TableSink,
    and I prefer Interfaces rather than methods in this case. When you have a
    bunch of abilities and each ability
    has more than one method, Interfaces can help to organize them and make
    users clear which methods
    need to implement when you want to have an ability.


    Best,
    Jark

    On Sun, 24 Apr 2022 at 18:13, Becket Qin<becket....@gmail.com>  
<mailto:becket....@gmail.com>  <becket....@gmail.com>  <mailto:becket....@gmail.com>  
<

    becket....@gmail.com> wrote:

    Hi Dawid,

    Thanks for the explanation. Apologies that I somehow misread a bunch of
    "align" and thought they were "assign".

    Regarding 1, by default implementation, I was thinking of the default

    no-op

    implementation. I am a little worried about the proliferation of

    decorative

    interfaces. I think the most important thing about interfaces is that

    they

    are easy to understand. In this case, I prefer adding new method to the
    existing interface for the following reasons:

    a) I feel the biggest drawback of decorative interfaces is which

    interface

    they can decorate and which combinations of multiple decorative

    interfaces

    are valid. In the current FLIP, the withSplitsAlignment interface is only
    applicable to the SourceReader which means it can't decorate any other
    interface. From an interface design perspective, a natural question is

    why

    not let "AlignedSplitReader" extend "withSplitsAlignment"? And it is also
    natural to assume that a split reader implementing both SplitReader and
    WithSplitAlignment would work, because a source reader implementing
    SourceReader and withSplitsAlignment works. So why isn't there an

    interface

    of AlignedSourceReader? In the future, if there is a new feature added
    (e.g. sorted or pre-partitioned data aware), are we going to create

    another

    interface of SplitReader such as SortedSplitReader or

    PrePartitionedAware?

    Can they be combined? So I think the additional decorative interface like
    withSplitsAlignment actually increases the understanding cost of users
    because they have to know what decorative interfaces are there, which
    interface they can decorate and which combinations of the decorative
    interfaces are valid and which are not. Ideally we want to avoid that. To
    be clear, I am not opposing having an interface of withSplitsAlignment,

    it

    is completely OK to have it as an internal interface and let SourceReader
    and SplitReader both extend it.

    b) Adding a new method to the SourceReader with a default implementation

    of

    no-op would help avoid logic branching in the source logic, especially
    given that we agree that the vast majority of the SourceReader
    implementations, if not all, would just extend from the SourceReaderBase.
    That means adding a new method to the interface would effectively give

    the

    same user experience, but simpler.

    c) A related design principle that may be worth discussing is how do we

    let

    the Source implementations tell Flink what capability is supported and

    what

    is not. Personally speaking I feel the most intuitive place to me is in

    the

    Source itself, because that is the entrance of the entire Source

    connector

    logic.

    Based on the above thoughts, I am wondering if the following interface
    would be easier to understand by the users.

    - Change "withSplitsAlignment" to internal interface, let both

    SourceReader

    and SplitReader extend from it, with a default no-op implementation.
    - Add a new method "boolean supportSplitsAlignment()" to the Source
    interface, with a default implementation returning false. Sources that

    have

    implemented the alignment logic can change this to return true, and
    override the alignSplits() methods in the SourceReader / SplitReader if
    needed.
    - In the future, if a new optional feature is going to be added to the
    Source, and that feature requires the awareness from Flink, we can add

    more

    such methods to the Source.

    What do you think?

    Thanks,

    Jiangjie (Becket) Qin





    On Fri, Apr 22, 2022 at 4:05 PM Dawid Wysakowicz<dwysakow...@apache.org>  
<mailto:dwysakow...@apache.org>  <dwysakow...@apache.org>  
<mailto:dwysakow...@apache.org>

    <dwysakow...@apache.org>  <mailto:dwysakow...@apache.org>  
<dwysakow...@apache.org>  <mailto:dwysakow...@apache.org>

    wrote:


    @Konstantin:

    As part of this FLIP, the `AlignedSplitReader` interface (aka the stop &
    resume behavior) will be implemented for Kafka and Pulsar only, correct?

    Correct, as far as I know though, those are the only sources which

    consume

    concurrently from multiple splits and thus alignment applies.

    @Thomas:

    I wonder if "supporting" split alignment in SourceReaderBase and then

    doing

    nothing if the split reader does not implement AlignedSplitReader could

    be

    misleading? Perhaps WithSplitsAlignment can instead be added to the
    specific source reader (i.e. KafkaSourceReader) to make it explicit that
    the source actually supports it.

    I understand your concern. Hmm, I think we could actually do that. Given
    the actual implementation of the SourceReaderBase#alignSplits is rather
    short (just a forward to the corresponding method of SplitFetcher), we
    could reimplement it in the actual source implementations. This solution
    has the downside though. Authors of new sources would have to do two
    things: extend from AlignedSplitReader and implement

    WithSplitsAssignment,

    instead of just extending AlignedSplitReader. I would be fine with such a
    tradeoff though. What others think?

    @Steven:

    For this part from the motivation section, is it accurate? Let's assume

    one

    source task consumes from 3 partitions and one of the partition is
    significantly slower. In this situation, watermark for this source task
    won't hold back as it is reading recent data from other two Kafka
    partitions. As a result, it won't hold back the overall watermark. I
    thought the problem is that we may have late data for this slow

    partition.

    It will hold back the watermark. Watermark of an operator is the minimum
    of watermarks of all splits[1]

    I have another question about the restart. Say split alignment is
    triggered. checkpoint is completed. job failed and restored from the last
    checkpoint. because alignment decision is not checkpointed, initially
    alignment won't be enforced until we get a cycle of watermark aggregation
    and propagation, right? Not saying this corner is a problem. Just want to
    understand it more.

    Your understanding is correct.

    @Becket:

    1. I think watermark alignment is sort of a general use case, so should

    we

    just add the related methods to SourceReader directly instead of
    introducing the new interface of WithSplitAssignment? We can provide
    default implementations, so backwards compatibility won't be an issue.

    I don't think we can provide a default implementation. How would we do
    that? Would it be just a no-op? Is it better than having an opt-in
    interface? The default implementation would have to be added exclusively

    in

    a *Public* SourceReader interface. By the way notice SourceReaderBase
    does extend from WithSplitsAlignment, so effectively all implementations

    do

    handle the alignment case. To be honest I think it is impossible to
    implement the SourceReader interface directly by end users.

    2. As you mentioned, the SplitReader interface probably also needs some
    change to support throttling at the split granularity. Can you add that
    interface change into the public interface section as well?

    It has been added from the beginning. See *AlignedSplitReader.*

    3. Nit, can we avoid using the method name assignSplits here, given that

    it

    is not actually changing the split assignments? It seems something like
    pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate.

    The method's called *alignSplits*, not assign. Do you still prefer a
    different name for that? Personally, I am open for suggestions here.

    Best,

    Dawid

    [1]




    
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation

    On 22/04/2022 05:59, Becket Qin wrote:

    Thanks for driving the effort, Sebastion. I think the motivation makes a
    lot of sense. Just a few suggestions / questions.

    1. I think watermark alignment is sort of a general use case, so should

    we

    just add the related methods to SourceReader directly instead of
    introducing the new interface of WithSplitAssignment? We can provide
    default implementations, so backwards compatibility won't be an issue.

    2. As you mentioned, the SplitReader interface probably also needs some
    change to support throttling at the split granularity. Can you add that
    interface change into the public interface section as well?

    3. Nit, can we avoid using the method name assignSplits here, given that

    it

    is not actually changing the split assignments? It seems something like
    pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate.

    Thanks,

    Jiangjie (Becket) Qin

    On Thu, Apr 21, 2022 at 11:39 PM Steven Wu<stevenz...@gmail.com>  
<mailto:stevenz...@gmail.com>  <stevenz...@gmail.com>  <mailto:stevenz...@gmail.com>  
<

    stevenz...@gmail.com> <

    stevenz...@gmail.com> wrote:

    However, a single source operator may read data from multiple

    splits/partitions, e.g., multiple Kafka partitions, such that even with
    watermark alignment the source operator may need to buffer excessive

    amount

    of data if one split emits data faster than another.

    For this part from the motivation section, is it accurate? Let's assume

    one

    source task consumes from 3 partitions and one of the partition is
    significantly slower. In this situation, watermark for this source task
    won't hold back as it is reading recent data from other two Kafka
    partitions. As a result, it won't hold back the overall watermark. I
    thought the problem is that we may have late data for this slow

    partition.

    I have another question about the restart. Say split alignment is
    triggered. checkpoint is completed. job failed and restored from the last
    checkpoint. because alignment decision is not checkpointed, initially
    alignment won't be enforced until we get a cycle of watermark aggregation
    and propagation, right? Not saying this corner is a problem. Just want to
    understand it more.



    On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise<t...@apache.org>  <mailto:t...@apache.org>  
<t...@apache.org>  <mailto:t...@apache.org>  <

    t...@apache.org> <

    t...@apache.org> wrote:

    Thanks for working on this!

    I wonder if "supporting" split alignment in SourceReaderBase and then

    doing

    nothing if the split reader does not implement AlignedSplitReader could

    be

    misleading? Perhaps WithSplitsAlignment can instead be added to the
    specific source reader (i.e. KafkaSourceReader) to make it explicit that
    the source actually supports it.

    Thanks,
    Thomas


    On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf<kna...@apache.org>  
<mailto:kna...@apache.org>  <kna...@apache.org>  <mailto:kna...@apache.org>  <

    kna...@apache.org> <

    kna...@apache.org>

    wrote:


    Hi Sebastian, Hi Dawid,

    As part of this FLIP, the `AlignedSplitReader` interface (aka the stop

    &

    resume behavior) will be implemented for Kafka and Pulsar only,

    correct?

    +1 in general. I believe it is valuable to complete the watermark

    aligned

    story with this FLIP.

    Cheers,

    Konstantin







    On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz<dwysakow...@apache.org>  
<mailto:dwysakow...@apache.org>

    wrote:


    To be explicit, having worked on it, I support it ;) I think we can
    start a vote thread soonish, as there are no concerns so far.

    Best,

    Dawid

    On 13/04/2022 11:27, Sebastian Mattheis wrote:

    Dear Flink developers,

    I would like to open a discussion on FLIP 217 [1] for an extension

    of

    Watermark Alignment to perform alignment also in SplitReaders. To

    do

    so,

    SplitReaders must be able to suspend and resume reading from split

    sources

    where the SourceOperator coordinates and controlls suspend and

    resume.

    To

    gather information about current watermarks of the SplitReaders, we

    extend

    the internal WatermarkOutputMulitplexer and report watermarks to

    the

    SourceOperator.

    There is a PoC for this FLIP [2], prototyped by Arvid Heise and

    revised

    and

    reworked by Dawid Wysakowicz (He did most of the work.) and me. The

    changes

    are backwards compatible in a way that if affected components do

    not

    support split alignment the behavior is as before.

    Best,
    Sebastian

    [1]






    
https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits

    [2]https://github.com/dawidwys/flink/tree/aligned-splits

    --

    Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk  
<http://twitter.com/snntrablehttps://github.com/knaufk>


Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to