AHeise commented on code in PR #26433: URL: https://github.com/apache/flink/pull/26433#discussion_r2036031020
########## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java: ########## @@ -151,24 +149,23 @@ public void snapshotState(StateSnapshotContext context) throws Exception { @Override public void endInput() throws Exception { - endInput = true; if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, checkpointId)); } - private void commitAndEmitCheckpoints() throws IOException, InterruptedException { - long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId; + private void commitAndEmitCheckpoints(long checkpointId) + throws IOException, InterruptedException { + lastCompletedCheckpointId = checkpointId; Review Comment: In general, transient state is lost on error. So whether we update before or after the loop doesn't matter because the exception will lead to a fail-over and everything is recalculated on recovery. Since everything is called from the main task thread (mailbox thread), there is no interleaving possible of this call and another call like `endInput`. Now in this specific case, `lastCompletedCheckpointId` refers to the completed checkpoint id of Flink as a whole. Since this value is primarily set through `notifyCheckpointCompleted`, the checkpoint is already completed before the start of the method. So I'd like to keep it as the first statement because it's easier to read than if it's done at the end of the method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org