GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4916
[FLINK-7153] Re-introduce preferred locations for scheduling ## What is the purpose of the change This PR makes the `TaskManagerLocation` accessible for asynchronous scheduling. Due to changes for Flink 1.3 where we introduced asynchronous scheduling, it was not always guaranteed that the scheduler knew about the scheduling locations of producer tasks. Especially the eager scheduling mode was affected since the slot allocation happened concurrently. In order to fix this problem, this PR adds a `TaskManagerLocationFuture` to each `Execution`. In eager scheduling mode, a slot will only be requested for a task if all its inputs have a slot assigned (e.g. their `TaskManagerLocationFuture` is completed). In lazy scheduling mode, we don't wait for the completion of all inputs, but take those inputs which are already known. In order to distinguish whether we want to wait for all or take all available task manager locations, we add a `LocationPreferenceConstraint` which has the values `ALL` and `ANY`. `ALL` means that we have to wait for all inputs to have a location assigned, and `ANY` means that we take what's currently known. In order to not deploy slots prematurely in eager mode, the slot assignment has been factored out into its own step. Before, one had to call `Execution#deployToSlot(SimpleSlot)` which assigned the given slot and started the deployment. Now, one has to call `Execution#tryAssignResource` before one can call `Execution#deploy`. Moreover this PR fixes that the `FailoverRegions` are topologically sorted which is important for non queued scheduling. FYI @StephanEwen ## Brief change log - Introduce `LocationPreferenceConstraint` to distinguish the waiting behaviour for the preferred locations - Split slot assignment and deployment into two separate steps - Moved preferred location calculation into the Execution to reduce code duplication between the `Scheduler` and the `SlotPool` - Changed preferred location calculation to be blocking if `LocationPreferenceConstraint#ALL` and not all input locations are known ## Verifying this change This change added tests and can be verified as follows: - Added `ExecutionTest` to check the correct assigned slot release in case of cancellation and to check the correct preferred location calculation - Added `ExecutionGraphDeploymentTest#testEagerSchedulingWaitsOnAllInputPreferredLocations` to check that eager scheduling waits for all inputs to be assigned before scheduling consumer tasks - Moreover, the scheduler is being tested by existing tests such as `SchedulerSlotSharingTest`, `ScheduleWithCoLocationHintTest` and many IT cases for lazy scheduling (batch case) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixGroupScheduling2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4916.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4916 ---- commit 32eb1812583b84d80091d1a278d53ed663d8a065 Author: Till <till.rohrm...@gmail.com> Date: 2017-10-16T12:04:13Z [FLINK-7153] Re-introduce preferred locations for scheduling commit 8c0c9aeaa7ca995247f2b9f9e63723e52d839a12 Author: Till Rohrmann <trohrm...@apache.org> Date: 2017-10-27T07:47:03Z [FLINK-7153] Introduce LocationPreferenceConstraint for scheduling The LocationPreferenceConstraint defines whether all or any preferred locations have to be taken into consideration when scheduling tasks. Especially for batch jobs where we do lazy scheduling not all input locations might be known for a consumer task. Therefore, we set the location preference constraint to any which means that only those location are taken into consideration which are known at scheduling time. commit c821e67529deaaed96f183fc22bc0a9fe246fa23 Author: Till Rohrmann <trohrm...@apache.org> Date: 2017-10-26T16:22:43Z [hotfix] Make failover region topological sorted commit 67baeade85e26758978bcdf7982576a2f4192aae Author: Till Rohrmann <trohrm...@apache.org> Date: 2017-10-27T17:08:15Z [FLINK-7153] Add test cases ---- ---