Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4734#discussion_r143973446 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -242,6 +248,86 @@ public void start() throws Exception { } @Override + public CompletableFuture<JobStatusesWithIdsOverview> requestJobIdsOverview(@RpcTimeout Time timeout) { + final int numJobs = jobManagerRunners.size(); + + ArrayList<CompletableFuture<Tuple2<JobID, JobStatus>>> jobStatuses = new ArrayList<>(numJobs); + for (Map.Entry<JobID, JobManagerRunner> jobManagerRunnerEntry : jobManagerRunners.entrySet()) { + CompletableFuture<JobStatus> jobStatusFuture = + jobManagerRunnerEntry.getValue().getJobManagerGateway().requestJobStatus(timeout); + + jobStatuses.add(jobStatusFuture.thenApply(jobStatus -> Tuple2.of(jobManagerRunnerEntry.getKey(), jobStatus))); + } + + CompletableFuture<Collection<Tuple2<JobID, JobStatus>>> combinedJobStatusesFuture = FutureUtils.combineAll(jobStatuses); + + return combinedJobStatusesFuture.thenApply( + jobStatusesWithIds -> { + List<JobID> jobsCreated = new LinkedList<>(); + List<JobID> jobsRunning = new LinkedList<>(); + List<JobID> jobsFinished = new LinkedList<>(); + List<JobID> jobsCancelling = new LinkedList<>(); + List<JobID> jobsCancelled = new LinkedList<>(); + List<JobID> jobsFailing = new LinkedList<>(); + List<JobID> jobsFailed = new LinkedList<>(); + List<JobID> jobsRestarting = new LinkedList<>(); + List<JobID> jobsSuspended = new LinkedList<>(); + List<JobID> jobsReconciling = new LinkedList<>(); --- End diff -- I think we might need some micro benchmarking here to be sure, as it may depend on the actual list to return. But I could stay with `ArrayList` (I think that was what we were using anyway.)
---