[ 
https://issues.apache.org/jira/browse/FLINK-13497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900732#comment-16900732
 ] 

Piotr Nowojski commented on FLINK-13497:
----------------------------------------

As far as we (me, [~carp84] and [~StephanEwen]) understand this, root problem 
here is a race condition on the JobManager between failing a checkpoint and 
completing another checkpoint, caused by {{FailJobCallback 
CheckpointFailureManager::failureCallback}} executing asynchronous operation:
{code:java}
cause -> getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause)
{code}
Possible race condition between:

Decline checkpoint
# JM receives {{JobMaster#declineCheckpoint}}
# In {{LegacyScheduler#declineCheckpoint}} we asynchronously schedule 
{{checkpointCoordinator.receiveDeclineMessage}} on {{ioExecutor}}
# This eventually reaches 
{{CheckpointFailureManager#handleCheckpointException}}, which decides again to 
asynchronously fail job ({{ExecutionGraph#failGlobal}} on 
{{getJobMasterMainThreadExecutor}}.
# {{failGlobal}} cancels all pending checkpoints

Acknowledge checkpoint
# JM receives {{JobMaster#acknowledgeCheckpoint}}
# In {{LegacyScheduler#acknowledgeCheckpoint}} we asynchronously schedule 
{{checkpointCoordinator.receiveAcknowledgeMessage}} on {{ioExecutor}}
# This completes some pending checkpoint

If  "Acknowledge checkpoint"  path executes and completes while {{failGlobal}} 
from  "Decline checkpoint" step 4 is awaiting execution, this lead to a 
completed checkpoint AFTER we failed a job.

There might be more that kind of race conditions introduced in by this 
{{failureCallback}}.

I'm not sure if there is some easy fix for that, because whole design of 
{{CheckpointCoordinator}} seems strange: like why {{checkpointCoordinator}} 
methods are executed on the {{ioExecutor}}? I think the proper solution would 
need to clean up the threading model here.

{{CheckpointCoordinator}} should mostly single threaded, executed only on the 
JobMasters's main thread executor (not on {{ioExecutor}}). {{ioExecutor}} 
should be only used for IO operations, like deleting/moving/touching files. 
Same applies to {{CheckpointFailureManager}}. That should remove/limit the 
concurrency issues.

Some minor point for refactoring, is that we probably could cut the cyclic 
dependency between {{CheckpointFailureManager}} and {{CheckpointCoordinator}}, 
by removing {{CheckpointFailureManager::failureCallback}} and changing for 
example
{code:java}
void CheckpointFailureManager::handleCheckpointException(ex)
{code} 
to
{code:java}
// return true if an exception should fail job
boolean CheckpointFailureManager::shouldCheckpointExceptionFailJob(ex)
{code}

However as far as we know, this is not a blocker issue. However it might be 
surprising that a checkpoint has completed after a job has failed. For now, I'm 
removing the fix for this from 1.9 release. I hope that this issues is not a 
tip of an iceberg and that we are not missing some other bugs/problems here.

> Checkpoints can complete after CheckpointFailureManager fails job
> -----------------------------------------------------------------
>
>                 Key: FLINK-13497
>                 URL: https://issues.apache.org/jira/browse/FLINK-13497
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Till Rohrmann
>            Priority: Critical
>             Fix For: 1.9.0
>
>
> I think that we introduced with FLINK-12364 an inconsistency wrt to job 
> termination a checkpointing. In FLINK-9900 it was discovered that checkpoints 
> can complete even after the {{CheckpointFailureManager}} decided to fail a 
> job. I think the expected behaviour should be that we fail all pending 
> checkpoints once the {{CheckpointFailureManager}} decides to fail the job.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to