[ 
https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815276#comment-16815276
 ] 

Till Rohrmann commented on FLINK-11813:
---------------------------------------

Before going into the details of your questions and the problems I see with the 
current implementation, let me quickly try to describe the bigger picture:

In HA mode, the cluster should try to execute a job until it reaches a terminal 
state. In case of component failover the cluster needs to check whether a job 
needs to be restarted or not. That's where the {{RunningJobsRegistry}} comes 
into play and tells the cluster what the state of a given job is. 

In case of a session cluster, the {{SubmittedJobGraphStore}} might already be 
enough by defining that we remove {{JobGraphs}} if they have reached a terminal 
state. However, in the job mode, the job won't get recovered from the 
{{SubmittedJobGraphStore}} but is already part of the cluster. Therefore, we 
need the {{RunningJobsRegistry}}.

Now coming to a bit more controversial point: I think for the sake of 
simplicity and better defined semantics that each job should have a {{JobID}} 
which uniquely identifies it in the context of a cluster. Moreover, the cluster 
should accept a job submission (with a specific {{JobID}}) exactly once. If one 
wants to submit the same topology again, then this needs to happen as a 
different job (meaning having a different {{JobID}} assigned). The reason is 
that we better isolate jobs if they are required to have different {{JobIDs}}. 
For example, it cannot happen that we submit a different topology under the 
same {{JobID}} of another job leading to checkpoint recovery failures.

What we do at the moment is that a cluster accepts jobs with the same {{JobID}} 
as long as they don't overlap. Hence it is possible to submit the same job 
again after the first execution has terminated. In that sense the current 
behaviour of the job mode with stand-by JMs is actually correct: After a JM 
finishes the execution of a job and terminates, another JM gains leadership. 
This JM re-executes the job, because it "considers" it to be a re-submission 
and it is correct since the two jobs don't overlap.

So in order to solve the problem, I would like to change the semantics that a 
cluster (session as well as job mode) only executes a job (identified by a 
{{JobID}}) exactly once during its lifetime. How is the lifetime of a cluster 
defined? 

Well, one option could be to say that each cluster, identified by a unique 
cluster id, has an eternal lifetime independent whether it's running or not. In 
that case, we could never clean up the persisted data of the 
{{RunningJobsRegistry}} of this cluster. This would effectively mean that we 
can never clean up the respective ZooKeeper znode. Moreover, in order to 
execute the same job again one would need to spin up a new cluster.

Another option (and a more practical one) would be to say that the lifetime of 
a cluster is as long as the cluster is running. When the cluster shuts down, it 
will clean up all persisted HA data to leave a clean slate. However, this means 
that until we actually shut down, we must not clean up the 
{{RunningJobsRegistry}} because we want to execute each job at most once during 
the lifetime of the cluster. With this semantic, it would be possible to start 
a new cluster with the same cluster id and execute a job again which has 
previously been executed on a cluster with the same cluster id (keep this part 
in mind for the later discussion of corner cases).

Assuming the latter semantics, how would this actually work? In the case on no 
stand-by components, this becomes trivial: Both the session cluster when it 
receives the graceful shut down call as well as the job mode when it finishes 
the job execution will simply clean up the {{RunningJobsRegistry}}. The clean 
up should be done by the central cluster component (currently the 
{{Dispatcher}}).

The more interesting part is the case with stand-by components: In the absence 
of a central communication mean to tell every stand-by component to terminate, 
only the last instance of the central cluster component should actually 
initiate the clean up. This requires that there is some mean to detect whether 
one is the last instance or not. One idea could be to look at the list of 
leadership contenders in order to figure this out. This is not a bullet proof 
strategy though: If a stand-by component loses its connection to ZooKeeper it 
will disappear from the list of leadership contenders. If the cluster is shut 
down during this time, another component will think that it is the last 
instance and initiate the clean up. When the other stand-by component 
reconnects to ZooKeeper, it will see that the {{RunningJobsRegistry}} is empty 
and starts executing the job. Semantically this is correct because for an 
outside observer it would not be possible to distinguish this case from 
starting a new cluster with the same cluster id and submitting the same job 
after the previous cluster has terminated. If we want to prevent this case from 
happening, we must not clean up the {{RunningJobsRegistry}} (this comes at a 
price of cluttering ZooKeeper).

All right, this was some lengthy explanation. Let me try to get to your 
questions now:

Tison's:
1. With FLINK-11843, we might start for each leader session another 
{{Dispatcher}}. In this case, the {{Dispatcher}} won't execute the clean up but 
another component which runs the different {{Dispatcher}} instances.
2. Concerning the race condition: This is fine since the newly started 
contender will be considered a new cluster instance.

Zhu Zhu's:
1. We could say that a job needs to be in {{RUNNING}} to be restarted. In this 
case, we need to make sure that the {{RunningJobsRegistry}} of a job mode 
cluster sets the contained job to {{RUNNING}}.
2. In case of a hard exit of the {{Dispatcher}}, we won't clean up the 
{{RunningJobsRegistry}}. But this also applies to other clean up operations.
3. I still think that the {{JobManagerLeaderElectionService}} is still needed 
because you can have potentially multiple JMs running and you need to figure 
out which one is currently executing the job (e.g. the TaskExecutor needs to 
know in order to offer its slots to the leader).



> Standby per job mode Dispatchers don't know job's JobSchedulingStatus
> ---------------------------------------------------------------------
>
>                 Key: FLINK-11813
>                 URL: https://issues.apache.org/jira/browse/FLINK-11813
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.6.4, 1.7.2, 1.8.0
>            Reporter: Till Rohrmann
>            Priority: Major
>
> At the moment, it can happen that standby {{Dispatchers}} in per job mode 
> will restart a terminated job after they gained leadership. The problem is 
> that we currently clear the {{RunningJobsRegistry}} once a job has reached a 
> globally terminal state. After the leading {{Dispatcher}} terminates, a 
> standby {{Dispatcher}} will gain leadership. Without having the information 
> from the {{RunningJobsRegistry}} it cannot tell whether the job has been 
> executed or whether the {{Dispatcher}} needs to re-execute the job. At the 
> moment, the {{Dispatcher}} will assume that there was a fault and hence 
> re-execute the job. This can lead to duplicate results.
> I think we need some way to tell standby {{Dispatchers}} that a certain job 
> has been successfully executed. One trivial solution could be to not clean up 
> the {{RunningJobsRegistry}} but then we will clutter ZooKeeper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to