tomncooper commented on code in PR #918: URL: https://github.com/apache/flink-kubernetes-operator/pull/918#discussion_r1854159148
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ########## @@ -202,6 +215,65 @@ public FlinkOperatorConfiguration getOperatorConfiguration( getDefaultConfig(namespace, flinkVersion)); } + /** + * This method will search the keys of the supplied map and find any that contain a flink + * version string that is relevant to the supplied flink version. + * + * <p>Relevance is defined as any key with the {@link + * KubernetesOperatorConfigOptions#VERSION_CONF_PREFIX} followed by either the supplied flink + * version (with or without the {@link + * KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}) or a lower flink version + * string followed by the {@link + * KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}. + * + * <p>Prefixes are returned in ascending order of flink version. + * + * @param baseConfMap The configuration map that should be searched for relevant Flink version + * prefixes. + * @return A list of relevant Flink version prefixes in order of ascending Flink version. + */ + protected static List<String> getRelevantVersionPrefixes( + Map<String, String> baseConfMap, FlinkVersion flinkVersion) { + SortedMap<FlinkVersion, String> greaterThanVersionPrefixes = new TreeMap<>(); + + for (Map.Entry<String, String> entry : baseConfMap.entrySet()) { + Matcher versionMatcher = FLINK_VERSION_PATTERN.matcher(entry.getKey()); + if (versionMatcher.matches() && versionMatcher.group("gt") != null) { + try { + FlinkVersion keyFlinkVersion = + FlinkVersion.fromMajorMinor( + Integer.parseInt(versionMatcher.group("major")), + Integer.parseInt(versionMatcher.group("minor"))); + if (flinkVersion.isEqualOrNewer(keyFlinkVersion)) { + greaterThanVersionPrefixes.put( + keyFlinkVersion, + VERSION_CONF_PREFIX + + keyFlinkVersion + + KubernetesOperatorConfigOptions + .FLINK_VERSION_GREATER_THAN_SUFFIX + + "."); + } + } catch (NumberFormatException numberFormatException) { + LOG.warn("Unable to parse version number in config key: {}", entry.getKey()); + } catch (IllegalArgumentException illegalArgumentException) { + LOG.warn("Unknown Flink version in config key: {}", entry.getKey()); + } + } + } Review Comment: Do you mean, for every supported version create all possible prefixes and then loop through them for the given `FlinkVersion`? That would simplify things, but we would have wasted calls to `applyDefault` which scans the whole base config each time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org