[ https://issues.apache.org/jira/browse/FLINK-13497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900732#comment-16900732 ]
Piotr Nowojski commented on FLINK-13497: ---------------------------------------- As far as we (me, [~carp84] and [~StephanEwen]) understand this, root problem here is a race condition on the JobManager between failing a checkpoint and completing another checkpoint, caused by {{FailJobCallback CheckpointFailureManager::failureCallback}} executing asynchronous operation: {code:java} cause -> getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause) {code} Possible race condition between: Decline checkpoint # JM receives {{JobMaster#declineCheckpoint}} # In {{LegacyScheduler#declineCheckpoint}} we asynchronously schedule {{checkpointCoordinator.receiveDeclineMessage}} on {{ioExecutor}} # This eventually reaches {{CheckpointFailureManager#handleCheckpointException}}, which decides again to asynchronously fail job ({{ExecutionGraph#failGlobal}} on {{getJobMasterMainThreadExecutor}}. # {{failGlobal}} cancels all pending checkpoints Acknowledge checkpoint # JM receives {{JobMaster#acknowledgeCheckpoint}} # In {{LegacyScheduler#acknowledgeCheckpoint}} we asynchronously schedule {{checkpointCoordinator.receiveAcknowledgeMessage}} on {{ioExecutor}} # This completes some pending checkpoint If "Acknowledge checkpoint" path executes and completes while {{failGlobal}} from "Decline checkpoint" step 4 is awaiting execution, this lead to a completed checkpoint AFTER we failed a job. There might be more that kind of race conditions introduced in by this {{failureCallback}}. I'm not sure if there is some easy fix for that, because whole design of {{CheckpointCoordinator}} seems strange: like why {{checkpointCoordinator}} methods are executed on the {{ioExecutor}}? I think the proper solution would need to clean up the threading model here. {{CheckpointCoordinator}} should mostly single threaded, executed only on the JobMasters's main thread executor (not on {{ioExecutor}}). {{ioExecutor}} should be only used for IO operations, like deleting/moving/touching files. Same applies to {{CheckpointFailureManager}}. That should remove/limit the concurrency issues. Some minor point for refactoring, is that we probably could cut the cyclic dependency between {{CheckpointFailureManager}} and {{CheckpointCoordinator}}, by removing {{CheckpointFailureManager::failureCallback}} and changing for example {code:java} void CheckpointFailureManager::handleCheckpointException(ex) {code} to {code:java} // return true if an exception should fail job boolean CheckpointFailureManager::shouldCheckpointExceptionFailJob(ex) {code} However as far as we know, this is not a blocker issue. However it might be surprising that a checkpoint has completed after a job has failed. For now, I'm removing the fix for this from 1.9 release. I hope that this issues is not a tip of an iceberg and that we are not missing some other bugs/problems here. > Checkpoints can complete after CheckpointFailureManager fails job > ----------------------------------------------------------------- > > Key: FLINK-13497 > URL: https://issues.apache.org/jira/browse/FLINK-13497 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.9.0, 1.10.0 > Reporter: Till Rohrmann > Priority: Critical > Fix For: 1.9.0 > > > I think that we introduced with FLINK-12364 an inconsistency wrt to job > termination a checkpointing. In FLINK-9900 it was discovered that checkpoints > can complete even after the {{CheckpointFailureManager}} decided to fail a > job. I think the expected behaviour should be that we fail all pending > checkpoints once the {{CheckpointFailureManager}} decides to fail the job. -- This message was sent by Atlassian JIRA (v7.6.14#76016)