AHeise commented on code in PR #25456: URL: https://github.com/apache/flink/pull/25456#discussion_r1805994539
########## flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java: ########## @@ -47,10 +47,50 @@ import java.util.Collections; import java.util.List; +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.util.Preconditions.checkNotNull; -class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamOperator<Void> - implements OneInputStreamOperator<CommittableMessage<CommT>, Void>, BoundedOneInput { +/** + * Implements the {@code GlobalCommitter}. + * + * <p>This operator usually trails behind a {@code CommitterOperator}. In this case, the global + * committer will receive committables from the committer operator through {@link + * #processElement(StreamRecord)}. Once all committables from all subtasks have been received, the + * global committer will commit them. + * + * <p>That means that the global committer will not wait for {@link + * #notifyCheckpointComplete(long)}. In many cases, it receives the callback before the actual + * committables anyway. So it would effectively globally commit one checkpoint later. + * + * <p>However, we can leverage the following observation: the global committer will only receive + * committables iff the respective checkpoint was completed and upstream committers received the + * {@link #notifyCheckpointComplete(long)}. So by waiting for all committables of a given + * checkpoint, we implicitly know that the checkpoint was successful and the global committer is + * supposed to globally commit. + * + * <p>Note that committables of checkpoint X are not checkpointed in X because the global committer + * is trailing behind the checkpoint. They are replayed from the committer state in case of an + * error. The state only includes incomplete checkpoints coming from upstream committers not + * receiving {@link #notifyCheckpointComplete(long)}. All committables received are successful. + * + * <p>In rare cases, the GlobalCommitterOperator may be connected to a writer directly. In this Review Comment: Yes, you are right. I'll clarify the comment - the good already acknowledges that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org