asfgit closed pull request #7203: [FLINK-10149[mesos] Don't allocate extra mesos port for TM unless configured to do so URL: https://github.com/apache/flink/pull/7203
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java index 426a891e814..0c4e1f6bcba 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java @@ -136,8 +136,9 @@ /** * Config parameter to configure which configuration keys will dynamically get a port assigned through Mesos. */ - public static final ConfigOption<String> PORT_ASSIGNMENTS = key("mesos.resourcemanager.tasks.port-assignments") - .defaultValue("") + public static final ConfigOption<String> PORT_ASSIGNMENTS = + key("mesos.resourcemanager.tasks.port-assignments") + .noDefaultValue() .withDescription(Description.builder() .text("Comma-separated list of configuration keys which represent a configurable port. " + "All port keys will dynamically get a port assigned through Mesos.") diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java index 84ec2229a2a..637442c899d 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -64,12 +65,13 @@ public class LaunchableMesosWorker implements LaunchableTask { protected static final Logger LOG = LoggerFactory.getLogger(LaunchableMesosWorker.class); + /** * The set of configuration keys to be dynamically configured with a port allocated from Mesos. */ - static final String[] TM_PORT_KEYS = { + static final Set<String> TM_PORT_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( "taskmanager.rpc.port", - "taskmanager.data.port"}; + "taskmanager.data.port"))); private final MesosArtifactResolver resolver; private final ContainerSpecification containerSpec; @@ -342,16 +344,18 @@ public String toString() { * @return A deterministically ordered Set of port keys to expose from the TM container */ static Set<String> extractPortKeys(Configuration config) { - final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS)); + final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(TM_PORT_KEYS); final String portKeys = config.getString(PORT_ASSIGNMENTS); - Arrays.stream(portKeys.split(",")) - .map(String::trim) - .peek(key -> LOG.debug("Adding port key " + key + " to mesos request")) - .forEach(tmPortKeys::add); + if (portKeys != null) { + Arrays.stream(portKeys.split(",")) + .map(String::trim) + .peek(key -> LOG.debug("Adding port key {} to mesos request")) + .forEach(tmPortKeys::add); + } - return tmPortKeys; + return Collections.unmodifiableSet(tmPortKeys); } @Override diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java index 6784e427c1f..48a436cb995 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java @@ -23,11 +23,14 @@ import org.junit.Test; -import java.util.Iterator; +import java.util.Arrays; +import java.util.HashSet; import java.util.Set; import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; /** * Test that mesos config are extracted correctly from the configuration. @@ -36,20 +39,31 @@ @Test public void canGetPortKeys() { + // Setup + Set<String> additionalPorts = new HashSet<>(Arrays.asList("someport.here", "anotherport")); + + Configuration config = new Configuration(); + config.setString(PORT_ASSIGNMENTS, String.join(",", additionalPorts)); + + // Act + Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config); + + // Assert + Set<String> expectedPorts = new HashSet<>(LaunchableMesosWorker.TM_PORT_KEYS); + expectedPorts.addAll(additionalPorts); + assertThat(portKeys, is(equalTo(expectedPorts))); + } + + @Test + public void canGetNoPortKeys() { // Setup Configuration config = new Configuration(); - config.setString(PORT_ASSIGNMENTS, "someport.here,anotherport"); // Act Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config); // Assert - assertEquals("Must get right number of port keys", 4, portKeys.size()); - Iterator<String> iterator = portKeys.iterator(); - assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next()); - assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next()); - assertEquals("port key must be correct", "someport.here", iterator.next()); - assertEquals("port key must be correct", "anotherport", iterator.next()); + assertThat(portKeys, is(equalTo(LaunchableMesosWorker.TM_PORT_KEYS))); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services