Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r202338312 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java --- @@ -29,6 +29,46 @@ private static final String[] EMPTY = new String[0]; + /** + * Get job manager's heap memory. + * + * This method will check the new key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and + * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility. + * + * @param configuration the configuration object + * @return the memory size of job manager's heap memory. + */ + public static MemorySize getJobManagerHeapMemory(Configuration configuration) { + if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) { + return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)); + } else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) { + return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m"); + } else { + throw new RuntimeException("Can not find config key : " + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() + + " or " + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB); + } + } + + /** + * Get task manager's heap memory. + * + * This method will check the new key {@link TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY} and + * the old key {@link TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY_MB} for backwards compatibility. + * + * @param configuration the configuration object + * @return the memory size of task manager's heap memory. + */ + public static MemorySize getTaskManagerHeapMemory(Configuration configuration) { + if (configuration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key())) { + return MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)); + } else if (configuration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB.key())) { + return MemorySize.parse(configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) + "m"); + } else { + throw new RuntimeException("Can not find config key : " + TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key() --- End diff -- Use `FlinkRuntimeException`.
---