[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235445#comment-16235445
 ] 

ASF GitHub Bot commented on FLINK-7153:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148477247
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ---
    @@ -476,14 +482,13 @@ else if (numSources < parallelism) {
         * @return The preferred locations based in input streams, or an empty 
iterable,
         *         if there is no input-based preference.
         */
    -   public Iterable<TaskManagerLocation> 
getPreferredLocationsBasedOnInputs() {
    +   public Collection<CompletableFuture<TaskManagerLocation>> 
getPreferredLocationsBasedOnInputs() {
                // otherwise, base the preferred locations on the input 
connections
                if (inputEdges == null) {
                        return Collections.emptySet();
                }
                else {
    -                   Set<TaskManagerLocation> locations = new HashSet<>();
    -                   Set<TaskManagerLocation> inputLocations = new 
HashSet<>();
    +                   Set<CompletableFuture<TaskManagerLocation>> 
inputLocations = new HashSet<>(4);
    --- End diff --
    
    This change was not entirely intended and the previous code makes totally 
sense with your explanation. I'm actually in favour of reverting my changes to 
not change the semantics for the moment.
    
    However, for the future, I'm wondering whether this kind of decision should 
be made by the `ExecutionVertex` or whether it shouldn't rather be the task of 
the `Scheduler` to make this kind of decision. 
    
    For example, what if a task has multiple input gates and one of them with 
exactly one producer. Then it will only return the location of this single 
producer. Now if this TM has no more slots left, then we would basically 
randomly pick another slot even though there might be other TMs one which 
another producer for this task would run.


> Eager Scheduling can't allocate source for ExecutionGraph correctly
> -------------------------------------------------------------------
>
>                 Key: FLINK-7153
>                 URL: https://issues.apache.org/jira/browse/FLINK-7153
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.3.1
>            Reporter: Sihua Zhou
>            Assignee: Till Rohrmann
>            Priority: Critical
>             Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to