+1 Thanks, Peter. Based on the consensus in the recent thread on FLIP-371 [1] I agree that this is the right approach. I made some minor edits to the FLIP, which looks good to me now.
[1] https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57 On Wed, Dec 13, 2023 at 5:30 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > Thank you Peter Vary for updating the FLIP based on the discussions. > I really like the improvements introduced by the mixin interfaces which now > aligns much better with the source and table connectors. > > While this introduces some breaking changes to the existing connectors, > this is a technical debt that we need to resolve as soon as possible and > fully before 2.0. > > +1 from my side. > > I am cc'ing some folks participating in the other threads, sorry about that > :) > > Cheers, > Gyula > > On Wed, Dec 13, 2023 at 4:14 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > > > I have updated the FLIP-372 [1] based on the comments from multiple > > sources. Moved to the mixin approach as this seems to be the consensus > > based on this thread [2] > > Also created a draft implementation [3] PR, so I can test the changes and > > default implementations (no new tests yet) > > Please provide your feedback, so I can address your questions, comments > and > > then we can move forward to voting. > > > > Thanks, > > Peter > > > > [1] - > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > > [2] - https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57 > > [3] - https://github.com/apache/flink/pull/23912 > > > > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. dec. > > 11., > > H, 14:28): > > > > > We identified another issue with the current Sink API in a thread [1] > > > related to FLIP-371 [2]. Currently it is not possible to evolve the > > > Sink.createWriter method with deprecation, because StatefulSink and > > > TwoPhaseCommittingSink has methods with the same name and parameters, > but > > > narrowed down return type (StatefulSinkWriter, > PrecommittingSinkWriter). > > > > > > To make the Sink API evolvable, we minimally have to remove these. > > > > > > The participants there also pointed out, that the Source API also uses > > > mixin interfaces (SupportsHandleExecutionAttemptSourceEvent, > > > SupportsIntermediateNoMoreSplits) in some cases. My observation is that > > it > > > has inheritance as well (ReaderOutput, ExternallyInducedSourceReader) > > > > > > I have created a draft API along these lines in a branch [3] where only > > > the last commit is relevant [4]. This implementation would follow the > > same > > > patterns as the current Source API. > > > > > > I see two different general approaches here, and I would like to hear > > your > > > preferences: > > > - Keep the changes minimal, stick to the current Sink API design. We > > > introduce the required new combination of interfaces > > > (TwoPhaseCommttingSinkWithPreCommitTopology, > > > WithPostCommitTopologyWithPreCommitTopology), and do not change the API > > > structure. > > > - Pros: > > > - Minimal change - smaller rewrite on the connector side > > > - Type checks happen on compile time > > > - Cons: > > > - Harder to evolve > > > - The number of interfaces increases with the possible > > > combinations > > > - Inconsistent API patterns wrt Source API - harder for > > > developers to understand > > > - Migrate to a model similar to the Source API. We create mixin > > interfaces > > > for SupportsCommitter, SupportsWriterState, SupportsPreCommitTopology, > > > SupportsPostCommitTopology, SupportsPreWriteTopology. > > > - Pros: > > > - Better evolvability > > > - Consistent with the Source API > > > - Cons: > > > - The connectors need to change their inheritance patterns > (after > > > the deprecation period) if they are using any of the more complicated > > Sinks. > > > - Type checks need to use `instanceof`, which could happen on > DAG > > > generation time. Also, if the developer fails to correctly match the > > > generic types on the mixin interfaces, the error will only surface > during > > > execution time - when the job tries to process the first record > > > > > > I personally prefer the Mixin approach for easier evolvability and > better > > > consistency, but I would like to hear your thoughts, so I can flash out > > the > > > chosen approach in FLIP-372 > > > > > > Thanks, > > > Peter > > > > > > [1] - https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57 > > > [2] - > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink > > > [3] - https://github.com/pvary/flink/tree/mixin_demo > > > [4] - > > > > > > https://github.com/pvary/flink/commit/acfc09f4c846f983f633bbf0c902aea49aa6b156 > > > > > > > > > On Fri, Nov 24, 2023, 11:38 Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > > >> Hi Peter! > > >> > > >> Thank you for the analysis of the options. > > >> > > >> I don't really have a strong opinion, but in general I am in favor of > > the > > >> mix in style interfaces. > > >> We follow the same approach for table sources / sinks as well. > > >> > > >> Some other benefits I see in addition to what you mentioned: > > >> - Easier to introduce new experimental / public-evolving interfaces > in > > >> the > > >> future > > >> - Easier to declare parts of the api stable going forward as it's not > > all > > >> or nothing > > >> > > >> The ability to do proper compile time validation is definitely a > > downside > > >> but this should mostly make initial development a little harder I > > believe. > > >> > > >> Cheers, > > >> Gyula > > >> > > >> On Thu, Nov 23, 2023 at 1:25 PM Péter Váry < > peter.vary.apa...@gmail.com > > > > > >> wrote: > > >> > > >> > We had a longer discussion with Gordon yesterday. > > >> > The main conclusion was that moving to a completely new interface is > > not > > >> > justified, and we try to improve the current one. > > >> > > > >> > Another ask from Gordon was to check when the user will be notified > if > > >> the > > >> > parameter types are incorrect using the mixin approach. > > >> > Imagine the type definition below: > > >> > > > >> > private static class > > >> > TestTwoPhaseCommittingSinkWithPreCommitTopologyWrongMixin > > >> > implements > > >> > TwoPhaseCommittingSinkWithPreCommitTopology<Integer, Long, String>, > > >> > WithPreCommitTopology<Boolean, Void> { > > >> > > > >> > The parametrization of the above interfaces contradicts each other: > > >> > > > >> > - TwoPhaseCommittingSinkWithPreCommitToplogy > > >> > - Input - Interger > > >> > - WriterResult - Long > > >> > - Committable - String) > > >> > - WithPreCommitToplogy > > >> > - WriteResult - Boolean > > >> > - Committable - Void > > >> > > > >> > > > >> > Sadly, I was not able to find a solution where we could notify the > > user > > >> at > > >> > job startup time. The first error the user will get is when the > first > > >> > record is processed/committed. Talked with Gyula Fora, and we > > discussed > > >> the > > >> > possibility to use the TypeExtractor to get the types. We have > decided > > >> that > > >> > it could work in some cases, but would not be a generic solution. > See > > >> the > > >> > "NOTES FOR USERS OF THIS CLASS" [1] > > >> > > > >> > This missing feature would justify abandoning the mixin solution, > and > > >> > sticking to creating individual interfaces, like: > > >> > > > >> > - *TwoPhaseCommittingSink* - When no pre-commit topology is > needed > > - > > >> > kept because it is enough for most of the use-cases. > > >> > - *TwoPhaseCommittingSinkWithPreCommitTopology* - When pre-commit > > >> > topology is needed with transformation in the pre commit stage - > > the > > >> new > > >> > generic interface (could be internal) > > >> > - *WithPreWriteTopology* - kept as it is > > >> > - *WithPreCommitTopology* - extends > > >> > TwoPhaseCommittingSinkWithPreCommitTopology with the > transformation > > >> > method > > >> > (classes from streaming package is needed, so can not be merged > > with > > >> > TwoPhaseCommittingSinkWithPreCommitTopology) > > >> > - *WithPostCommitTopology* - kept as it is (extends only > > >> > TwoPhaseCommittingSink, so no pre-commit topology is allowed) > > >> > - *WithPostCommitTopologyWithPreCommitTopology* - extends > > >> > WithPreCommitTopology with the same method as > > WithPostCommitTopology > > >> > > > >> > I don't really like the > `WithPostCommitTopologyWithPreCommitTopology` > > >> > complex interface, and if we start adding new features then the > number > > >> of > > >> > the interfaces could exponentially grow, but I agree that the type > > >> checking > > >> > is important. I don't have a strong opinion, but I am inclined to > vote > > >> for > > >> > moving in the direction of the individual intefaces. > > >> > > > >> > What do you prefer? > > >> > > > >> > 1. Go with the mixin approach > > >> > 1. Better extendability > > >> > 2. Fewer interfaces (only with 1 now, but later this could be > > >> more) > > >> > 3. Easier to understand (IMHO) > > >> > 2. Stick with the combined interfaces approach (some mixin, like > > >> > WithPreWriteTopology, some combined like > > >> > WithPostCommitTopologyWithPreCommitTopology) > > >> > 1. Better error messages > > >> > 2. Less disruptive change (still breaking for implementations > of > > >> > WithPreCommitTopology) > > >> > 3. Do you have a better idea? > > >> > > > >> > > > >> > Thanks, > > >> > Peter > > >> > > > >> > CC: Jiabao Sun - as he might be interested in this discussion > > >> > > > >> > [1] - > > >> > > > >> > > > >> > > > https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/TypeExtractor.html > > >> > > > >> > > > >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. > > okt. > > >> > 25., > > >> > Sze, 16:02): > > >> > > > >> > > Hi Gordon, > > >> > > > > >> > > Thanks for the review, here are my thoughts: > > >> > > > > >> > > > In terms of the abstraction layering, I was wondering if you've > > also > > >> > > considered this approach which I've quickly sketched in my local > > fork: > > >> > > > > >> > > > >> > > > https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae > > >> > > > > >> > > I think we have a few design issues here: > > >> > > - How to handle the old interface where the transformation is not > > >> needed > > >> > > in the pre-commit phase? - As you have proposed, default method > > >> > > implementation is a nice solution here, as we do not really have > to > > >> > change > > >> > > everything in the transformation process. > > >> > > - How to handle the WithPostCommitTopology interface? - Currently > > the > > >> > > parent interface for the sink with a post commit topology is > > strictly > > >> a > > >> > > single interface (TwoPhaseCommittingSink) and we want to add this > to > > >> both > > >> > > type of sinks (new - with transformation / old - without > > >> transformation). > > >> > > In this case we could get away with creating > > OldTwoPhaseCommittingSink > > >> > > WithPostCommitTopology and > > >> > NewTwoPhaseCommittingSinkWithPostCommitTopology, > > >> > > but this is not a good approach for future extensibility. I tend > to > > >> > prefer > > >> > > a real mixin approach to creating multiple interfaces for this. > > >> > > > > >> > > > Quick thought: regarding the awkwardness you mention in the end > > with > > >> > > sinks that have post commit topologies, but no pre-commit > > topologies - > > >> > > Alternative to the mixin approach in the FLIP, it might make sense > > to > > >> > > consider a builder approach for constructing 2PC sinks > > >> > > > > >> > > TBH, after providing the possibility to transform in the > pre-commit > > >> > phase, > > >> > > I have started to think about the possible different > > generalizations: > > >> > > - Why not have the possibility to have a different return type of > > the > > >> > > pre-write phase? - While we have the possibility to transform the > > data > > >> > in a > > >> > > preceding map phase before the Sink, but for some Sinks might want > > to > > >> > > encapsulate these transformations before the writes. > > >> > > - Why not have the explicit possibility to change the return type > of > > >> the > > >> > > committer? - We might not want to emit the incoming Committable, > we > > >> might > > >> > > want to use the commit hash - or any other data generated by the > > >> > committer > > >> > > - in the post-commit topology. So in some cases it might make > sense > > >> for > > >> > the > > >> > > committer to emit elements with different types than the input. > > >> > > - Why not have everything as a mixin interface and define a Sink > > this > > >> way > > >> > > (very-very similar to your builder approach) > > >> > > > > >> > > But I currently do not see explicit requirements for these > features, > > >> and > > >> > > it would result in another full rewrite of the Sink API which had > a > > >> > really > > >> > > troubled history with several rewrites in the recent releases, so > I > > >> > decided > > >> > > against these big changes and kept the changes minimal. > > >> > > > > >> > > So, while I personally would love to see the Builder solution, I > am > > >> > afraid > > >> > > that the Flink community needs some stability around the Sink API > > for > > >> > now, > > >> > > so the different Sinks could start to use this new feature. > > >> > > > > >> > > What do you think? > > >> > > > > >> > > Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: > 2023. > > >> okt. > > >> > > 25., Sze, 2:01): > > >> > > > > >> > >> Hi Peter, > > >> > >> > > >> > >> Thanks a lot for starting this FLIP! > > >> > >> > > >> > >> I agree that the current TwoPhaseCommittingSink interfaces is > > >> limiting > > >> > in > > >> > >> that it assumes 1) committers have the same parallelism as > writers, > > >> and > > >> > 2) > > >> > >> writers immediately produce finalized committables. This FLIP > > >> captures > > >> > the > > >> > >> problem pretty well, and I do think there are use cases for a > more > > >> > general > > >> > >> flexible interface outside of the Iceberg connector as well. > > >> > >> > > >> > >> In terms of the abstraction layering, I was wondering if you've > > also > > >> > >> considered this approach which I've quickly sketched in my local > > >> fork: > > >> > >> > > >> > >> > > >> > > > >> > > > https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae > > >> > >> > > >> > >> With this approach, the sink translator always expect that 2PC > sink > > >> > >> implementations should extend `TwoPhaseCommittingSinkBase` and > > >> therefore > > >> > >> assumes that a pre-commit topology always exist. For simple 2PC > > sinks > > >> > that > > >> > >> do not require transforming committables, we would ship (for > > >> > convenience) > > >> > >> an additional `SimpleTwoPhaseCommittingSinkBase` where the > > pre-commit > > >> > >> topology is a no-op passthrough. With that we avoid some of the > > >> > >> "boilerplates" where 2PC sinks with pre-commit topology requires > > >> > >> implementing two interfaces, as proposed in the FLIP. > > >> > >> > > >> > >> Quick thought: regarding the awkwardness you mention in the end > > with > > >> > sinks > > >> > >> that have post commit topologies, but no pre-commit topologies - > > >> > >> Alternative to the mixin approach in the FLIP, it might make > sense > > to > > >> > >> consider a builder approach for constructing 2PC sinks, which > > should > > >> > also > > >> > >> give users type-safety at compile time while not having the > > >> awkwardness > > >> > >> with all the types involved. Something along the lines of: > > >> > >> > > >> > >> ``` > > >> > >> new TwoPhaseCommittingSinkBuilder(writerSupplier, > > committerSupplier) > > >> > >> .withPreCommitTopology(writerResultsStream -> ...) // > > >> > >> Function<DataStream<WriterResultT>, DataStream<CommT>> > > >> > >> .withPostCommitTopology(committablesStream -> ...) // > > >> > >> Consumer<DataStream<CommT>> > > >> > >> .withPreWriteTopology(inputStream -> ...) // > > >> > >> Function<DataStream<InputT>, DataStream<InputT>> > > >> > >> .build(); > > >> > >> ``` > > >> > >> > > >> > >> We could probably do some validation in the build() method, e.g. > if > > >> > writer > > >> > >> / committer have different types, then clearly a pre-commit > > topology > > >> > >> should > > >> > >> have been defined to transform intermediate writer results. > > >> > >> > > >> > >> Obviously, this would take generalization of the > > >> TwoPhaseCommittingSink > > >> > >> interface to the extreme, where we just have one interface with > all > > >> of > > >> > the > > >> > >> pre-commit / pre-write / post-commit methods built-in, and users > > >> would > > >> > use > > >> > >> the builder as the entrypoint to opt-in / opt-out as needed. The > > >> upside > > >> > is > > >> > >> that the SinkTransformationTranslator logic will become much less > > >> > >> cluttered. > > >> > >> > > >> > >> I'll need to experiment the builder approach a bit more to see if > > it > > >> > makes > > >> > >> sense at all, but wanted to throw out the idea earlier to see > what > > >> you > > >> > >> think. > > >> > >> > > >> > >> On Mon, Oct 9, 2023 at 6:59 AM Péter Váry < > > >> peter.vary.apa...@gmail.com> > > >> > >> wrote: > > >> > >> > > >> > >> > Hi Team, > > >> > >> > > > >> > >> > Did some experimenting and found the originally proposed > solution > > >> to > > >> > be > > >> > >> a > > >> > >> > bit awkward for cases where WithPostCommitTopology was needed > but > > >> we > > >> > do > > >> > >> not > > >> > >> > need the WithPreCommitTopology transformation. > > >> > >> > The flexibility of the new API would be better if we would use > a > > >> mixin > > >> > >> like > > >> > >> > approach. The interfaces would only be used to define the > > specific > > >> > >> required > > >> > >> > methods, and they would not need to extend the original > > >> > >> > TwoPhaseCommittingSink interface too. > > >> > >> > > > >> > >> > Since the interfaces WithPreCommitTopology and the > > >> > >> WithPostCommitTopology > > >> > >> > interfaces are still Experimental, after talking to Gyula > > offline, > > >> I > > >> > >> have > > >> > >> > updated the FLIP to use this new approach. > > >> > >> > > > >> > >> > Any comments, thoughts are welcome. > > >> > >> > > > >> > >> > Thanks, > > >> > >> > Peter > > >> > >> > > > >> > >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: > > 2023. > > >> > okt. > > >> > >> 5., > > >> > >> > Cs, 16:04): > > >> > >> > > > >> > >> > > Hi Team, > > >> > >> > > > > >> > >> > > In my previous email[1] I have described our challenges > > migrating > > >> > the > > >> > >> > > existing Iceberg SinkFunction based implementation, to the > new > > >> > SinkV2 > > >> > >> > based > > >> > >> > > implementation. > > >> > >> > > > > >> > >> > > As a result of the discussion around that topic, I have > created > > >> the > > >> > >> > > FLIP-371 [2] to address the Committer related changes, and > now > > I > > >> > have > > >> > >> > > created a companion FLIP-372 [3] to address the > > >> > WithPreCommitTopology > > >> > >> > > related issues. > > >> > >> > > > > >> > >> > > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology > to > > >> > alter > > >> > >> the > > >> > >> > > type of the Committable > > >> > >> > > > > >> > >> > > The main goal of the FLIP-372 is to extend the currently > > existing > > >> > >> > > TwoPhaseCommittingSink API by adding the possibility to have > a > > >> > >> > > PreCommitTopology where the input of and the output types of > > the > > >> pre > > >> > >> > commit > > >> > >> > > transformation are different. > > >> > >> > > > > >> > >> > > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink > > >> > >> > > WithPreCommitTopology to alter the type of the Committable > > >> > >> > > < > > >> > >> > > > >> > >> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > > >> > >> > > > > >> > >> > > > > >> > >> > > Please use this thread to discuss the FLIP related questions, > > >> > >> proposals, > > >> > >> > > and feedback. > > >> > >> > > > > >> > >> > > Thanks, > > >> > >> > > Peter > > >> > >> > > > > >> > >> > > - [1] > > >> > >> https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn > > >> > >> > > - [2] > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink > > >> > >> > > - [3] > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > > >> > > > >> > > >> > > >