Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5767#discussion_r177684059
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
    @@ -645,6 +637,56 @@ private void 
registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void>
                        jobManagerRunnerTerminationFuture));
        }
     
    +   private CompletableFuture<JobMasterGateway> 
getJobMasterGatewayFuture(JobID jobId) {
    +           final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
    +
    +           if (jobManagerRunner == null) {
    +                   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
    +           } else {
    +                   final CompletableFuture<JobMasterGateway> 
leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture();
    +                   return leaderGatewayFuture.thenApplyAsync(
    +                           (JobMasterGateway jobMasterGateway) -> {
    +                                   // check whether the retrieved 
JobMasterGateway belongs still to a running JobMaster
    +                                   if 
(jobManagerRunners.containsKey(jobId)) {
    +                                           return jobMasterGateway;
    +                                   } else {
    +                                           throw new 
CompletionException(new FlinkJobNotFoundException(jobId));
    +                                   }
    +                           },
    +                           getMainThreadExecutor());
    +           }
    +   }
    +
    +   private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> 
optionalCollection) {
    +           return 
optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
    +   }
    +
    +   @Nonnull
    +   private <T> List<CompletableFuture<Optional<T>>> 
queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> 
queryFunction) {
    +           final int numberJobsRunning = jobManagerRunners.size();
    +
    +           ArrayList<CompletableFuture<Optional<T>>> 
optionalJobInformation = new ArrayList<>(
    +                   numberJobsRunning);
    +
    +           for (JobID jobId : jobManagerRunners.keySet()) {
    +                   final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
    +
    +                   final CompletableFuture<Optional<T>> optionalRequest = 
jobMasterGatewayFuture
    +                           .thenCompose(queryFunction::apply)
    +                           .handle(
    +                                   (T value, Throwable throwable) -> {
    +                                           if (throwable != null) {
    --- End diff --
    
    This is equivalent to `.handle((T value, Throwable throwable) -> 
Optional.ofNullable(value));`


---

Reply via email to