skymilong commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1557102504


##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java:
##########
@@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path 
configPath) throws Excep
         }
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         try {
-            Map<String, String> configMap =
+            Map<String, Object> configMap =
                     mapper.readValue(
-                            configPath.toFile(), new TypeReference<Map<String, 
String>>() {});
-            return Configuration.fromMap(configMap);
+                            configPath.toFile(), new TypeReference<Map<String, 
Object>>() {});
+            return Configuration.fromMap(flattenConfigMap(configMap));
         } catch (Exception e) {
             throw new IllegalStateException(
                     String.format(
                             "Failed to load config file \"%s\" to key-value 
pairs", configPath),
                     e);
         }
     }
+
+    private static Map<String, String> flattenConfigMap(Map<String, Object> 
configMap) {
+        Map<String, String> result = new HashMap<>();
+        flattenConfigMapHelper(configMap, "", result);
+        return result;
+    }
+
+    private static void flattenConfigMapHelper(
+            Map<String, Object> configMap, String currentPath, Map<String, 
String> result) {
+        for (Map.Entry<String, Object> entry : configMap.entrySet()) {
+            String updatedPath =
+                    currentPath.isEmpty() ? entry.getKey() : currentPath + "." 
+ entry.getKey();
+            if (entry.getValue() instanceof Map) {
+                flattenConfigMapHelper((Map<String, Object>) entry.getValue(), 
updatedPath, result);
+            } else {

Review Comment:
   > There should be a case for handling `List` type, according to 
https://github.com/apache/flink/blob/5bbcf8de79ce1979412879b919299ffa5a9b62fe/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L301-L307
   
   After reviewing this section of code and the previous PR 
https://github.com/apache/flink-cdc/pull/2681, I find myself a bit uncertain. 
Should we directly reference the relevant code from Flink, or should we copy 
over the YamlParserUtils#toYAMLString part of the code as done in PR 
https://github.com/apache/flink-cdc/pull/2681? I would greatly appreciate your 
advice on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to