skymilong commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1552023129
########## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java: ########## @@ -20,17 +20,24 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; /** Utilities for handling Flink configuration and environment. */ public class FlinkEnvironmentUtils { private static final String FLINK_CONF_DIR = "conf"; - private static final String FLINK_CONF_FILENAME = "flink-conf.yaml"; + private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml"; + private static final String NEW_FLINK_CONF_FILENAME = "config.yaml"; public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception { - Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); + Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME); + // If the old version of the configuration file does not exist, then attempt to use the new + // version of the file name. + if (!Files.exists(flinkConfPath)) { + flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(NEW_FLINK_CONF_FILENAME); + } return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); Review Comment: Thank you for your feedback. I have rechecked the code and I agree with you. The new config.yaml is not in a map format anymore, so rewriting the parsing logic would be necessary. I am willing to take on this task and submit the necessary changes. In addition, I think it might be beneficial to handle the old version of the configuration file through exception handling. Here is a possible way to do so: ```java private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml"; private static final String FLINK_CONF_FILENAME = "config.yaml"; public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception { Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); try { return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); } catch (FileNotFoundException e) { LOG.warn( "Failed to load the new configuration file:{}. Trying to load the old configuration file:{}.", FLINK_CONF_FILENAME, OLD_FLINK_CONF_FILENAME); return ConfigurationUtils.loadMapFormattedConfig( flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME)); } } ``` This is my idea for exception handling. We first attempt to load the new configuration file. If an exception occurs (which could be due to the absence of the new configuration file), we then try to load the old configuration file. This approach not only ensures backward compatibility and allows for a smoother transition between different versions of Flink, but also results in two log notifications from the `loadMapFormattedConfig` method. I think this will make the notifications more user-friendly. Do you think this approach is feasible? Or do you have any better suggestions or ideas? I look forward to your feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org