Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/5114#discussion_r155000484 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -217,15 +222,13 @@ public String toString() { dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostname); } - // use the assigned ports for the TM - if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) { - throw new IllegalArgumentException("unsufficient # of ports assigned"); - } - for (int i = 0; i < TM_PORT_KEYS.length; i++) { - int port = assignment.getAssignedPorts().get(i); - String key = TM_PORT_KEYS[i]; - taskInfo.addResources(ranges("ports", mesosConfiguration.frameworkInfo().getRole(), range(port, port))); - dynamicProperties.setInteger(key, port); + // take needed ports for the TM + List<Protos.Resource> portResources = allocation.takeRanges("ports", TM_PORT_KEYS.length, roles); + taskInfo.addAllResources(portResources); + Iterator<String> portsToAssign = Iterators.forArray(TM_PORT_KEYS); + rangeValues(portResources).forEach(port -> dynamicProperties.setLong(portsToAssign.next(), port)); --- End diff -- A call to `takeRanges` won't give more than requested so it won't happen.
---