AHeise commented on code in PR #25456:
URL: https://github.com/apache/flink/pull/25456#discussion_r1805994539


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java:
##########
@@ -47,10 +47,50 @@
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-class GlobalCommitterOperator<CommT, GlobalCommT> extends 
AbstractStreamOperator<Void>
-        implements OneInputStreamOperator<CommittableMessage<CommT>, Void>, 
BoundedOneInput {
+/**
+ * Implements the {@code GlobalCommitter}.
+ *
+ * <p>This operator usually trails behind a {@code CommitterOperator}. In this 
case, the global
+ * committer will receive committables from the committer operator through 
{@link
+ * #processElement(StreamRecord)}. Once all committables from all subtasks 
have been received, the
+ * global committer will commit them.
+ *
+ * <p>That means that the global committer will not wait for {@link
+ * #notifyCheckpointComplete(long)}. In many cases, it receives the callback 
before the actual
+ * committables anyway. So it would effectively globally commit one checkpoint 
later.
+ *
+ * <p>However, we can leverage the following observation: the global committer 
will only receive
+ * committables iff the respective checkpoint was completed and upstream 
committers received the
+ * {@link #notifyCheckpointComplete(long)}. So by waiting for all committables 
of a given
+ * checkpoint, we implicitly know that the checkpoint was successful and the 
global committer is
+ * supposed to globally commit.
+ *
+ * <p>Note that committables of checkpoint X are not checkpointed in X because 
the global committer
+ * is trailing behind the checkpoint. They are replayed from the committer 
state in case of an
+ * error. The state only includes incomplete checkpoints coming from upstream 
committers not
+ * receiving {@link #notifyCheckpointComplete(long)}. All committables 
received are successful.
+ *
+ * <p>In rare cases, the GlobalCommitterOperator may be connected to a writer 
directly. In this

Review Comment:
   Yes, you are right. I'll clarify the comment - the code already acknowledges 
that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to