yuxiqian commented on code in PR #3823: URL: https://github.com/apache/flink-cdc/pull/3823#discussion_r1903736412
########## flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java: ########## @@ -145,6 +146,31 @@ void testPipelineExecuting() throws Exception { assertThat(executionInfo.getDescription()).isEqualTo("fake-description"); } + @Test + void testPipelineExecutingWithFlinkConfig() throws Exception { + CliExecutor executor = + createExecutor( + pipelineDef(), + "--flink-home", + flinkHome(), + "--global-config", + globalPipelineConfig(), + "--flink-conf", + "execution.target=yarn-session", + "--flink-conf", + "rest.bind-port=42689", + "-fc", + "yarn.application.id=application_1714009558476_3563", + "-fc", + "rest.bind-address=10.1.140.140"); + Map<String, String> configMap = executor.getFlinkConfig().toMap(); + assertThat(configMap.get("execution.target")).isEqualTo("yarn-session"); + assertThat(configMap.get("rest.bind-port")).isEqualTo("42689"); + assertThat(configMap.get("yarn.application.id")) + .isEqualTo("application_1714009558476_3563"); + assertThat(configMap.get("rest.bind-address")).isEqualTo("10.1.140.140"); Review Comment: ```suggestion assertThat(configMap) .containsEntry("execution.target", "yarn-session") .containsEntry("rest.bind-port", "42689") .containsEntry("yarn.application.id", "application_1714009558476_3563") .containsEntry("rest.bind-address", "10.1.140.140"); ``` ########## flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java: ########## @@ -145,6 +146,31 @@ void testPipelineExecuting() throws Exception { assertThat(executionInfo.getDescription()).isEqualTo("fake-description"); } + @Test + void testPipelineExecutingWithFlinkConfig() throws Exception { Review Comment: Please also add tests for malformed arguments (like when `=` is missing, or `=` is part of value, etc.) ########## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java: ########## @@ -114,6 +120,21 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception { savepointSettings); } + private static void overrideFlinkConfiguration( + Configuration flinkConfig, CommandLine commandLine) { + String[] flinkConfigs = commandLine.getOptionValues(FLINK_CONFIG); + if (flinkConfigs != null) { + LOG.info("Find flink config items: {}", String.join(",", flinkConfigs)); + for (String config : flinkConfigs) { + String key = config.split("=")[0].trim(); + String value = config.split("=")[1].trim(); Review Comment: We can split just once and verify the format first. ########## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java: ########## @@ -114,6 +120,21 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception { savepointSettings); } + private static void overrideFlinkConfiguration( + Configuration flinkConfig, CommandLine commandLine) { + String[] flinkConfigs = commandLine.getOptionValues(FLINK_CONFIG); + if (flinkConfigs != null) { + LOG.info("Find flink config items: {}", String.join(",", flinkConfigs)); Review Comment: ```suggestion LOG.info("Dynamic flink config items found: {}", flinkConfigs); ``` ########## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java: ########## @@ -94,6 +94,15 @@ public class CliFrontendOptions { + "program that was part of the program when the savepoint was triggered.") .build(); + public static final Option FLINK_CONFIG = + Option.builder("fc") Review Comment: [SQL Client](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#sql-client-startup-options) provides `-D` to pass extra Flink options dynamically. Maybe we can follow the same naming here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org