fapaul commented on code in PR #25456:
URL: https://github.com/apache/flink/pull/25456#discussion_r1803152409


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java:
##########
@@ -47,10 +47,50 @@
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-class GlobalCommitterOperator<CommT, GlobalCommT> extends 
AbstractStreamOperator<Void>
-        implements OneInputStreamOperator<CommittableMessage<CommT>, Void>, 
BoundedOneInput {
+/**
+ * Implements the {@code GlobalCommitter}.
+ *
+ * <p>This operator usually trails behind a {@code CommitterOperator}. In this 
case, the global
+ * committer will receive committables from the committer operator through 
{@link
+ * #processElement(StreamRecord)}. Once all committables from all subtasks 
have been received, the
+ * global committer will commit them.
+ *
+ * <p>That means that the global committer will not wait for {@link
+ * #notifyCheckpointComplete(long)}. In many cases, it receives the callback 
before the actual
+ * committables anyway. So it would effectively globally commit one checkpoint 
later.
+ *
+ * <p>However, we can leverage the following observation: the global committer 
will only receive
+ * committables iff the respective checkpoint was completed and upstream 
committers received the
+ * {@link #notifyCheckpointComplete(long)}. So by waiting for all committables 
of a given
+ * checkpoint, we implicitly know that the checkpoint was successful and the 
global committer is
+ * supposed to globally commit.
+ *
+ * <p>Note that committables of checkpoint X are not checkpointed in X because 
the global committer
+ * is trailing behind the checkpoint. They are replayed from the committer 
state in case of an
+ * error. The state only includes incomplete checkpoints coming from upstream 
committers not
+ * receiving {@link #notifyCheckpointComplete(long)}. All committables 
received are successful.
+ *
+ * <p>In rare cases, the GlobalCommitterOperator may be connected to a writer 
directly. In this

Review Comment:
   In theory, the global committer can be at any point of the custom topology 
hooks behind some custom operator.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java:
##########
@@ -127,38 +165,67 @@ public void initializeState(StateInitializationContext 
context) throws Exception
                             });
             lastCompletedCheckpointId = 
context.getRestoredCheckpointId().getAsLong();
             // try to re-commit recovered transactions as quickly as possible
-            commit(lastCompletedCheckpointId);
+            commit();
         }
     }
 
+    private SimpleVersionedSerializer<GlobalCommittableWrapper<CommT, 
GlobalCommT>>
+            getCommitterStateSerializer() {
+        final CommittableCollectorSerializer<CommT> 
committableCollectorSerializer =
+                new CommittableCollectorSerializer<>(
+                        committableSerializer,
+                        
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
+                        
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(),
+                        metricGroup);
+        return new GlobalCommitterSerializer<>(
+                committableCollectorSerializer, globalCommittableSerializer, 
metricGroup);
+    }
+
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
-        commit(lastCompletedCheckpointId);
-    }
-
-    private void commit(long checkpointId) throws IOException, 
InterruptedException {
-        for (CheckpointCommittableManager<CommT> checkpoint :
-                
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
-            checkpoint.commit(committer);
+        if (!commitOnInput) {
+            lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);

Review Comment:
   Nit: Let's move the calculation of `lastCompletedCheckpointId` into the 
`commit()` method and add a parameter to the `commit()` method for the current 
checkpoint.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.translators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.graph.TransformationTranslator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+
+import static 
org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME;
+
+/**
+ * A {@link TransformationTranslator} for the {@link GlobalCommitterOperator}. 
The main purpose is
+ * to detect whether we set {@link GlobalCommitterOperator#commitOnInput} or 
not.
+ */
+@Internal
+public class GlobalCommitterTransformationTranslator<CommT>
+        implements TransformationTranslator<Void, 
GlobalCommitterTransform<CommT>> {
+
+    @Override
+    public Collection<Integer> translateForBatch(
+            GlobalCommitterTransform<CommT> transformation, Context context) {
+        return translateInternal(transformation, true);
+    }
+
+    @Override
+    public Collection<Integer> translateForStreaming(
+            GlobalCommitterTransform<CommT> transformation, Context context) {
+        return translateInternal(transformation, false);
+    }
+
+    private Collection<Integer> translateInternal(
+            GlobalCommitterTransform<CommT> globalCommitterTransform, boolean 
batch) {
+        DataStream<CommittableMessage<CommT>> inputStream =
+                globalCommitterTransform.getInputStream();
+        boolean checkpointingEnabled =
+                inputStream
+                        .getExecutionEnvironment()
+                        .getCheckpointConfig()
+                        .isCheckpointingEnabled();
+        boolean commitOnInput = batch || !checkpointingEnabled || 
hasUpstreamCommitter(inputStream);
+
+        // Create a global shuffle and add the global committer with 
parallelism 1.
+        final PhysicalTransformation<Void> transformation =
+                (PhysicalTransformation<Void>)
+                        inputStream
+                                .global()
+                                .transform(
+                                        GLOBAL_COMMITTER_TRANSFORMATION_NAME,
+                                        Types.VOID,
+                                        new GlobalCommitterOperator<>(
+                                                
globalCommitterTransform.getCommitterFactory(),
+                                                
globalCommitterTransform.getCommittableSerializer(),
+                                                commitOnInput))
+                                .getTransformation();
+        transformation.setChainingStrategy(ChainingStrategy.ALWAYS);
+        transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME);
+        transformation.setParallelism(1);
+        transformation.setMaxParallelism(1);
+        return Collections.emptyList();
+    }
+
+    /**
+     * Looks for a committer in the pipeline and aborts on writer. The 
GlobalCommitter behaves
+     * differently if there is a committer after the writer.
+     */
+    private static boolean hasUpstreamCommitter(DataStream<?> ds) {

Review Comment:
   How safe is this really? Users could have used the global committer with 
different custom operators in front of that follow either writer or committer 
semantics.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java:
##########
@@ -168,13 +169,33 @@ public Collection<CommittableWithLineage<CommT>> 
commit(Committer<CommT> committ
         return committed;
     }
 
-    Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean 
onlyIfFullyReceived) {
+    Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean 
assertFull) {
         return subtasksCommittableManagers.values().stream()
-                .filter(subtask -> !onlyIfFullyReceived || 
subtask.hasReceivedAll())
+                .peek(subtask -> assertReceivedAll(assertFull, subtask))
                 .flatMap(SubtaskCommittableManager::getPendingRequests)
                 .collect(Collectors.toList());
     }
 
+    /**
+     * Sinks don't use unaligned checkpoints, so we receive all committables 
of a given upstream

Review Comment:
   This javadoc is now a bit confusing since global committer and committer 
follow different committing and receival strategies.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java:
##########
@@ -127,38 +165,67 @@ public void initializeState(StateInitializationContext 
context) throws Exception
                             });
             lastCompletedCheckpointId = 
context.getRestoredCheckpointId().getAsLong();
             // try to re-commit recovered transactions as quickly as possible
-            commit(lastCompletedCheckpointId);
+            commit();
         }
     }
 
+    private SimpleVersionedSerializer<GlobalCommittableWrapper<CommT, 
GlobalCommT>>
+            getCommitterStateSerializer() {
+        final CommittableCollectorSerializer<CommT> 
committableCollectorSerializer =
+                new CommittableCollectorSerializer<>(
+                        committableSerializer,
+                        
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
+                        
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(),
+                        metricGroup);
+        return new GlobalCommitterSerializer<>(
+                committableCollectorSerializer, globalCommittableSerializer, 
metricGroup);
+    }
+
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
-        commit(lastCompletedCheckpointId);
-    }
-
-    private void commit(long checkpointId) throws IOException, 
InterruptedException {
-        for (CheckpointCommittableManager<CommT> checkpoint :
-                
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
-            checkpoint.commit(committer);
+        if (!commitOnInput) {
+            lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
+            commit();
         }
-        committableCollector.compact();
     }
 
-    @Override
-    public void endInput() throws Exception {
-        final CheckpointCommittableManager<CommT> endOfInputCommittable =
-                committableCollector.getEndOfInputCommittable();
-        if (endOfInputCommittable != null) {
-            do {
-                endOfInputCommittable.commit(committer);
-            } while (!committableCollector.isFinished());
-        }
+    private void commit() throws IOException, InterruptedException {
+        // this is true for the last commit and we need to make sure that all 
committables are
+        // indeed committed as this function will never be invoked again
+        boolean waitForAllCommitted =
+                lastCompletedCheckpointId == EOI

Review Comment:
   Can the following scenario happen? 
   
   On `EOI` and `commitOnInput=True`, we start running an infinite loop because 
we have not received all committables for EOI from the upstream task (committer)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to