rkhachatryan commented on a change in pull request #14662: URL: https://github.com/apache/flink/pull/14662#discussion_r560402071
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java ########## @@ -237,19 +241,29 @@ private void handleExecutionException(Exception e) { + '.', e); - // We only report the exception for the original cause of fail and cleanup. - // Otherwise this followup exception could race the original exception in failing - // the task. - try { - taskEnvironment.declineCheckpoint( + if (isTaskRunning.get()) { + // We only report the exception for the original cause of fail and cleanup. + // Otherwise this followup exception could race the original exception in + // failing the task. + try { + taskEnvironment.declineCheckpoint( + checkpointMetaData.getCheckpointId(), + new CheckpointException( + CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION, + checkpointException)); + } catch (Exception unhandled) { + AsynchronousException asyncException = new AsynchronousException(unhandled); + asyncExceptionHandler.handleAsyncException( + "Failure in asynchronous checkpoint materialization", + asyncException); + } + } else { + // We never decline checkpoint after task is not running to avoid unexpected job + // failover, which caused by exceeding checkpoint tolerable failure threshold. + LOG.warn( Review comment: Could you explain the connection of this change to the original PR motivation? What will happen to such checkpoint? WIll it timeout? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org