[ https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815276#comment-16815276 ]
Till Rohrmann commented on FLINK-11813: --------------------------------------- Before going into the details of your questions and the problems I see with the current implementation, let me quickly try to describe the bigger picture: In HA mode, the cluster should try to execute a job until it reaches a terminal state. In case of component failover the cluster needs to check whether a job needs to be restarted or not. That's where the {{RunningJobsRegistry}} comes into play and tells the cluster what the state of a given job is. In case of a session cluster, the {{SubmittedJobGraphStore}} might already be enough by defining that we remove {{JobGraphs}} if they have reached a terminal state. However, in the job mode, the job won't get recovered from the {{SubmittedJobGraphStore}} but is already part of the cluster. Therefore, we need the {{RunningJobsRegistry}}. Now coming to a bit more controversial point: I think for the sake of simplicity and better defined semantics that each job should have a {{JobID}} which uniquely identifies it in the context of a cluster. Moreover, the cluster should accept a job submission (with a specific {{JobID}}) exactly once. If one wants to submit the same topology again, then this needs to happen as a different job (meaning having a different {{JobID}} assigned). The reason is that we better isolate jobs if they are required to have different {{JobIDs}}. For example, it cannot happen that we submit a different topology under the same {{JobID}} of another job leading to checkpoint recovery failures. What we do at the moment is that a cluster accepts jobs with the same {{JobID}} as long as they don't overlap. Hence it is possible to submit the same job again after the first execution has terminated. In that sense the current behaviour of the job mode with stand-by JMs is actually correct: After a JM finishes the execution of a job and terminates, another JM gains leadership. This JM re-executes the job, because it "considers" it to be a re-submission and it is correct since the two jobs don't overlap. So in order to solve the problem, I would like to change the semantics that a cluster (session as well as job mode) only executes a job (identified by a {{JobID}}) exactly once during its lifetime. How is the lifetime of a cluster defined? Well, one option could be to say that each cluster, identified by a unique cluster id, has an eternal lifetime independent whether it's running or not. In that case, we could never clean up the persisted data of the {{RunningJobsRegistry}} of this cluster. This would effectively mean that we can never clean up the respective ZooKeeper znode. Moreover, in order to execute the same job again one would need to spin up a new cluster. Another option (and a more practical one) would be to say that the lifetime of a cluster is as long as the cluster is running. When the cluster shuts down, it will clean up all persisted HA data to leave a clean slate. However, this means that until we actually shut down, we must not clean up the {{RunningJobsRegistry}} because we want to execute each job at most once during the lifetime of the cluster. With this semantic, it would be possible to start a new cluster with the same cluster id and execute a job again which has previously been executed on a cluster with the same cluster id (keep this part in mind for the later discussion of corner cases). Assuming the latter semantics, how would this actually work? In the case on no stand-by components, this becomes trivial: Both the session cluster when it receives the graceful shut down call as well as the job mode when it finishes the job execution will simply clean up the {{RunningJobsRegistry}}. The clean up should be done by the central cluster component (currently the {{Dispatcher}}). The more interesting part is the case with stand-by components: In the absence of a central communication mean to tell every stand-by component to terminate, only the last instance of the central cluster component should actually initiate the clean up. This requires that there is some mean to detect whether one is the last instance or not. One idea could be to look at the list of leadership contenders in order to figure this out. This is not a bullet proof strategy though: If a stand-by component loses its connection to ZooKeeper it will disappear from the list of leadership contenders. If the cluster is shut down during this time, another component will think that it is the last instance and initiate the clean up. When the other stand-by component reconnects to ZooKeeper, it will see that the {{RunningJobsRegistry}} is empty and starts executing the job. Semantically this is correct because for an outside observer it would not be possible to distinguish this case from starting a new cluster with the same cluster id and submitting the same job after the previous cluster has terminated. If we want to prevent this case from happening, we must not clean up the {{RunningJobsRegistry}} (this comes at a price of cluttering ZooKeeper). All right, this was some lengthy explanation. Let me try to get to your questions now: Tison's: 1. With FLINK-11843, we might start for each leader session another {{Dispatcher}}. In this case, the {{Dispatcher}} won't execute the clean up but another component which runs the different {{Dispatcher}} instances. 2. Concerning the race condition: This is fine since the newly started contender will be considered a new cluster instance. Zhu Zhu's: 1. We could say that a job needs to be in {{RUNNING}} to be restarted. In this case, we need to make sure that the {{RunningJobsRegistry}} of a job mode cluster sets the contained job to {{RUNNING}}. 2. In case of a hard exit of the {{Dispatcher}}, we won't clean up the {{RunningJobsRegistry}}. But this also applies to other clean up operations. 3. I still think that the {{JobManagerLeaderElectionService}} is still needed because you can have potentially multiple JMs running and you need to figure out which one is currently executing the job (e.g. the TaskExecutor needs to know in order to offer its slots to the leader). > Standby per job mode Dispatchers don't know job's JobSchedulingStatus > --------------------------------------------------------------------- > > Key: FLINK-11813 > URL: https://issues.apache.org/jira/browse/FLINK-11813 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Reporter: Till Rohrmann > Priority: Major > > At the moment, it can happen that standby {{Dispatchers}} in per job mode > will restart a terminated job after they gained leadership. The problem is > that we currently clear the {{RunningJobsRegistry}} once a job has reached a > globally terminal state. After the leading {{Dispatcher}} terminates, a > standby {{Dispatcher}} will gain leadership. Without having the information > from the {{RunningJobsRegistry}} it cannot tell whether the job has been > executed or whether the {{Dispatcher}} needs to re-execute the job. At the > moment, the {{Dispatcher}} will assume that there was a fault and hence > re-execute the job. This can lead to duplicate results. > I think we need some way to tell standby {{Dispatchers}} that a certain job > has been successfully executed. One trivial solution could be to not clean up > the {{RunningJobsRegistry}} but then we will clutter ZooKeeper. -- This message was sent by Atlassian JIRA (v7.6.3#76005)