Hi Peter, I actually had a session this week at Current, talking about the state of the Flink connector ecosystem. Afterwards, I had a lot of conversations with people who were interested in the compaction with Iceberg and I actually was planning to look at what the current status was on the Iceberg topic. Your timing is great :)
I'm looping in Gordon, since we talked about the stabilization of these experimental interfaces offline. Also looping in Guowei Ma and Yun Gao since they helped a lot in the discussion for the Sink V2 API. When we discussed FLIP-191 [1], my understanding was that Iceberg would "writes the files immediately and the post topology will take care of compacting the already written files and updating the file log after the compaction" [2]. Looking at the Iceberg proposal, the entire PostCommit wouldn't be used by Iceberg. I'm wondering if that means we should remove the entire PostCommit topology support API, since as far as I can tell there's no other connector that needs it. Or is there a plan to use that at a later stage by Iceberg? I need to think a bit more on what are potential options to improve these interfaces, but wanted to already loop in some people in the thread. Best regards, Martijn [1] https://lists.apache.org/thread/zjc4p47k4sxjcbnntt8od2grnmx51xg0 [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction#FLIP191:ExtendunifiedSinkinterfacetosupportsmallfilecompaction-Alternative1GlobalSinkCoordinator: On Thu, Sep 28, 2023 at 7:56 AM Péter Váry <peter.vary.apa...@gmail.com> wrote: > > Hi Flink Team, > > I am working on implementing a new Iceberg Sink which would use the new > SinkV2 API [1], [2]. The main goal is to open up the possibility for doing > the incremental small files compaction inside a Flink job using the new > Iceberg Sink. > > During the implementation I have found several places where the SinkV2 API > has some missing features, some of which have already been discussed on > this mailing list [3]. > > Here is the abbreviated list (the full description of the missing feature > and the proposed workarounds are available in the design doc [1]) > > - Committer > - Metrics is not available - No workaround - Would be a simple > additional change > - No init method - Simple workaround - Would be a simple additional > change > - Parallelism - Simple, ugly workaround - Would be a simple(?) > additional change > - WithPreCommitTopology > - Transformation input/output parameters are the same - Very ugly > workaround - I see multiple ways to solve this, and I would like to hear > the opinion of the Community. See the discussion below. > > Let's just focus on the precommit topology. This is currently defined as > this: > > > *@Experimental* > > *public interface WithPreCommitTopology<InputT, CommT>* > * extends TwoPhaseCommittingSink<InputT, CommT> {* > > > > > > * DataStream<CommittableMessage<CommT>> addPreCommitTopology( > DataStream<CommittableMessage<CommT>> committables);}* > > > What Iceberg, and IMHO anyone who really wants a PreCommitTopology needs is > a real transformation where the input and the output types are different: > > > > *@Experimental* > > *public interface WithPreCommitTopology<InputT, WriterResult, Committable>* > * extends TwoPhaseCommittingSink<InputT, ???????> {* > > > * DataStream<CommittableMessage<**Committable* > *>> addPreCommitTopology( DataStream<CommittableMessage<* > *WriterResult* > *>> committables);}* > > > The issue is that the *TwoPhaseCommittingSink* interface defines 2 methods > which are preventing us to do this (the writer returns a *CommT *which is > used as an input type for the committer, and we want to be able to used > different types there*)*: > > *@PublicEvolving* > *public interface TwoPhaseCommittingSink<InputT, CommT> extends > Sink<InputT> {* > > * PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext > context) throws IOException;* > > * Committer<CommT> createCommitter() throws IOException;* > *}* > > > I see 3 ways of handling this issue: > > 1. We can decide that the use-case is not supported by the SinkV2 API. > The users of the API still could do what I did in the PR [2]. They can > create and use a Tuple2, or some POJO as a *CommT*, where some of the > attributes are only populated before the transformation, some of the > attributes are only populated after the *addPreCommitTopology* method > 2. We can add a new *TwoPhaseCommittingSink**WithPreCommitTopology* > interface > and remove the *WithPreCommitTopology.* We can use the same > *SinkTransformationTranslator* and methods to create the topology for > Sink implementing this interface > 3. We can enhance/change the *TwoPhaseCommittingSink* to accommodate the > new requirements, but that would introduce several methods which are not > needed in a non-transforming case > > For the record this is the sketch of the methods we would need: > > * PrecommittingSinkWriter<InputT, **Committable* > *> createWriter(InitContext context) throws IOException;** Committer<* > *Committable* > *> createCommitter() throws IOException;** SimpleVersionedSerializer<* > *WriterResult**> get**WriterResult**Serializer();* > * SimpleVersionedSerializer<**Committable**> getCommittableSerializer();* > > > Currently, I am leaning towards the 2nd solution because of the following > reasons: > > - Backward compatible > - Probably most of the SinkV2 implementations do not need the > *WithPreCommitTopology* anyway > - If we need the *WithPreCommitTopology*, then we need to provide the > full flexibility for the users > > Anyone else facing similar issues with the SinkV2 API? > What are your thoughts? > Any other possible solutions/suggestions? > > If we agree on the general direction, I would be happy to put together a > FLIP, and start the official discussion. > > Thanks, > Peter > > References: > > - [1] Iceberg design doc: > > https://docs.google.com/document/d/1K1M4wb9r_Tr-SDsUvqLyBaI5F14eRcqe3-ud6io0Da0/edit > - [2] Iceberg PR: https://github.com/apache/iceberg/pull/8653 > - [3] https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj