Hi Peter,

Thanks a lot for starting this FLIP!

I agree that the current TwoPhaseCommittingSink interfaces is limiting in
that it assumes 1) committers have the same parallelism as writers, and 2)
writers immediately produce finalized committables. This FLIP captures the
problem pretty well, and I do think there are use cases for a more general
flexible interface outside of the Iceberg connector as well.

In terms of the abstraction layering, I was wondering if you've also
considered this approach which I've quickly sketched in my local fork:
https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae

With this approach, the sink translator always expect that 2PC sink
implementations should extend `TwoPhaseCommittingSinkBase` and therefore
assumes that a pre-commit topology always exist. For simple 2PC sinks that
do not require transforming committables, we would ship (for convenience)
an additional `SimpleTwoPhaseCommittingSinkBase` where the pre-commit
topology is a no-op passthrough. With that we avoid some of the
"boilerplates" where 2PC sinks with pre-commit topology requires
implementing two interfaces, as proposed in the FLIP.

Quick thought: regarding the awkwardness you mention in the end with sinks
that have post commit topologies, but no pre-commit topologies -
Alternative to the mixin approach in the FLIP, it might make sense to
consider a builder approach for constructing 2PC sinks, which should also
give users type-safety at compile time while not having the awkwardness
with all the types involved. Something along the lines of:

```
new TwoPhaseCommittingSinkBuilder(writerSupplier, committerSupplier)
    .withPreCommitTopology(writerResultsStream -> ...)       //
Function<DataStream<WriterResultT>, DataStream<CommT>>
    .withPostCommitTopology(committablesStream -> ...)     //
Consumer<DataStream<CommT>>
    .withPreWriteTopology(inputStream -> ...)      //
Function<DataStream<InputT>, DataStream<InputT>>
    .build();
```

We could probably do some validation in the build() method, e.g. if writer
/ committer have different types, then clearly a pre-commit topology should
have been defined to transform intermediate writer results.

Obviously, this would take generalization of the TwoPhaseCommittingSink
interface to the extreme, where we just have one interface with all of the
pre-commit / pre-write / post-commit methods built-in, and users would use
the builder as the entrypoint to opt-in / opt-out as needed. The upside is
that the SinkTransformationTranslator logic will become much less cluttered.

I'll need to experiment the builder approach a bit more to see if it makes
sense at all, but wanted to throw out the idea earlier to see what you
think.

On Mon, Oct 9, 2023 at 6:59 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Hi Team,
>
> Did some experimenting and found the originally proposed solution to be a
> bit awkward for cases where WithPostCommitTopology was needed but we do not
> need the WithPreCommitTopology transformation.
> The flexibility of the new API would be better if we would use a mixin like
> approach. The interfaces would only be used to define the specific required
> methods, and they would not need to extend the original
> TwoPhaseCommittingSink interface too.
>
> Since the interfaces WithPreCommitTopology and the WithPostCommitTopology
> interfaces are still Experimental, after talking to Gyula offline, I have
> updated the FLIP to use this new approach.
>
> Any comments, thoughts are welcome.
>
> Thanks,
> Peter
>
> Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. okt. 5.,
> Cs, 16:04):
>
> > Hi Team,
> >
> > In my previous email[1] I have described our challenges migrating the
> > existing Iceberg SinkFunction based implementation, to the new SinkV2
> based
> > implementation.
> >
> > As a result of the discussion around that topic, I have created the
> > FLIP-371 [2] to address the Committer related changes, and now I have
> > created a companion FLIP-372 [3] to address the WithPreCommitTopology
> > related issues.
> >
> > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology to alter the
> > type of the Committable
> >
> > The main goal of the FLIP-372 is to extend the currently existing
> > TwoPhaseCommittingSink API by adding the possibility to have a
> > PreCommitTopology where the input of and the output types of the pre
> commit
> > transformation are different.
> >
> > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink
> > WithPreCommitTopology to alter the type of the Committable
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> >
> >
> > Please use this thread to discuss the FLIP related questions, proposals,
> > and feedback.
> >
> > Thanks,
> > Peter
> >
> > - [1] https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn
> > - [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink
> > - [3]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> >
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> >
>

Reply via email to