Hi Flink Team, I am working on implementing a new Iceberg Sink which would use the new SinkV2 API [1], [2]. The main goal is to open up the possibility for doing the incremental small files compaction inside a Flink job using the new Iceberg Sink.
During the implementation I have found several places where the SinkV2 API has some missing features, some of which have already been discussed on this mailing list [3]. Here is the abbreviated list (the full description of the missing feature and the proposed workarounds are available in the design doc [1]) - Committer - Metrics is not available - No workaround - Would be a simple additional change - No init method - Simple workaround - Would be a simple additional change - Parallelism - Simple, ugly workaround - Would be a simple(?) additional change - WithPreCommitTopology - Transformation input/output parameters are the same - Very ugly workaround - I see multiple ways to solve this, and I would like to hear the opinion of the Community. See the discussion below. Let's just focus on the precommit topology. This is currently defined as this: *@Experimental* *public interface WithPreCommitTopology<InputT, CommT>* * extends TwoPhaseCommittingSink<InputT, CommT> {* * DataStream<CommittableMessage<CommT>> addPreCommitTopology( DataStream<CommittableMessage<CommT>> committables);}* What Iceberg, and IMHO anyone who really wants a PreCommitTopology needs is a real transformation where the input and the output types are different: *@Experimental* *public interface WithPreCommitTopology<InputT, WriterResult, Committable>* * extends TwoPhaseCommittingSink<InputT, ???????> {* * DataStream<CommittableMessage<**Committable* *>> addPreCommitTopology( DataStream<CommittableMessage<* *WriterResult* *>> committables);}* The issue is that the *TwoPhaseCommittingSink* interface defines 2 methods which are preventing us to do this (the writer returns a *CommT *which is used as an input type for the committer, and we want to be able to used different types there*)*: *@PublicEvolving* *public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {* * PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;* * Committer<CommT> createCommitter() throws IOException;* *}* I see 3 ways of handling this issue: 1. We can decide that the use-case is not supported by the SinkV2 API. The users of the API still could do what I did in the PR [2]. They can create and use a Tuple2, or some POJO as a *CommT*, where some of the attributes are only populated before the transformation, some of the attributes are only populated after the *addPreCommitTopology* method 2. We can add a new *TwoPhaseCommittingSink**WithPreCommitTopology* interface and remove the *WithPreCommitTopology.* We can use the same *SinkTransformationTranslator* and methods to create the topology for Sink implementing this interface 3. We can enhance/change the *TwoPhaseCommittingSink* to accommodate the new requirements, but that would introduce several methods which are not needed in a non-transforming case For the record this is the sketch of the methods we would need: * PrecommittingSinkWriter<InputT, **Committable* *> createWriter(InitContext context) throws IOException;** Committer<* *Committable* *> createCommitter() throws IOException;** SimpleVersionedSerializer<* *WriterResult**> get**WriterResult**Serializer();* * SimpleVersionedSerializer<**Committable**> getCommittableSerializer();* Currently, I am leaning towards the 2nd solution because of the following reasons: - Backward compatible - Probably most of the SinkV2 implementations do not need the *WithPreCommitTopology* anyway - If we need the *WithPreCommitTopology*, then we need to provide the full flexibility for the users Anyone else facing similar issues with the SinkV2 API? What are your thoughts? Any other possible solutions/suggestions? If we agree on the general direction, I would be happy to put together a FLIP, and start the official discussion. Thanks, Peter References: - [1] Iceberg design doc: https://docs.google.com/document/d/1K1M4wb9r_Tr-SDsUvqLyBaI5F14eRcqe3-ud6io0Da0/edit - [2] Iceberg PR: https://github.com/apache/iceberg/pull/8653 - [3] https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj