tomncooper commented on code in PR #918: URL: https://github.com/apache/flink-kubernetes-operator/pull/918#discussion_r1854139768
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ########## @@ -202,6 +215,65 @@ public FlinkOperatorConfiguration getOperatorConfiguration( getDefaultConfig(namespace, flinkVersion)); } + /** + * This method will search the keys of the supplied map and find any that contain a flink + * version string that is relevant to the supplied flink version. + * + * <p>Relevance is defined as any key with the {@link + * KubernetesOperatorConfigOptions#VERSION_CONF_PREFIX} followed by either the supplied flink + * version (with or without the {@link + * KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}) or a lower flink version + * string followed by the {@link + * KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}. + * + * <p>Prefixes are returned in ascending order of flink version. + * + * @param baseConfMap The configuration map that should be searched for relevant Flink version + * prefixes. + * @return A list of relevant Flink version prefixes in order of ascending Flink version. + */ + protected static List<String> getRelevantVersionPrefixes( + Map<String, String> baseConfMap, FlinkVersion flinkVersion) { + SortedMap<FlinkVersion, String> greaterThanVersionPrefixes = new TreeMap<>(); + + for (Map.Entry<String, String> entry : baseConfMap.entrySet()) { + Matcher versionMatcher = FLINK_VERSION_PATTERN.matcher(entry.getKey()); + if (versionMatcher.matches() && versionMatcher.group("gt") != null) { + try { + FlinkVersion keyFlinkVersion = + FlinkVersion.fromMajorMinor( + Integer.parseInt(versionMatcher.group("major")), + Integer.parseInt(versionMatcher.group("minor"))); + if (flinkVersion.isEqualOrNewer(keyFlinkVersion)) { + greaterThanVersionPrefixes.put( + keyFlinkVersion, + VERSION_CONF_PREFIX + + keyFlinkVersion + + KubernetesOperatorConfigOptions + .FLINK_VERSION_GREATER_THAN_SUFFIX + + "."); + } + } catch (NumberFormatException numberFormatException) { + LOG.warn("Unable to parse version number in config key: {}", entry.getKey()); + } catch (IllegalArgumentException illegalArgumentException) { + LOG.warn("Unknown Flink version in config key: {}", entry.getKey()); + } + } + } Review Comment: Good point, we could generate the lists in the `FlinkVersion` enum. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org