morazow commented on code in PR #3412: URL: https://github.com/apache/flink-cdc/pull/3412#discussion_r1635917238
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java: ########## @@ -165,6 +169,76 @@ public void testDatabaseAndTableWithTheSameName() throws SQLException { + inventoryDatabase.getDatabaseName())); } + @Test + public void testLackRequireOption() { + Map<String, String> options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + List<String> requireKeys = + factory.requiredOptions().stream() + .map(ConfigOption::key) + .collect(Collectors.toList()); + for (String requireKey : requireKeys) { + Map<String, String> remainingOptions = new HashMap<>(options); + remainingOptions.remove(requireKey); + Factory.Context context = new MockContext(Configuration.fromMap(remainingOptions)); + + assertThatThrownBy(() -> factory.createDataSource(context)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + String.format( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "%s", + requireKey)); + } + } + + @Test + public void testUnsupportedOption() { + Map<String, String> options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + options.put("unsupported_key", "unsupported_value"); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + assertThatThrownBy(() -> factory.createDataSource(context)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Unsupported options found for 'mysql'.\n\n" + + "Unsupported options:\n\n" + + "unsupported_key"); + } + + @Test + public void testPrefixRequireOption() { + inventoryDatabase.createAndInitialize(); + Map<String, String> options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + options.put("jdbc.properties.requireSSL", "true"); + options.put("debezium.snapshot.mode", "initial"); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + assertThat(dataSource.getSourceConfig().getTableList()) + .isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".products")); + } Review Comment: 👍 ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java: ########## @@ -44,4 +47,49 @@ public void testCreateDataSink() { conf, conf, Thread.currentThread().getContextClassLoader())); Assertions.assertTrue(dataSink instanceof KafkaDataSink); } + + @Test + public void testUnsupportedOption() { + + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("kafka", DataSinkFactory.class); + Assertions.assertTrue(sinkFactory instanceof KafkaDataSinkFactory); Review Comment: And in all other usages ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java: ########## @@ -44,4 +47,49 @@ public void testCreateDataSink() { conf, conf, Thread.currentThread().getContextClassLoader())); Assertions.assertTrue(dataSink instanceof KafkaDataSink); } + + @Test + public void testUnsupportedOption() { + + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("kafka", DataSinkFactory.class); + Assertions.assertTrue(sinkFactory instanceof KafkaDataSinkFactory); Review Comment: Should we use here also assertj assertions? ``` assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org