Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5767#discussion_r177728701 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -645,6 +637,56 @@ private void registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void> jobManagerRunnerTerminationFuture)); } + private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) { + final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); + + if (jobManagerRunner == null) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } else { + final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture(); + return leaderGatewayFuture.thenApplyAsync( + (JobMasterGateway jobMasterGateway) -> { + // check whether the retrieved JobMasterGateway belongs still to a running JobMaster + if (jobManagerRunners.containsKey(jobId)) { + return jobMasterGateway; + } else { + throw new CompletionException(new FlinkJobNotFoundException(jobId)); + } + }, + getMainThreadExecutor()); + } + } + + private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> optionalCollection) { + return optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()); + } + + @Nonnull + private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> queryFunction) { + final int numberJobsRunning = jobManagerRunners.size(); + + ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>( + numberJobsRunning); + + for (JobID jobId : jobManagerRunners.keySet()) { + final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); + + final CompletableFuture<Optional<T>> optionalRequest = jobMasterGatewayFuture + .thenCompose(queryFunction::apply) + .handle( + (T value, Throwable throwable) -> { + if (throwable != null) { --- End diff -- Good catch. Will change it :-)
---