Hi Martijn,

Thanks for the prompt answer, good to hear that there is interest in
the community to improve things around the APIs!

*First and foremost, we still plan to use the PostCommitTopology* to do the
compaction for the Iceberg Sink. That is on our roadmap, but the first step
is to do the SinkV2 migration, with which we have found several
difficulties as I have mentioned in my previous email.

Here is how I envision the Flink Sink flow in the long term:

   1. We need a rebalancing step before the writers, to redistribute the
   incoming data - *WithPreWriteTopology* will be used for this step
   (parallelism N)
   2. We will write out the data files, and collect the statistics and
   other metadata about the files - *SinkWriter* will be used for this step
   (parallelism N)
   3. We will aggregate the data file names and the metadata about the data
   files generated in this checkpoint - *WithPreCommitTopology* will be
   used (parallelism 1)
   4. We commit the changes aggregated during the checkpoint - *Committer*
   will be used (parallelism 1)
   5. After a few commits we do quick incremental compaction for these
   commits - *WithPostCommitTopology* will be used (parallelism 1?)

I have a PR available for 1-4 steps, I am still in the process of finding
the best solution for step 5. Currently I am leaning toward using only
minimal data (like the number of commits, or files) coming from the
committer in this step, and relying on the Iceberg table data/metadata on
compaction. If this does not change then I think the current
WithPostCommitTopology would be enough for our case.

I am in CET TZ, but we can have a quick chat on slack or webex or whatever
your poison is :)

Thanks,
Peter

 - [1] Iceberg design doc:
https://docs.google.com/document/d/1K1M4wb9r_Tr-SDsUvqLyBaI5F14eRcqe3-ud6io0Da0/edit

Martijn Visser <martijnvis...@apache.org> ezt írta (időpont: 2023. szept.
29., P, 2:44):

> Hi Peter,
>
> I actually had a session this week at Current, talking about the state
> of the Flink connector ecosystem. Afterwards, I had a lot of
> conversations with people who were interested in the compaction with
> Iceberg and I actually was planning to look at what the current status
> was on the Iceberg topic. Your timing is great :)
>
> I'm looping in Gordon, since we talked about the stabilization of
> these experimental interfaces offline. Also looping in Guowei Ma and
> Yun Gao since they helped a lot in the discussion for the Sink V2 API.
>
> When we discussed FLIP-191 [1], my understanding was that Iceberg
> would "writes the files immediately and the post topology will take
> care of compacting the already written files and updating the file log
> after the compaction" [2]. Looking at the Iceberg proposal, the entire
> PostCommit wouldn't be used by Iceberg. I'm wondering if that means we
> should remove the entire PostCommit topology support API, since as far
> as I can tell there's no other connector that needs it. Or is there a
> plan to use that at a later stage by Iceberg?
>
> I need to think a bit more on what are potential options to improve
> these interfaces, but wanted to already loop in some people in the
> thread.
>
> Best regards,
>
> Martijn
>
> [1] https://lists.apache.org/thread/zjc4p47k4sxjcbnntt8od2grnmx51xg0
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction#FLIP191:ExtendunifiedSinkinterfacetosupportsmallfilecompaction-Alternative1GlobalSinkCoordinator
> :
>
>
>
> On Thu, Sep 28, 2023 at 7:56 AM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
> >
> > Hi Flink Team,
> >
> > I am working on implementing a new Iceberg Sink which would use the new
> > SinkV2 API [1], [2]. The main goal is to open up the possibility for
> doing
> > the incremental small files compaction inside a Flink job using the new
> > Iceberg Sink.
> >
> > During the implementation I have found several places where the SinkV2
> API
> > has some missing features, some of which have already been discussed on
> > this mailing list [3].
> >
> > Here is the abbreviated list (the full description of the missing feature
> > and the proposed workarounds are available in the design doc [1])
> >
> >    - Committer
> >       - Metrics is not available - No workaround - Would be a simple
> >       additional change
> >       - No init method - Simple workaround - Would be a simple additional
> >       change
> >       - Parallelism - Simple, ugly workaround - Would be a simple(?)
> >       additional change
> >    - WithPreCommitTopology
> >       - Transformation input/output parameters are the same - Very ugly
> >       workaround - I see multiple ways to solve this, and I would like
> to hear
> >       the opinion of the Community. See the discussion below.
> >
> > Let's just focus on the precommit topology. This is currently defined as
> > this:
> >
> >
> > *@Experimental*
> >
> > *public interface WithPreCommitTopology<InputT, CommT>*
> > *        extends TwoPhaseCommittingSink<InputT, CommT> {*
> >
> >
> >
> >
> >
> > *    DataStream<CommittableMessage<CommT>> addPreCommitTopology(
> > DataStream<CommittableMessage<CommT>> committables);}*
> >
> >
> > What Iceberg, and IMHO anyone who really wants a PreCommitTopology needs
> is
> > a real transformation where the input and the output types are different:
> >
> >
> >
> > *@Experimental*
> >
> > *public interface WithPreCommitTopology<InputT, WriterResult,
> Committable>*
> > *        extends TwoPhaseCommittingSink<InputT, ???????> {*
> >
> >
> > *    DataStream<CommittableMessage<**Committable*
> > *>> addPreCommitTopology(            DataStream<CommittableMessage<*
> > *WriterResult*
> > *>> committables);}*
> >
> >
> > The issue is that the *TwoPhaseCommittingSink* interface defines 2
> methods
> > which are preventing us to do this (the writer returns a *CommT *which is
> > used as an input type for the committer, and we want to be able to used
> > different types there*)*:
> >
> > *@PublicEvolving*
> > *public interface TwoPhaseCommittingSink<InputT, CommT> extends
> > Sink<InputT> {*
> >
> > *    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext
> > context) throws IOException;*
> >
> > *    Committer<CommT> createCommitter() throws IOException;*
> > *}*
> >
> >
> > I see 3 ways of handling this issue:
> >
> >    1. We can decide that the use-case is not supported by the SinkV2 API.
> >    The users of the API still could do what I did in the PR [2]. They can
> >    create and use a Tuple2, or some POJO as a *CommT*, where some of the
> >    attributes are only populated before the transformation, some of the
> >    attributes are only populated after the *addPreCommitTopology* method
> >    2. We can add a new *TwoPhaseCommittingSink**WithPreCommitTopology*
> > interface
> >    and remove the *WithPreCommitTopology.* We can use the same
> >    *SinkTransformationTranslator* and methods to create the topology for
> >    Sink implementing this interface
> >    3. We can enhance/change the *TwoPhaseCommittingSink* to accommodate
> the
> >    new requirements, but that would introduce several methods which are
> not
> >    needed in a non-transforming case
> >
> > For the record this is the sketch of the methods we would need:
> >
> > *    PrecommittingSinkWriter<InputT, **Committable*
> > *> createWriter(InitContext context) throws IOException;**    Committer<*
> > *Committable*
> > *> createCommitter() throws IOException;**    SimpleVersionedSerializer<*
> > *WriterResult**> get**WriterResult**Serializer();*
> > *    SimpleVersionedSerializer<**Committable**>
> getCommittableSerializer();*
> >
> >
> > Currently, I am leaning towards the 2nd solution because of the following
> > reasons:
> >
> >    - Backward compatible
> >    - Probably most of the SinkV2 implementations do not need the
> >    *WithPreCommitTopology* anyway
> >    - If we need the *WithPreCommitTopology*, then we need to provide the
> >    full flexibility for the users
> >
> > Anyone else facing similar issues with the SinkV2 API?
> > What are your thoughts?
> > Any other possible solutions/suggestions?
> >
> > If we agree on the general direction, I would be happy to put together a
> > FLIP, and start the official discussion.
> >
> > Thanks,
> > Peter
> >
> > References:
> >
> >    - [1] Iceberg design doc:
> >
> https://docs.google.com/document/d/1K1M4wb9r_Tr-SDsUvqLyBaI5F14eRcqe3-ud6io0Da0/edit
> >    - [2] Iceberg PR: https://github.com/apache/iceberg/pull/8653
> >    - [3]
> https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj
>

Reply via email to