GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4916

    [FLINK-7153] Re-introduce preferred locations for scheduling

    ## What is the purpose of the change
    
    This PR makes the `TaskManagerLocation` accessible for asynchronous 
scheduling.
    
    Due to changes for Flink 1.3 where we introduced asynchronous scheduling, 
it was not always guaranteed that the scheduler knew about the scheduling 
locations of producer tasks. Especially the eager scheduling mode was affected 
since the slot allocation happened concurrently.
    
    In order to fix this problem, this PR adds a `TaskManagerLocationFuture` to 
each `Execution`. In eager scheduling mode, a slot will only be requested for a 
task if all its inputs have a slot assigned (e.g. their 
`TaskManagerLocationFuture` is completed). In lazy scheduling mode, we don't 
wait for the completion of all inputs, but take those inputs which are already 
known.
    
    In order to distinguish whether we want to wait for all or take all 
available task manager locations, we add a `LocationPreferenceConstraint` which 
has the values `ALL` and `ANY`. `ALL` means that we have to wait for all inputs 
to have a location assigned, and `ANY` means that we take what's currently 
known.
    
    In order to not deploy slots prematurely in eager mode, the slot assignment 
has been factored out into its own step. Before, one had to call 
`Execution#deployToSlot(SimpleSlot)` which assigned the given slot and started 
the deployment. Now, one has to call `Execution#tryAssignResource` before one 
can call `Execution#deploy`.
    
    Moreover this PR fixes that the `FailoverRegions` are topologically sorted 
which is important for non queued scheduling.
    
    FYI @StephanEwen 
    
    ## Brief change log
    
    - Introduce `LocationPreferenceConstraint` to distinguish the waiting 
behaviour for the preferred locations
    - Split slot assignment and deployment into two separate steps
    - Moved preferred location calculation into the Execution to reduce code 
duplication between the `Scheduler` and the `SlotPool`
    - Changed preferred location calculation to be blocking if 
`LocationPreferenceConstraint#ALL` and not all input locations are known
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    - Added `ExecutionTest` to check the correct assigned slot release in case 
of cancellation and to check the correct preferred location calculation
    - Added 
`ExecutionGraphDeploymentTest#testEagerSchedulingWaitsOnAllInputPreferredLocations`
 to check that eager scheduling waits for all inputs to be assigned before 
scheduling consumer tasks
    - Moreover, the scheduler is being tested by existing tests such as 
`SchedulerSlotSharingTest`, `ScheduleWithCoLocationHintTest` and many IT cases 
for lazy scheduling (batch case)
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixGroupScheduling2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4916.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4916
    
----
commit 32eb1812583b84d80091d1a278d53ed663d8a065
Author: Till <till.rohrm...@gmail.com>
Date:   2017-10-16T12:04:13Z

    [FLINK-7153] Re-introduce preferred locations for scheduling

commit 8c0c9aeaa7ca995247f2b9f9e63723e52d839a12
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2017-10-27T07:47:03Z

    [FLINK-7153] Introduce LocationPreferenceConstraint for scheduling
    
    The LocationPreferenceConstraint defines whether all or any preferred 
locations
    have to be taken into consideration when scheduling tasks. Especially for 
batch
    jobs where we do lazy scheduling not all input locations might be known for 
a
    consumer task. Therefore, we set the location preference constraint to any 
which
    means that only those location are taken into consideration which are known 
at
    scheduling time.

commit c821e67529deaaed96f183fc22bc0a9fe246fa23
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2017-10-26T16:22:43Z

    [hotfix] Make failover region topological sorted

commit 67baeade85e26758978bcdf7982576a2f4192aae
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2017-10-27T17:08:15Z

    [FLINK-7153] Add test cases

----


---

Reply via email to