pnowojski commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r755767863



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1405,59 +1406,80 @@ private void declineCheckpoint(
     }
 
     public void notifyCheckpointComplete(final long checkpointID) {
-        final TaskInvokable invokable = this.invokable;
-
-        if (executionState == ExecutionState.RUNNING) {
-            checkState(invokable instanceof CheckpointableTask, "invokable is 
not checkpointable");
-            try {
-                ((CheckpointableTask) 
invokable).notifyCheckpointCompleteAsync(checkpointID);
-            } catch (RejectedExecutionException ex) {
-                // This may happen if the mailbox is closed. It means that the 
task is shutting
-                // down, so we just ignore it.
-                LOG.debug(
-                        "Notify checkpoint complete {} for {} ({}) was 
rejected by the mailbox",
-                        checkpointID,
-                        taskNameWithSubtask,
-                        executionId);
-            } catch (Throwable t) {
-                if (getExecutionState() == ExecutionState.RUNNING) {
-                    // fail task if checkpoint confirmation failed.
-                    failExternally(new RuntimeException("Error while 
confirming checkpoint", t));
-                }
-            }
-        } else {
-            LOG.debug(
-                    "Ignoring checkpoint commit notification for non-running 
task {}.",
-                    taskNameWithSubtask);
-        }
+        notifyCheckpoint(
+                checkpointID,
+                CheckpointStoreUtil.INVALID_CHECKPOINT_ID,
+                NotifyCheckpointOperation.COMPLETE);
     }
 
     public void notifyCheckpointAborted(
             final long checkpointID, final long latestCompletedCheckpointId) {
-        final TaskInvokable invokable = this.invokable;
+        notifyCheckpoint(
+                checkpointID, latestCompletedCheckpointId, 
NotifyCheckpointOperation.ABORT);
+    }
 
-        if (executionState == ExecutionState.RUNNING) {
+    public void notifyCheckpointSubsumed(long checkpointID) {
+        notifyCheckpoint(
+                checkpointID,
+                CheckpointStoreUtil.INVALID_CHECKPOINT_ID,
+                NotifyCheckpointOperation.SUBSUME);
+    }
+
+    private void notifyCheckpoint(
+            long checkpointId,
+            long latestCompletedCheckpointId,
+            NotifyCheckpointOperation notifyCheckpointOperation) {
+        TaskInvokable invokable = this.invokable;
+
+        if (executionState == ExecutionState.RUNNING && invokable != null) {
             checkState(invokable instanceof CheckpointableTask, "invokable is 
not checkpointable");
             try {
-                ((CheckpointableTask) invokable)
-                        .notifyCheckpointAbortAsync(checkpointID, 
latestCompletedCheckpointId);
+                switch (notifyCheckpointOperation) {
+                    case ABORT:
+                        ((CheckpointableTask) invokable)
+                                .notifyCheckpointAbortAsync(
+                                        checkpointId, 
latestCompletedCheckpointId);
+                        break;
+                    case COMPLETE:
+                        ((CheckpointableTask) invokable)
+                                .notifyCheckpointCompleteAsync(checkpointId);
+                        break;
+                    case SUBSUME:
+                        ((CheckpointableTask) invokable)
+                                .notifyCheckpointSubsumedAsync(checkpointId);
+                }
             } catch (RejectedExecutionException ex) {
                 // This may happen if the mailbox is closed. It means that the 
task is shutting
                 // down, so we just ignore it.
                 LOG.debug(
-                        "Notify checkpoint abort {} for {} ({}) was rejected 
by the mailbox",
-                        checkpointID,
+                        "Notify checkpoint {}} {} for {} ({}) was rejected by 
the mailbox.",
+                        notifyCheckpointOperation,
+                        checkpointId,
                         taskNameWithSubtask,
                         executionId);
             } catch (Throwable t) {
-                if (getExecutionState() == ExecutionState.RUNNING) {
-                    // fail task if checkpoint aborted notification failed.
-                    failExternally(new RuntimeException("Error while aborting 
checkpoint", t));
+                switch (notifyCheckpointOperation) {
+                    case ABORT:
+                    case COMPLETE:
+                        if (getExecutionState() == ExecutionState.RUNNING) {
+                            failExternally(
+                                    new RuntimeException(
+                                            String.format(
+                                                    "Error while notify 
checkpoint %s.",
+                                                    notifyCheckpointOperation),
+                                            t));
+                        }
+                        break;
+                    case SUBSUME:
+                        // just rethrow the throwable out as we do not expect 
notification of
+                        // subsume could fail the task.
+                        ExceptionUtils.rethrow(t);

Review comment:
       Now I remembered some past discussion about the behaviour on failing 
tasks or not during checkpointing. Our previous conclusion was indeed no to 
fail the job if not needed. We have to fail the job, if the exception happened 
in for example the sync phase of snapshotting, as otherwise 
operators/statebackends/user functions might end up in an inconsistent state. 
However failures from for example async phase can be handled by the 
`CheckpointCoordinator` and `CheckpointFailureManager`.
   
   `notifyCheckpointComplete` and `notifyCheckpointAborted` has to fail the 
job, at least in some/most cases, because if the failure has originated in the 
user code, that user code might be in an invalid state. For example:
   ```
   void notifyCheckpointComplete() {
     a++;
     maybeThrowException();
     b++;
   }
   ```
   after throwing `a` and `b` will be inconsistent. 
   
   As we don't expose `notifyCheckpointSubsumed()` to the users, this argument 
doesn't hold here. So the only question is our internal data structures are 
expected to keep working just fine if an exception indeed happens in this call? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to