tomncooper commented on code in PR #918:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/918#discussion_r1857010349


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -202,6 +215,65 @@ public FlinkOperatorConfiguration getOperatorConfiguration(
                 getDefaultConfig(namespace, flinkVersion));
     }
 
+    /**
+     * This method will search the keys of the supplied map and find any that 
contain a flink
+     * version string that is relevant to the supplied flink version.
+     *
+     * <p>Relevance is defined as any key with the {@link
+     * KubernetesOperatorConfigOptions#VERSION_CONF_PREFIX} followed by either 
the supplied flink
+     * version (with or without the {@link
+     * KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}) or 
a lower flink version
+     * string followed by the {@link
+     * KubernetesOperatorConfigOptions#FLINK_VERSION_GREATER_THAN_SUFFIX}.
+     *
+     * <p>Prefixes are returned in ascending order of flink version.
+     *
+     * @param baseConfMap The configuration map that should be searched for 
relevant Flink version
+     *     prefixes.
+     * @return A list of relevant Flink version prefixes in order of ascending 
Flink version.
+     */
+    protected static List<String> getRelevantVersionPrefixes(
+            Map<String, String> baseConfMap, FlinkVersion flinkVersion) {
+        SortedMap<FlinkVersion, String> greaterThanVersionPrefixes = new 
TreeMap<>();
+
+        for (Map.Entry<String, String> entry : baseConfMap.entrySet()) {
+            Matcher versionMatcher = 
FLINK_VERSION_PATTERN.matcher(entry.getKey());
+            if (versionMatcher.matches() && versionMatcher.group("gt") != 
null) {
+                try {
+                    FlinkVersion keyFlinkVersion =
+                            FlinkVersion.fromMajorMinor(
+                                    
Integer.parseInt(versionMatcher.group("major")),
+                                    
Integer.parseInt(versionMatcher.group("minor")));
+                    if (flinkVersion.isEqualOrNewer(keyFlinkVersion)) {
+                        greaterThanVersionPrefixes.put(
+                                keyFlinkVersion,
+                                VERSION_CONF_PREFIX
+                                        + keyFlinkVersion
+                                        + KubernetesOperatorConfigOptions
+                                                
.FLINK_VERSION_GREATER_THAN_SUFFIX
+                                        + ".");
+                    }
+                } catch (NumberFormatException numberFormatException) {
+                    LOG.warn("Unable to parse version number in config key: 
{}", entry.getKey());
+                } catch (IllegalArgumentException illegalArgumentException) {
+                    LOG.warn("Unknown Flink version in config key: {}", 
entry.getKey());
+                }
+            }
+        }

Review Comment:
   So I added a cache (map) in the config manager (keyed on flink version) and 
call that cache inside `getDefaultConfig`. I clear the cache in 
`updateDefaultConfig` so we only do the calc once for each 
FlinkVersion:baseConfig combination.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to