Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148363944 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall( // Miscellaneous // -------------------------------------------------------------------------------------------- + /** + * Calculates the preferred locations based on the location preference constraint. + * + * @param locationPreferenceConstraint constraint for the location preference + * @return Future containing the collection of preferred locations. This might not be completed if not all inputs + * have been a resource assigned. + */ + @VisibleForTesting + public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) { + final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs(); + final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture; + + switch(locationPreferenceConstraint) { + case ALL: + preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures); + break; + case ANY: --- End diff -- If I read it correctly, case `ANY` can complete without any input being ready (all being not yet done), returning a completed future with an empty collection. Is that intended semantics?
---