[ 
https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9575.
----------------------------------
    Resolution: Fixed

Fixed via
master:
e984168e2eca59c08da90bd5feeac458eaa91bed
f6b2e8c5ff0304e4835d2dc8c792a0d055679603

1.6.0:
e2b4ffc016da822dda544b31fb3caf679f80a9d9
b9fe077d221bdb013ed57f2555405c9fe4a96aa1

1.5.2:
1bf77cfe17bc046772d02b22d6347388de359ff6
9c4b40dd0bbb22f8f312b0fc42f54a1a4619bf53

> Potential race condition when removing JobGraph in HA
> -----------------------------------------------------
>
>                 Key: FLINK-9575
>                 URL: https://issues.apache.org/jira/browse/FLINK-9575
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.5.0
>            Reporter: Dominik Wosiński
>            Assignee: Dominik Wosiński
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> When we are removing the _JobGraph_ from _JobManager_ for example after 
> invoking _cancel()_, the following code is executed : 
> {noformat}
>  
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
> result = if (removeJobFromStateBackend) { val futureOption = Some(future { 
> try { // ...otherwise, we can have lingering resources when there is a 
> concurrent shutdown // and the ZooKeeper client is closed. Not removing the 
> job immediately allow the // shutdown to release all resources. 
> submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
> log.warn(s"Could not remove submitted job graph $jobID.", t) } 
> }(context.dispatcher)) try { archive ! decorateMessage( 
> ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch 
> { case t: Throwable => log.warn(s"Could not archive the execution graph 
> $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result 
> case None => None } // remove all job-related BLOBs from local and HA store 
> libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
> removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) 
> futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent 
> shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow 
> the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph 
> $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", 
> t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of 
> blob files connected with this jar. This means as far as I understand that 
> there is a potential problem that we can fail to remove job graph from 
> _submittedJobGraphs._ If the JobManager fails and we elect the new leader it 
> can try to recover such job, but it will fail with an exception since the 
> assigned blob was already removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to