TisonKun commented on a change in pull request #9832: [FLINK-11843] Bind lifespan of Dispatcher to leader session URL: https://github.com/apache/flink/pull/9832#discussion_r336505248
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -869,78 +823,6 @@ private void jobMasterFailed(JobID jobId, Throwable cause) { return optionalJobInformation; } - //------------------------------------------------------ - // Leader contender - //------------------------------------------------------ - - /** - * Callback method when current resourceManager is granted leadership. - * - * @param newLeaderSessionID unique leadershipID - */ - @Override - public void grantLeadership(final UUID newLeaderSessionID) { - runAsyncWithoutFencing( - () -> { - log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID); - - final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoveryOperation.thenApplyAsync( - FunctionUtils.uncheckedFunction(ignored -> recoverJobs()), - getRpcService().getExecutor()); - - final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync( - (Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs), - getUnfencedMainThreadExecutor()); - - final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenCombineAsync( - recoveredJobsFuture, - BiFunctionWithException.unchecked((Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> { - if (confirmLeadership) { - leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress()); - } else { - for (JobGraph recoveredJob : recoveredJobs) { - jobGraphStore.releaseJobGraph(recoveredJob.getJobID()); - } - } - return null; - }), - getRpcService().getExecutor()); - - confirmationFuture.whenComplete( - (Void ignored, Throwable throwable) -> { - if (throwable != null) { - onFatalError( - new DispatcherException( - String.format("Failed to take leadership with session id %s.", newLeaderSessionID), - (ExceptionUtils.stripCompletionException(throwable)))); - } - }); - - recoveryOperation = confirmationFuture; - }); - } - - private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID newLeaderSessionID, Collection<JobGraph> recoveredJobs) { - final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID); - - if (leaderElectionService.hasLeadership(newLeaderSessionID)) { - log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId); - setNewFencingToken(dispatcherId); - - Collection<CompletableFuture<?>> runFutures = new ArrayList<>(recoveredJobs.size()); - - for (JobGraph recoveredJob : recoveredJobs) { - final CompletableFuture<?> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob); - runFutures.add(runFuture); - } - - return FutureUtils.waitForAll(runFutures).thenApply(ignored -> true); - } else { - log.debug("Dispatcher {} lost leadership before accepting it. Stop recovering jobs for fencing token {}.", getAddress(), dispatcherId); - return CompletableFuture.completedFuture(false); - } - } - private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> action) { Review comment: We don't have to wait for a previous Dispatcher terminate job manager any more. If any time we do wait for a terminating job manager to submit job, it is in a wrong state IMO. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services