tomncooper commented on code in PR #918:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/918#discussion_r1854159148


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -202,6 +215,65 @@ public FlinkOperatorConfiguration getOperatorConfiguration(
                 getDefaultConfig(namespace, flinkVersion));
     }
 
+    /**
+     * This method will search the keys of the supplied map and find any that 
contain a flink
+     * version string that is relevant to the supplied flink version.
+     *
+     * <p>Relevance is defined as any key with the {@link
+     * KubernetesOperatorConfigOptions#VERSION_CONF_PREFIX} followed by either 
the supplied flink
+     * version (with or without the {@link
+     * KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}) or 
a lower flink version
+     * string followed by the {@link
+     * KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}.
+     *
+     * <p>Prefixes are returned in ascending order of flink version.
+     *
+     * @param baseConfMap The configuration map that should be searched for 
relevant Flink version
+     *     prefixes.
+     * @return A list of relevant Flink version prefixes in order of ascending 
Flink version.
+     */
+    protected static List<String> getRelevantVersionPrefixes(
+            Map<String, String> baseConfMap, FlinkVersion flinkVersion) {
+        SortedMap<FlinkVersion, String> greaterThanVersionPrefixes = new 
TreeMap<>();
+
+        for (Map.Entry<String, String> entry : baseConfMap.entrySet()) {
+            Matcher versionMatcher = 
FLINK_VERSION_PATTERN.matcher(entry.getKey());
+            if (versionMatcher.matches() && versionMatcher.group("gt") != 
null) {
+                try {
+                    FlinkVersion keyFlinkVersion =
+                            FlinkVersion.fromMajorMinor(
+                                    
Integer.parseInt(versionMatcher.group("major")),
+                                    
Integer.parseInt(versionMatcher.group("minor")));
+                    if (flinkVersion.isEqualOrNewer(keyFlinkVersion)) {
+                        greaterThanVersionPrefixes.put(
+                                keyFlinkVersion,
+                                VERSION_CONF_PREFIX
+                                        + keyFlinkVersion
+                                        + KubernetesOperatorConfigOptions
+                                                
.FLINK_VERSION_GREATER_THAN_SUFFIX
+                                        + ".");
+                    }
+                } catch (NumberFormatException numberFormatException) {
+                    LOG.warn("Unable to parse version number in config key: 
{}", entry.getKey());
+                } catch (IllegalArgumentException illegalArgumentException) {
+                    LOG.warn("Unknown Flink version in config key: {}", 
entry.getKey());
+                }
+            }
+        }

Review Comment:
   Do you mean, for every supported version create all possible prefixes (for 
example we could do this in the `FlinkVersion` enum) and then loop through them 
for the given `FlinkVersion`? That would simplify things, but we would have 
wasted calls to `applyDefault` which scans the whole base config each time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to