[ 
https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815027#comment-16815027
 ] 

Zhu Zhu edited comment on FLINK-11813 at 4/11/19 2:47 AM:
----------------------------------------------------------

Hi Till, for your 2 questions above:

1. If one job with jobID xxx terminates, and later another job with the same 
jobID is submitted, I think Flink can regard it as a valid submission. 
Currently in our production use, there is a way that the client re-submit 
previously generated JobGraph to speed up the job launching, when the previous 
job is FAILED. In this case, job with the same ID are seen as different 
attempts.

   We did not handle the unexpected duplicated submission if the second 
submission comes after the first one is completed. Not sure in what case this 
may happen?

2. The process would be like this
 # submitting job -> setting status in RunningJobsRegistry to be pending in 
*Dispatcher* (null/NONE -> PENDING)
 # creating and launching JobManagerRunner which will try to acquire the HA 
leadership
 # once a JobManager is granted leadership, it changes the job status in 
RunningJobsRegistry to RUNNING  and starts the JobMaster(or creating a new 
JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING)
 # when this job terminates, the JobManager removes the job from the 
RunningJobsRegistry (RUNNING -> NONE)

           So if it is the first time to launch the JM, the job status is 
PENDING so the job will be started. If it is a second time leadership gaining, 
and the first is completed, the job status would be NONE. Besides, if JM 
failover happens during the PENDING/RUNNING status, the new leader will also 
restart the job.

 

I totally agree that "the main problem is that the entries of the 
{{RunningJobsRegistry}} are bound to the lifecycle of the 
{{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job 
submission in the Dispatcher is the beginning of lifecycle.

 

I agree with your proposal too, which can well handle the unexpected submission 
duplications.

One thing to confirm is that, as in stage 5 the job status is changed to be 
RUNNING already in job submission, in stage 3 should we restart the job only if 
it is RUNNING?

 

 


was (Author: zhuzh):
Hi Till, for your 2 questions above:

1. If one job with jobID xxx terminates, and later another job with the same 
jobID is submitted, I think Flink can regard it as a valid submission. 
Currently in our production use, there is a way that the client re-submit 
previously generated JobGraph to speed up the job launching, when the previous 
job is FAILED. In this case, job with the same ID are seen as different 
attempts.

   We did not handle the unexpected duplicated submission if the second 
submission comes after the first one is completed. Not sure in what case this 
may happen?

2. The process would be like this
 # submitting job -> setting status in RunningJobsRegistry to be pending in 
*Dispatcher* (null/NONE -> PENDING)
 # creating and launching JobManagerRunner which will try to acquire the HA 
leadership
 # once a JobManager is granted leadership, it changes the job status in 
RunningJobsRegistry to RUNNING  and starts the JobMaster(or creating a new 
JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING)
 # when this job terminates, the JobManager removes the job from the 
RunningJobsRegistry (RUNNING -> NONE)

           So if it is the first time to launch the JM, the job status is 
PENDING so the job will be started. If it is a second time leadership gaining, 
and the first is completed, the job status would be NONE. Besides, if JM 
failover happens during the PENDING/RUNNING status, the new leader will also 
restart the job.

 

I totally agree that "the main problem is that the entries of the 
{{RunningJobsRegistry}} are bound to the lifecycle of the 
{{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job 
submission in the Dispatcher is the beginning of lifecycle.

 

I think with your proposal if fine, which can handle the unexpected submission 
duplications.

One thing to confirm is that, as in stage 5 the job status is changed to be 
RUNNING already in job submission, in stage 3 should we restart the job only if 
it is RUNNING?

 

 

> Standby per job mode Dispatchers don't know job's JobSchedulingStatus
> ---------------------------------------------------------------------
>
>                 Key: FLINK-11813
>                 URL: https://issues.apache.org/jira/browse/FLINK-11813
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.6.4, 1.7.2, 1.8.0
>            Reporter: Till Rohrmann
>            Priority: Major
>
> At the moment, it can happen that standby {{Dispatchers}} in per job mode 
> will restart a terminated job after they gained leadership. The problem is 
> that we currently clear the {{RunningJobsRegistry}} once a job has reached a 
> globally terminal state. After the leading {{Dispatcher}} terminates, a 
> standby {{Dispatcher}} will gain leadership. Without having the information 
> from the {{RunningJobsRegistry}} it cannot tell whether the job has been 
> executed or whether the {{Dispatcher}} needs to re-execute the job. At the 
> moment, the {{Dispatcher}} will assume that there was a fault and hence 
> re-execute the job. This can lead to duplicate results.
> I think we need some way to tell standby {{Dispatchers}} that a certain job 
> has been successfully executed. One trivial solution could be to not clean up 
> the {{RunningJobsRegistry}} but then we will clutter ZooKeeper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to