tillrohrmann commented on a change in pull request #9832: [FLINK-11843] Bind 
lifespan of Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r337432603
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##########
 @@ -869,78 +823,6 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
                return optionalJobInformation;
        }
 
-       //------------------------------------------------------
-       // Leader contender
-       //------------------------------------------------------
-
-       /**
-        * Callback method when current resourceManager is granted leadership.
-        *
-        * @param newLeaderSessionID unique leadershipID
-        */
-       @Override
-       public void grantLeadership(final UUID newLeaderSessionID) {
-               runAsyncWithoutFencing(
-                       () -> {
-                               log.info("Dispatcher {} was granted leadership 
with fencing token {}", getAddress(), newLeaderSessionID);
-
-                               final CompletableFuture<Collection<JobGraph>> 
recoveredJobsFuture = recoveryOperation.thenApplyAsync(
-                                       FunctionUtils.uncheckedFunction(ignored 
-> recoverJobs()),
-                                       getRpcService().getExecutor());
-
-                               final CompletableFuture<Boolean> 
fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
-                                       (Collection<JobGraph> recoveredJobs) -> 
tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
-                                       getUnfencedMainThreadExecutor());
-
-                               final CompletableFuture<Void> 
confirmationFuture = fencingTokenFuture.thenCombineAsync(
-                                       recoveredJobsFuture,
-                                       
BiFunctionWithException.unchecked((Boolean confirmLeadership, 
Collection<JobGraph> recoveredJobs) -> {
-                                               if (confirmLeadership) {
-                                                       
leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress());
-                                               } else {
-                                                       for (JobGraph 
recoveredJob : recoveredJobs) {
-                                                               
jobGraphStore.releaseJobGraph(recoveredJob.getJobID());
-                                                       }
-                                               }
-                                               return null;
-                                       }),
-                                       getRpcService().getExecutor());
-
-                               confirmationFuture.whenComplete(
-                                       (Void ignored, Throwable throwable) -> {
-                                               if (throwable != null) {
-                                                       onFatalError(
-                                                               new 
DispatcherException(
-                                                                       
String.format("Failed to take leadership with session id %s.", 
newLeaderSessionID),
-                                                                       
(ExceptionUtils.stripCompletionException(throwable))));
-                                               }
-                                       });
-
-                               recoveryOperation = confirmationFuture;
-                       });
-       }
-
-       private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID 
newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
-               final DispatcherId dispatcherId = 
DispatcherId.fromUuid(newLeaderSessionID);
-
-               if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
-                       log.debug("Dispatcher {} accepted leadership with 
fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
-                       setNewFencingToken(dispatcherId);
-
-                       Collection<CompletableFuture<?>> runFutures = new 
ArrayList<>(recoveredJobs.size());
-
-                       for (JobGraph recoveredJob : recoveredJobs) {
-                               final CompletableFuture<?> runFuture = 
waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, 
this::runJob);
-                               runFutures.add(runFuture);
-                       }
-
-                       return 
FutureUtils.waitForAll(runFutures).thenApply(ignored -> true);
-               } else {
-                       log.debug("Dispatcher {} lost leadership before 
accepting it. Stop recovering jobs for fencing token {}.", getAddress(), 
dispatcherId);
-                       return CompletableFuture.completedFuture(false);
-               }
-       }
-
        private CompletableFuture<Void> waitForTerminatingJobManager(JobID 
jobId, JobGraph jobGraph, FunctionWithException<JobGraph, 
CompletableFuture<Void>, ?> action) {
 
 Review comment:
   This is not strictly true atm. Imagine that someone (e.g. a test) submits 
the same job after the first one has completed. Since the termination can take 
some time, we still need to check this. Changing this now, would mean a 
behavioural change.
   
   I would like to change this later as part of FLINK-11719 but not as part of 
this PR. The scope is already too large.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to