Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6297#discussion_r202338296 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java --- @@ -29,6 +29,46 @@ private static final String[] EMPTY = new String[0]; + /** + * Get job manager's heap memory. + * + * This method will check the new key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and + * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility. + * + * @param configuration the configuration object + * @return the memory size of job manager's heap memory. + */ + public static MemorySize getJobManagerHeapMemory(Configuration configuration) { + if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) { + return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)); + } else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) { + return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m"); + } else { + throw new RuntimeException("Can not find config key : " + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() --- End diff -- Use `FlinkRuntimeException`.
---