I have updated the FLIP-372 [1] based on the comments from multiple sources. Moved to the mixin approach as this seems to be the consensus based on this thread [2] Also created a draft implementation [3] PR, so I can test the changes and default implementations (no new tests yet) Please provide your feedback, so I can address your questions, comments and then we can move forward to voting.
Thanks, Peter [1] - https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable [2] - https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57 [3] - https://github.com/apache/flink/pull/23912 Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. dec. 11., H, 14:28): > We identified another issue with the current Sink API in a thread [1] > related to FLIP-371 [2]. Currently it is not possible to evolve the > Sink.createWriter method with deprecation, because StatefulSink and > TwoPhaseCommittingSink has methods with the same name and parameters, but > narrowed down return type (StatefulSinkWriter, PrecommittingSinkWriter). > > To make the Sink API evolvable, we minimally have to remove these. > > The participants there also pointed out, that the Source API also uses > mixin interfaces (SupportsHandleExecutionAttemptSourceEvent, > SupportsIntermediateNoMoreSplits) in some cases. My observation is that it > has inheritance as well (ReaderOutput, ExternallyInducedSourceReader) > > I have created a draft API along these lines in a branch [3] where only > the last commit is relevant [4]. This implementation would follow the same > patterns as the current Source API. > > I see two different general approaches here, and I would like to hear your > preferences: > - Keep the changes minimal, stick to the current Sink API design. We > introduce the required new combination of interfaces > (TwoPhaseCommttingSinkWithPreCommitTopology, > WithPostCommitTopologyWithPreCommitTopology), and do not change the API > structure. > - Pros: > - Minimal change - smaller rewrite on the connector side > - Type checks happen on compile time > - Cons: > - Harder to evolve > - The number of interfaces increases with the possible > combinations > - Inconsistent API patterns wrt Source API - harder for > developers to understand > - Migrate to a model similar to the Source API. We create mixin interfaces > for SupportsCommitter, SupportsWriterState, SupportsPreCommitTopology, > SupportsPostCommitTopology, SupportsPreWriteTopology. > - Pros: > - Better evolvability > - Consistent with the Source API > - Cons: > - The connectors need to change their inheritance patterns (after > the deprecation period) if they are using any of the more complicated Sinks. > - Type checks need to use `instanceof`, which could happen on DAG > generation time. Also, if the developer fails to correctly match the > generic types on the mixin interfaces, the error will only surface during > execution time - when the job tries to process the first record > > I personally prefer the Mixin approach for easier evolvability and better > consistency, but I would like to hear your thoughts, so I can flash out the > chosen approach in FLIP-372 > > Thanks, > Peter > > [1] - https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57 > [2] - > https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink > [3] - https://github.com/pvary/flink/tree/mixin_demo > [4] - > https://github.com/pvary/flink/commit/acfc09f4c846f983f633bbf0c902aea49aa6b156 > > > On Fri, Nov 24, 2023, 11:38 Gyula Fóra <gyula.f...@gmail.com> wrote: > >> Hi Peter! >> >> Thank you for the analysis of the options. >> >> I don't really have a strong opinion, but in general I am in favor of the >> mix in style interfaces. >> We follow the same approach for table sources / sinks as well. >> >> Some other benefits I see in addition to what you mentioned: >> - Easier to introduce new experimental / public-evolving interfaces in >> the >> future >> - Easier to declare parts of the api stable going forward as it's not all >> or nothing >> >> The ability to do proper compile time validation is definitely a downside >> but this should mostly make initial development a little harder I believe. >> >> Cheers, >> Gyula >> >> On Thu, Nov 23, 2023 at 1:25 PM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >> > We had a longer discussion with Gordon yesterday. >> > The main conclusion was that moving to a completely new interface is not >> > justified, and we try to improve the current one. >> > >> > Another ask from Gordon was to check when the user will be notified if >> the >> > parameter types are incorrect using the mixin approach. >> > Imagine the type definition below: >> > >> > private static class >> > TestTwoPhaseCommittingSinkWithPreCommitTopologyWrongMixin >> > implements >> > TwoPhaseCommittingSinkWithPreCommitTopology<Integer, Long, String>, >> > WithPreCommitTopology<Boolean, Void> { >> > >> > The parametrization of the above interfaces contradicts each other: >> > >> > - TwoPhaseCommittingSinkWithPreCommitToplogy >> > - Input - Interger >> > - WriterResult - Long >> > - Committable - String) >> > - WithPreCommitToplogy >> > - WriteResult - Boolean >> > - Committable - Void >> > >> > >> > Sadly, I was not able to find a solution where we could notify the user >> at >> > job startup time. The first error the user will get is when the first >> > record is processed/committed. Talked with Gyula Fora, and we discussed >> the >> > possibility to use the TypeExtractor to get the types. We have decided >> that >> > it could work in some cases, but would not be a generic solution. See >> the >> > "NOTES FOR USERS OF THIS CLASS" [1] >> > >> > This missing feature would justify abandoning the mixin solution, and >> > sticking to creating individual interfaces, like: >> > >> > - *TwoPhaseCommittingSink* - When no pre-commit topology is needed - >> > kept because it is enough for most of the use-cases. >> > - *TwoPhaseCommittingSinkWithPreCommitTopology* - When pre-commit >> > topology is needed with transformation in the pre commit stage - the >> new >> > generic interface (could be internal) >> > - *WithPreWriteTopology* - kept as it is >> > - *WithPreCommitTopology* - extends >> > TwoPhaseCommittingSinkWithPreCommitTopology with the transformation >> > method >> > (classes from streaming package is needed, so can not be merged with >> > TwoPhaseCommittingSinkWithPreCommitTopology) >> > - *WithPostCommitTopology* - kept as it is (extends only >> > TwoPhaseCommittingSink, so no pre-commit topology is allowed) >> > - *WithPostCommitTopologyWithPreCommitTopology* - extends >> > WithPreCommitTopology with the same method as WithPostCommitTopology >> > >> > I don't really like the `WithPostCommitTopologyWithPreCommitTopology` >> > complex interface, and if we start adding new features then the number >> of >> > the interfaces could exponentially grow, but I agree that the type >> checking >> > is important. I don't have a strong opinion, but I am inclined to vote >> for >> > moving in the direction of the individual intefaces. >> > >> > What do you prefer? >> > >> > 1. Go with the mixin approach >> > 1. Better extendability >> > 2. Fewer interfaces (only with 1 now, but later this could be >> more) >> > 3. Easier to understand (IMHO) >> > 2. Stick with the combined interfaces approach (some mixin, like >> > WithPreWriteTopology, some combined like >> > WithPostCommitTopologyWithPreCommitTopology) >> > 1. Better error messages >> > 2. Less disruptive change (still breaking for implementations of >> > WithPreCommitTopology) >> > 3. Do you have a better idea? >> > >> > >> > Thanks, >> > Peter >> > >> > CC: Jiabao Sun - as he might be interested in this discussion >> > >> > [1] - >> > >> > >> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/TypeExtractor.html >> > >> > >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. okt. >> > 25., >> > Sze, 16:02): >> > >> > > Hi Gordon, >> > > >> > > Thanks for the review, here are my thoughts: >> > > >> > > > In terms of the abstraction layering, I was wondering if you've also >> > > considered this approach which I've quickly sketched in my local fork: >> > > >> > >> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae >> > > >> > > I think we have a few design issues here: >> > > - How to handle the old interface where the transformation is not >> needed >> > > in the pre-commit phase? - As you have proposed, default method >> > > implementation is a nice solution here, as we do not really have to >> > change >> > > everything in the transformation process. >> > > - How to handle the WithPostCommitTopology interface? - Currently the >> > > parent interface for the sink with a post commit topology is strictly >> a >> > > single interface (TwoPhaseCommittingSink) and we want to add this to >> both >> > > type of sinks (new - with transformation / old - without >> transformation). >> > > In this case we could get away with creating OldTwoPhaseCommittingSink >> > > WithPostCommitTopology and >> > NewTwoPhaseCommittingSinkWithPostCommitTopology, >> > > but this is not a good approach for future extensibility. I tend to >> > prefer >> > > a real mixin approach to creating multiple interfaces for this. >> > > >> > > > Quick thought: regarding the awkwardness you mention in the end with >> > > sinks that have post commit topologies, but no pre-commit topologies - >> > > Alternative to the mixin approach in the FLIP, it might make sense to >> > > consider a builder approach for constructing 2PC sinks >> > > >> > > TBH, after providing the possibility to transform in the pre-commit >> > phase, >> > > I have started to think about the possible different generalizations: >> > > - Why not have the possibility to have a different return type of the >> > > pre-write phase? - While we have the possibility to transform the data >> > in a >> > > preceding map phase before the Sink, but for some Sinks might want to >> > > encapsulate these transformations before the writes. >> > > - Why not have the explicit possibility to change the return type of >> the >> > > committer? - We might not want to emit the incoming Committable, we >> might >> > > want to use the commit hash - or any other data generated by the >> > committer >> > > - in the post-commit topology. So in some cases it might make sense >> for >> > the >> > > committer to emit elements with different types than the input. >> > > - Why not have everything as a mixin interface and define a Sink this >> way >> > > (very-very similar to your builder approach) >> > > >> > > But I currently do not see explicit requirements for these features, >> and >> > > it would result in another full rewrite of the Sink API which had a >> > really >> > > troubled history with several rewrites in the recent releases, so I >> > decided >> > > against these big changes and kept the changes minimal. >> > > >> > > So, while I personally would love to see the Builder solution, I am >> > afraid >> > > that the Flink community needs some stability around the Sink API for >> > now, >> > > so the different Sinks could start to use this new feature. >> > > >> > > What do you think? >> > > >> > > Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2023. >> okt. >> > > 25., Sze, 2:01): >> > > >> > >> Hi Peter, >> > >> >> > >> Thanks a lot for starting this FLIP! >> > >> >> > >> I agree that the current TwoPhaseCommittingSink interfaces is >> limiting >> > in >> > >> that it assumes 1) committers have the same parallelism as writers, >> and >> > 2) >> > >> writers immediately produce finalized committables. This FLIP >> captures >> > the >> > >> problem pretty well, and I do think there are use cases for a more >> > general >> > >> flexible interface outside of the Iceberg connector as well. >> > >> >> > >> In terms of the abstraction layering, I was wondering if you've also >> > >> considered this approach which I've quickly sketched in my local >> fork: >> > >> >> > >> >> > >> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae >> > >> >> > >> With this approach, the sink translator always expect that 2PC sink >> > >> implementations should extend `TwoPhaseCommittingSinkBase` and >> therefore >> > >> assumes that a pre-commit topology always exist. For simple 2PC sinks >> > that >> > >> do not require transforming committables, we would ship (for >> > convenience) >> > >> an additional `SimpleTwoPhaseCommittingSinkBase` where the pre-commit >> > >> topology is a no-op passthrough. With that we avoid some of the >> > >> "boilerplates" where 2PC sinks with pre-commit topology requires >> > >> implementing two interfaces, as proposed in the FLIP. >> > >> >> > >> Quick thought: regarding the awkwardness you mention in the end with >> > sinks >> > >> that have post commit topologies, but no pre-commit topologies - >> > >> Alternative to the mixin approach in the FLIP, it might make sense to >> > >> consider a builder approach for constructing 2PC sinks, which should >> > also >> > >> give users type-safety at compile time while not having the >> awkwardness >> > >> with all the types involved. Something along the lines of: >> > >> >> > >> ``` >> > >> new TwoPhaseCommittingSinkBuilder(writerSupplier, committerSupplier) >> > >> .withPreCommitTopology(writerResultsStream -> ...) // >> > >> Function<DataStream<WriterResultT>, DataStream<CommT>> >> > >> .withPostCommitTopology(committablesStream -> ...) // >> > >> Consumer<DataStream<CommT>> >> > >> .withPreWriteTopology(inputStream -> ...) // >> > >> Function<DataStream<InputT>, DataStream<InputT>> >> > >> .build(); >> > >> ``` >> > >> >> > >> We could probably do some validation in the build() method, e.g. if >> > writer >> > >> / committer have different types, then clearly a pre-commit topology >> > >> should >> > >> have been defined to transform intermediate writer results. >> > >> >> > >> Obviously, this would take generalization of the >> TwoPhaseCommittingSink >> > >> interface to the extreme, where we just have one interface with all >> of >> > the >> > >> pre-commit / pre-write / post-commit methods built-in, and users >> would >> > use >> > >> the builder as the entrypoint to opt-in / opt-out as needed. The >> upside >> > is >> > >> that the SinkTransformationTranslator logic will become much less >> > >> cluttered. >> > >> >> > >> I'll need to experiment the builder approach a bit more to see if it >> > makes >> > >> sense at all, but wanted to throw out the idea earlier to see what >> you >> > >> think. >> > >> >> > >> On Mon, Oct 9, 2023 at 6:59 AM Péter Váry < >> peter.vary.apa...@gmail.com> >> > >> wrote: >> > >> >> > >> > Hi Team, >> > >> > >> > >> > Did some experimenting and found the originally proposed solution >> to >> > be >> > >> a >> > >> > bit awkward for cases where WithPostCommitTopology was needed but >> we >> > do >> > >> not >> > >> > need the WithPreCommitTopology transformation. >> > >> > The flexibility of the new API would be better if we would use a >> mixin >> > >> like >> > >> > approach. The interfaces would only be used to define the specific >> > >> required >> > >> > methods, and they would not need to extend the original >> > >> > TwoPhaseCommittingSink interface too. >> > >> > >> > >> > Since the interfaces WithPreCommitTopology and the >> > >> WithPostCommitTopology >> > >> > interfaces are still Experimental, after talking to Gyula offline, >> I >> > >> have >> > >> > updated the FLIP to use this new approach. >> > >> > >> > >> > Any comments, thoughts are welcome. >> > >> > >> > >> > Thanks, >> > >> > Peter >> > >> > >> > >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. >> > okt. >> > >> 5., >> > >> > Cs, 16:04): >> > >> > >> > >> > > Hi Team, >> > >> > > >> > >> > > In my previous email[1] I have described our challenges migrating >> > the >> > >> > > existing Iceberg SinkFunction based implementation, to the new >> > SinkV2 >> > >> > based >> > >> > > implementation. >> > >> > > >> > >> > > As a result of the discussion around that topic, I have created >> the >> > >> > > FLIP-371 [2] to address the Committer related changes, and now I >> > have >> > >> > > created a companion FLIP-372 [3] to address the >> > WithPreCommitTopology >> > >> > > related issues. >> > >> > > >> > >> > > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology to >> > alter >> > >> the >> > >> > > type of the Committable >> > >> > > >> > >> > > The main goal of the FLIP-372 is to extend the currently existing >> > >> > > TwoPhaseCommittingSink API by adding the possibility to have a >> > >> > > PreCommitTopology where the input of and the output types of the >> pre >> > >> > commit >> > >> > > transformation are different. >> > >> > > >> > >> > > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink >> > >> > > WithPreCommitTopology to alter the type of the Committable >> > >> > > < >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable >> > >> > > >> > >> > > >> > >> > > Please use this thread to discuss the FLIP related questions, >> > >> proposals, >> > >> > > and feedback. >> > >> > > >> > >> > > Thanks, >> > >> > > Peter >> > >> > > >> > >> > > - [1] >> > >> https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn >> > >> > > - [2] >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink >> > >> > > - [3] >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable >> > >> > > >> > >> > > >> > >> > > >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable >> > >> > > >> > >> > >> > >> >> > > >> > >> >>