[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16234604#comment-16234604
 ] 

ASF GitHub Bot commented on FLINK-7153:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148358215
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
    @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider 
slotProvider, final Time timeout) {
                // that way we do not have any operation that can fail between 
allocating the slots
                // and adding them to the list. If we had a failure in between 
there, that would
                // cause the slots to get lost
    -           final ArrayList<ExecutionAndSlot[]> resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
                final boolean queued = allowQueuedScheduling;
     
    -           // we use this flag to handle failures in a 'finally' clause
    -           // that allows us to not go through clumsy cast-and-rethrow 
logic
    -           boolean successful = false;
    +           // collecting all the slots may resize and fail in that 
operation without slots getting lost
    +           final ArrayList<CompletableFuture<Execution>> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
     
    -           try {
    -                   // collecting all the slots may resize and fail in that 
operation without slots getting lost
    -                   final ArrayList<CompletableFuture<SimpleSlot>> 
slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +           // allocate the slots (obtain all their futures
    +           for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +                   // these calls are not blocking, they only return 
futures
    --- End diff --
    
    Having a scheduling mechanism that tries to satisfy both state locality and 
input locality could be interesting. Input locality may extend to locality of 
input partitions (in Kafka for example) as well, which makes it even more 
complicated.
    
    I think the current state of the heuristic is: previous location first 
(later: state locality first), if that leaves freedom, try to schedule based on 
inputs.
    
    I can see an extended variant where we first collect all vertices with 
constraints, try to satisfy those. They may in turn add more constraints (or 
preferences) which should be satisfied next. Repeat until all are satisfied, or 
it is not possible to satisfy the preferences any more. But that is a pretty 
big change, that we should discus and design properly, not push it into a bug 
fix.


> Eager Scheduling can't allocate source for ExecutionGraph correctly
> -------------------------------------------------------------------
>
>                 Key: FLINK-7153
>                 URL: https://issues.apache.org/jira/browse/FLINK-7153
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.3.1
>            Reporter: Sihua Zhou
>            Assignee: Till Rohrmann
>            Priority: Critical
>             Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to