[ 
https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16205877#comment-16205877
 ] 

Till Rohrmann commented on FLINK-7153:
--------------------------------------

Why don't we make the scheduling dependent on the future locations of the input 
tasks? This would mean that the {{ExecutionVertex#getPreferredLocations}} would 
return a {{Collection<CompletableFuture<TaskManagerLocation>>}} which contains 
the future locations of the inputs. When trying to allocate a slot for a given 
task, the {{Scheduler}} will retrieve the future locations of the inputs, and 
only upon completion it will allocate the slot for the requested task. This 
will ensure that we have all information needed to calculate the scheduling. 
I've quickly prototyped a working version which you find here [1].

The downside of this approach is that we lose the nice property that we only 
start deploying tasks in eager mode if we are sure that we have enough slots 
available. The reason is that we have to call {{Execution#deployToSlot}} in 
order to complete the location future of an {{Execution}}. We can solve this 
problem, however, by separating the slot assignment and the actual deploy call 
into two methods, e.g. {{Execution#assignSlot}} and {{Execution#deployToSlot}}. 
I've prototyped this version here [2]. Before calling 
{{Execution#deployToSlot}} one has to call explicitly 
{{Execution#tryAssignSlot}}. This complicates the API a little bit but it will 
allow us to fail without deploying tasks in the eager deployment mode.

What do you think?

[1] https://github.com/tillrohrmann/flink/tree/fixGroupScheduling
[2] https://github.com/tillrohrmann/flink/tree/fixGroupScheduling2

> Eager Scheduling can't allocate source for ExecutionGraph correctly
> -------------------------------------------------------------------
>
>                 Key: FLINK-7153
>                 URL: https://issues.apache.org/jira/browse/FLINK-7153
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.3.1
>            Reporter: Sihua Zhou
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex 
> one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is 
> two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
> empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
> deployed via 'Execution.deployToSlot()'. So allocate resource base on 
> prefered location can't work correctly, we need to set the slot info for 
> `Execution` as soon as Execution.allocateSlotForExecution() called 
> successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the 
> test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below 
> is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which 
> only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to