[ https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16814246#comment-16814246 ]
Till Rohrmann commented on FLINK-11813: --------------------------------------- I'm not 100% sure that this approach works. I think treating the {{DONE}} and {{NONE}} case identical leads to problems: 1. Submitting the two jobs with the same {{JobID}} would not be detected if the first completed before the second gets submitted. 2. How will a stand-by JM in job mode decide whether it should start executing the job or not if it sees {{NONE}}? It could either be the first JM gaining leadership or the second after another has completed the job and set the state of the job to {{NONE}} again. In the first case, it should start executing the job. In the second case, it should terminate. I think the main problem is that the entries of the {{RunningJobsRegistry}} are bound to the lifecycle of the {{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher}} and how we clean up the {{DONE}} state. I would propose to do the following: 1. Having the states: {{NONE}}/{{null}}, {{RUNNING}} and {{DONE}} 2. The {{Dispatcher}} is responsible for updating the {{RunningJobsRegistry}} entries. 3. Whenever a {{Dispatcher}} gains leadership, it will restart jobs which are in state {{NONE}} or {{RUNNING}} 4. A {{Dispatcher}} only accepts job submissions for jobs which are in state {{NONE}} 5. When a job submission is accepted/job persisted to {{SubmittedJobGraphStore}} the state is set into {{RUNNING}} 6. If a job reaches a terminal state, the {{Dispatcher}} sets the state to {{DONE}} 7. When the {{Dispatcher}} terminates, it will try to clean up the {{RunningJobsRegistry}} but only if there are no other leader contenders for the cluster id of the cluster What do you think? > Standby per job mode Dispatchers don't know job's JobSchedulingStatus > --------------------------------------------------------------------- > > Key: FLINK-11813 > URL: https://issues.apache.org/jira/browse/FLINK-11813 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Reporter: Till Rohrmann > Priority: Major > > At the moment, it can happen that standby {{Dispatchers}} in per job mode > will restart a terminated job after they gained leadership. The problem is > that we currently clear the {{RunningJobsRegistry}} once a job has reached a > globally terminal state. After the leading {{Dispatcher}} terminates, a > standby {{Dispatcher}} will gain leadership. Without having the information > from the {{RunningJobsRegistry}} it cannot tell whether the job has been > executed or whether the {{Dispatcher}} needs to re-execute the job. At the > moment, the {{Dispatcher}} will assume that there was a fault and hence > re-execute the job. This can lead to duplicate results. > I think we need some way to tell standby {{Dispatchers}} that a certain job > has been successfully executed. One trivial solution could be to not clean up > the {{RunningJobsRegistry}} but then we will clutter ZooKeeper. -- This message was sent by Atlassian JIRA (v7.6.3#76005)