Also one more meta comment regarding interplay of interface changes across FLIP-371 and FLIP-372:
With FLIP-371, we're already thinking about some signature changes on the TwoPhaseCommittingSink interface, such as 1) introducing `createCommitter(CommitterInitContext)`, and 2) renaming the writer side `InitContext` to `WriterInitContext` to align on naming conventions with the new `CommitterInitContext`. Instead of trying to adapt the existing TwoPhaseCommittingSink interface, since we're likely going to have to introduce a complete new interface with FLIP-372 anyways + deprecate existing TwoPhaseCommittingSink, what do you think about only introducing the above signature changes in the new interface and just leaving the old one as is? Having access to the new features (transforming committables / runtime context for committer initialization) would be motivation for implementations to migrate as soon as possible. On Tue, Oct 24, 2023 at 5:00 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Peter, > > Thanks a lot for starting this FLIP! > > I agree that the current TwoPhaseCommittingSink interfaces is limiting in > that it assumes 1) committers have the same parallelism as writers, and 2) > writers immediately produce finalized committables. This FLIP captures the > problem pretty well, and I do think there are use cases for a more general > flexible interface outside of the Iceberg connector as well. > > In terms of the abstraction layering, I was wondering if you've also > considered this approach which I've quickly sketched in my local fork: > https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae > > With this approach, the sink translator always expect that 2PC sink > implementations should extend `TwoPhaseCommittingSinkBase` and therefore > assumes that a pre-commit topology always exist. For simple 2PC sinks that > do not require transforming committables, we would ship (for convenience) > an additional `SimpleTwoPhaseCommittingSinkBase` where the pre-commit > topology is a no-op passthrough. With that we avoid some of the > "boilerplates" where 2PC sinks with pre-commit topology requires > implementing two interfaces, as proposed in the FLIP. > > Quick thought: regarding the awkwardness you mention in the end with sinks > that have post commit topologies, but no pre-commit topologies - > Alternative to the mixin approach in the FLIP, it might make sense to > consider a builder approach for constructing 2PC sinks, which should also > give users type-safety at compile time while not having the awkwardness > with all the types involved. Something along the lines of: > > ``` > new TwoPhaseCommittingSinkBuilder(writerSupplier, committerSupplier) > .withPreCommitTopology(writerResultsStream -> ...) // > Function<DataStream<WriterResultT>, DataStream<CommT>> > .withPostCommitTopology(committablesStream -> ...) // > Consumer<DataStream<CommT>> > .withPreWriteTopology(inputStream -> ...) // > Function<DataStream<InputT>, DataStream<InputT>> > .build(); > ``` > > We could probably do some validation in the build() method, e.g. if writer > / committer have different types, then clearly a pre-commit topology should > have been defined to transform intermediate writer results. > > Obviously, this would take generalization of the TwoPhaseCommittingSink > interface to the extreme, where we just have one interface with all of the > pre-commit / pre-write / post-commit methods built-in, and users would use > the builder as the entrypoint to opt-in / opt-out as needed. The upside is > that the SinkTransformationTranslator logic will become much less cluttered. > > I'll need to experiment the builder approach a bit more to see if it makes > sense at all, but wanted to throw out the idea earlier to see what you > think. > > On Mon, Oct 9, 2023 at 6:59 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Hi Team, >> >> Did some experimenting and found the originally proposed solution to be a >> bit awkward for cases where WithPostCommitTopology was needed but we do >> not >> need the WithPreCommitTopology transformation. >> The flexibility of the new API would be better if we would use a mixin >> like >> approach. The interfaces would only be used to define the specific >> required >> methods, and they would not need to extend the original >> TwoPhaseCommittingSink interface too. >> >> Since the interfaces WithPreCommitTopology and the WithPostCommitTopology >> interfaces are still Experimental, after talking to Gyula offline, I have >> updated the FLIP to use this new approach. >> >> Any comments, thoughts are welcome. >> >> Thanks, >> Peter >> >> Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. okt. >> 5., >> Cs, 16:04): >> >> > Hi Team, >> > >> > In my previous email[1] I have described our challenges migrating the >> > existing Iceberg SinkFunction based implementation, to the new SinkV2 >> based >> > implementation. >> > >> > As a result of the discussion around that topic, I have created the >> > FLIP-371 [2] to address the Committer related changes, and now I have >> > created a companion FLIP-372 [3] to address the WithPreCommitTopology >> > related issues. >> > >> > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology to alter >> the >> > type of the Committable >> > >> > The main goal of the FLIP-372 is to extend the currently existing >> > TwoPhaseCommittingSink API by adding the possibility to have a >> > PreCommitTopology where the input of and the output types of the pre >> commit >> > transformation are different. >> > >> > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink >> > WithPreCommitTopology to alter the type of the Committable >> > < >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable >> > >> > >> > Please use this thread to discuss the FLIP related questions, proposals, >> > and feedback. >> > >> > Thanks, >> > Peter >> > >> > - [1] https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn >> > - [2] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink >> > - [3] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable >> > >> > >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable >> > >> >