Sihua Zhou created FLINK-7153:
---------------------------------

             Summary: JM can't allocate source for ExecutionGraph correctly
                 Key: FLINK-7153
                 URL: https://issues.apache.org/jira/browse/FLINK-7153
             Project: Flink
          Issue Type: Bug
          Components: Core
    Affects Versions: 1.3.1
            Reporter: Sihua Zhou


The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex one 
by one by call ExecutionJobVertex.allocateResourcesForAll(), here is two 
problem about it:

1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
deployed via 'Execution.deployToSlot()'. So allocate resource base on prefered 
location can't work correctly, we need to set the slot info for `Execution` as 
soon as Execution.allocateSlotForExecution() called successfully?

2. Current allocate strategy can't allocate the slot optimize.  Here is the 
test case:
{code}
JobVertex v1 = new JobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);

SlotSharingGroup group = new SlotSharingGroup();

v1.setSlotSharingGroup(group);
v2.setSlotSharingGroup(group);

v1.setParallelism(2);
v2.setParallelism(4);

v1.setInvokableClass(BatchTask.class);
v2.setInvokableClass(BatchTask.class);

v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED_BOUNDED);
{code}
Currently, after allocate for v1,v2, we got a local partition and three remote 
partition. But actually, it should be 2 local partition and 2 remote partition. 

The causes of the above problems is becuase that the current allocate strategy 
is allocate the resource for execution one by one(if the execution can allocate 
from SlotGroup than get it, Otherwise ask for a new one for it). 

If we change the allocate strategy to two step will solve this problem, below 
is the Pseudo code:

for (ExecutionJobVertex ejv: getVerticesTopologically) {
 step 1: try to allocate from SlothGroup base on inputs one by one (which only 
allocate resource base on location).
 step 2: allocate for the remain execution.
}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to