[ 
https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16814246#comment-16814246
 ] 

Till Rohrmann commented on FLINK-11813:
---------------------------------------

I'm not 100% sure that this approach works. I think treating the {{DONE}} and 
{{NONE}} case identical leads to problems: 

1. Submitting the two jobs with the same {{JobID}} would not be detected if the 
first completed before the second gets submitted. 
2. How will a stand-by JM in job mode decide whether it should start executing 
the job or not if it sees {{NONE}}? It could either be the first JM gaining 
leadership or the second after another has completed the job and set the state 
of the job to {{NONE}} again. In the first case, it should start executing the 
job. In the second case, it should terminate.

I think the main problem is that the entries of the {{RunningJobsRegistry}} are 
bound to the lifecycle of the {{JobManagerRunner}}/{{Job}} instead of the 
{{Dispatcher}} and how we clean up the {{DONE}} state. I would propose to do 
the following:

1. Having the states: {{NONE}}/{{null}}, {{RUNNING}} and {{DONE}}
2. The {{Dispatcher}} is responsible for updating the {{RunningJobsRegistry}} 
entries.
3. Whenever a {{Dispatcher}} gains leadership, it will restart jobs which are 
in state {{NONE}} or {{RUNNING}}
4. A {{Dispatcher}} only accepts job submissions for jobs which are in state 
{{NONE}}
5. When a job submission is accepted/job persisted to 
{{SubmittedJobGraphStore}} the state is set into {{RUNNING}}
6. If a job reaches a terminal state, the {{Dispatcher}} sets the state to 
{{DONE}}
7. When the {{Dispatcher}} terminates, it will try to clean up the 
{{RunningJobsRegistry}} but only if there are no other leader contenders for 
the cluster id of the cluster

What do you think?

> Standby per job mode Dispatchers don't know job's JobSchedulingStatus
> ---------------------------------------------------------------------
>
>                 Key: FLINK-11813
>                 URL: https://issues.apache.org/jira/browse/FLINK-11813
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.6.4, 1.7.2, 1.8.0
>            Reporter: Till Rohrmann
>            Priority: Major
>
> At the moment, it can happen that standby {{Dispatchers}} in per job mode 
> will restart a terminated job after they gained leadership. The problem is 
> that we currently clear the {{RunningJobsRegistry}} once a job has reached a 
> globally terminal state. After the leading {{Dispatcher}} terminates, a 
> standby {{Dispatcher}} will gain leadership. Without having the information 
> from the {{RunningJobsRegistry}} it cannot tell whether the job has been 
> executed or whether the {{Dispatcher}} needs to re-execute the job. At the 
> moment, the {{Dispatcher}} will assume that there was a fault and hence 
> re-execute the job. This can lead to duplicate results.
> I think we need some way to tell standby {{Dispatchers}} that a certain job 
> has been successfully executed. One trivial solution could be to not clean up 
> the {{RunningJobsRegistry}} but then we will clutter ZooKeeper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to