tillrohrmann commented on a change in pull request #13217: URL: https://github.com/apache/flink/pull/13217#discussion_r475556728
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -427,21 +389,39 @@ private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner } } } else { - log.debug("There is a newer JobManagerRunner for the job {}.", jobId); + log.debug("Job {} is not registered anymore at dispatcher", jobId); } - return null; }, getMainThreadExecutor())); + } - jobManagerRunner.start(); - - return jobManagerRunner; + CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) { + final RpcService rpcService = getRpcService(); + return CompletableFuture.supplyAsync( + () -> { + try { + JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner( + jobGraph, + configuration, + rpcService, + highAvailabilityServices, + heartbeatServices, + jobManagerSharedServices, + new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), + fatalErrorHandler); + runner.start(); + return runner; + } catch (Exception e) { + throw new CompletionException(new JobExecutionException(jobGraph.getJobID(), "Could not instantiate JobManager.", e)); + } + }, + rpcService.getExecutor()); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation Review comment: Great, thanks. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org