AHeise commented on code in PR #25456:
URL: https://github.com/apache/flink/pull/25456#discussion_r1806181486


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java:
##########
@@ -127,38 +165,67 @@ public void initializeState(StateInitializationContext 
context) throws Exception
                             });
             lastCompletedCheckpointId = 
context.getRestoredCheckpointId().getAsLong();
             // try to re-commit recovered transactions as quickly as possible
-            commit(lastCompletedCheckpointId);
+            commit();
         }
     }
 
+    private SimpleVersionedSerializer<GlobalCommittableWrapper<CommT, 
GlobalCommT>>
+            getCommitterStateSerializer() {
+        final CommittableCollectorSerializer<CommT> 
committableCollectorSerializer =
+                new CommittableCollectorSerializer<>(
+                        committableSerializer,
+                        
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
+                        
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(),
+                        metricGroup);
+        return new GlobalCommitterSerializer<>(
+                committableCollectorSerializer, globalCommittableSerializer, 
metricGroup);
+    }
+
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
-        commit(lastCompletedCheckpointId);
-    }
-
-    private void commit(long checkpointId) throws IOException, 
InterruptedException {
-        for (CheckpointCommittableManager<CommT> checkpoint :
-                
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
-            checkpoint.commit(committer);
+        if (!commitOnInput) {
+            lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
+            commit();
         }
-        committableCollector.compact();
     }
 
-    @Override
-    public void endInput() throws Exception {
-        final CheckpointCommittableManager<CommT> endOfInputCommittable =
-                committableCollector.getEndOfInputCommittable();
-        if (endOfInputCommittable != null) {
-            do {
-                endOfInputCommittable.commit(committer);
-            } while (!committableCollector.isFinished());
-        }
+    private void commit() throws IOException, InterruptedException {
+        // this is true for the last commit and we need to make sure that all 
committables are
+        // indeed committed as this function will never be invoked again
+        boolean waitForAllCommitted =
+                lastCompletedCheckpointId == EOI

Review Comment:
   Yes that is a likely scenario. Hence the second part of this statement that 
will only be true iff there is an EOI committable and it has received all 
messages.
   
   Then, the only way that could turn into infinite loop is when committables 
appear out of order (in respect to the checkpointId) which is possible with 
async retries. However, this applies also to the old version that loops on 
`endInput` (remember that we need to emit committables post EOI for the final 
checkpoint)
   
   It will be solved by the next PR that removes async retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to