Hi Scott, It's great to see further adoption of the Sink V2 architecture. Happy to answer your questions.
1. The sink architecture should ensure that Committer:commit is always called with all committables from a subtask for a given subtaskId.There is an open issue where users have reported a problem with that assumption [1] but we haven't been able to track the problem down. Behind the scenes, if you use a SinkWriter->Committer topology, the Committables are transferred via the checkpoint barrier channel to the committer (not transferring the committables results in failing the checkpoint) and checkpointed by the committer before Committer:commit is called. This means when Committer:commit is called, it reads the comittables from the local state. 2. Committer::commit is called on what we call in Flink notifyCheckpointComplete which is based on a RPC call that the Jobmanager makes to all Taskmanagers when a checkpoint is finished. There is no guarantee when or if this will be called, but eventually. If some of the RPCs are delayed or do not reach the manager, the Committer will accumulate committables from multiple checkpoints. 3. I am not sure I fully understand that point. I see two different requirements. First, you could skip a committable if you do not want to commit it, which you could do with calling CommitRequest::signalAlreadyCommitted [2]. It's not the primary purpose of the method, but it should suffice. The second point is having a communication mechanism between SinkWriter and Committer, which at the moment does not exist. I would love to hear more details about why the rewrite is necessary maybe we can model the sink differently to achieve that requirement. Can you explain more about the relation between your questions and using the SupportsPreCommitTopology::addPreCommitTopology? It sounds like you are not planning to use the Committer but a custom globalCommitter. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-25920 [2] https://github.com/apache/flink/blob/7fc3aac774f5deb9b48727ba5f916c78085b49b9/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java#L100