pnowojski commented on a change in pull request #16582: URL: https://github.com/apache/flink/pull/16582#discussion_r754394153
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ########## @@ -1405,59 +1406,80 @@ private void declineCheckpoint( } public void notifyCheckpointComplete(final long checkpointID) { - final TaskInvokable invokable = this.invokable; - - if (executionState == ExecutionState.RUNNING) { - checkState(invokable instanceof CheckpointableTask, "invokable is not checkpointable"); - try { - ((CheckpointableTask) invokable).notifyCheckpointCompleteAsync(checkpointID); - } catch (RejectedExecutionException ex) { - // This may happen if the mailbox is closed. It means that the task is shutting - // down, so we just ignore it. - LOG.debug( - "Notify checkpoint complete {} for {} ({}) was rejected by the mailbox", - checkpointID, - taskNameWithSubtask, - executionId); - } catch (Throwable t) { - if (getExecutionState() == ExecutionState.RUNNING) { - // fail task if checkpoint confirmation failed. - failExternally(new RuntimeException("Error while confirming checkpoint", t)); - } - } - } else { - LOG.debug( - "Ignoring checkpoint commit notification for non-running task {}.", - taskNameWithSubtask); - } + notifyCheckpoint( + checkpointID, + CheckpointStoreUtil.INVALID_CHECKPOINT_ID, + NotifyCheckpointOperation.COMPLETE); } public void notifyCheckpointAborted( final long checkpointID, final long latestCompletedCheckpointId) { - final TaskInvokable invokable = this.invokable; + notifyCheckpoint( + checkpointID, latestCompletedCheckpointId, NotifyCheckpointOperation.ABORT); + } - if (executionState == ExecutionState.RUNNING) { + public void notifyCheckpointSubsumed(long checkpointID) { + notifyCheckpoint( + checkpointID, + CheckpointStoreUtil.INVALID_CHECKPOINT_ID, + NotifyCheckpointOperation.SUBSUME); + } + + private void notifyCheckpoint( + long checkpointId, + long latestCompletedCheckpointId, + NotifyCheckpointOperation notifyCheckpointOperation) { + TaskInvokable invokable = this.invokable; + + if (executionState == ExecutionState.RUNNING && invokable != null) { checkState(invokable instanceof CheckpointableTask, "invokable is not checkpointable"); try { - ((CheckpointableTask) invokable) - .notifyCheckpointAbortAsync(checkpointID, latestCompletedCheckpointId); + switch (notifyCheckpointOperation) { + case ABORT: + ((CheckpointableTask) invokable) + .notifyCheckpointAbortAsync( + checkpointId, latestCompletedCheckpointId); + break; + case COMPLETE: + ((CheckpointableTask) invokable) + .notifyCheckpointCompleteAsync(checkpointId); + break; + case SUBSUME: + ((CheckpointableTask) invokable) + .notifyCheckpointSubsumedAsync(checkpointId); + } } catch (RejectedExecutionException ex) { // This may happen if the mailbox is closed. It means that the task is shutting // down, so we just ignore it. LOG.debug( - "Notify checkpoint abort {} for {} ({}) was rejected by the mailbox", - checkpointID, + "Notify checkpoint {}} {} for {} ({}) was rejected by the mailbox.", + notifyCheckpointOperation, + checkpointId, taskNameWithSubtask, executionId); } catch (Throwable t) { - if (getExecutionState() == ExecutionState.RUNNING) { - // fail task if checkpoint aborted notification failed. - failExternally(new RuntimeException("Error while aborting checkpoint", t)); + switch (notifyCheckpointOperation) { + case ABORT: + case COMPLETE: + if (getExecutionState() == ExecutionState.RUNNING) { + failExternally( + new RuntimeException( + String.format( + "Error while notify checkpoint %s.", + notifyCheckpointOperation), + t)); + } + break; + case SUBSUME: + // just rethrow the throwable out as we do not expect notification of + // subsume could fail the task. + ExceptionUtils.rethrow(t); Review comment: I'm a little bit thorn apart here. Wouldn't the same logic apply to `notifyCheckpointAbort()`? On the other hand, even in the case of an exception during `notifyCheckpointComplete()`. As long as checkpoint is failed, there won't be any data loss. Data will be committed in the next checkpoint OR if there are enough checkpoint failures, the whole job will failover because of the exceeded number of tolerable checkpoint failures. I think it could be argued both directions for either of those methods (abort/complete/subsume), so maybe for the sake of simplicity and consistency it would be best to keep them behaving in the same way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org