Hi Martijn, Thanks for the prompt answer, good to hear that there is interest in the community to improve things around the APIs!
*First and foremost, we still plan to use the PostCommitTopology* to do the compaction for the Iceberg Sink. That is on our roadmap, but the first step is to do the SinkV2 migration, with which we have found several difficulties as I have mentioned in my previous email. Here is how I envision the Flink Sink flow in the long term: 1. We need a rebalancing step before the writers, to redistribute the incoming data - *WithPreWriteTopology* will be used for this step (parallelism N) 2. We will write out the data files, and collect the statistics and other metadata about the files - *SinkWriter* will be used for this step (parallelism N) 3. We will aggregate the data file names and the metadata about the data files generated in this checkpoint - *WithPreCommitTopology* will be used (parallelism 1) 4. We commit the changes aggregated during the checkpoint - *Committer* will be used (parallelism 1) 5. After a few commits we do quick incremental compaction for these commits - *WithPostCommitTopology* will be used (parallelism 1?) I have a PR available for 1-4 steps, I am still in the process of finding the best solution for step 5. Currently I am leaning toward using only minimal data (like the number of commits, or files) coming from the committer in this step, and relying on the Iceberg table data/metadata on compaction. If this does not change then I think the current WithPostCommitTopology would be enough for our case. I am in CET TZ, but we can have a quick chat on slack or webex or whatever your poison is :) Thanks, Peter - [1] Iceberg design doc: https://docs.google.com/document/d/1K1M4wb9r_Tr-SDsUvqLyBaI5F14eRcqe3-ud6io0Da0/edit Martijn Visser <martijnvis...@apache.org> ezt írta (időpont: 2023. szept. 29., P, 2:44): > Hi Peter, > > I actually had a session this week at Current, talking about the state > of the Flink connector ecosystem. Afterwards, I had a lot of > conversations with people who were interested in the compaction with > Iceberg and I actually was planning to look at what the current status > was on the Iceberg topic. Your timing is great :) > > I'm looping in Gordon, since we talked about the stabilization of > these experimental interfaces offline. Also looping in Guowei Ma and > Yun Gao since they helped a lot in the discussion for the Sink V2 API. > > When we discussed FLIP-191 [1], my understanding was that Iceberg > would "writes the files immediately and the post topology will take > care of compacting the already written files and updating the file log > after the compaction" [2]. Looking at the Iceberg proposal, the entire > PostCommit wouldn't be used by Iceberg. I'm wondering if that means we > should remove the entire PostCommit topology support API, since as far > as I can tell there's no other connector that needs it. Or is there a > plan to use that at a later stage by Iceberg? > > I need to think a bit more on what are potential options to improve > these interfaces, but wanted to already loop in some people in the > thread. > > Best regards, > > Martijn > > [1] https://lists.apache.org/thread/zjc4p47k4sxjcbnntt8od2grnmx51xg0 > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction#FLIP191:ExtendunifiedSinkinterfacetosupportsmallfilecompaction-Alternative1GlobalSinkCoordinator > : > > > > On Thu, Sep 28, 2023 at 7:56 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > > > > Hi Flink Team, > > > > I am working on implementing a new Iceberg Sink which would use the new > > SinkV2 API [1], [2]. The main goal is to open up the possibility for > doing > > the incremental small files compaction inside a Flink job using the new > > Iceberg Sink. > > > > During the implementation I have found several places where the SinkV2 > API > > has some missing features, some of which have already been discussed on > > this mailing list [3]. > > > > Here is the abbreviated list (the full description of the missing feature > > and the proposed workarounds are available in the design doc [1]) > > > > - Committer > > - Metrics is not available - No workaround - Would be a simple > > additional change > > - No init method - Simple workaround - Would be a simple additional > > change > > - Parallelism - Simple, ugly workaround - Would be a simple(?) > > additional change > > - WithPreCommitTopology > > - Transformation input/output parameters are the same - Very ugly > > workaround - I see multiple ways to solve this, and I would like > to hear > > the opinion of the Community. See the discussion below. > > > > Let's just focus on the precommit topology. This is currently defined as > > this: > > > > > > *@Experimental* > > > > *public interface WithPreCommitTopology<InputT, CommT>* > > * extends TwoPhaseCommittingSink<InputT, CommT> {* > > > > > > > > > > > > * DataStream<CommittableMessage<CommT>> addPreCommitTopology( > > DataStream<CommittableMessage<CommT>> committables);}* > > > > > > What Iceberg, and IMHO anyone who really wants a PreCommitTopology needs > is > > a real transformation where the input and the output types are different: > > > > > > > > *@Experimental* > > > > *public interface WithPreCommitTopology<InputT, WriterResult, > Committable>* > > * extends TwoPhaseCommittingSink<InputT, ???????> {* > > > > > > * DataStream<CommittableMessage<**Committable* > > *>> addPreCommitTopology( DataStream<CommittableMessage<* > > *WriterResult* > > *>> committables);}* > > > > > > The issue is that the *TwoPhaseCommittingSink* interface defines 2 > methods > > which are preventing us to do this (the writer returns a *CommT *which is > > used as an input type for the committer, and we want to be able to used > > different types there*)*: > > > > *@PublicEvolving* > > *public interface TwoPhaseCommittingSink<InputT, CommT> extends > > Sink<InputT> {* > > > > * PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext > > context) throws IOException;* > > > > * Committer<CommT> createCommitter() throws IOException;* > > *}* > > > > > > I see 3 ways of handling this issue: > > > > 1. We can decide that the use-case is not supported by the SinkV2 API. > > The users of the API still could do what I did in the PR [2]. They can > > create and use a Tuple2, or some POJO as a *CommT*, where some of the > > attributes are only populated before the transformation, some of the > > attributes are only populated after the *addPreCommitTopology* method > > 2. We can add a new *TwoPhaseCommittingSink**WithPreCommitTopology* > > interface > > and remove the *WithPreCommitTopology.* We can use the same > > *SinkTransformationTranslator* and methods to create the topology for > > Sink implementing this interface > > 3. We can enhance/change the *TwoPhaseCommittingSink* to accommodate > the > > new requirements, but that would introduce several methods which are > not > > needed in a non-transforming case > > > > For the record this is the sketch of the methods we would need: > > > > * PrecommittingSinkWriter<InputT, **Committable* > > *> createWriter(InitContext context) throws IOException;** Committer<* > > *Committable* > > *> createCommitter() throws IOException;** SimpleVersionedSerializer<* > > *WriterResult**> get**WriterResult**Serializer();* > > * SimpleVersionedSerializer<**Committable**> > getCommittableSerializer();* > > > > > > Currently, I am leaning towards the 2nd solution because of the following > > reasons: > > > > - Backward compatible > > - Probably most of the SinkV2 implementations do not need the > > *WithPreCommitTopology* anyway > > - If we need the *WithPreCommitTopology*, then we need to provide the > > full flexibility for the users > > > > Anyone else facing similar issues with the SinkV2 API? > > What are your thoughts? > > Any other possible solutions/suggestions? > > > > If we agree on the general direction, I would be happy to put together a > > FLIP, and start the official discussion. > > > > Thanks, > > Peter > > > > References: > > > > - [1] Iceberg design doc: > > > https://docs.google.com/document/d/1K1M4wb9r_Tr-SDsUvqLyBaI5F14eRcqe3-ud6io0Da0/edit > > - [2] Iceberg PR: https://github.com/apache/iceberg/pull/8653 > > - [3] > https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj >