tillrohrmann commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r475556728



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -427,21 +389,39 @@ private JobManagerRunner 
startJobManagerRunner(JobManagerRunner jobManagerRunner
                                                        }
                                                }
                                        } else {
-                                               log.debug("There is a newer 
JobManagerRunner for the job {}.", jobId);
+                                               log.debug("Job {} is not 
registered anymore at dispatcher", jobId);
                                        }
-
                                        return null;
                                }, getMainThreadExecutor()));
+       }
 
-               jobManagerRunner.start();
-
-               return jobManagerRunner;
+       CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph 
jobGraph) {
+               final RpcService rpcService = getRpcService();
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       JobManagerRunner runner = 
jobManagerRunnerFactory.createJobManagerRunner(
+                                               jobGraph,
+                                               configuration,
+                                               rpcService,
+                                               highAvailabilityServices,
+                                               heartbeatServices,
+                                               jobManagerSharedServices,
+                                               new 
DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
+                                               fatalErrorHandler);
+                                       runner.start();
+                                       return runner;
+                               } catch (Exception e) {
+                                       throw new CompletionException(new 
JobExecutionException(jobGraph.getJobID(), "Could not instantiate JobManager.", 
e));
+                               }
+                       },
+                       rpcService.getExecutor()); // do not use main thread 
executor. Otherwise, Dispatcher is blocked on JobManager creation

Review comment:
       Great, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to