Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5114#discussion_r155000484
  
    --- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
    @@ -217,15 +222,13 @@ public String toString() {
                        
dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, 
taskManagerHostname);
                }
     
    -           // use the assigned ports for the TM
    -           if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) 
{
    -                   throw new IllegalArgumentException("unsufficient # of 
ports assigned");
    -           }
    -           for (int i = 0; i < TM_PORT_KEYS.length; i++) {
    -                   int port = assignment.getAssignedPorts().get(i);
    -                   String key = TM_PORT_KEYS[i];
    -                   taskInfo.addResources(ranges("ports", 
mesosConfiguration.frameworkInfo().getRole(), range(port, port)));
    -                   dynamicProperties.setInteger(key, port);
    +           // take needed ports for the TM
    +           List<Protos.Resource> portResources = 
allocation.takeRanges("ports", TM_PORT_KEYS.length, roles);
    +           taskInfo.addAllResources(portResources);
    +           Iterator<String> portsToAssign = 
Iterators.forArray(TM_PORT_KEYS);
    +           rangeValues(portResources).forEach(port -> 
dynamicProperties.setLong(portsToAssign.next(), port));
    --- End diff --
    
    A call to `takeRanges` won't give more than requested so it won't happen.


---

Reply via email to