[ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann resolved FLINK-9575. ---------------------------------- Resolution: Fixed Fixed via master: e984168e2eca59c08da90bd5feeac458eaa91bed f6b2e8c5ff0304e4835d2dc8c792a0d055679603 1.6.0: e2b4ffc016da822dda544b31fb3caf679f80a9d9 b9fe077d221bdb013ed57f2555405c9fe4a96aa1 1.5.2: 1bf77cfe17bc046772d02b22d6347388de359ff6 9c4b40dd0bbb22f8f312b0fc42f54a1a4619bf53 > Potential race condition when removing JobGraph in HA > ----------------------------------------------------- > > Key: FLINK-9575 > URL: https://issues.apache.org/jira/browse/FLINK-9575 > Project: Flink > Issue Type: Bug > Affects Versions: 1.5.0 > Reporter: Dominik Wosiński > Assignee: Dominik Wosiński > Priority: Critical > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > When we are removing the _JobGraph_ from _JobManager_ for example after > invoking _cancel()_, the following code is executed : > {noformat} > > val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val > result = if (removeJobFromStateBackend) { val futureOption = Some(future { > try { // ...otherwise, we can have lingering resources when there is a > concurrent shutdown // and the ZooKeeper client is closed. Not removing the > job immediately allow the // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => > log.warn(s"Could not remove submitted job graph $jobID.", t) } > }(context.dispatcher)) try { archive ! decorateMessage( > ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch > { case t: Throwable => log.warn(s"Could not archive the execution graph > $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result > case None => None } // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, > removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) > futureOption } > val futureOption = currentJobs.get(jobID) match { > case Some((eg, _)) => > val result = if (removeJobFromStateBackend) { > val futureOption = Some(future { > try { > // ...otherwise, we can have lingering resources when there is a concurrent > shutdown > // and the ZooKeeper client is closed. Not removing the job immediately allow > the > // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) > } catch { > case t: Throwable => log.warn(s"Could not remove submitted job graph > $jobID.", t) > } > }(context.dispatcher)) > try { > archive ! decorateMessage( > ArchiveExecutionGraph( > jobID, > ArchivedExecutionGraph.createFrom(eg))) > } catch { > case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", > t) > } > futureOption > } else { > None > } > currentJobs.remove(jobID) > result > case None => None > } > // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) > blobServer.cleanupJob(jobID, removeJobFromStateBackend) > jobManagerMetricGroup.removeJob(jobID) > futureOption > }{noformat} > This causes the asynchronous removal of the job and synchronous removal of > blob files connected with this jar. This means as far as I understand that > there is a potential problem that we can fail to remove job graph from > _submittedJobGraphs._ If the JobManager fails and we elect the new leader it > can try to recover such job, but it will fail with an exception since the > assigned blob was already removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)