Yun Tang created FLINK-23471: -------------------------------- Summary: Try best to ensure all operators and state manager handle the checkpoint notification Key: FLINK-23471 URL: https://issues.apache.org/jira/browse/FLINK-23471 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / Task Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.14.0
Current {{SubtaskCheckpointCoordinatorImpl#notifyCheckpointComplete}} has implementation below: {code:java} @Override public void notifyCheckpointComplete( long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception { if (!isRunning.get()) { LOG.debug( "Ignoring notification of complete checkpoint {} for not-running task {}", checkpointId, taskName); } else if (operatorChain.isFinishedOnRestore()) { LOG.debug( "Ignoring notification of complete checkpoint {} for finished on restore task {}", checkpointId, taskName); } else { LOG.debug( "Notification of completed checkpoint {} for task {}", checkpointId, taskName); for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) { operatorWrapper.notifyCheckpointComplete(checkpointId); } } env.getTaskStateManager().notifyCheckpointComplete(checkpointId); } {code} If one operator in the operator chain throws exception out, the following operators and {{TaskStateManager}} would not receive the notification anymore. -- This message was sent by Atlassian Jira (v8.3.4#803005)