Hi Flink Team,

I am working on implementing a new Iceberg Sink which would use the new
SinkV2 API [1], [2]. The main goal is to open up the possibility for doing
the incremental small files compaction inside a Flink job using the new
Iceberg Sink.

During the implementation I have found several places where the SinkV2 API
has some missing features, some of which have already been discussed on
this mailing list [3].

Here is the abbreviated list (the full description of the missing feature
and the proposed workarounds are available in the design doc [1])

   - Committer
      - Metrics is not available - No workaround - Would be a simple
      additional change
      - No init method - Simple workaround - Would be a simple additional
      change
      - Parallelism - Simple, ugly workaround - Would be a simple(?)
      additional change
   - WithPreCommitTopology
      - Transformation input/output parameters are the same - Very ugly
      workaround - I see multiple ways to solve this, and I would like to hear
      the opinion of the Community. See the discussion below.

Let's just focus on the precommit topology. This is currently defined as
this:


*@Experimental*

*public interface WithPreCommitTopology<InputT, CommT>*
*        extends TwoPhaseCommittingSink<InputT, CommT> {*





*    DataStream<CommittableMessage<CommT>> addPreCommitTopology(
DataStream<CommittableMessage<CommT>> committables);}*


What Iceberg, and IMHO anyone who really wants a PreCommitTopology needs is
a real transformation where the input and the output types are different:



*@Experimental*

*public interface WithPreCommitTopology<InputT, WriterResult, Committable>*
*        extends TwoPhaseCommittingSink<InputT, ???????> {*


*    DataStream<CommittableMessage<**Committable*
*>> addPreCommitTopology(            DataStream<CommittableMessage<*
*WriterResult*
*>> committables);}*


The issue is that the *TwoPhaseCommittingSink* interface defines 2 methods
which are preventing us to do this (the writer returns a *CommT *which is
used as an input type for the committer, and we want to be able to used
different types there*)*:

*@PublicEvolving*
*public interface TwoPhaseCommittingSink<InputT, CommT> extends
Sink<InputT> {*

*    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext
context) throws IOException;*

*    Committer<CommT> createCommitter() throws IOException;*
*}*


I see 3 ways of handling this issue:

   1. We can decide that the use-case is not supported by the SinkV2 API.
   The users of the API still could do what I did in the PR [2]. They can
   create and use a Tuple2, or some POJO as a *CommT*, where some of the
   attributes are only populated before the transformation, some of the
   attributes are only populated after the *addPreCommitTopology* method
   2. We can add a new *TwoPhaseCommittingSink**WithPreCommitTopology*
interface
   and remove the *WithPreCommitTopology.* We can use the same
   *SinkTransformationTranslator* and methods to create the topology for
   Sink implementing this interface
   3. We can enhance/change the *TwoPhaseCommittingSink* to accommodate the
   new requirements, but that would introduce several methods which are not
   needed in a non-transforming case

For the record this is the sketch of the methods we would need:

*    PrecommittingSinkWriter<InputT, **Committable*
*> createWriter(InitContext context) throws IOException;**    Committer<*
*Committable*
*> createCommitter() throws IOException;**    SimpleVersionedSerializer<*
*WriterResult**> get**WriterResult**Serializer();*
*    SimpleVersionedSerializer<**Committable**> getCommittableSerializer();*


Currently, I am leaning towards the 2nd solution because of the following
reasons:

   - Backward compatible
   - Probably most of the SinkV2 implementations do not need the
   *WithPreCommitTopology* anyway
   - If we need the *WithPreCommitTopology*, then we need to provide the
   full flexibility for the users

Anyone else facing similar issues with the SinkV2 API?
What are your thoughts?
Any other possible solutions/suggestions?

If we agree on the general direction, I would be happy to put together a
FLIP, and start the official discussion.

Thanks,
Peter

References:

   - [1] Iceberg design doc:
   
https://docs.google.com/document/d/1K1M4wb9r_Tr-SDsUvqLyBaI5F14eRcqe3-ud6io0Da0/edit
   - [2] Iceberg PR: https://github.com/apache/iceberg/pull/8653
   - [3] https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj

Reply via email to