Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r203000806 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java --- @@ -352,6 +352,76 @@ public void testConfigurationClusterSpecification() throws Exception { assertThat(clusterSpecification.getSlotsPerTaskManager(), is(slotsPerTaskManager)); } + /** + * Tests the specifying heap memory for job manager and task manager. + */ + @Test + public void testHeapMemoryProperty() throws Exception { + //without unit + String[] args = new String[] { "-yn", "2", "-yjm", "1024", "-ytm", "2048" }; + + FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( + new Configuration(), + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); + + CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); + + ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine); + + assertThat(clusterSpecification.getMasterMemoryMB(), is(1024)); + assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048)); + + //with unit "m" + args = new String[] { "-yn", "2", "-yjm", "1024m", "-ytm", "2048m" }; + commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); + clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine); + + assertThat(clusterSpecification.getMasterMemoryMB(), is(1024)); + assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048)); + + //with unit non "m" + args = new String[] { "-yn", "2", "-yjm", "1g", "-ytm", "2g" }; + commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); + clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine); + + assertThat(clusterSpecification.getMasterMemoryMB(), is(1024)); + assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048)); + + //specify old config key + Configuration configuration = new Configuration(); + configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048); + configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096); + + flinkYarnSessionCli = new FlinkYarnSessionCli( + configuration, + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); + + commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false); + + clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine); + + assertThat(clusterSpecification.getMasterMemoryMB(), is(2048)); + assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(4096)); + + //set nothing use default value --- End diff -- This should be a separate test
---