Hi Becket, Thanks a lot for your fast and detailed response. For me, it converges and dropping the supportsX method sounds very reasonable to me. (Side note: With "pausable splits" enabled as "default" I think we misunderstood. As you described now "default" I understand as that it should be the new recommended way of implementation, and I think that is fully valid. Before, I understood "default" here as the default implementation, i.e., throwing UnsupportedOperationException, which is the exact opposite. :) )
Nevertheless: As mentioned, an open question for me is if watermark alignment should enforce pausable splits. For clarification, the current documentation [1] says: *Note:* As of 1.15, Flink supports aligning across tasks of the same source > and/or different sources. It does not support aligning > splits/partitions/shards in the same task. > > In a case where there are e.g. two Kafka partitions that produce > watermarks at different pace, that get assigned to the same task watermark > might not behave as expected. Fortunately, worst case it should not perform > worse than without alignment. > > Given the limitation above, we suggest applying watermark alignment in two > situations: > > 1. You have two different sources (e.g. Kafka and File) that produce > watermarks at different speeds > 2. You run your source with parallelism equal to the number of > splits/shards/partitions, which results in every subtask being assigned a > single unit of work. > > I personally see no issue in implementing and I see no reason against implementing this dependency of watermark alignment and pausable splits. (I think this would even be a good path towards shaping watermark alignment in 1.16.) However, "I don't see" means that I would be happy to hear Dawid's and Piotrek's opinions as they implemented watermark alignment based on FLIP-182 [2] and I don't want to miss relevant rationale/background info from their side. *@Piotrek* *@Dawid *What do you think? Regards, Sebastian [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_ [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources?src=contextnavpagetreemode On Wed, May 11, 2022 at 1:30 PM Becket Qin <becket....@gmail.com> wrote: > +dev > > Hi Sebastian, > > Thank you for the summary. Please see the detailed replies inline. As a > recap of my suggestions. > > 1. Pausable splits API. > a) Add default implementations to methods "pauseOrResumeSplits" in both > SourceReader and SplitReader where both default implementations throw > UnsupportedOperationException. > > 2. User story. > a) We tell users to enable the watermark alignment as they like. This > is exactly what the current Flink API is. > b) We tell the source developers, please implement pausable splits, > otherwise bad things may happen. Think of it like you are expected to > implement SourceReader#snapshotState() properly, otherwise exceptions will > be thrown when users enable checkpointing. > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, May 11, 2022 at 4:45 PM Sebastian Mattheis < > sebast...@ververica.com> wrote: > >> Hi Becket, Hi everybody, >> >> I'm sorry if I misread the messages but I could not derive an agreement >> from the mailing list. Nevertheless, if I understand you right the >> suggestion is: >> >> * Add default implementations to methods "pauseOrResumeSplits" in both >> SourceReader and SplitReader where both default implementations throw >> UnsupportedOperationException. >> > Yes. > > * Add "supportsPauseOrResumeSplits" to the Source interface. (In the >> following, I refer to supporting this as "pausable splits".) >> > We may no longer need this if pausable splits are expected to be > implemented by the source developers, i.e. non-optional. Having this method > would then be somewhat misleading as it looks like the sources that do not > support pausable splits are also acceptable in the long term. So API wise, > I'd say maybe we should remove this for this FLIP, although I believe this > supportXXX pattern itself is still attractive for optional features. > > >> >> To make the conclusions explicit: >> >> 1. The implementation of pauseOrResumeSplits in both interfaces >> SourceReader and SplitReader are optional where the default is that it >> doesn't support it. (--> This means that the implementation is still >> optional for the source developer.) >> > It is optional for backwards compatibility with existing sources, as they > may still compile without code change. But starting from this FLIP, Flink > will always optimistically assume that all the sources support pausable > splits. If a source does not support pausable splits, it goes to an error > handling path when watermark alignment is enabled on it. This is different > from a usual optional feature, where no error is expected. > > >> 2. If watermark alignment is enabled in the application code by adding >> withWatermarkAlignment to the WatermarkStrategy while SourceReader or >> SplitReader do not support pausableSplits, we throw an >> UnsupportedOperationException. >> > Yes. > > >> 3. With regard to your statement: >> >>> [...] basically means watermark alignment is an non-optional feature to >>> the end users. >> >> You actually mean that "pausable splits" are non-optional for the app >> developer if watermark alignment is enabled. However, watermark alignment >> is optional and can be enabled/disabled. >> > Yes, watermark alignment can be enabled/disabled in individual sources in > Flink jobs, which basically means the code supporting watermark alignment > has to already be there. That again means the Source developers are also > expected to support pausable splits by default. So this way we essentially > tell the end users that you may enable / disable this feature as you wish, > and tell the source developers that you SHOULD implement this because the > end users may turn it on/off at will. And if the source does not support > pausable splits, that goes to an error handling path when watermark > alignment is enabled on it. So users know they have to explicitly exclude > this source. > > >> >> So far it's totally clear to me and I hope this is what you mean. I also >> agree with both statements: >> >> So making that expectation aligned with the source developers seems >>> reasonable. >>> >> >> I think this is a simple and clean solution from both the end user and >>> source developers' standpoint. >>> >> >> However, a last conclusion derives from 3. and is an open question for me: >> >> 4. The feature of "pausable splits" is now tightly bound to watermark >> alignment, i.e., if sources do not support "pausable splits" one can not >> enable watermark alignment for these sources. This dependency is not the >> current status of watermark alignment implementation because it is/was >> implemented without pausable splits. Do we want to introduce this >> dependency? (This is an open question. I cannot judge that.) >> > The watermark alignment basically relies on the pausable splits, right? So > personally I found it quite reasonable that if the source does not support > pausable splits, end users cannot enable watermark alignment on it. > > >> If something is wrong, please correct me. >> >> Regards, >> Sebastian >> >> On Wed, May 11, 2022 at 9:05 AM Becket Qin <becket....@gmail.com> wrote: >> >>> Hi Sebastian, >>> >>> Thanks for the reply and patient discussion. I agree this is a tricky >>> decision. >>> >>> >>>> Nevertheless, Piotr has valid concerns about Option c) which I see as >>>> follows: >>>> (1) An interface with default NOOP implementation makes the >>>> implementation optional. And in my opinion, a default implementation is and >>>> will remain a way of making implementation optional because even in future >>>> a developer can decide to implement the "old flavor" without support for >>>> pausable splits. >>>> (2) It may not be too critical but I also find it suboptimal that with >>>> a NOOP default implementation there is no way to check at runtime if >>>> SourceReader or SplitReader actually support pausing. (To do so, one would >>>> need a supportsX method which makes it again more complicated.)\ >>> >>> >>> Based on the last few messages in the mailing list. Piotr and I agreed >>> that the default implementation should just throw an >>> UnsupportedOperationException if the source is unpausable. So this >>> basically tells the Source developers that this feature is expected to be >>> supported. Because we cannot prevent end users from putting an unpausable >>> source into the watermark alignment group, that basically means watermark >>> alignment is an non-optional feature to the end users. So making that >>> expectation aligned with the source developers seems reasonable. And if a >>> source does not support this feature, the end users should explicitly >>> remove that source from the watermark alignment group. >>> >>> Personally speaking I think this is a simple and clean solution from >>> both the end user and source developers' standpoint. >>> >>> Does this address your concerns? >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Wed, May 11, 2022 at 2:52 PM Sebastian Mattheis < >>> sebast...@ververica.com> wrote: >>> >>>> Hi Piotr, Hi Becket, Hi everybody, >>>> >>>> we, Dawid and I, discussed the various suggestions/options and we would >>>> be okay either way because we find neither solution is perfect just because >>>> of the already present complexity. >>>> >>>> Option c) Adding methods to the interfaces of SourceReader and >>>> SplitReader >>>> Option a) Adding decorative interfaces to be used by SourceReader and >>>> SplitReader >>>> >>>> As of the current status (v. 12) of the FLIP [1], it is based on Option >>>> c) which we find acceptable because the complexity added is only a single >>>> method. >>>> >>>> Nevertheless, Piotr has valid concerns about Option c) which I see as >>>> follows: >>>> (1) An interface with default NOOP implementation makes the >>>> implementation optional. And in my opinion, a default implementation is and >>>> will remain a way of making implementation optional because even in future >>>> a developer can decide to implement the "old flavor" without support for >>>> pausable splits. >>>> (2) It may not be too critical but I also find it suboptimal that with >>>> a NOOP default implementation there is no way to check at runtime if >>>> SourceReader or SplitReader actually support pausing. (To do so, one would >>>> need a supportsX method which makes it again more complicated.) >>>> >>>> However, we haven't changed it because Option a) is also not optimal or >>>> straight-forward: >>>> (1) We need to add two distinct yet similar decorative interfaces >>>> since, as mentioned, the signatures of the methods are different. For >>>> example, we would need decorative interfaces like >>>> `SplitReaderWithPausableSplits` and `SourceReaderWithPausableSplits`. >>>> (2) As a consequence, we would need to somehow document how/where to >>>> implement both interfaces and how this relates to each other. This we could >>>> solve by adding a note in the interface of SourceReader and SplitReader and >>>> reference to the decorative interfaces but it still increases complexity >>>> too. >>>> >>>> In summary, we see both as acceptable and preferred over other options. >>>> The question is if we can find a solution or compromise that is acceptable >>>> for everybody to reach consensus. >>>> >>>> Please let us know what you think because we would be happy if we can >>>> conclude the discussion to avoid dropping the initiative on this FLIP. >>>> >>>> Regards, >>>> Sebastian >>>> >>>> [1] >>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199540438 >>>> (v. 12) >>>> >>>> On Thu, May 5, 2022 at 10:13 AM Piotr Nowojski <pnowoj...@apache.org> >>>> wrote: >>>> >>>>> Hi Guowei, >>>>> >>>>> as Dawid wrote a couple of messages back: >>>>> >>>>> > This is covered in the previous FLIP[1] which has been already >>>>> implemented in 1.15. In short, it must be enabled with the watermark >>>>> strategy which also configures drift and update interval >>>>> >>>>> So by default watermark alignment is disabled, regardless if a source >>>>> supports it or not. >>>>> >>>>> Best, >>>>> Piotrek >>>>> >>>>> czw., 5 maj 2022 o 09:56 Guowei Ma <guowei....@gmail.com> napisał(a): >>>>> >>>>>> Hi, >>>>>> >>>>>> We know that in the case of Bounded input Flink supports the Batch >>>>>> execution mode. Currently in Batch execution mode, flink is executed >>>>>> on a >>>>>> stage-by-stage basis. In this way, perhaps watermark alignment might >>>>>> not >>>>>> gain much. >>>>>> >>>>>> So my question is: Is watermark alignment the default behavior(for >>>>>> implemented source only)? If so, have you considered evaluating the >>>>>> impact >>>>>> of this behavior on the Batch execution mode? Or thinks it is not >>>>>> necessary. >>>>>> >>>>>> Correct me if I miss something. >>>>>> >>>>>> Best, >>>>>> Guowei >>>>>> >>>>>> >>>>>> On Thu, May 5, 2022 at 1:01 PM Piotr Nowojski < >>>>>> piotr.nowoj...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> > Hi Becket and Dawid, >>>>>> > >>>>>> > > I feel that no matter which option we choose this can not be >>>>>> solved >>>>>> > entirely in either of the options, because of the point above and >>>>>> because >>>>>> > the signature of SplitReader#pauseOrResumeSplits and >>>>>> > SourceReader#pauseOrResumeSplits are slightly different (one >>>>>> identifies >>>>>> > splits with splitId the other one passes the splits directly). >>>>>> > >>>>>> > Yes, that's a good point in this case and for features that need to >>>>>> be >>>>>> > implemented in more than one place. >>>>>> > >>>>>> > > Is there any reason for pausing reading from a split an optional >>>>>> feature, >>>>>> > > other than that this was not included in the original interface? >>>>>> > >>>>>> > An additional argument in favor of making it optional is to >>>>>> simplify source >>>>>> > implementation. But on its own I'm not sure if that would be enough >>>>>> to >>>>>> > justify making this feature optional. Maybe. >>>>>> > >>>>>> > > I think it would be way simpler and clearer to just let end users >>>>>> and >>>>>> > Flink >>>>>> > > assume all the connectors will implement this feature. >>>>>> > >>>>>> > As I wrote above that would be an interesting choice to make (ease >>>>>> of >>>>>> > implementation for new users, vs system consistency). Regardless of >>>>>> that, >>>>>> > yes, for me the main argument is the API backward compatibility. >>>>>> But let's >>>>>> > clear a couple of points: >>>>>> > - The current proposal adding methods to the base interface with >>>>>> default >>>>>> > implementations is an OPTIONAL feature. Same as the decorative >>>>>> version >>>>>> > would be. >>>>>> > - Decorative version could implement "throw >>>>>> UnsupportedOperationException" >>>>>> > if user enabled watermark alignment just as well and I agree that's >>>>>> a >>>>>> > better option compared to logging a warning. >>>>>> > >>>>>> > Best, >>>>>> > Piotrek >>>>>> > >>>>>> > >>>>>> > śr., 4 maj 2022 o 15:40 Becket Qin <becket....@gmail.com> >>>>>> napisał(a): >>>>>> > >>>>>> > > Thanks for the reply and patient discussion, Piotr and Dawid. >>>>>> > > >>>>>> > > Is there any reason for pausing reading from a split an optional >>>>>> feature, >>>>>> > > other than that this was not included in the original interface? >>>>>> > > >>>>>> > > To be honest I am really worried about the complexity of the user >>>>>> story >>>>>> > > here. Optional features like this have a high overhead. Imagine >>>>>> this >>>>>> > > feature is optional, now a user enabled watermark alignment and >>>>>> defined a >>>>>> > > few watermark groups. Would it work? Hmm, that depends on whether >>>>>> the >>>>>> > > involved Source has implmemented this feature. If the Sources are >>>>>> well >>>>>> > > documented, good luck. Otherwise end users may have to look into >>>>>> the code >>>>>> > > of the Source to see whether the feature is supported. Which is >>>>>> something >>>>>> > > they shouldn't have to do. >>>>>> > > >>>>>> > > I think it would be way simpler and clearer to just let end users >>>>>> and >>>>>> > Flink >>>>>> > > assume all the connectors will implement this feature. After all >>>>>> the >>>>>> > > watermark group is not optinoal to the end users. If in some rare >>>>>> cases, >>>>>> > > the feature cannot be supported, a clear >>>>>> UnsupportedOperationException >>>>>> > will >>>>>> > > be thrown to tell users to explicitly remove this Source from the >>>>>> > watermark >>>>>> > > group. I don't think we should have a warning message here, as >>>>>> they tend >>>>>> > to >>>>>> > > be ignored in many cases. If we do this, we don't even need the >>>>>> > supportXXX >>>>>> > > method in the Source for this feature. In fact this is exactly >>>>>> how many >>>>>> > > interfaces works today. For example, >>>>>> SplitEnumerator#addSplitsBack() is >>>>>> > not >>>>>> > > supported by Pravega source because it does not support partial >>>>>> failover. >>>>>> > > In that case, it simply throws an exception to trigger a global >>>>>> recovery. >>>>>> > > >>>>>> > > The reason we add a default implementation in this case would >>>>>> just for >>>>>> > the >>>>>> > > sake of backwards compatibility so the old source can still >>>>>> compile. >>>>>> > Sure, >>>>>> > > in short term, this feature might not be supported by many >>>>>> existing >>>>>> > > sources. That is OK, and it is quite visible to the source >>>>>> developers >>>>>> > that >>>>>> > > they did not override the default impl which throws an >>>>>> > > UnsupportedOperationException. >>>>>> > > >>>>>> > > @Dawid, >>>>>> > > >>>>>> > > the Java doc of the SupportXXX() method in the Source would be >>>>>> the single >>>>>> > > >> source of truth regarding how to implement this feature. >>>>>> > > > >>>>>> > > > >>>>>> > > >>>>>> > > I also don't find it entirely true. Half of the classes are >>>>>> theoretically >>>>>> > > > optional and are utility classes from the point of view how the >>>>>> > > interfaces >>>>>> > > > are organized. Theoretically users do not need to use any of >>>>>> > > > SourceReaderBase & SplitReader. Would be weird to list their >>>>>> methods in >>>>>> > > the >>>>>> > > > Source interface. >>>>>> > > >>>>>> > > I think the ultimate goal of java docs is to guide users to >>>>>> implement the >>>>>> > > Source. If SourceReaderBase is the preferred way to implement a >>>>>> > > SourceReader, it seems worth mentioning that. Even the Java >>>>>> language >>>>>> > > documentation interfaces lists the konwn implementations [1] so >>>>>> people >>>>>> > can >>>>>> > > leverage them. But for this particular case, if we make the >>>>>> feature >>>>>> > > non-optional, we don't even need the supportXXX() method for now. >>>>>> > > >>>>>> > > Thanks, >>>>>> > > >>>>>> > > Jiangjie (Becket) Qin >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > On Wed, May 4, 2022 at 4:37 PM Dawid Wysakowicz < >>>>>> dwysakow...@apache.org> >>>>>> > > wrote: >>>>>> > > >>>>>> > > > Hey Piotr and Becket, >>>>>> > > > >>>>>> > > > First of all, let me say I am happy with whichever option is >>>>>> agreed in >>>>>> > > the >>>>>> > > > discussion. >>>>>> > > > >>>>>> > > > I wanted to clarify a few points from the discussion though: >>>>>> > > > >>>>>> > > > @Becket: >>>>>> > > > >>>>>> > > > The main argument for adding the methods to the SourceReader is >>>>>> that >>>>>> > > these >>>>>> > > > methods are effectively NON-OPTIONAL to SourceReader impl, i.e. >>>>>> > starting >>>>>> > > > from this FLIP, all the SourceReaders impl are expected to >>>>>> support this >>>>>> > > > method, although some old implementations may not have >>>>>> implemented this >>>>>> > > > feature. I think we should distinguish the new features from the >>>>>> > optional >>>>>> > > > features. While the public decorative interface is a solution >>>>>> to the >>>>>> > > > optional features. We should not use it for the features that >>>>>> are >>>>>> > > > non-optional. >>>>>> > > > >>>>>> > > > I don't think that this feature is NON-OPTIONAL. Even though >>>>>> > preferred, I >>>>>> > > > still think it can be simply optional. >>>>>> > > > >>>>>> > > > the Java doc of the SupportXXX() method in the Source would be >>>>>> the >>>>>> > single >>>>>> > > > source of truth regarding how to implement this feature. >>>>>> > > > >>>>>> > > > I also don't find it entirely true. Half of the classes are >>>>>> > theoretically >>>>>> > > > optional and are utility classes from the point of view how the >>>>>> > > interfaces >>>>>> > > > are organized. Theoretically users do not need to use any of >>>>>> > > > SourceReaderBase & SplitReader. Would be weird to list their >>>>>> methods in >>>>>> > > the >>>>>> > > > Source interface. >>>>>> > > > >>>>>> > > > @Piotr >>>>>> > > > >>>>>> > > > If we have all of the methods with default implementation in >>>>>> the base >>>>>> > > > interface, the API doesn't give any clue to the user which set >>>>>> of >>>>>> > methods >>>>>> > > > are required to be implemented at the same time. >>>>>> > > > >>>>>> > > > I feel that no matter which option we choose this can not be >>>>>> solved >>>>>> > > > entirely in either of the options, because of the point above >>>>>> and >>>>>> > because >>>>>> > > > the signature of SplitReader#pauseOrResumeSplits and >>>>>> > > > SourceReader#pauseOrResumeSplits are slightly different (one >>>>>> identifies >>>>>> > > > splits with splitId the other one passes the splits directly). >>>>>> > > > >>>>>> > > > Best, >>>>>> > > > >>>>>> > > > Dawid >>>>>> > > > On 03/05/2022 14:30, Becket Qin wrote: >>>>>> > > > >>>>>> > > > Hi Piotr, >>>>>> > > > >>>>>> > > > Thanks for the comment. >>>>>> > > > >>>>>> > > > Just to clarify, I am not against the decorative interfaces, >>>>>> but I do >>>>>> > > > think we should use them with caution. The main argument for >>>>>> adding the >>>>>> > > > methods to the SourceReader is that these methods are >>>>>> > > > effectively NON-OPTIONAL to SourceReader impl, i.e. starting >>>>>> from this >>>>>> > > > FLIP, all the SourceReaders impl are expected to support this >>>>>> > > > method, although some old implementations may not have >>>>>> implemented this >>>>>> > > > feature. I think we should distinguish the new features from the >>>>>> > optional >>>>>> > > > features. While the public decorative interface is a solution >>>>>> to the >>>>>> > > > optional features. We should not use it for the features that >>>>>> are >>>>>> > > > non-optional. >>>>>> > > > >>>>>> > > > That said, this feature is optional for SplitReaders. Arguably >>>>>> we can >>>>>> > > have >>>>>> > > > a decorative interface for that, but for simplicity and >>>>>> symmetry of the >>>>>> > > > interface, personally I prefer just adding a new method. >>>>>> > > > >>>>>> > > > Regarding the advantages you mentioned about the decorative >>>>>> interfaces, >>>>>> > > > they would make sense if: >>>>>> > > > 1. The feature is optional. >>>>>> > > > 2. There is only one decorative interface involved for a >>>>>> feature. >>>>>> > > > Otherwise the argument that all the methods are grouped >>>>>> together will >>>>>> > not >>>>>> > > > stand. >>>>>> > > > >>>>>> > > > Compared with that, I think the current solution works fine in >>>>>> all >>>>>> > cases, >>>>>> > > > i.e. "having supportXXX() method in Source, and default methods >>>>>> / >>>>>> > > > decorative interfaces in base interfaces.". >>>>>> > > > >>>>>> > > > The advantages are: >>>>>> > > >> - clean and easy to implement base interface >>>>>> > > > >>>>>> > > > In the current approach, the Java doc of the SupportXXX() >>>>>> method in the >>>>>> > > > Source would be the single source of truth regarding how to >>>>>> implement >>>>>> > > this >>>>>> > > > feature. It lists the method that has to be implemented to >>>>>> support this >>>>>> > > > feature, regardless of how many classes / interfaces are >>>>>> involved. >>>>>> > > > >>>>>> > > > When implementing the base interface, users do not need to >>>>>> implement a >>>>>> > > > method with default implementation. If they are curious what >>>>>> the method >>>>>> > > is >>>>>> > > > for, the java doc of that method simply points users to the >>>>>> > SupportXXX() >>>>>> > > > method in the Source. It seems not adding work to the users >>>>>> compared >>>>>> > with >>>>>> > > > decorative interfaces, but gives much better discoverability. >>>>>> > > > >>>>>> > > > - all of the methods from a single feature are grouped in a >>>>>> single >>>>>> > > >> decorator interface, together with their dedicated java doc. >>>>>> It's also >>>>>> > > >> easier to google search for help using the decorator name >>>>>> > > > >>>>>> > > > - if an optional feature requires two methods to be implemented >>>>>> at >>>>>> > once, >>>>>> > > >> decorator can guarantee that >>>>>> > > > >>>>>> > > > These two points are not true when multiple components and >>>>>> classes are >>>>>> > > > involved collaboratively to provide a feature. In our case, we >>>>>> have >>>>>> > both >>>>>> > > > SourceReader and SplitReader involved. And there might be other >>>>>> > > interfaces >>>>>> > > > on the JM side involved for some future features. So the >>>>>> relevant >>>>>> > methods >>>>>> > > > can actually be scattered over the places. That said, we may >>>>>> still use >>>>>> > > > decorative interfaces for each component, if the feature is >>>>>> optional, >>>>>> > > given >>>>>> > > > there is a single source of truth for the feature. >>>>>> > > > >>>>>> > > > Here I would strongly lean towards making life easier for new >>>>>> users, >>>>>> > > >> lowering the entry barrier, at the (imo) slight expense for >>>>>> the power >>>>>> > > >> users. >>>>>> > > > >>>>>> > > > I actually think the current approach is simpler, more >>>>>> extensible and >>>>>> > > more >>>>>> > > > general for all the users. Can you articulate a bit more on >>>>>> which part >>>>>> > > you >>>>>> > > > think makes users harder to understand? >>>>>> > > > >>>>>> > > > >>>>>> > > > There is another benefit of the decorative interfaces which is >>>>>> not >>>>>> > > > mentioned, but might be worth considering here. Usually the >>>>>> decorative >>>>>> > > > interfaces give slightly better backwards compatibility than >>>>>> the new >>>>>> > > > default method in the interfaces. That is when users are using >>>>>> a jar >>>>>> > that >>>>>> > > > was compiled with an older version of Flink which does not have >>>>>> the >>>>>> > > default >>>>>> > > > method in the interfaces in question. A decorative interface >>>>>> may still >>>>>> > > > provide backwards compatibility in that case, while default >>>>>> method impl >>>>>> > > > cannot. >>>>>> > > > >>>>>> > > > I think in Flink we in general do not guarantee custom >>>>>> components >>>>>> > > compiled >>>>>> > > > with an older version can run with a newer version of Flink. A >>>>>> > recompile >>>>>> > > > with a newer version would be required. That said, if we do >>>>>> care about >>>>>> > > > this, we can just change the "supportXXX()" method in the Source >>>>>> > > interface >>>>>> > > > to use decorative interfaces, and leave the other parts >>>>>> unchanged. >>>>>> > > > >>>>>> > > > Thanks, >>>>>> > > > >>>>>> > > > Jiangjie (Becket) Qin >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > On Tue, May 3, 2022 at 6:25 PM Piotr Nowojski < >>>>>> pnowoj...@apache.org> >>>>>> > > > wrote: >>>>>> > > > >>>>>> > > >> Hi, >>>>>> > > >> >>>>>> > > >> Sorry for chipping in so late, but I was OoO for the last two >>>>>> weeks. >>>>>> > > >> Regarding the interfaces, I would be actually against adding >>>>>> those >>>>>> > > methods >>>>>> > > >> to the base interfaces for the reasons mentioned above. >>>>>> Clogging the >>>>>> > > base >>>>>> > > >> interface for new users with tons of methods that they do not >>>>>> need, do >>>>>> > > not >>>>>> > > >> understand and do not know what to do with them. Moreover, such >>>>>> > > decorative >>>>>> > > >> interfaces are solving a problem if a feature requires two or >>>>>> more >>>>>> > > methods >>>>>> > > >> to be implemented at the same time. If we have all of the >>>>>> methods with >>>>>> > > >> default implementation in the base interface, the API doesn't >>>>>> give any >>>>>> > > >> clue >>>>>> > > >> to the user which set of methods are required to be >>>>>> implemented at the >>>>>> > > >> same >>>>>> > > >> time. >>>>>> > > >> >>>>>> > > >> > a) I feel the biggest drawback of decorative interfaces is >>>>>> which >>>>>> > > >> interface >>>>>> > > >> > they can decorate and which combinations of multiple >>>>>> decorative >>>>>> > > >> interfaces >>>>>> > > >> > are valid. (...) >>>>>> > > >> > In the future, if there is a new feature added >>>>>> > > >> > (e.g. sorted or pre-partitioned data aware), are we going to >>>>>> create >>>>>> > > >> another >>>>>> > > >> > interface of SplitReader such as SortedSplitReader or >>>>>> > > >> PrePartitionedAware? >>>>>> > > >> > Can they be combined? So I think the additional decorative >>>>>> interface >>>>>> > > >> like >>>>>> > > >> > withSplitsAlignment actually increases the understanding >>>>>> cost of >>>>>> > users >>>>>> > > >> > because they have to know what decorative interfaces are >>>>>> there, >>>>>> > which >>>>>> > > >> > interface they can decorate and which combinations of the >>>>>> decorative >>>>>> > > >> > interfaces are valid and which are not. Ideally we want to >>>>>> avoid >>>>>> > that. >>>>>> > > >> >>>>>> > > >> I'm not sure if I understand how embedding default methods in >>>>>> the base >>>>>> > > >> interface is solving the problem: what can be combined or not? >>>>>> If >>>>>> > there >>>>>> > > >> are >>>>>> > > >> two conflicting features, having decorative interfaces that >>>>>> can not be >>>>>> > > >> mixed together actually makes much more sense to me rather >>>>>> than having >>>>>> > > >> them >>>>>> > > >> all in one base class. How would you allow users to implement >>>>>> only one >>>>>> > > of >>>>>> > > >> those two features? >>>>>> > > >> >>>>>> > > >> To reiterate on the issue. Yes, there are drawbacks: >>>>>> > > >> - how can a user discover what decorators are there? >>>>>> > > >> - how can a user know where the decorator can be applied to? >>>>>> > > >> >>>>>> > > >> However those are drawbacks for more power users, that can be >>>>>> > mitigated >>>>>> > > by >>>>>> > > >> the documentation. For example listing all of the decorators >>>>>> with >>>>>> > > >> detailed explanation both in the docs and in the java docs. >>>>>> More >>>>>> > > >> experienced users will be able to deal with those issues >>>>>> easier, as >>>>>> > they >>>>>> > > >> will already have some basic understanding of Flink. Also if >>>>>> user has >>>>>> > a >>>>>> > > >> problem that he wants to solve, he will google search a >>>>>> potential >>>>>> > > solution >>>>>> > > >> to his problem anyway, and while doing that he is very likely >>>>>> to >>>>>> > > discover >>>>>> > > >> the decorator that he needs anyway in the docs. >>>>>> > > >> >>>>>> > > >> The advantages are: >>>>>> > > >> - clean and easy to implement base interface >>>>>> > > >> - all of the methods from a single feature are grouped in a >>>>>> single >>>>>> > > >> decorator interface, together with their dedicated java doc. >>>>>> It's also >>>>>> > > >> easier to google search for help using the decorator name >>>>>> > > >> - if an optional feature requires two methods to be >>>>>> implemented at >>>>>> > once, >>>>>> > > >> decorator can guarantee that >>>>>> > > >> >>>>>> > > >> Here I would strongly lean towards making life easier for new >>>>>> users, >>>>>> > > >> lowering the entry barrier, at the (imo) slight expense for >>>>>> the power >>>>>> > > >> users. >>>>>> > > >> >>>>>> > > >> Best, >>>>>> > > >> Piotrek >>>>>> > > >> >>>>>> > > >> >>>>>> > > >> wt., 26 kwi 2022 o 15:32 Becket Qin <becket....@gmail.com> >>>>>> > napisał(a): >>>>>> > > >> >>>>>> > > >> > Thanks for the reply Sebastian and Dawid, >>>>>> > > >> > >>>>>> > > >> > I think Sebastion has a good summary. This is a really >>>>>> helpful >>>>>> > > >> discussion. >>>>>> > > >> > >>>>>> > > >> > Thinking a bit more, I feel that it might still be better to >>>>>> add the >>>>>> > > >> > supportsXXX() method in the Source rather than SourceReader. >>>>>> > > >> > >>>>>> > > >> > Generally speaking, what we are trying to do here is to let >>>>>> the >>>>>> > Flink >>>>>> > > >> > framework know what the Source is capable of. In this FLIP, >>>>>> it >>>>>> > happens >>>>>> > > >> to >>>>>> > > >> > be the capability that only involves SourceReader. But in the >>>>>> > future, >>>>>> > > >> it is >>>>>> > > >> > possible that another functionality involves both the >>>>>> > SplitEnumerator >>>>>> > > >> and >>>>>> > > >> > SourceReader. In that case, following the current approach, >>>>>> we >>>>>> > should >>>>>> > > >> put >>>>>> > > >> > the "supportsXXX()" method in both SplitEnumerator and >>>>>> SourceReader. >>>>>> > > >> > Because if we only put this in the SourceReader, then the JM >>>>>> would >>>>>> > > have >>>>>> > > >> to >>>>>> > > >> > create a SourceReader in order to know whether this feature >>>>>> is >>>>>> > > >> supported, >>>>>> > > >> > which is a little ugly. But if we put the "supportsXXX()" >>>>>> method in >>>>>> > > the >>>>>> > > >> > Source, we will break the "symmetric" design because this >>>>>> FLIP >>>>>> > chose a >>>>>> > > >> > different way. >>>>>> > > >> > >>>>>> > > >> > This is also why I think supportsXXX() method seems a good >>>>>> thing to >>>>>> > > >> have, >>>>>> > > >> > because when there are a few interfaces / methods that are >>>>>> expected >>>>>> > to >>>>>> > > >> be >>>>>> > > >> > implemented at the same time in order to deliver a feature, >>>>>> it is >>>>>> > > always >>>>>> > > >> > good to have a single source of truth to tell the framework >>>>>> what to >>>>>> > > do, >>>>>> > > >> so >>>>>> > > >> > the framework can do consistent things in different parts. >>>>>> > > >> > >>>>>> > > >> > @Sebastian Mattheis <sebast...@ververica.com> >>>>>> > > >> > >>>>>> > > >> > Regarding interface flavor b), i.e. AlignedSourceReader + >>>>>> > > >> > AlignedSplitReader, what I feel awkward about is that we are >>>>>> > > essentially >>>>>> > > >> > expecting almost all the SourceReader implementations to >>>>>> extend >>>>>> > > >> > SourceReaderBase, which effectively makes the SourceReader >>>>>> interface >>>>>> > > >> > without the pausing support useless. So this indicates that >>>>>> public >>>>>> > > >> > decorative interfaces (or sub-interfaces for the same >>>>>> purpose) only >>>>>> > > >> > make sense if the original interface is also expected to be >>>>>> used. >>>>>> > > >> > Otherwise, it seems makes more sense to add the method to the >>>>>> > original >>>>>> > > >> > interface itself. >>>>>> > > >> > >>>>>> > > >> > Cheers, >>>>>> > > >> > >>>>>> > > >> > Jiangjie (Becket) Qin >>>>>> > > >> > >>>>>> > > >> > >>>>>> > > >> > >>>>>> > > >> > >>>>>> > > >> > On Tue, Apr 26, 2022 at 6:05 PM Dawid Wysakowicz < >>>>>> > > >> dwysakow...@apache.org> >>>>>> > > >> > wrote: >>>>>> > > >> > >>>>>> > > >> > > Thanks @Sebastian for the nice summary. >>>>>> > > >> > > >>>>>> > > >> > > I think most of your points aligned with the suggestions I >>>>>> made to >>>>>> > > the >>>>>> > > >> > > FLIP, while you were writing your reply (I believe we hit >>>>>> enter >>>>>> > > >> nearly at >>>>>> > > >> > > the same time ;) ) >>>>>> > > >> > > >>>>>> > > >> > > Two points after we synced offline >>>>>> > > >> > > >>>>>> > > >> > > 1. I changed also the supportsWatermarksSplitAlignment to >>>>>> > > >> > > supportsPausingSplits to express the general capability of >>>>>> > pausing. >>>>>> > > >> > > >>>>>> > > >> > > 2. As for if we should >>>>>> PausingSourceReader/PausingSplitReader >>>>>> > > (option >>>>>> > > >> b) >>>>>> > > >> > > or if we should just add the methods (option c), I suggest >>>>>> to >>>>>> > simply >>>>>> > > >> add >>>>>> > > >> > > the two methods as I felt this is much preferred approach >>>>>> Becket, >>>>>> > > >> which >>>>>> > > >> > > others do not object. Unless there is an opposition let's >>>>>> go with >>>>>> > > this >>>>>> > > >> > > option c. >>>>>> > > >> > > >>>>>> > > >> > > Best, >>>>>> > > >> > > >>>>>> > > >> > > Dawid >>>>>> > > >> > > On 26/04/2022 10:06, Sebastian Mattheis wrote: >>>>>> > > >> > > >>>>>> > > >> > > Hi folks, >>>>>> > > >> > > >>>>>> > > >> > > Sorry for being a bit silent. Many thanks for all the >>>>>> input and >>>>>> > > >> > > suggestions. As I'm a bit new, I needed some time to catch >>>>>> up and >>>>>> > > >> > structure >>>>>> > > >> > > (for myself) the discussion and I wanted to find a way to >>>>>> > structure >>>>>> > > >> the >>>>>> > > >> > > conclusions. (Also because I had the feeling that some >>>>>> concerns >>>>>> > got >>>>>> > > >> lost >>>>>> > > >> > in >>>>>> > > >> > > the discussion.) This is my attempt and please correct me >>>>>> if >>>>>> > > >> something is >>>>>> > > >> > > wrong or misunderstood. I tried to collect and assemble the >>>>>> > > opinions, >>>>>> > > >> > > suggestions, and conclusions (to the best of my knowledge): >>>>>> > > >> > > >>>>>> > > >> > > # Top A: Should split alignment (pause/resume behavior) be >>>>>> a >>>>>> > general >>>>>> > > >> > > capability? >>>>>> > > >> > > >>>>>> > > >> > > I personally don't see any reason no to have it a general >>>>>> > capability >>>>>> > > >> > > because for the alignSplit method it is actually >>>>>> independent of >>>>>> > the >>>>>> > > >> > > watermarks. If we agree here to have it a general >>>>>> capability, we >>>>>> > > >> should >>>>>> > > >> > > also agree on the right wording. Does >>>>>> "alignSplits(splitsToResume, >>>>>> > > >> > > splitsToPause)" refer to what is then actually meant? (I >>>>>> see it as >>>>>> > > >> okay. >>>>>> > > >> > I >>>>>> > > >> > > don't have any better idea whilst Arvid suggested >>>>>> > > >> "pauseOrResumeSplits".) >>>>>> > > >> > > >>>>>> > > >> > > # Top B: Should it be possible do enable/disable split >>>>>> alignment? >>>>>> > > >> > > >>>>>> > > >> > > I would personally not disable the split alignment on the >>>>>> source >>>>>> > > >> reader >>>>>> > > >> > > side because if split alignment is used for some other use >>>>>> case >>>>>> > (see >>>>>> > > >> A) >>>>>> > > >> > it >>>>>> > > >> > > could have nasty side effects on other/future use cases. >>>>>> Instead, >>>>>> > I >>>>>> > > >> would >>>>>> > > >> > > disable "watermark split alignment" where I think it should >>>>>> > disable >>>>>> > > >> the >>>>>> > > >> > > watermark-dependent trigger for split alignment. >>>>>> > > >> > > >>>>>> > > >> > > # Top C: Should we add a supportsX method? >>>>>> > > >> > > >>>>>> > > >> > > I find it difficult to define the scope of a supportsX >>>>>> method >>>>>> > w.r.t. >>>>>> > > >> to >>>>>> > > >> > > the following questions: a) Where is it used? and b) What >>>>>> is the >>>>>> > > >> expected >>>>>> > > >> > > output? To b), it's not straight-forward to provide a >>>>>> meaningful >>>>>> > > >> output, >>>>>> > > >> > > e.g., if SourceReader supports split alignment but >>>>>> SplitReader >>>>>> > not. >>>>>> > > >> This >>>>>> > > >> > is >>>>>> > > >> > > because with the current implementation, we can determine >>>>>> whether >>>>>> > > >> split >>>>>> > > >> > > alignment is fully supported only during runtime and >>>>>> specifically >>>>>> > > >> > actually >>>>>> > > >> > > only when calling alignSplits down the call hierarchy up >>>>>> to the >>>>>> > > actual >>>>>> > > >> > > SplitReaders. >>>>>> > > >> > > >>>>>> > > >> > > Therefore, I would suggest to either raise an error or >>>>>> warning if >>>>>> > > the >>>>>> > > >> > > alignment is called but not supported at some point. I >>>>>> know we >>>>>> > > should >>>>>> > > >> > > carefully think about when this could be the case because >>>>>> we don't >>>>>> > > >> want >>>>>> > > >> > to >>>>>> > > >> > > flood anybody with such warnings. However, warnings could >>>>>> be an >>>>>> > > >> indicator >>>>>> > > >> > > for the user that for watermark split alignment use case >>>>>> split >>>>>> > > >> reading is >>>>>> > > >> > > imbalanced with the conclusion to either disable the >>>>>> trigger for >>>>>> > > >> > watermark >>>>>> > > >> > > split alignment (see Top B) or to use/implement a source >>>>>> and >>>>>> > reader >>>>>> > > >> that >>>>>> > > >> > > fully supports split alignment. >>>>>> > > >> > > >>>>>> > > >> > > # Top D: How to design interfaces? >>>>>> > > >> > > >>>>>> > > >> > > Thanks for structuring the discussion with the the various >>>>>> > > >> possibilities >>>>>> > > >> > > (a-d). From the discussion and emails, I would like to >>>>>> summarize >>>>>> > the >>>>>> > > >> > > following requirements: >>>>>> > > >> > > - Interfaces should be consistent ("symmetric"), i.e., >>>>>> similar >>>>>> > > >> semantics >>>>>> > > >> > > should have similar interfaces with similar usage. >>>>>> > > >> > > - Make explicit which implementations implement >>>>>> interfaces/support >>>>>> > > >> > > behavior. >>>>>> > > >> > > - Make clear what are default implementations and how to >>>>>> implement >>>>>> > > >> > > interfaces with desired behavior. >>>>>> > > >> > > >>>>>> > > >> > > This is a simplified view of the relations between relevant >>>>>> > classes >>>>>> > > of >>>>>> > > >> > the >>>>>> > > >> > > PoC implementation: >>>>>> > > >> > > >>>>>> > > >> > > SourceReader (Public) <|-- SourceReaderBase (Internal) >>>>>> <|-- .. >>>>>> > <|-- >>>>>> > > >> > > MySourceReader >>>>>> > > >> > > >>>>>> > > >> > > MySourceReader <>-- SplitFetcherManager (Internal) <>-- >>>>>> > SplitFetcher >>>>>> > > >> > > (Internal) <>-- SplitReader (Public) <|-- MySplitReader >>>>>> > > >> > > >>>>>> > > >> > > (A <|-- B: B inherits from A; A <>-- B: A "has a" B) >>>>>> > > >> > > >>>>>> > > >> > > Note that SourceReaderBase and SplitFetcherManager >>>>>> implement most >>>>>> > of >>>>>> > > >> the >>>>>> > > >> > > "logic" for split alignment just because we wanted to >>>>>> implement >>>>>> > > split >>>>>> > > >> > > alignment and wanted it to be available as kind of a >>>>>> default. As a >>>>>> > > >> > > consequence, we have a "default implementation" for >>>>>> SourceReader >>>>>> > > that >>>>>> > > >> > > implements the actual logic for split alignment. For that >>>>>> reason, >>>>>> > I >>>>>> > > >> find >>>>>> > > >> > it >>>>>> > > >> > > very confusing to have a NOOP default implementation in the >>>>>> > > interface >>>>>> > > >> for >>>>>> > > >> > > the SourceReader. As a consequence, interface strategy c) >>>>>> is >>>>>> > > difficult >>>>>> > > >> > > because this would require NOOP default implementations in >>>>>> the >>>>>> > > public >>>>>> > > >> > > interfaces of SourceReader and SplitReader. This is the >>>>>> same for >>>>>> > > >> strategy >>>>>> > > >> > > d) because it would require NOOP default implementation in >>>>>> the >>>>>> > > >> > > SourceReader. Further, as Dawid described method >>>>>> signatures of >>>>>> > > >> alignSplit >>>>>> > > >> > > for SourceReader and SplitReader differ and it would be >>>>>> extremely >>>>>> > > >> > difficult >>>>>> > > >> > > to make the signatures the same (with even potential >>>>>> performance >>>>>> > > >> impact >>>>>> > > >> > > because of additional loop-ups of split ids). Therefore, >>>>>> having a >>>>>> > > >> > symmetric >>>>>> > > >> > > decorative interface as of strategy a) is actually not >>>>>> possible >>>>>> > and >>>>>> > > >> > having >>>>>> > > >> > > two decorative interfaces with different method signatures >>>>>> is >>>>>> > > >> confusing. >>>>>> > > >> > My >>>>>> > > >> > > conclusion is that we are best with strategy b) which >>>>>> means to >>>>>> > have >>>>>> > > >> > > specializing sub-interfaces that inherit from the parent >>>>>> > interface: >>>>>> > > >> > > SourceReader <|-- AlignedSourceReader, SplitReader <|-- >>>>>> > > >> > AlignedSplitReader >>>>>> > > >> > > With this option, I'm not 100% sure what the implications >>>>>> are and >>>>>> > if >>>>>> > > >> this >>>>>> > > >> > > could get nasty. I would suggest that Dawid and I just try >>>>>> to >>>>>> > > >> implement >>>>>> > > >> > and >>>>>> > > >> > > see if we like it. :) >>>>>> > > >> > > >>>>>> > > >> > > # Summary >>>>>> > > >> > > >>>>>> > > >> > > In conclusion, please let me know your perspectives. Please >>>>>> > correct >>>>>> > > >> me, >>>>>> > > >> > if >>>>>> > > >> > > something is wrong or if I misunderstood something. My >>>>>> perspective >>>>>> > > >> would >>>>>> > > >> > be: >>>>>> > > >> > > >>>>>> > > >> > > Top A: Yes >>>>>> > > >> > > Top B: Yes (but disable watermark trigger for split >>>>>> alignment) >>>>>> > > >> > > Top C: No >>>>>> > > >> > > Top D: b) >>>>>> > > >> > > >>>>>> > > >> > > Best, >>>>>> > > >> > > Sebastian >>>>>> > > >> > > >>>>>> > > >> > > On Tue, Apr 26, 2022 at 9:55 AM Dawid Wysakowicz < >>>>>> > > >> dwysakow...@apache.org >>>>>> > > >> > > >>>>>> > > >> > > wrote: >>>>>> > > >> > > >>>>>> > > >> > >> @Arvid: >>>>>> > > >> > >> >>>>>> > > >> > >> While I also like Becket's capability approach, I fear >>>>>> that it >>>>>> > > >> doesn't >>>>>> > > >> > work >>>>>> > > >> > >> for this particular use case: Sources can always be >>>>>> aligned >>>>>> > > >> cross-task >>>>>> > > >> > and >>>>>> > > >> > >> this is just about intra-task alignment. So it's >>>>>> plausible to put >>>>>> > > >> > sources >>>>>> > > >> > >> into an alignment group even though they do not use any >>>>>> of the >>>>>> > > >> presented >>>>>> > > >> > >> API of FLIP-217. They should just issue a warning, if >>>>>> they handle >>>>>> > > >> > multiple >>>>>> > > >> > >> splits (see motivation section). >>>>>> > > >> > >> >>>>>> > > >> > >> Yes, but the "supportXXX" method would be for telling if >>>>>> it >>>>>> > > supports >>>>>> > > >> > that >>>>>> > > >> > >> intra-task alignment. Cross-task alignment would always be >>>>>> > > supported. >>>>>> > > >> > >> >>>>>> > > >> > >> I updated interfaces to what I believe to be closest to a >>>>>> > consensus >>>>>> > > >> > >> between all participants. Do you mind taking a look? >>>>>> > > >> > >> >>>>>> > > >> > >> @Sebastian Do you mind addressing the nits? >>>>>> > > >> > >> >>>>>> > > >> > >> Best, >>>>>> > > >> > >> >>>>>> > > >> > >> Dawid >>>>>> > > >> > >> >>>>>> > > >> > >> On 25/04/2022 13:39, Arvid Heise wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> Thanks for pushing this effort. >>>>>> > > >> > >> >>>>>> > > >> > >> I'd actually be in favor of 1b). I fully agree that >>>>>> decorator >>>>>> > > >> interfaces >>>>>> > > >> > >> should be avoided but I'm also not a big fan of >>>>>> overloading the >>>>>> > > base >>>>>> > > >> > >> interfaces (they are hard to implement as is). The usual >>>>>> feedback >>>>>> > > to >>>>>> > > >> > >> Source-related interfaces are always that they are >>>>>> overwhelming >>>>>> > and >>>>>> > > >> too >>>>>> > > >> > >> hard to implement. However, I'd also not oppose 1c) as >>>>>> scattered >>>>>> > > >> > interfaces >>>>>> > > >> > >> also have drawbacks. I'd just dislike 1a) and 1d). >>>>>> > > >> > >> While I also like Becket's capability approach, I fear >>>>>> that it >>>>>> > > >> doesn't >>>>>> > > >> > work >>>>>> > > >> > >> for this particular use case: Sources can always be >>>>>> aligned >>>>>> > > >> cross-task >>>>>> > > >> > and >>>>>> > > >> > >> this is just about intra-task alignment. So it's >>>>>> plausible to put >>>>>> > > >> > sources >>>>>> > > >> > >> into an alignment group even though they do not use any >>>>>> of the >>>>>> > > >> presented >>>>>> > > >> > >> API of FLIP-217. They should just issue a warning, if >>>>>> they handle >>>>>> > > >> > multiple >>>>>> > > >> > >> splits (see motivation section). >>>>>> > > >> > >> >>>>>> > > >> > >> I think renaming alignSplits to facilitate future use >>>>>> cases makes >>>>>> > > >> sense >>>>>> > > >> > but >>>>>> > > >> > >> then all interfaces (if 1c) is chosen) should be adjusted >>>>>> > > >> accordingly. >>>>>> > > >> > >> AlignedSourceReader could be PausingSourceReader and I'd >>>>>> go for >>>>>> > > >> > >> pauseOrResumeSplits (Becket's proposal afaik). We could >>>>>> also >>>>>> > split >>>>>> > > it >>>>>> > > >> > into >>>>>> > > >> > >> pauseSplit and resumeSplit. While pauseOrResumeSplits may >>>>>> allow >>>>>> > > >> Sources >>>>>> > > >> > to >>>>>> > > >> > >> just use 1 instead of 2 library calls (as written in the >>>>>> > Javadoc), >>>>>> > > >> both >>>>>> > > >> > >> Kafka and Pulsar can't use it and I'm not sure if there >>>>>> is a >>>>>> > system >>>>>> > > >> that >>>>>> > > >> > >> can. >>>>>> > > >> > >> >>>>>> > > >> > >> Some nit for the FLIP: >>>>>> > > >> > >> - Please replace "stop" with "pause". >>>>>> > > >> > >> - Not sure if it's worth it in the capability section: >>>>>> Sources >>>>>> > that >>>>>> > > >> > adopt >>>>>> > > >> > >> this interface cannot be used in earlier versions. So it >>>>>> feels >>>>>> > like >>>>>> > > >> we >>>>>> > > >> > are >>>>>> > > >> > >> only forward compatible (old sources can be used after the >>>>>> > change); >>>>>> > > >> but >>>>>> > > >> > I >>>>>> > > >> > >> guess this holds for any API addition. >>>>>> > > >> > >> - You might want to add what happens when all splits are >>>>>> paused. >>>>>> > > >> > >> - You may want to describe how the 3 flavors of >>>>>> SourceReaderBase >>>>>> > > >> > interact >>>>>> > > >> > >> with the interface. >>>>>> > > >> > >> - I'm not sure if it makes sense to include Kafka and >>>>>> Pulsar in >>>>>> > the >>>>>> > > >> > FLIP. >>>>>> > > >> > >> For me, this is rather immediate follow-up work. (could >>>>>> be in the >>>>>> > > >> same >>>>>> > > >> > >> umbrella ticket) >>>>>> > > >> > >> >>>>>> > > >> > >> Best, >>>>>> > > >> > >> >>>>>> > > >> > >> Arvid >>>>>> > > >> > >> >>>>>> > > >> > >> On Mon, Apr 25, 2022 at 12:52 PM Dawid Wysakowicz < >>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org> >>>>>> > > >> > >> wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> a) "MySourceReader implements SourceReader, >>>>>> WithSplitsAlignment", >>>>>> > > >> along >>>>>> > > >> > >> with "MySplitReader implements SplitReader, >>>>>> WithSplitsAlignment", >>>>>> > > or >>>>>> > > >> > >> b) "MySourceReader implements AlignedSourceReader" and >>>>>> > > "MySplitReader >>>>>> > > >> > >> implements AlignedSplitReader", or >>>>>> > > >> > >> c) "MySourceReader implements SourceReader" and >>>>>> "MySplitReader >>>>>> > > >> > implements >>>>>> > > >> > >> SplitReader". >>>>>> > > >> > >> >>>>>> > > >> > >> I think the latest proposal according to Dawid would be: >>>>>> > > >> > >> d) "MySourceReader implements SourceReader" and >>>>>> "MySplitReader >>>>>> > > >> > implements >>>>>> > > >> > >> AlignedSplitReader". >>>>>> > > >> > >> I am fine with this API, although personally speaking I >>>>>> think it >>>>>> > is >>>>>> > > >> > simpler >>>>>> > > >> > >> to just add a new method to the split reader with default >>>>>> impl. >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> I think that is a good idea to have it aligned as much as >>>>>> > possible. >>>>>> > > >> I'd >>>>>> > > >> > be >>>>>> > > >> > >> +1 for your option c). We can merge AlignedSplitReader >>>>>> with >>>>>> > > >> > SplitReader. We >>>>>> > > >> > >> will update the FLIP shortly. >>>>>> > > >> > >> >>>>>> > > >> > >> Best, >>>>>> > > >> > >> >>>>>> > > >> > >> Dawid >>>>>> > > >> > >> >>>>>> > > >> > >> On 25/04/2022 12:43, Becket Qin wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> Thanks for the comment, Jark. >>>>>> > > >> > >> >>>>>> > > >> > >> 3. Interface/Method Name. >>>>>> > > >> > >> >>>>>> > > >> > >> Can the interface be used to align other things in the >>>>>> future? >>>>>> > For >>>>>> > > >> > example, >>>>>> > > >> > >> align read speed, I have >>>>>> > > >> > >> seen users requesting global rate limits. This feature >>>>>> may also >>>>>> > > need >>>>>> > > >> an >>>>>> > > >> > >> interface like this. >>>>>> > > >> > >> If we don't plan to extend this interface to support >>>>>> align other >>>>>> > > >> > things, I >>>>>> > > >> > >> suggest explicitly declaring >>>>>> > > >> > >> the purpose of the methods, such as >>>>>> `alignWatermarksForSplits` >>>>>> > > >> instead >>>>>> > > >> > of >>>>>> > > >> > >> `alignSplits`. >>>>>> > > >> > >> >>>>>> > > >> > >> This is a good point. Naming wise, it would usually be >>>>>> more >>>>>> > > >> extensible >>>>>> > > >> > to >>>>>> > > >> > >> just describe what the method actually does, instead of >>>>>> assuming >>>>>> > > the >>>>>> > > >> > >> purpose of doing this. For example, in this case, >>>>>> > > >> pauseOrResumeSplits() >>>>>> > > >> > >> would be more extensible because this can be used for any >>>>>> kind of >>>>>> > > >> flow >>>>>> > > >> > >> control, be it watermark alignment or simple rate >>>>>> limiting. >>>>>> > > >> > >> >>>>>> > > >> > >> 4. Interface or Method. >>>>>> > > >> > >> >>>>>> > > >> > >> I don't have a strong opinion on this. I think they have >>>>>> their >>>>>> > own >>>>>> > > >> > >> advantages. >>>>>> > > >> > >> In Flink SQL, we heavily use Interfaces for extending >>>>>> abilities >>>>>> > > >> > >> (SupportsXxxx) for TableSource/TableSink, >>>>>> > > >> > >> and I prefer Interfaces rather than methods in this case. >>>>>> When >>>>>> > you >>>>>> > > >> have >>>>>> > > >> > a >>>>>> > > >> > >> bunch of abilities and each ability >>>>>> > > >> > >> has more than one method, Interfaces can help to organize >>>>>> them >>>>>> > and >>>>>> > > >> make >>>>>> > > >> > >> users clear which methods >>>>>> > > >> > >> need to implement when you want to have an ability. >>>>>> > > >> > >> >>>>>> > > >> > >> I am OK with decorative interfaces if this is a general >>>>>> design >>>>>> > > >> pattern >>>>>> > > >> > in >>>>>> > > >> > >> the other components in Flink. But it looks like the >>>>>> current API >>>>>> > > >> > proposal >>>>>> > > >> > >> is not symmetric. >>>>>> > > >> > >> >>>>>> > > >> > >> The current proposal is essentially "MySourceReader >>>>>> implements >>>>>> > > >> > >> SourceReader, WithSplitsAlignment", along with >>>>>> "MySplitReader >>>>>> > > >> implements >>>>>> > > >> > >> AlignedSplitsReader". >>>>>> > > >> > >> >>>>>> > > >> > >> Should we make the API symmetric? I'd consider any one of >>>>>> the >>>>>> > > >> following >>>>>> > > >> > as >>>>>> > > >> > >> symmetric. >>>>>> > > >> > >> >>>>>> > > >> > >> a) "MySourceReader implements SourceReader, >>>>>> WithSplitsAlignment", >>>>>> > > >> along >>>>>> > > >> > >> with "MySplitReader implements SplitReader, >>>>>> WithSplitsAlignment", >>>>>> > > or >>>>>> > > >> > >> b) "MySourceReader implements AlignedSourceReader" and >>>>>> > > "MySplitReader >>>>>> > > >> > >> implements AlignedSplitReader", or >>>>>> > > >> > >> c) "MySourceReader implements SourceReader" and >>>>>> "MySplitReader >>>>>> > > >> > implements >>>>>> > > >> > >> SplitReader". >>>>>> > > >> > >> >>>>>> > > >> > >> I think the latest proposal according to Dawid would be: >>>>>> > > >> > >> d) "MySourceReader implements SourceReader" and >>>>>> "MySplitReader >>>>>> > > >> > implements >>>>>> > > >> > >> AlignedSplitReader". >>>>>> > > >> > >> I am fine with this API, although personally speaking I >>>>>> think it >>>>>> > is >>>>>> > > >> > simpler >>>>>> > > >> > >> to just add a new method to the split reader with default >>>>>> impl. >>>>>> > > >> > >> >>>>>> > > >> > >> @Dawid Wysakowicz <dwysakow...@apache.org> < >>>>>> > dwysakow...@apache.org >>>>>> > > > >>>>>> > > >> < >>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org>, thanks >>>>>> for the >>>>>> > > reply. >>>>>> > > >> > >> >>>>>> > > >> > >> Having said that, as I don't have a preference and I >>>>>> agree most >>>>>> > of >>>>>> > > >> the >>>>>> > > >> > >> >>>>>> > > >> > >> sources will support the alignment I am fine following >>>>>> your >>>>>> > > >> suggestion >>>>>> > > >> > to >>>>>> > > >> > >> have the SourceReader extending from >>>>>> > WithWatermarksSplitsAlignment, >>>>>> > > >> but >>>>>> > > >> > >> would put the "supportsXXX" there, not in the Source to >>>>>> keep the >>>>>> > > two >>>>>> > > >> > >> methods together. >>>>>> > > >> > >> >>>>>> > > >> > >> One benefit of having the "supportsXXX" in Source is that >>>>>> this >>>>>> > > allows >>>>>> > > >> > some >>>>>> > > >> > >> compile time check. For example, if a user enabled >>>>>> watermark >>>>>> > > >> alignment >>>>>> > > >> > >> while it is not supported by the Source, an exception can >>>>>> be >>>>>> > thrown >>>>>> > > >> at >>>>>> > > >> > >> compile time. It seems in general useful. That said, I >>>>>> agree that >>>>>> > > API >>>>>> > > >> > >> cleanliness wise it is better to put the two methods >>>>>> together. >>>>>> > > >> > >> >>>>>> > > >> > >> Thanks, >>>>>> > > >> > >> >>>>>> > > >> > >> Jiangjie (Becket) Qin >>>>>> > > >> > >> >>>>>> > > >> > >> On Mon, Apr 25, 2022 at 5:56 PM Jark Wu <imj...@gmail.com> >>>>>> < >>>>>> > > >> > imj...@gmail.com> <imj...@gmail.com> <imj...@gmail.com> >>>>>> wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> Thank Dawid for the reminder on FLIP-182. Sorry I did >>>>>> miss it. >>>>>> > > >> > >> I don't have other concerns then. >>>>>> > > >> > >> >>>>>> > > >> > >> Best, >>>>>> > > >> > >> Jark >>>>>> > > >> > >> >>>>>> > > >> > >> On Mon, 25 Apr 2022 at 15:40, Dawid Wysakowicz < >>>>>> > > >> dwysakow...@apache.org> >>>>>> > > >> > <dwysakow...@apache.org> <dwysakow...@apache.org> < >>>>>> > > >> dwysakow...@apache.org> >>>>>> > > >> > >> wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> @Jark: >>>>>> > > >> > >> >>>>>> > > >> > >> 1. Will the framework always align with watermarks when >>>>>> the >>>>>> > source >>>>>> > > >> > >> implements the interface? >>>>>> > > >> > >> I'm afraid not every case needs watermark alignment even >>>>>> if Kafka >>>>>> > > >> > >> implements the interface, >>>>>> > > >> > >> and this will affect the throughput somehow. I agree with >>>>>> Becket >>>>>> > > >> > >> we may need a >>>>>> > > >> > >> `supportSplitsAlignment()` method for users to configure >>>>>> the >>>>>> > source >>>>>> > > >> to >>>>>> > > >> > >> enable/disable the alignment. >>>>>> > > >> > >> >>>>>> > > >> > >> 2. How does the framework calculate maxDesiredWatermark? >>>>>> > > >> > >> I think the algorithm of maxDesiredWatermark will greatly >>>>>> affect >>>>>> > > >> > >> >>>>>> > > >> > >> throughput >>>>>> > > >> > >> >>>>>> > > >> > >> if the reader is constantly >>>>>> > > >> > >> switching between pause and resume. Can users configure >>>>>> the >>>>>> > > >> alignment >>>>>> > > >> > >> offset? >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> This is covered in the previous FLIP[1] which has been >>>>>> already >>>>>> > > >> > >> >>>>>> > > >> > >> implemented >>>>>> > > >> > >> >>>>>> > > >> > >> in 1.15. In short, it must be enabled with the watermark >>>>>> strategy >>>>>> > > >> which >>>>>> > > >> > >> also configures drift and update interval. >>>>>> > > >> > >> >>>>>> > > >> > >> If we don't plan to extend this interface to support >>>>>> align other >>>>>> > > >> things, >>>>>> > > >> > >> >>>>>> > > >> > >> I >>>>>> > > >> > >> >>>>>> > > >> > >> suggest explicitly declaring >>>>>> > > >> > >> the purpose of the methods, such as >>>>>> `alignWatermarksForSplits` >>>>>> > > >> instead >>>>>> > > >> > of >>>>>> > > >> > >> `alignSplits`. >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> Sure let's rename it. >>>>>> > > >> > >> >>>>>> > > >> > >> @Becket: >>>>>> > > >> > >> >>>>>> > > >> > >> I understand your point. On the other hand putting all >>>>>> methods, >>>>>> > > even >>>>>> > > >> > with >>>>>> > > >> > >> "supportsXXX" methods for enabling certain features, >>>>>> makes the >>>>>> > > entry >>>>>> > > >> > >> threshold for writing a new source higher. Instead of >>>>>> focusing on >>>>>> > > the >>>>>> > > >> > >> >>>>>> > > >> > >> basic >>>>>> > > >> > >> >>>>>> > > >> > >> and required properties of the Source, the person >>>>>> implementing a >>>>>> > > >> source >>>>>> > > >> > >> must bother with and need to figure out what all of the >>>>>> extra >>>>>> > > >> features >>>>>> > > >> > >> >>>>>> > > >> > >> are >>>>>> > > >> > >> >>>>>> > > >> > >> about and how to deal with them. It makes it also harder >>>>>> to >>>>>> > > organize >>>>>> > > >> > >> methods in coupled groups as Jark said. >>>>>> > > >> > >> >>>>>> > > >> > >> Having said that, as I don't have a preference and I >>>>>> agree most >>>>>> > of >>>>>> > > >> the >>>>>> > > >> > >> sources will support the alignment I am fine following >>>>>> your >>>>>> > > >> suggestion >>>>>> > > >> > to >>>>>> > > >> > >> have the SourceReader extending from >>>>>> > WithWatermarksSplitsAlignment, >>>>>> > > >> but >>>>>> > > >> > >> would put the "supportsXXX" there, not in the Source to >>>>>> keep the >>>>>> > > two >>>>>> > > >> > >> methods together. >>>>>> > > >> > >> >>>>>> > > >> > >> Lastly, I agree it is really unfortunate the "alignSplits" >>>>>> > methods >>>>>> > > >> > differ >>>>>> > > >> > >> slightly for SourceReader and SpitReader. The reason for >>>>>> that is >>>>>> > > >> > >> SourceReaderBase deals only with SplitIds, whereas >>>>>> SplitReader >>>>>> > > needs >>>>>> > > >> the >>>>>> > > >> > >> actual splits to pause them. I found the discrepancy >>>>>> acceptable >>>>>> > for >>>>>> > > >> the >>>>>> > > >> > >> sake of simplifying changes significantly, especially as >>>>>> they >>>>>> > would >>>>>> > > >> > >> >>>>>> > > >> > >> highly >>>>>> > > >> > >> >>>>>> > > >> > >> likely impact performance as we would have to perform >>>>>> additional >>>>>> > > >> > lookups. >>>>>> > > >> > >> Moreover the SplitReader is a secondary interface. >>>>>> > > >> > >> >>>>>> > > >> > >> Best, >>>>>> > > >> > >> >>>>>> > > >> > >> Dawid >>>>>> > > >> > >> >>>>>> > > >> > >> [1] https://cwiki.apache.org/confluence/x/hQYBCw >>>>>> > > >> > >> >>>>>> > > >> > >> On 24/04/2022 17:15, Jark Wu wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> Thanks for the effort, Dawid and Sebastian! >>>>>> > > >> > >> >>>>>> > > >> > >> I just have some minor questions (maybe I missed >>>>>> something). >>>>>> > > >> > >> >>>>>> > > >> > >> 1. Will the framework always align with watermarks when >>>>>> the >>>>>> > source >>>>>> > > >> > >> implements the interface? >>>>>> > > >> > >> I'm afraid not every case needs watermark alignment even >>>>>> if Kafka >>>>>> > > >> > >> implements the interface, >>>>>> > > >> > >> and this will affect the throughput somehow. I agree with >>>>>> Becket >>>>>> > > >> > >> we may need a >>>>>> > > >> > >> `supportSplitsAlignment()` method for users to configure >>>>>> the >>>>>> > source >>>>>> > > >> to >>>>>> > > >> > >> enable/disable the alignment. >>>>>> > > >> > >> >>>>>> > > >> > >> 2. How does the framework calculate maxDesiredWatermark? >>>>>> > > >> > >> I think the algorithm of maxDesiredWatermark will greatly >>>>>> affect >>>>>> > > >> > >> >>>>>> > > >> > >> throughput >>>>>> > > >> > >> >>>>>> > > >> > >> if the reader is constantly >>>>>> > > >> > >> switching between pause and resume. Can users configure >>>>>> the >>>>>> > > >> alignment >>>>>> > > >> > >> offset? >>>>>> > > >> > >> >>>>>> > > >> > >> 3. Interface/Method Name. >>>>>> > > >> > >> Can the interface be used to align other things in the >>>>>> future? >>>>>> > For >>>>>> > > >> > >> >>>>>> > > >> > >> example, >>>>>> > > >> > >> >>>>>> > > >> > >> align read speed, I have >>>>>> > > >> > >> seen users requesting global rate limits. This feature >>>>>> may also >>>>>> > > need >>>>>> > > >> an >>>>>> > > >> > >> interface like this. >>>>>> > > >> > >> If we don't plan to extend this interface to support >>>>>> align other >>>>>> > > >> things, >>>>>> > > >> > >> >>>>>> > > >> > >> I >>>>>> > > >> > >> >>>>>> > > >> > >> suggest explicitly declaring >>>>>> > > >> > >> the purpose of the methods, such as >>>>>> `alignWatermarksForSplits` >>>>>> > > >> instead >>>>>> > > >> > of >>>>>> > > >> > >> `alignSplits`. >>>>>> > > >> > >> >>>>>> > > >> > >> 4. Interface or Method. >>>>>> > > >> > >> I don't have a strong opinion on this. I think they have >>>>>> their >>>>>> > own >>>>>> > > >> > >> advantages. >>>>>> > > >> > >> In Flink SQL, we heavily use Interfaces for extending >>>>>> abilities >>>>>> > > >> > >> (SupportsXxxx) for TableSource/TableSink, >>>>>> > > >> > >> and I prefer Interfaces rather than methods in this case. >>>>>> When >>>>>> > you >>>>>> > > >> have >>>>>> > > >> > a >>>>>> > > >> > >> bunch of abilities and each ability >>>>>> > > >> > >> has more than one method, Interfaces can help to organize >>>>>> them >>>>>> > and >>>>>> > > >> make >>>>>> > > >> > >> users clear which methods >>>>>> > > >> > >> need to implement when you want to have an ability. >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> Best, >>>>>> > > >> > >> Jark >>>>>> > > >> > >> >>>>>> > > >> > >> On Sun, 24 Apr 2022 at 18:13, Becket Qin < >>>>>> becket....@gmail.com> >>>>>> > < >>>>>> > > >> > becket....@gmail.com> <becket....@gmail.com> < >>>>>> becket....@gmail.com> >>>>>> > < >>>>>> > > >> > >> becket....@gmail.com> wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> Hi Dawid, >>>>>> > > >> > >> >>>>>> > > >> > >> Thanks for the explanation. Apologies that I somehow >>>>>> misread a >>>>>> > > bunch >>>>>> > > >> of >>>>>> > > >> > >> "align" and thought they were "assign". >>>>>> > > >> > >> >>>>>> > > >> > >> Regarding 1, by default implementation, I was thinking of >>>>>> the >>>>>> > > default >>>>>> > > >> > >> >>>>>> > > >> > >> no-op >>>>>> > > >> > >> >>>>>> > > >> > >> implementation. I am a little worried about the >>>>>> proliferation of >>>>>> > > >> > >> >>>>>> > > >> > >> decorative >>>>>> > > >> > >> >>>>>> > > >> > >> interfaces. I think the most important thing about >>>>>> interfaces is >>>>>> > > that >>>>>> > > >> > >> >>>>>> > > >> > >> they >>>>>> > > >> > >> >>>>>> > > >> > >> are easy to understand. In this case, I prefer adding new >>>>>> method >>>>>> > to >>>>>> > > >> the >>>>>> > > >> > >> existing interface for the following reasons: >>>>>> > > >> > >> >>>>>> > > >> > >> a) I feel the biggest drawback of decorative interfaces >>>>>> is which >>>>>> > > >> > >> >>>>>> > > >> > >> interface >>>>>> > > >> > >> >>>>>> > > >> > >> they can decorate and which combinations of multiple >>>>>> decorative >>>>>> > > >> > >> >>>>>> > > >> > >> interfaces >>>>>> > > >> > >> >>>>>> > > >> > >> are valid. In the current FLIP, the withSplitsAlignment >>>>>> interface >>>>>> > > is >>>>>> > > >> > only >>>>>> > > >> > >> applicable to the SourceReader which means it can't >>>>>> decorate any >>>>>> > > >> other >>>>>> > > >> > >> interface. From an interface design perspective, a natural >>>>>> > question >>>>>> > > >> is >>>>>> > > >> > >> >>>>>> > > >> > >> why >>>>>> > > >> > >> >>>>>> > > >> > >> not let "AlignedSplitReader" extend >>>>>> "withSplitsAlignment"? And it >>>>>> > > is >>>>>> > > >> > also >>>>>> > > >> > >> natural to assume that a split reader implementing both >>>>>> > SplitReader >>>>>> > > >> and >>>>>> > > >> > >> WithSplitAlignment would work, because a source reader >>>>>> > implementing >>>>>> > > >> > >> SourceReader and withSplitsAlignment works. So why isn't >>>>>> there an >>>>>> > > >> > >> >>>>>> > > >> > >> interface >>>>>> > > >> > >> >>>>>> > > >> > >> of AlignedSourceReader? In the future, if there is a new >>>>>> feature >>>>>> > > >> added >>>>>> > > >> > >> (e.g. sorted or pre-partitioned data aware), are we going >>>>>> to >>>>>> > create >>>>>> > > >> > >> >>>>>> > > >> > >> another >>>>>> > > >> > >> >>>>>> > > >> > >> interface of SplitReader such as SortedSplitReader or >>>>>> > > >> > >> >>>>>> > > >> > >> PrePartitionedAware? >>>>>> > > >> > >> >>>>>> > > >> > >> Can they be combined? So I think the additional decorative >>>>>> > > interface >>>>>> > > >> > like >>>>>> > > >> > >> withSplitsAlignment actually increases the understanding >>>>>> cost of >>>>>> > > >> users >>>>>> > > >> > >> because they have to know what decorative interfaces are >>>>>> there, >>>>>> > > which >>>>>> > > >> > >> interface they can decorate and which combinations of the >>>>>> > > decorative >>>>>> > > >> > >> interfaces are valid and which are not. Ideally we want >>>>>> to avoid >>>>>> > > >> that. >>>>>> > > >> > To >>>>>> > > >> > >> be clear, I am not opposing having an interface of >>>>>> > > >> withSplitsAlignment, >>>>>> > > >> > >> >>>>>> > > >> > >> it >>>>>> > > >> > >> >>>>>> > > >> > >> is completely OK to have it as an internal interface and >>>>>> let >>>>>> > > >> > SourceReader >>>>>> > > >> > >> and SplitReader both extend it. >>>>>> > > >> > >> >>>>>> > > >> > >> b) Adding a new method to the SourceReader with a default >>>>>> > > >> implementation >>>>>> > > >> > >> >>>>>> > > >> > >> of >>>>>> > > >> > >> >>>>>> > > >> > >> no-op would help avoid logic branching in the source >>>>>> logic, >>>>>> > > >> especially >>>>>> > > >> > >> given that we agree that the vast majority of the >>>>>> SourceReader >>>>>> > > >> > >> implementations, if not all, would just extend from the >>>>>> > > >> > SourceReaderBase. >>>>>> > > >> > >> That means adding a new method to the interface would >>>>>> effectively >>>>>> > > >> give >>>>>> > > >> > >> >>>>>> > > >> > >> the >>>>>> > > >> > >> >>>>>> > > >> > >> same user experience, but simpler. >>>>>> > > >> > >> >>>>>> > > >> > >> c) A related design principle that may be worth >>>>>> discussing is how >>>>>> > > do >>>>>> > > >> we >>>>>> > > >> > >> >>>>>> > > >> > >> let >>>>>> > > >> > >> >>>>>> > > >> > >> the Source implementations tell Flink what capability is >>>>>> > supported >>>>>> > > >> and >>>>>> > > >> > >> >>>>>> > > >> > >> what >>>>>> > > >> > >> >>>>>> > > >> > >> is not. Personally speaking I feel the most intuitive >>>>>> place to me >>>>>> > > is >>>>>> > > >> in >>>>>> > > >> > >> >>>>>> > > >> > >> the >>>>>> > > >> > >> >>>>>> > > >> > >> Source itself, because that is the entrance of the entire >>>>>> Source >>>>>> > > >> > >> >>>>>> > > >> > >> connector >>>>>> > > >> > >> >>>>>> > > >> > >> logic. >>>>>> > > >> > >> >>>>>> > > >> > >> Based on the above thoughts, I am wondering if the >>>>>> following >>>>>> > > >> interface >>>>>> > > >> > >> would be easier to understand by the users. >>>>>> > > >> > >> >>>>>> > > >> > >> - Change "withSplitsAlignment" to internal interface, let >>>>>> both >>>>>> > > >> > >> >>>>>> > > >> > >> SourceReader >>>>>> > > >> > >> >>>>>> > > >> > >> and SplitReader extend from it, with a default no-op >>>>>> > > implementation. >>>>>> > > >> > >> - Add a new method "boolean supportSplitsAlignment()" to >>>>>> the >>>>>> > Source >>>>>> > > >> > >> interface, with a default implementation returning false. >>>>>> Sources >>>>>> > > >> that >>>>>> > > >> > >> >>>>>> > > >> > >> have >>>>>> > > >> > >> >>>>>> > > >> > >> implemented the alignment logic can change this to return >>>>>> true, >>>>>> > and >>>>>> > > >> > >> override the alignSplits() methods in the SourceReader / >>>>>> > > SplitReader >>>>>> > > >> if >>>>>> > > >> > >> needed. >>>>>> > > >> > >> - In the future, if a new optional feature is going to be >>>>>> added >>>>>> > to >>>>>> > > >> the >>>>>> > > >> > >> Source, and that feature requires the awareness from >>>>>> Flink, we >>>>>> > can >>>>>> > > >> add >>>>>> > > >> > >> >>>>>> > > >> > >> more >>>>>> > > >> > >> >>>>>> > > >> > >> such methods to the Source. >>>>>> > > >> > >> >>>>>> > > >> > >> What do you think? >>>>>> > > >> > >> >>>>>> > > >> > >> Thanks, >>>>>> > > >> > >> >>>>>> > > >> > >> Jiangjie (Becket) Qin >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> On Fri, Apr 22, 2022 at 4:05 PM Dawid Wysakowicz < >>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org> < >>>>>> > > >> dwysakow...@apache.org> >>>>>> > > >> > <dwysakow...@apache.org> >>>>>> > > >> > >> <dwysakow...@apache.org> <dwysakow...@apache.org> < >>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org> >>>>>> > > >> > >> >>>>>> > > >> > >> wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> @Konstantin: >>>>>> > > >> > >> >>>>>> > > >> > >> As part of this FLIP, the `AlignedSplitReader` interface >>>>>> (aka the >>>>>> > > >> stop & >>>>>> > > >> > >> resume behavior) will be implemented for Kafka and Pulsar >>>>>> only, >>>>>> > > >> correct? >>>>>> > > >> > >> >>>>>> > > >> > >> Correct, as far as I know though, those are the only >>>>>> sources >>>>>> > which >>>>>> > > >> > >> >>>>>> > > >> > >> consume >>>>>> > > >> > >> >>>>>> > > >> > >> concurrently from multiple splits and thus alignment >>>>>> applies. >>>>>> > > >> > >> >>>>>> > > >> > >> @Thomas: >>>>>> > > >> > >> >>>>>> > > >> > >> I wonder if "supporting" split alignment in >>>>>> SourceReaderBase and >>>>>> > > then >>>>>> > > >> > >> >>>>>> > > >> > >> doing >>>>>> > > >> > >> >>>>>> > > >> > >> nothing if the split reader does not implement >>>>>> AlignedSplitReader >>>>>> > > >> could >>>>>> > > >> > >> >>>>>> > > >> > >> be >>>>>> > > >> > >> >>>>>> > > >> > >> misleading? Perhaps WithSplitsAlignment can instead be >>>>>> added to >>>>>> > the >>>>>> > > >> > >> specific source reader (i.e. KafkaSourceReader) to make it >>>>>> > explicit >>>>>> > > >> that >>>>>> > > >> > >> the source actually supports it. >>>>>> > > >> > >> >>>>>> > > >> > >> I understand your concern. Hmm, I think we could actually >>>>>> do >>>>>> > that. >>>>>> > > >> Given >>>>>> > > >> > >> the actual implementation of the >>>>>> SourceReaderBase#alignSplits is >>>>>> > > >> rather >>>>>> > > >> > >> short (just a forward to the corresponding method of >>>>>> > SplitFetcher), >>>>>> > > >> we >>>>>> > > >> > >> could reimplement it in the actual source >>>>>> implementations. This >>>>>> > > >> solution >>>>>> > > >> > >> has the downside though. Authors of new sources would >>>>>> have to do >>>>>> > > two >>>>>> > > >> > >> things: extend from AlignedSplitReader and implement >>>>>> > > >> > >> >>>>>> > > >> > >> WithSplitsAssignment, >>>>>> > > >> > >> >>>>>> > > >> > >> instead of just extending AlignedSplitReader. I would be >>>>>> fine >>>>>> > with >>>>>> > > >> such >>>>>> > > >> > a >>>>>> > > >> > >> tradeoff though. What others think? >>>>>> > > >> > >> >>>>>> > > >> > >> @Steven: >>>>>> > > >> > >> >>>>>> > > >> > >> For this part from the motivation section, is it >>>>>> accurate? Let's >>>>>> > > >> assume >>>>>> > > >> > >> >>>>>> > > >> > >> one >>>>>> > > >> > >> >>>>>> > > >> > >> source task consumes from 3 partitions and one of the >>>>>> partition >>>>>> > is >>>>>> > > >> > >> significantly slower. In this situation, watermark for >>>>>> this >>>>>> > source >>>>>> > > >> task >>>>>> > > >> > >> won't hold back as it is reading recent data from other >>>>>> two Kafka >>>>>> > > >> > >> partitions. As a result, it won't hold back the overall >>>>>> > watermark. >>>>>> > > I >>>>>> > > >> > >> thought the problem is that we may have late data for >>>>>> this slow >>>>>> > > >> > >> >>>>>> > > >> > >> partition. >>>>>> > > >> > >> >>>>>> > > >> > >> It will hold back the watermark. Watermark of an operator >>>>>> is the >>>>>> > > >> minimum >>>>>> > > >> > >> of watermarks of all splits[1] >>>>>> > > >> > >> >>>>>> > > >> > >> I have another question about the restart. Say split >>>>>> alignment is >>>>>> > > >> > >> triggered. checkpoint is completed. job failed and >>>>>> restored from >>>>>> > > the >>>>>> > > >> > last >>>>>> > > >> > >> checkpoint. because alignment decision is not >>>>>> checkpointed, >>>>>> > > initially >>>>>> > > >> > >> alignment won't be enforced until we get a cycle of >>>>>> watermark >>>>>> > > >> > aggregation >>>>>> > > >> > >> and propagation, right? Not saying this corner is a >>>>>> problem. Just >>>>>> > > >> want >>>>>> > > >> > to >>>>>> > > >> > >> understand it more. >>>>>> > > >> > >> >>>>>> > > >> > >> Your understanding is correct. >>>>>> > > >> > >> >>>>>> > > >> > >> @Becket: >>>>>> > > >> > >> >>>>>> > > >> > >> 1. I think watermark alignment is sort of a general use >>>>>> case, so >>>>>> > > >> should >>>>>> > > >> > >> >>>>>> > > >> > >> we >>>>>> > > >> > >> >>>>>> > > >> > >> just add the related methods to SourceReader directly >>>>>> instead of >>>>>> > > >> > >> introducing the new interface of WithSplitAssignment? We >>>>>> can >>>>>> > > provide >>>>>> > > >> > >> default implementations, so backwards compatibility won't >>>>>> be an >>>>>> > > >> issue. >>>>>> > > >> > >> >>>>>> > > >> > >> I don't think we can provide a default implementation. >>>>>> How would >>>>>> > we >>>>>> > > >> do >>>>>> > > >> > >> that? Would it be just a no-op? Is it better than having >>>>>> an >>>>>> > opt-in >>>>>> > > >> > >> interface? The default implementation would have to be >>>>>> added >>>>>> > > >> exclusively >>>>>> > > >> > >> >>>>>> > > >> > >> in >>>>>> > > >> > >> >>>>>> > > >> > >> a *Public* SourceReader interface. By the way notice >>>>>> > > SourceReaderBase >>>>>> > > >> > >> does extend from WithSplitsAlignment, so effectively all >>>>>> > > >> implementations >>>>>> > > >> > >> >>>>>> > > >> > >> do >>>>>> > > >> > >> >>>>>> > > >> > >> handle the alignment case. To be honest I think it is >>>>>> impossible >>>>>> > to >>>>>> > > >> > >> implement the SourceReader interface directly by end >>>>>> users. >>>>>> > > >> > >> >>>>>> > > >> > >> 2. As you mentioned, the SplitReader interface probably >>>>>> also >>>>>> > needs >>>>>> > > >> some >>>>>> > > >> > >> change to support throttling at the split granularity. >>>>>> Can you >>>>>> > add >>>>>> > > >> that >>>>>> > > >> > >> interface change into the public interface section as >>>>>> well? >>>>>> > > >> > >> >>>>>> > > >> > >> It has been added from the beginning. See >>>>>> *AlignedSplitReader.* >>>>>> > > >> > >> >>>>>> > > >> > >> 3. Nit, can we avoid using the method name assignSplits >>>>>> here, >>>>>> > given >>>>>> > > >> that >>>>>> > > >> > >> >>>>>> > > >> > >> it >>>>>> > > >> > >> >>>>>> > > >> > >> is not actually changing the split assignments? It seems >>>>>> > something >>>>>> > > >> like >>>>>> > > >> > >> pauseOrResumeSplits(), or adjustSplitsThrottling() is more >>>>>> > > accurate. >>>>>> > > >> > >> >>>>>> > > >> > >> The method's called *alignSplits*, not assign. Do you >>>>>> still >>>>>> > prefer >>>>>> > > a >>>>>> > > >> > >> different name for that? Personally, I am open for >>>>>> suggestions >>>>>> > > here. >>>>>> > > >> > >> >>>>>> > > >> > >> Best, >>>>>> > > >> > >> >>>>>> > > >> > >> Dawid >>>>>> > > >> > >> >>>>>> > > >> > >> [1] >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >>>>>> > > >> >>>>>> > > >>>>>> > >>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation >>>>>> > > >> > >> >>>>>> > > >> > >> On 22/04/2022 05:59, Becket Qin wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> Thanks for driving the effort, Sebastion. I think the >>>>>> motivation >>>>>> > > >> makes a >>>>>> > > >> > >> lot of sense. Just a few suggestions / questions. >>>>>> > > >> > >> >>>>>> > > >> > >> 1. I think watermark alignment is sort of a general use >>>>>> case, so >>>>>> > > >> should >>>>>> > > >> > >> >>>>>> > > >> > >> we >>>>>> > > >> > >> >>>>>> > > >> > >> just add the related methods to SourceReader directly >>>>>> instead of >>>>>> > > >> > >> introducing the new interface of WithSplitAssignment? We >>>>>> can >>>>>> > > provide >>>>>> > > >> > >> default implementations, so backwards compatibility won't >>>>>> be an >>>>>> > > >> issue. >>>>>> > > >> > >> >>>>>> > > >> > >> 2. As you mentioned, the SplitReader interface probably >>>>>> also >>>>>> > needs >>>>>> > > >> some >>>>>> > > >> > >> change to support throttling at the split granularity. >>>>>> Can you >>>>>> > add >>>>>> > > >> that >>>>>> > > >> > >> interface change into the public interface section as >>>>>> well? >>>>>> > > >> > >> >>>>>> > > >> > >> 3. Nit, can we avoid using the method name assignSplits >>>>>> here, >>>>>> > given >>>>>> > > >> that >>>>>> > > >> > >> >>>>>> > > >> > >> it >>>>>> > > >> > >> >>>>>> > > >> > >> is not actually changing the split assignments? It seems >>>>>> > something >>>>>> > > >> like >>>>>> > > >> > >> pauseOrResumeSplits(), or adjustSplitsThrottling() is more >>>>>> > > accurate. >>>>>> > > >> > >> >>>>>> > > >> > >> Thanks, >>>>>> > > >> > >> >>>>>> > > >> > >> Jiangjie (Becket) Qin >>>>>> > > >> > >> >>>>>> > > >> > >> On Thu, Apr 21, 2022 at 11:39 PM Steven Wu < >>>>>> stevenz...@gmail.com >>>>>> > > >>>>>> > > < >>>>>> > > >> > stevenz...@gmail.com> <stevenz...@gmail.com> < >>>>>> stevenz...@gmail.com> >>>>>> > < >>>>>> > > >> > >> stevenz...@gmail.com> < >>>>>> > > >> > >> stevenz...@gmail.com> wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> However, a single source operator may read data from >>>>>> multiple >>>>>> > > >> > >> >>>>>> > > >> > >> splits/partitions, e.g., multiple Kafka partitions, such >>>>>> that >>>>>> > even >>>>>> > > >> with >>>>>> > > >> > >> watermark alignment the source operator may need to buffer >>>>>> > > excessive >>>>>> > > >> > >> >>>>>> > > >> > >> amount >>>>>> > > >> > >> >>>>>> > > >> > >> of data if one split emits data faster than another. >>>>>> > > >> > >> >>>>>> > > >> > >> For this part from the motivation section, is it >>>>>> accurate? Let's >>>>>> > > >> assume >>>>>> > > >> > >> >>>>>> > > >> > >> one >>>>>> > > >> > >> >>>>>> > > >> > >> source task consumes from 3 partitions and one of the >>>>>> partition >>>>>> > is >>>>>> > > >> > >> significantly slower. In this situation, watermark for >>>>>> this >>>>>> > source >>>>>> > > >> task >>>>>> > > >> > >> won't hold back as it is reading recent data from other >>>>>> two Kafka >>>>>> > > >> > >> partitions. As a result, it won't hold back the overall >>>>>> > watermark. >>>>>> > > I >>>>>> > > >> > >> thought the problem is that we may have late data for >>>>>> this slow >>>>>> > > >> > >> >>>>>> > > >> > >> partition. >>>>>> > > >> > >> >>>>>> > > >> > >> I have another question about the restart. Say split >>>>>> alignment is >>>>>> > > >> > >> triggered. checkpoint is completed. job failed and >>>>>> restored from >>>>>> > > the >>>>>> > > >> > last >>>>>> > > >> > >> checkpoint. because alignment decision is not >>>>>> checkpointed, >>>>>> > > initially >>>>>> > > >> > >> alignment won't be enforced until we get a cycle of >>>>>> watermark >>>>>> > > >> > aggregation >>>>>> > > >> > >> and propagation, right? Not saying this corner is a >>>>>> problem. Just >>>>>> > > >> want >>>>>> > > >> > to >>>>>> > > >> > >> understand it more. >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise < >>>>>> t...@apache.org> < >>>>>> > > >> > t...@apache.org> <t...@apache.org> <t...@apache.org> < >>>>>> > > >> > >> t...@apache.org> < >>>>>> > > >> > >> t...@apache.org> wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> Thanks for working on this! >>>>>> > > >> > >> >>>>>> > > >> > >> I wonder if "supporting" split alignment in >>>>>> SourceReaderBase and >>>>>> > > then >>>>>> > > >> > >> >>>>>> > > >> > >> doing >>>>>> > > >> > >> >>>>>> > > >> > >> nothing if the split reader does not implement >>>>>> AlignedSplitReader >>>>>> > > >> could >>>>>> > > >> > >> >>>>>> > > >> > >> be >>>>>> > > >> > >> >>>>>> > > >> > >> misleading? Perhaps WithSplitsAlignment can instead be >>>>>> added to >>>>>> > the >>>>>> > > >> > >> specific source reader (i.e. KafkaSourceReader) to make it >>>>>> > explicit >>>>>> > > >> that >>>>>> > > >> > >> the source actually supports it. >>>>>> > > >> > >> >>>>>> > > >> > >> Thanks, >>>>>> > > >> > >> Thomas >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf < >>>>>> > > kna...@apache.org> >>>>>> > > >> < >>>>>> > > >> > kna...@apache.org> <kna...@apache.org> <kna...@apache.org> < >>>>>> > > >> > >> kna...@apache.org> < >>>>>> > > >> > >> kna...@apache.org> >>>>>> > > >> > >> >>>>>> > > >> > >> wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> Hi Sebastian, Hi Dawid, >>>>>> > > >> > >> >>>>>> > > >> > >> As part of this FLIP, the `AlignedSplitReader` interface >>>>>> (aka the >>>>>> > > >> stop >>>>>> > > >> > >> >>>>>> > > >> > >> & >>>>>> > > >> > >> >>>>>> > > >> > >> resume behavior) will be implemented for Kafka and Pulsar >>>>>> only, >>>>>> > > >> > >> >>>>>> > > >> > >> correct? >>>>>> > > >> > >> >>>>>> > > >> > >> +1 in general. I believe it is valuable to complete the >>>>>> watermark >>>>>> > > >> > >> >>>>>> > > >> > >> aligned >>>>>> > > >> > >> >>>>>> > > >> > >> story with this FLIP. >>>>>> > > >> > >> >>>>>> > > >> > >> Cheers, >>>>>> > > >> > >> >>>>>> > > >> > >> Konstantin >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz < >>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org> >>>>>> > > >> > >> >>>>>> > > >> > >> wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> To be explicit, having worked on it, I support it ;) I >>>>>> think we >>>>>> > can >>>>>> > > >> > >> start a vote thread soonish, as there are no concerns so >>>>>> far. >>>>>> > > >> > >> >>>>>> > > >> > >> Best, >>>>>> > > >> > >> >>>>>> > > >> > >> Dawid >>>>>> > > >> > >> >>>>>> > > >> > >> On 13/04/2022 11:27, Sebastian Mattheis wrote: >>>>>> > > >> > >> >>>>>> > > >> > >> Dear Flink developers, >>>>>> > > >> > >> >>>>>> > > >> > >> I would like to open a discussion on FLIP 217 [1] for an >>>>>> > extension >>>>>> > > >> > >> >>>>>> > > >> > >> of >>>>>> > > >> > >> >>>>>> > > >> > >> Watermark Alignment to perform alignment also in >>>>>> SplitReaders. To >>>>>> > > >> > >> >>>>>> > > >> > >> do >>>>>> > > >> > >> >>>>>> > > >> > >> so, >>>>>> > > >> > >> >>>>>> > > >> > >> SplitReaders must be able to suspend and resume reading >>>>>> from >>>>>> > split >>>>>> > > >> > >> >>>>>> > > >> > >> sources >>>>>> > > >> > >> >>>>>> > > >> > >> where the SourceOperator coordinates and controlls >>>>>> suspend and >>>>>> > > >> > >> >>>>>> > > >> > >> resume. >>>>>> > > >> > >> >>>>>> > > >> > >> To >>>>>> > > >> > >> >>>>>> > > >> > >> gather information about current watermarks of the >>>>>> SplitReaders, >>>>>> > we >>>>>> > > >> > >> >>>>>> > > >> > >> extend >>>>>> > > >> > >> >>>>>> > > >> > >> the internal WatermarkOutputMulitplexer and report >>>>>> watermarks to >>>>>> > > >> > >> >>>>>> > > >> > >> the >>>>>> > > >> > >> >>>>>> > > >> > >> SourceOperator. >>>>>> > > >> > >> >>>>>> > > >> > >> There is a PoC for this FLIP [2], prototyped by Arvid >>>>>> Heise and >>>>>> > > >> > >> >>>>>> > > >> > >> revised >>>>>> > > >> > >> >>>>>> > > >> > >> and >>>>>> > > >> > >> >>>>>> > > >> > >> reworked by Dawid Wysakowicz (He did most of the work.) >>>>>> and me. >>>>>> > The >>>>>> > > >> > >> >>>>>> > > >> > >> changes >>>>>> > > >> > >> >>>>>> > > >> > >> are backwards compatible in a way that if affected >>>>>> components do >>>>>> > > >> > >> >>>>>> > > >> > >> not >>>>>> > > >> > >> >>>>>> > > >> > >> support split alignment the behavior is as before. >>>>>> > > >> > >> >>>>>> > > >> > >> Best, >>>>>> > > >> > >> Sebastian >>>>>> > > >> > >> >>>>>> > > >> > >> [1] >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >>>>>> > > >> >>>>>> > > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits >>>>>> > > >> > >> >>>>>> > > >> > >> [2] https://github.com/dawidwys/flink/tree/aligned-splits >>>>>> > > >> > >> >>>>>> > > >> > >> -- >>>>>> > > >> > >> >>>>>> > > >> > >> Konstantin Knaufhttps:// >>>>>> > > >> twitter.com/snntrablehttps://github.com/knaufk >>>>>> > > >> > >> >>>>>> > > >> > >> >>>>>> > > >> > >>>>>> > > >> >>>>>> > > > >>>>>> > > >>>>>> > >>>>>> >>>>>