yuxiqian commented on code in PR #3823:
URL: https://github.com/apache/flink-cdc/pull/3823#discussion_r1903736412


##########
flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java:
##########
@@ -145,6 +146,31 @@ void testPipelineExecuting() throws Exception {
         
assertThat(executionInfo.getDescription()).isEqualTo("fake-description");
     }
 
+    @Test
+    void testPipelineExecutingWithFlinkConfig() throws Exception {
+        CliExecutor executor =
+                createExecutor(
+                        pipelineDef(),
+                        "--flink-home",
+                        flinkHome(),
+                        "--global-config",
+                        globalPipelineConfig(),
+                        "--flink-conf",
+                        "execution.target=yarn-session",
+                        "--flink-conf",
+                        "rest.bind-port=42689",
+                        "-fc",
+                        "yarn.application.id=application_1714009558476_3563",
+                        "-fc",
+                        "rest.bind-address=10.1.140.140");
+        Map<String, String> configMap = executor.getFlinkConfig().toMap();
+        
assertThat(configMap.get("execution.target")).isEqualTo("yarn-session");
+        assertThat(configMap.get("rest.bind-port")).isEqualTo("42689");
+        assertThat(configMap.get("yarn.application.id"))
+                .isEqualTo("application_1714009558476_3563");
+        
assertThat(configMap.get("rest.bind-address")).isEqualTo("10.1.140.140");

Review Comment:
   ```suggestion
           assertThat(configMap)
                   .containsEntry("execution.target", "yarn-session")
                   .containsEntry("rest.bind-port", "42689")
                   .containsEntry("yarn.application.id", 
"application_1714009558476_3563")
                   .containsEntry("rest.bind-address", "10.1.140.140");
   ```



##########
flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java:
##########
@@ -145,6 +146,31 @@ void testPipelineExecuting() throws Exception {
         
assertThat(executionInfo.getDescription()).isEqualTo("fake-description");
     }
 
+    @Test
+    void testPipelineExecutingWithFlinkConfig() throws Exception {

Review Comment:
   Please also add tests for malformed arguments (like when `=` is missing, or 
`=` is part of value, etc.)



##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java:
##########
@@ -114,6 +120,21 @@ static CliExecutor createExecutor(CommandLine commandLine) 
throws Exception {
                 savepointSettings);
     }
 
+    private static void overrideFlinkConfiguration(
+            Configuration flinkConfig, CommandLine commandLine) {
+        String[] flinkConfigs = commandLine.getOptionValues(FLINK_CONFIG);
+        if (flinkConfigs != null) {
+            LOG.info("Find flink config items: {}", String.join(",", 
flinkConfigs));
+            for (String config : flinkConfigs) {
+                String key = config.split("=")[0].trim();
+                String value = config.split("=")[1].trim();

Review Comment:
   We can split just once and verify the format first.



##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java:
##########
@@ -114,6 +120,21 @@ static CliExecutor createExecutor(CommandLine commandLine) 
throws Exception {
                 savepointSettings);
     }
 
+    private static void overrideFlinkConfiguration(
+            Configuration flinkConfig, CommandLine commandLine) {
+        String[] flinkConfigs = commandLine.getOptionValues(FLINK_CONFIG);
+        if (flinkConfigs != null) {
+            LOG.info("Find flink config items: {}", String.join(",", 
flinkConfigs));

Review Comment:
   ```suggestion
               LOG.info("Dynamic flink config items found: {}", flinkConfigs);
   ```



##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java:
##########
@@ -94,6 +94,15 @@ public class CliFrontendOptions {
                                     + "program that was part of the program 
when the savepoint was triggered.")
                     .build();
 
+    public static final Option FLINK_CONFIG =
+            Option.builder("fc")

Review Comment:
   [SQL 
Client](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#sql-client-startup-options)
 provides `-D` to pass extra Flink options dynamically. Maybe we can follow the 
same naming here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to