AHeise commented on code in PR #26433:
URL: https://github.com/apache/flink/pull/26433#discussion_r2036031020


##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java:
##########
@@ -151,24 +149,23 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
 
     @Override
     public void endInput() throws Exception {
-        endInput = true;
         if (!isCheckpointingEnabled || isBatchMode) {
             // There will be no final checkpoint, all committables should be 
committed here
-            commitAndEmitCheckpoints();
+            commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
         }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
-        commitAndEmitCheckpoints();
+        commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, 
checkpointId));
     }
 
-    private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
-        long completedCheckpointId = endInput ? EOI : 
lastCompletedCheckpointId;
+    private void commitAndEmitCheckpoints(long checkpointId)
+            throws IOException, InterruptedException {
+        lastCompletedCheckpointId = checkpointId;

Review Comment:
   In general, transient state is lost on error. So whether we update before or 
after the loop doesn't matter because the exception will lead to a fail-over 
and everything is recalculated on recovery. Since everything is called from the 
main task thread (mailbox thread), there is no interleaving possible of this 
call and another call like `endInput`.
   
   Now in this specific case, `lastCompletedCheckpointId` refers to the 
completed checkpoint id of Flink as a whole. Since this value is primarily set 
through `notifyCheckpointCompleted`, the checkpoint is already completed before 
the start of the method. So I'd like to keep it as the first statement because 
it's easier to read than if it's done at the end of the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to