morazow commented on code in PR #3412:
URL: https://github.com/apache/flink-cdc/pull/3412#discussion_r1635917238


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java:
##########
@@ -165,6 +169,76 @@ public void testDatabaseAndTableWithTheSameName() throws 
SQLException {
                                         + 
inventoryDatabase.getDatabaseName()));
     }
 
+    @Test
+    public void testLackRequireOption() {
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + 
".prod\\.*");
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        List<String> requireKeys =
+                factory.requiredOptions().stream()
+                        .map(ConfigOption::key)
+                        .collect(Collectors.toList());
+        for (String requireKey : requireKeys) {
+            Map<String, String> remainingOptions = new HashMap<>(options);
+            remainingOptions.remove(requireKey);
+            Factory.Context context = new 
MockContext(Configuration.fromMap(remainingOptions));
+
+            assertThatThrownBy(() -> factory.createDataSource(context))
+                    .isInstanceOf(ValidationException.class)
+                    .hasMessageContaining(
+                            String.format(
+                                    "One or more required options are 
missing.\n\n"
+                                            + "Missing required options 
are:\n\n"
+                                            + "%s",
+                                    requireKey));
+        }
+    }
+
+    @Test
+    public void testUnsupportedOption() {
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + 
".prod\\.*");
+        options.put("unsupported_key", "unsupported_value");
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        Factory.Context context = new 
MockContext(Configuration.fromMap(options));
+
+        assertThatThrownBy(() -> factory.createDataSource(context))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Unsupported options found for 'mysql'.\n\n"
+                                + "Unsupported options:\n\n"
+                                + "unsupported_key");
+    }
+
+    @Test
+    public void testPrefixRequireOption() {
+        inventoryDatabase.createAndInitialize();
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + 
".prod\\.*");
+        options.put("jdbc.properties.requireSSL", "true");
+        options.put("debezium.snapshot.mode", "initial");
+        Factory.Context context = new 
MockContext(Configuration.fromMap(options));
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        MySqlDataSource dataSource = (MySqlDataSource) 
factory.createDataSource(context);
+        assertThat(dataSource.getSourceConfig().getTableList())
+                .isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + 
".products"));
+    }

Review Comment:
   👍 



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java:
##########
@@ -44,4 +47,49 @@ public void testCreateDataSink() {
                                 conf, conf, 
Thread.currentThread().getContextClassLoader()));
         Assertions.assertTrue(dataSink instanceof KafkaDataSink);
     }
+
+    @Test
+    public void testUnsupportedOption() {
+
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier("kafka", 
DataSinkFactory.class);
+        Assertions.assertTrue(sinkFactory instanceof KafkaDataSinkFactory);

Review Comment:
   And in all other usages



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java:
##########
@@ -44,4 +47,49 @@ public void testCreateDataSink() {
                                 conf, conf, 
Thread.currentThread().getContextClassLoader()));
         Assertions.assertTrue(dataSink instanceof KafkaDataSink);
     }
+
+    @Test
+    public void testUnsupportedOption() {
+
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier("kafka", 
DataSinkFactory.class);
+        Assertions.assertTrue(sinkFactory instanceof KafkaDataSinkFactory);

Review Comment:
   Should we use here also assertj assertions?
   
   ```
   assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to