I have updated the FLIP-372 [1] based on the comments from multiple
sources. Moved to the mixin approach as this seems to be the consensus
based on this thread [2]
Also created a draft implementation [3] PR, so I can test the changes and
default implementations (no new tests yet)
Please provide your feedback, so I can address your questions, comments and
then we can move forward to voting.

Thanks,
Peter

[1] -
https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
[2] - https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57
[3] - https://github.com/apache/flink/pull/23912

Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. dec. 11.,
H, 14:28):

> We identified another issue with the current Sink API in a thread [1]
> related to FLIP-371 [2]. Currently it is not possible to evolve the
> Sink.createWriter method with deprecation, because StatefulSink and
> TwoPhaseCommittingSink has methods with the same name and parameters, but
> narrowed down return type (StatefulSinkWriter, PrecommittingSinkWriter).
>
> To make the Sink API evolvable, we minimally have to remove these.
>
> The participants there also pointed out, that the Source API also uses
> mixin interfaces (SupportsHandleExecutionAttemptSourceEvent,
> SupportsIntermediateNoMoreSplits) in some cases. My observation is that it
> has inheritance as well (ReaderOutput, ExternallyInducedSourceReader)
>
> I have created a draft API along these lines in a branch [3] where only
> the last commit is relevant [4]. This implementation would follow the same
> patterns as the current Source API.
>
> I see two different general approaches here, and I would like to hear your
> preferences:
> - Keep the changes minimal, stick to the current Sink API design. We
> introduce the required new combination of interfaces
> (TwoPhaseCommttingSinkWithPreCommitTopology,
> WithPostCommitTopologyWithPreCommitTopology), and do not change the API
> structure.
>      - Pros:
>           - Minimal change - smaller rewrite on the connector side
>           - Type checks happen on compile time
>      - Cons:
>           - Harder to evolve
>           - The number of interfaces increases with the possible
> combinations
>           - Inconsistent API patterns wrt Source API - harder for
> developers to understand
> - Migrate to a model similar to the Source API. We create mixin interfaces
> for SupportsCommitter, SupportsWriterState, SupportsPreCommitTopology,
> SupportsPostCommitTopology, SupportsPreWriteTopology.
>     - Pros:
>         - Better evolvability
>         - Consistent with the Source API
>     - Cons:
>         - The connectors need to change their inheritance patterns (after
> the deprecation period) if they are using any of the more complicated Sinks.
>         - Type checks need to use `instanceof`, which could happen on DAG
> generation time. Also, if the developer fails to correctly match the
> generic types on the mixin interfaces, the error will only surface during
> execution time - when the job tries to process the first record
>
> I personally prefer the Mixin approach for easier evolvability and better
> consistency, but I would like to hear your thoughts, so I can flash out the
> chosen approach in FLIP-372
>
> Thanks,
> Peter
>
> [1] - https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57
> [2] -
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink
> [3] - https://github.com/pvary/flink/tree/mixin_demo
> [4] -
> https://github.com/pvary/flink/commit/acfc09f4c846f983f633bbf0c902aea49aa6b156
>
>
> On Fri, Nov 24, 2023, 11:38 Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> Hi Peter!
>>
>> Thank you for the analysis of the options.
>>
>> I don't really have a strong opinion, but in general I am in favor of the
>> mix in style interfaces.
>> We follow the same approach for table sources / sinks as well.
>>
>> Some other benefits I see in addition to what you mentioned:
>>  - Easier to introduce new experimental / public-evolving interfaces in
>> the
>> future
>>  - Easier to declare parts of the api stable going forward as it's not all
>> or nothing
>>
>> The ability to do proper compile time validation is definitely a downside
>> but this should mostly make initial development a little harder I believe.
>>
>> Cheers,
>> Gyula
>>
>> On Thu, Nov 23, 2023 at 1:25 PM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>> > We had a longer discussion with Gordon yesterday.
>> > The main conclusion was that moving to a completely new interface is not
>> > justified, and we try to improve the current one.
>> >
>> > Another ask from Gordon was to check when the user will be notified if
>> the
>> > parameter types are incorrect using the mixin approach.
>> > Imagine the type definition below:
>> >
>> > private static class
>> > TestTwoPhaseCommittingSinkWithPreCommitTopologyWrongMixin
>> >         implements
>> > TwoPhaseCommittingSinkWithPreCommitTopology<Integer, Long, String>,
>> >         WithPreCommitTopology<Boolean, Void> {
>> >
>> > The parametrization of the above interfaces contradicts each other:
>> >
>> >    - TwoPhaseCommittingSinkWithPreCommitToplogy
>> >       - Input - Interger
>> >       - WriterResult - Long
>> >       - Committable - String)
>> >    - WithPreCommitToplogy
>> >       - WriteResult - Boolean
>> >       - Committable - Void
>> >
>> >
>> > Sadly, I was not able to find a solution where we could notify the user
>> at
>> > job startup time. The first error the user will get is when the first
>> > record is processed/committed. Talked with Gyula Fora, and we discussed
>> the
>> > possibility to use the TypeExtractor to get the types. We have decided
>> that
>> > it could work in some cases, but would not be a generic solution. See
>> the
>> > "NOTES FOR USERS OF THIS CLASS" [1]
>> >
>> > This missing feature would justify abandoning the mixin solution, and
>> > sticking to creating individual interfaces, like:
>> >
>> >    - *TwoPhaseCommittingSink* - When no pre-commit topology is needed -
>> >    kept because it is enough for most of the use-cases.
>> >    - *TwoPhaseCommittingSinkWithPreCommitTopology* - When pre-commit
>> >    topology is needed with transformation in the pre commit stage - the
>> new
>> >    generic interface (could be internal)
>> >    - *WithPreWriteTopology* - kept as it is
>> >    - *WithPreCommitTopology* - extends
>> >    TwoPhaseCommittingSinkWithPreCommitTopology with the transformation
>> > method
>> >    (classes from streaming package is needed, so can not be merged with
>> >    TwoPhaseCommittingSinkWithPreCommitTopology)
>> >    - *WithPostCommitTopology* - kept as it is (extends only
>> >    TwoPhaseCommittingSink, so no pre-commit topology is allowed)
>> >    - *WithPostCommitTopologyWithPreCommitTopology* - extends
>> >    WithPreCommitTopology with the same method as WithPostCommitTopology
>> >
>> > I don't really like the `WithPostCommitTopologyWithPreCommitTopology`
>> > complex interface, and if we start adding new features then the number
>> of
>> > the interfaces could exponentially grow, but I agree that the type
>> checking
>> > is important. I don't have a strong opinion, but I am inclined to vote
>> for
>> > moving in the direction of the individual intefaces.
>> >
>> > What do you prefer?
>> >
>> >    1. Go with the mixin approach
>> >       1. Better extendability
>> >       2. Fewer interfaces (only with 1 now, but later this could be
>> more)
>> >       3. Easier to understand (IMHO)
>> >    2. Stick with the combined interfaces approach (some mixin, like
>> >    WithPreWriteTopology, some combined like
>> >    WithPostCommitTopologyWithPreCommitTopology)
>> >       1. Better error messages
>> >       2. Less disruptive change (still breaking for implementations of
>> >       WithPreCommitTopology)
>> >    3. Do you have a better idea?
>> >
>> >
>> > Thanks,
>> > Peter
>> >
>> > CC: Jiabao Sun - as he might be interested in this discussion
>> >
>> > [1] -
>> >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/TypeExtractor.html
>> >
>> >
>> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. okt.
>> > 25.,
>> > Sze, 16:02):
>> >
>> > > Hi Gordon,
>> > >
>> > > Thanks for the review, here are my thoughts:
>> > >
>> > > > In terms of the abstraction layering, I was wondering if you've also
>> > > considered this approach which I've quickly sketched in my local fork:
>> > >
>> >
>> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae
>> > >
>> > > I think we have a few design issues here:
>> > > - How to handle the old interface where the transformation is not
>> needed
>> > > in the pre-commit phase? - As you have proposed, default method
>> > > implementation is a nice solution here, as we do not really have to
>> > change
>> > > everything in the transformation process.
>> > > - How to handle the WithPostCommitTopology interface? - Currently the
>> > > parent interface for the sink with a post commit topology is strictly
>> a
>> > > single interface (TwoPhaseCommittingSink) and we want to add this to
>> both
>> > > type of sinks (new - with transformation / old - without
>> transformation).
>> > > In this case we could get away with creating OldTwoPhaseCommittingSink
>> > > WithPostCommitTopology and
>> > NewTwoPhaseCommittingSinkWithPostCommitTopology,
>> > > but this is not a good approach for future extensibility. I tend to
>> > prefer
>> > > a real mixin approach to creating multiple interfaces for this.
>> > >
>> > > > Quick thought: regarding the awkwardness you mention in the end with
>> > > sinks that have post commit topologies, but no pre-commit topologies -
>> > > Alternative to the mixin approach in the FLIP, it might make sense to
>> > > consider a builder approach for constructing 2PC sinks
>> > >
>> > > TBH, after providing the possibility to transform in the pre-commit
>> > phase,
>> > > I have started to think about the possible different generalizations:
>> > > - Why not have the possibility to have a different return type of the
>> > > pre-write phase? - While we have the possibility to transform the data
>> > in a
>> > > preceding map phase before the Sink, but for some Sinks might want to
>> > > encapsulate these transformations before the writes.
>> > > - Why not have the explicit possibility to change the return type of
>> the
>> > > committer? - We might not want to emit the incoming Committable, we
>> might
>> > > want to use the commit hash - or any other data generated by the
>> > committer
>> > > - in the post-commit topology. So in some cases it might make sense
>> for
>> > the
>> > > committer to emit elements with different types than the input.
>> > > - Why not have everything as a mixin interface and define a Sink this
>> way
>> > > (very-very similar to your builder approach)
>> > >
>> > > But I currently do not see explicit requirements for these features,
>> and
>> > > it would result in another full rewrite of the Sink API which had a
>> > really
>> > > troubled history with several rewrites in the recent releases, so I
>> > decided
>> > > against these big changes and kept the changes minimal.
>> > >
>> > > So, while I personally would love to see the Builder solution, I am
>> > afraid
>> > > that the Flink community needs some stability around the Sink API for
>> > now,
>> > > so the different Sinks could start to use this new feature.
>> > >
>> > > What do you think?
>> > >
>> > > Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2023.
>> okt.
>> > > 25., Sze, 2:01):
>> > >
>> > >> Hi Peter,
>> > >>
>> > >> Thanks a lot for starting this FLIP!
>> > >>
>> > >> I agree that the current TwoPhaseCommittingSink interfaces is
>> limiting
>> > in
>> > >> that it assumes 1) committers have the same parallelism as writers,
>> and
>> > 2)
>> > >> writers immediately produce finalized committables. This FLIP
>> captures
>> > the
>> > >> problem pretty well, and I do think there are use cases for a more
>> > general
>> > >> flexible interface outside of the Iceberg connector as well.
>> > >>
>> > >> In terms of the abstraction layering, I was wondering if you've also
>> > >> considered this approach which I've quickly sketched in my local
>> fork:
>> > >>
>> > >>
>> >
>> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae
>> > >>
>> > >> With this approach, the sink translator always expect that 2PC sink
>> > >> implementations should extend `TwoPhaseCommittingSinkBase` and
>> therefore
>> > >> assumes that a pre-commit topology always exist. For simple 2PC sinks
>> > that
>> > >> do not require transforming committables, we would ship (for
>> > convenience)
>> > >> an additional `SimpleTwoPhaseCommittingSinkBase` where the pre-commit
>> > >> topology is a no-op passthrough. With that we avoid some of the
>> > >> "boilerplates" where 2PC sinks with pre-commit topology requires
>> > >> implementing two interfaces, as proposed in the FLIP.
>> > >>
>> > >> Quick thought: regarding the awkwardness you mention in the end with
>> > sinks
>> > >> that have post commit topologies, but no pre-commit topologies -
>> > >> Alternative to the mixin approach in the FLIP, it might make sense to
>> > >> consider a builder approach for constructing 2PC sinks, which should
>> > also
>> > >> give users type-safety at compile time while not having the
>> awkwardness
>> > >> with all the types involved. Something along the lines of:
>> > >>
>> > >> ```
>> > >> new TwoPhaseCommittingSinkBuilder(writerSupplier, committerSupplier)
>> > >>     .withPreCommitTopology(writerResultsStream -> ...)       //
>> > >> Function<DataStream<WriterResultT>, DataStream<CommT>>
>> > >>     .withPostCommitTopology(committablesStream -> ...)     //
>> > >> Consumer<DataStream<CommT>>
>> > >>     .withPreWriteTopology(inputStream -> ...)      //
>> > >> Function<DataStream<InputT>, DataStream<InputT>>
>> > >>     .build();
>> > >> ```
>> > >>
>> > >> We could probably do some validation in the build() method, e.g. if
>> > writer
>> > >> / committer have different types, then clearly a pre-commit topology
>> > >> should
>> > >> have been defined to transform intermediate writer results.
>> > >>
>> > >> Obviously, this would take generalization of the
>> TwoPhaseCommittingSink
>> > >> interface to the extreme, where we just have one interface with all
>> of
>> > the
>> > >> pre-commit / pre-write / post-commit methods built-in, and users
>> would
>> > use
>> > >> the builder as the entrypoint to opt-in / opt-out as needed. The
>> upside
>> > is
>> > >> that the SinkTransformationTranslator logic will become much less
>> > >> cluttered.
>> > >>
>> > >> I'll need to experiment the builder approach a bit more to see if it
>> > makes
>> > >> sense at all, but wanted to throw out the idea earlier to see what
>> you
>> > >> think.
>> > >>
>> > >> On Mon, Oct 9, 2023 at 6:59 AM Péter Váry <
>> peter.vary.apa...@gmail.com>
>> > >> wrote:
>> > >>
>> > >> > Hi Team,
>> > >> >
>> > >> > Did some experimenting and found the originally proposed solution
>> to
>> > be
>> > >> a
>> > >> > bit awkward for cases where WithPostCommitTopology was needed but
>> we
>> > do
>> > >> not
>> > >> > need the WithPreCommitTopology transformation.
>> > >> > The flexibility of the new API would be better if we would use a
>> mixin
>> > >> like
>> > >> > approach. The interfaces would only be used to define the specific
>> > >> required
>> > >> > methods, and they would not need to extend the original
>> > >> > TwoPhaseCommittingSink interface too.
>> > >> >
>> > >> > Since the interfaces WithPreCommitTopology and the
>> > >> WithPostCommitTopology
>> > >> > interfaces are still Experimental, after talking to Gyula offline,
>> I
>> > >> have
>> > >> > updated the FLIP to use this new approach.
>> > >> >
>> > >> > Any comments, thoughts are welcome.
>> > >> >
>> > >> > Thanks,
>> > >> > Peter
>> > >> >
>> > >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023.
>> > okt.
>> > >> 5.,
>> > >> > Cs, 16:04):
>> > >> >
>> > >> > > Hi Team,
>> > >> > >
>> > >> > > In my previous email[1] I have described our challenges migrating
>> > the
>> > >> > > existing Iceberg SinkFunction based implementation, to the new
>> > SinkV2
>> > >> > based
>> > >> > > implementation.
>> > >> > >
>> > >> > > As a result of the discussion around that topic, I have created
>> the
>> > >> > > FLIP-371 [2] to address the Committer related changes, and now I
>> > have
>> > >> > > created a companion FLIP-372 [3] to address the
>> > WithPreCommitTopology
>> > >> > > related issues.
>> > >> > >
>> > >> > > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology to
>> > alter
>> > >> the
>> > >> > > type of the Committable
>> > >> > >
>> > >> > > The main goal of the FLIP-372 is to extend the currently existing
>> > >> > > TwoPhaseCommittingSink API by adding the possibility to have a
>> > >> > > PreCommitTopology where the input of and the output types of the
>> pre
>> > >> > commit
>> > >> > > transformation are different.
>> > >> > >
>> > >> > > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink
>> > >> > > WithPreCommitTopology to alter the type of the Committable
>> > >> > > <
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
>> > >> > >
>> > >> > >
>> > >> > > Please use this thread to discuss the FLIP related questions,
>> > >> proposals,
>> > >> > > and feedback.
>> > >> > >
>> > >> > > Thanks,
>> > >> > > Peter
>> > >> > >
>> > >> > > - [1]
>> > >> https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn
>> > >> > > - [2]
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink
>> > >> > > - [3]
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
>> > >> > >
>> > >> >
>> > >>
>> > >
>> >
>>
>>

Reply via email to