Hi Scott,

It's great to see further adoption of the Sink V2 architecture.
Happy to answer your questions.

1. The sink architecture should ensure that Committer:commit is always
called with all committables from a subtask for a given subtaskId.There is
an open issue where users have reported a problem with that assumption [1]
but we haven't been able to track the problem down. Behind the scenes, if
you use a SinkWriter->Committer topology, the Committables are transferred
via the checkpoint barrier channel to the committer (not transferring the
committables results in failing the checkpoint) and checkpointed by the
committer before Committer:commit is called. This means when
Committer:commit is called, it reads the comittables from the local state.

2. Committer::commit is called on what we call in Flink
notifyCheckpointComplete which is based on a RPC call that the Jobmanager
makes to all Taskmanagers when a checkpoint is finished. There is no
guarantee when or if this will be called, but eventually. If some of the
RPCs are delayed or do not reach the manager, the Committer will accumulate
committables from multiple checkpoints.

3. I am not sure I fully understand that point. I see two different
requirements. First, you could skip a committable if you do not want to
commit it, which you could do with calling
CommitRequest::signalAlreadyCommitted [2]. It's not the primary purpose of
the method, but it should suffice. The second point is having a
communication mechanism between SinkWriter and Committer, which at the
moment does not exist. I would love to hear more details about why the
rewrite is necessary maybe we can model the sink differently to achieve
that requirement.

Can you explain more about the relation between your questions and using
the SupportsPreCommitTopology::addPreCommitTopology? It sounds like you are
not planning to use the Committer but a custom globalCommitter.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-25920
[2]
https://github.com/apache/flink/blob/7fc3aac774f5deb9b48727ba5f916c78085b49b9/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java#L100

Reply via email to