Github user summerleafs commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r147891494 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall( // Miscellaneous // -------------------------------------------------------------------------------------------- + /** + * Calculates the preferred locations based on the location preference constraint. + * + * @param locationPreferenceConstraint constraint for the location preference + * @return Future containing the collection of preferred locations. This might not be completed if not all inputs + * have been a resource assigned. + */ + @VisibleForTesting + public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) { + final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs(); + final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture; --- End diff -- Hi Till,`getPreferredLocations()` is not invoked here because flink doesn't yet support reading the checkpoint data locally? I have create a issue for flink reading checkpoint locally [here](https://issues.apache.org/jira/browse/FLINK-7873?filter=-1), when it complete i wonder if we can invoke `getPreferedLocations()` instead of `getPreferredLocationsBasedOnInputs()`.
---