Yun Tang created FLINK-23471:
--------------------------------

             Summary: Try best to ensure all operators and state manager handle 
the checkpoint notification
                 Key: FLINK-23471
                 URL: https://issues.apache.org/jira/browse/FLINK-23471
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Checkpointing, Runtime / Task
            Reporter: Yun Tang
            Assignee: Yun Tang
             Fix For: 1.14.0


Current {{SubtaskCheckpointCoordinatorImpl#notifyCheckpointComplete}} has 
implementation below:

{code:java}
    @Override
    public void notifyCheckpointComplete(
            long checkpointId, OperatorChain<?, ?> operatorChain, 
Supplier<Boolean> isRunning)
            throws Exception {
        if (!isRunning.get()) {
            LOG.debug(
                    "Ignoring notification of complete checkpoint {} for 
not-running task {}",
                    checkpointId,
                    taskName);
        } else if (operatorChain.isFinishedOnRestore()) {
            LOG.debug(
                    "Ignoring notification of complete checkpoint {} for 
finished on restore task {}",
                    checkpointId,
                    taskName);
        } else {
            LOG.debug(
                    "Notification of completed checkpoint {} for task {}", 
checkpointId, taskName);

            for (StreamOperatorWrapper<?, ?> operatorWrapper :
                    operatorChain.getAllOperators(true)) {
                operatorWrapper.notifyCheckpointComplete(checkpointId);
            }
        }
        env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
    }
{code}

If one operator in the operator chain throws exception out, the following 
operators and {{TaskStateManager}} would not receive the notification anymore.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to