lvyanquan commented on code in PR #3433: URL: https://github.com/apache/flink-cdc/pull/3433#discussion_r1839617689
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-jdbc/src/main/java/org/apache/flink/cdc/connectors/jdbc/config/JdbcPropertiesFormatter.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.jdbc.config; + +import java.util.Properties; + +/** Jdbc Properties Formatter. */ +public class JdbcPropertiesFormatter { + public static Properties formatJdbcProperties(String jdbcPropertiesString) { + Properties properties = new Properties(); + if (jdbcPropertiesString == null || jdbcPropertiesString.isEmpty()) { + return properties; + } + + String[] keyValuePairs = jdbcPropertiesString.split(","); + for (String keyValuePair : keyValuePairs) { + String[] keyValue = keyValuePair.split("="); + if (keyValue.length == 2) { + properties.setProperty(keyValue[0].trim(), keyValue[1].trim()); + } Review Comment: Add some exception or log here to warn user when the input string is ill-formed. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-jdbc/src/main/java/org/apache/flink/cdc/connectors/jdbc/sink/JdbcMetadataApplier.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.jdbc.sink; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.connectors.jdbc.catalog.Catalog; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Class that implements the MetadataApplier interface, used to apply JDBC metadata. */ +public class JdbcMetadataApplier implements MetadataApplier { + private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataApplier.class); + + // Catalog is unSerializable. + private final Catalog catalog; + + public JdbcMetadataApplier(Catalog catalog) { + this.catalog = catalog; + } + + @Override + public void applySchemaChange(SchemaChangeEvent event) { + try { + // send schema change op to doris + if (event instanceof CreateTableEvent) { + applyCreateTableEvent((CreateTableEvent) event); + } else if (event instanceof AddColumnEvent) { + applyAddColumnEvent((AddColumnEvent) event); + } else if (event instanceof DropColumnEvent) { + applyDropColumnEvent((DropColumnEvent) event); + } else if (event instanceof RenameColumnEvent) { + applyRenameColumnEvent((RenameColumnEvent) event); + } else if (event instanceof AlterColumnTypeEvent) { + applyAlterColumnType((AlterColumnTypeEvent) event); + } + } catch (Exception ex) { Review Comment: We have support DropTableEvent and TruncateTableEvent, could you please add some method to deal with these events? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-jdbc/src/main/java/org/apache/flink/cdc/connectors/jdbc/config/JdbcPropertiesFormatter.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.jdbc.config; + +import java.util.Properties; + +/** Jdbc Properties Formatter. */ +public class JdbcPropertiesFormatter { Review Comment: This can be removed as it doesn't seem to have been used. ########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToMySqlE2eITCase.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.pipeline.tests.utils.MySqlContainer; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.lifecycle.Startables; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mysql.sink.MySqlSinkTestBase.assertEqualsInAnyOrder; + +/** */ +@RunWith(Parameterized.class) +public class MySqlToMySqlE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(MySqlToMySqlE2eITCase.class); + public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + public static final int TESTCASE_TIMEOUT_SECONDS = 60; + public static final String TEST_USERNAME = "root"; + public static final String TEST_PASSWORD = "test"; + public static final String TEST_DB = "db_source"; + // public static final String TEST_SINK_DB = "db_sink"; + public static final String SOURCE_NETWORK_NAME = "mysqlsource"; + public static final String SINK_NETWORK_NAME = "mysqlsink"; + protected Path jdbcJar; + + @ClassRule + public static final MySqlContainer MYSQL_SOURCE = + new MySqlContainer(MySqlVersion.V8_0, NETWORK, SOURCE_NETWORK_NAME, TEST_DB); + + @ClassRule + public static final MySqlContainer MYSQL_SINK = + new MySqlContainer(MySqlVersion.V8_0, NETWORK, SINK_NETWORK_NAME, TEST_DB); + + @BeforeClass + public static void initializeContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL_SOURCE)).join(); + Startables.deepStart(Stream.of(MYSQL_SINK)).join(); + LOG.info("Waiting for backends to be available"); + long startWaitingTimestamp = System.currentTimeMillis(); + + // MYSQL_SOURCE.waitForLog("", 1, 60); + + while (!MYSQL_SOURCE.checkMySqlAvailability()) { + try { + if (System.currentTimeMillis() - startWaitingTimestamp + > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("MYSQL_SOURCE startup timed out."); + } + LOG.info("Waiting for MYSQL_SOURCE to be available"); + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // ignore and check next round + } + } + + while (!MYSQL_SINK.checkMySqlAvailability()) { + try { + if (System.currentTimeMillis() - startWaitingTimestamp + > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("MYSQL_SINK startup timed out."); + } + LOG.info("Waiting for MYSQL_SINK to be available"); + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // ignore and check next round + } + } + LOG.info("Containers are started."); + } + + @Before + public void before() throws Exception { + super.before(); + MYSQL_SOURCE.createDatabase(TEST_DB); + MYSQL_SOURCE.createAndInitialize("mysql_inventory"); + MYSQL_SINK.createDatabase(TEST_DB); + jdbcJar = TestUtils.getResource(getJdbcConnectorResourceName()); + // MYSQL_SINK.createAndInitialize("mysql_inventory"); + } + + protected String getJdbcConnectorResourceName() { + return String.format("jdbc-connector_%s.jar", flinkVersion); + } + + @After + public void after() { + super.after(); + MYSQL_SOURCE.dropDatabase(TEST_DB); + MYSQL_SINK.dropDatabase(TEST_DB); + } + + @Test + public void test() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: " + + SOURCE_NETWORK_NAME + + "\n" + + " port: 3306 \n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + " jdbc.properties.useSSL: false\n" + + " jdbc.properties.allowPublicKeyRetrieval: true\n" + + "\n" + + "sink:\n" + + " type: mysql-writer\n" + + " hostname: " + + SINK_NETWORK_NAME + + "\n" + + " port: 3306 \n" + + " username: %s\n" + + " password: %s\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1", + TEST_USERNAME, + TEST_PASSWORD, + TEST_DB, + TEST_USERNAME, + TEST_PASSWORD); + + System.out.println(pipelineJob); + + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path jdbcCdcJar = TestUtils.getResource("jdbc-cdc-pipeline-connector.jar"); + Path hikariJar = TestUtils.getResource("HikariCP.jar"); Review Comment: We've included this HikariCP dependency into mysql-cdc-pipeline-connector.jar package or jdbc-cdc-pipeline-connector.jar, so we don't need to upload this jar? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-jdbc/pom.xml: ########## @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-pipeline-connectors</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>flink-cdc-pipeline-connector-jdbc</artifactId> + + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> Review Comment: This is unnecessary as user may want to compile with JDK 11. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-jdbc/src/main/java/org/apache/flink/cdc/connectors/jdbc/sink/JdbcMetadataApplier.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.jdbc.sink; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.connectors.jdbc.catalog.Catalog; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Class that implements the MetadataApplier interface, used to apply JDBC metadata. */ +public class JdbcMetadataApplier implements MetadataApplier { + private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataApplier.class); + + // Catalog is unSerializable. + private final Catalog catalog; + + public JdbcMetadataApplier(Catalog catalog) { + this.catalog = catalog; + } + + @Override + public void applySchemaChange(SchemaChangeEvent event) { + try { + // send schema change op to doris Review Comment: doris? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-jdbc/src/main/java/org/apache/flink/cdc/connectors/jdbc/sink/JdbcDataSink.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.jdbc.sink; + +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.sink.EventSinkProvider; +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.connectors.jdbc.catalog.Catalog; +import org.apache.flink.cdc.connectors.jdbc.config.JdbcSinkConfig; +import org.apache.flink.cdc.connectors.jdbc.sink.v2.EventRecordSerializationSchema; +import org.apache.flink.cdc.connectors.jdbc.sink.v2.JdbcSink; +import org.apache.flink.cdc.connectors.jdbc.sink.v2.JdbcSinkBuilder; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; + +import java.io.Serializable; + +/** JdbcDataSink class is used to write data to the database. */ +public class JdbcDataSink implements DataSink, Serializable { + // Catalog is unSerializable. + private final Catalog catalog; Review Comment: We can pass `JdbcSinkConfig` to here as `JdbcSinkConfig` is `Serializable`, and then use `JdbcSinkConfig` to create a `Catalog`. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-jdbc/src/main/java/org/apache/flink/cdc/connectors/jdbc/sink/v2/EventRecordSerializationSchema.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.jdbc.sink.v2; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.connectors.jdbc.sink.utils.JsonWrapper; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Serializer for the input {@link Event}. It will serialize a row to a map. */ +public class EventRecordSerializationSchema implements RecordSerializationSchema<Event> { + /** keep the relationship of TableId and table information. */ + private final Map<TableId, TableInfo> tableInfoMap; + + private final JsonWrapper jsonWrapper; + + public EventRecordSerializationSchema() { + tableInfoMap = new HashMap<>(); + jsonWrapper = new JsonWrapper(); + } + + @Override + public JdbcRowData serialize(Event record) throws IOException { Review Comment: Can we add a test for this method? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSinkFactory.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.factory; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.connectors.jdbc.options.JdbcSinkOptions; +import org.apache.flink.cdc.connectors.jdbc.sink.JdbcDataSink; +import org.apache.flink.cdc.connectors.mysql.sink.MySqlDataSinkConfig; +import org.apache.flink.cdc.connectors.mysql.sink.MysqlPooledDataSinkFactory; +import org.apache.flink.cdc.connectors.mysql.sink.catalog.MySqlCatalog; +import org.apache.flink.cdc.connectors.mysql.sink.catalog.MySqlCatalogFactory; +import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.cdc.connectors.jdbc.options.JdbcSinkOptions.JDBC_PROPERTIES_PROP_PREFIX; +import static org.apache.flink.cdc.connectors.mysql.sink.MySqlDataSinkOptions.CONNECTION_POOL_SIZE; +import static org.apache.flink.cdc.connectors.mysql.sink.MySqlDataSinkOptions.CONNECT_MAX_RETRIES; +import static org.apache.flink.cdc.connectors.mysql.sink.MySqlDataSinkOptions.CONNECT_TIMEOUT; +import static org.apache.flink.cdc.connectors.mysql.sink.MySqlDataSinkOptions.DRIVER_CLASS_NAME; +import static org.apache.flink.cdc.connectors.mysql.sink.MySqlDataSinkOptions.HOSTNAME; +import static org.apache.flink.cdc.connectors.mysql.sink.MySqlDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.sink.MySqlDataSinkOptions.PORT; +import static org.apache.flink.cdc.connectors.mysql.sink.MySqlDataSinkOptions.SERVER_TIME_ZONE; +import static org.apache.flink.cdc.connectors.mysql.sink.MySqlDataSinkOptions.USERNAME; + +/** Factory for creating configured instance of {@link DataSinkFactory}. */ +public class MySqlDataSinkFactory implements DataSinkFactory { + private static final Logger LOG = LoggerFactory.getLogger(MySqlDataSinkFactory.class); + + public static final String IDENTIFIER = "mysql-writer"; + + @Override + public DataSink createDataSink(Context context) { + FactoryHelper.createFactoryHelper(this, context) + .validateExcept(JDBC_PROPERTIES_PROP_PREFIX); + + final Configuration config = context.getFactoryConfiguration(); + MySqlDataSinkConfig.Builder builder = new MySqlDataSinkConfig.Builder(); + + config.getOptional(HOSTNAME).ifPresent(builder::hostname); + config.getOptional(PORT).ifPresent(builder::port); + config.getOptional(USERNAME).ifPresent(builder::username); + config.getOptional(PASSWORD).ifPresent(builder::password); + builder.serverTimeZone(config.getOptional(SERVER_TIME_ZONE).orElseGet(() -> "UTC")); + builder.connectTimeout( + config.getOptional(CONNECT_TIMEOUT).orElseGet(() -> Duration.ofSeconds(30))); + builder.connectionPoolSize(config.getOptional(CONNECTION_POOL_SIZE).orElseGet(() -> 20)); + builder.connectMaxRetries(config.getOptional(CONNECT_MAX_RETRIES).orElseGet(() -> 3)); + // driver class name + builder.driverClassName( + config.getOptional(DRIVER_CLASS_NAME).orElseGet(() -> "com.mysql.cj.jdbc.Driver")); + // builder jdbc properties + Properties properties = new Properties(); + Map<String, String> jdbcProperties = + JdbcSinkOptions.getPropertiesByPrefix(config, JDBC_PROPERTIES_PROP_PREFIX); + properties.putAll(jdbcProperties); + builder.jdbcProperties(properties); + // get jdbc url + String jdbcUrl = MysqlPooledDataSinkFactory.INSTANCE.getJdbcUrl(builder.build()); + builder.connUrl(jdbcUrl); + // print configs + Map<String, String> map = config.toMap(); + OptionUtils.printOptions(IDENTIFIER, map); + + MySqlDataSinkConfig sinkConfig = builder.build(); + MySqlCatalog catalog = MySqlCatalogFactory.INSTANCE.createCatalog(sinkConfig); + + return new JdbcDataSink(catalog, sinkConfig); Review Comment: Can we add an ItTest to write event using this JDBCDataSink and verify the result to make sure that it can work as expected? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-jdbc/src/main/java/org/apache/flink/cdc/connectors/jdbc/catalog/AbstractJdbcCatalog.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.jdbc.catalog; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.StringUtils; +import org.apache.flink.cdc.connectors.jdbc.config.JdbcSinkConfig; +import org.apache.flink.cdc.connectors.jdbc.conn.JdbcConnectionFactory; +import org.apache.flink.cdc.connectors.jdbc.conn.JdbcConnectionPoolFactory; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** AbstractJdbcCatalog. */ +public abstract class AbstractJdbcCatalog extends AbstractCatalog { + private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); + protected final JdbcConnectionFactory connectionFactory; + + public AbstractJdbcCatalog( + String name, JdbcSinkConfig sinkConfig, JdbcConnectionPoolFactory poolFactory) { + super(name); + connectionFactory = new JdbcConnectionFactory(sinkConfig, poolFactory); + } + + @Override + public void close() throws CatalogException {} + + @Override + public void createTable(TableId tableId, Schema schema, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + checkTableIdArguments(tableId); + + String createTableSql = buildCreateTableSql(tableId, schema, ignoreIfExists); + + try { + executeUpdate(createTableSql); + LOG.info( + "Success to create table {}.{}, sql: {}", + tableId.getSchemaName(), + tableId.getTableName(), + createTableSql); + } catch (Exception e) { + LOG.error( + "Failed to create table {}.{}, sql: {},reason:{}", + tableId.getSchemaName(), + tableId.getTableName(), + createTableSql, + e.getMessage()); + } + } + + @Override + public void addColumn(TableId tableId, AddColumnEvent addedColumns) + throws TableNotExistException, CatalogException { + checkTableIdArguments(tableId); + + Preconditions.checkArgument( + !addedColumns.getAddedColumns().isEmpty(), "Added columns should not be empty."); + + String addColumnSql = buildAlterAddColumnsSql(tableId, addedColumns.getAddedColumns()); + try { + executeUpdate(addColumnSql); + LOG.info( + "Success to add column on table {}.{}, sql: {}", + tableId.getSchemaName(), + tableId.getTableName(), + addColumnSql); + } catch (Exception e) { + LOG.error( + "Success to add column on table {}.{}, sql: {},reason:{}", + tableId.getSchemaName(), + tableId.getTableName(), + addColumnSql, + e.getMessage()); + } + } + + @Override + public void dropColumn(TableId tableId, DropColumnEvent addedColumns) + throws TableNotExistException, CatalogException { + checkTableIdArguments(tableId); + + List<String> droppedColumnNames = addedColumns.getDroppedColumnNames(); + Preconditions.checkArgument( + !droppedColumnNames.isEmpty(), "Dropped columns should not be empty."); + + droppedColumnNames.forEach( + column -> { + String dropColumnSql = buildDropColumnSql(tableId, column); + try { + executeUpdate(dropColumnSql); + LOG.info( + "Success to drop column on table {}.{}, sql: {}", + tableId.getSchemaName(), + tableId.getTableName(), + dropColumnSql); + } catch (Exception e) { + LOG.error( + "Failed to drop column on table {}.{}, sql: {},reason:{}", + tableId.getSchemaName(), + tableId.getTableName(), + dropColumnSql, + e.getMessage()); + } + }); + } + + @Override + public void renameColumn(TableId tableId, RenameColumnEvent addedColumns) + throws TableNotExistException, CatalogException { + checkTableIdArguments(tableId); + + Map<String, String> nameMapping = addedColumns.getNameMapping(); + for (Map.Entry<String, String> entry : nameMapping.entrySet()) { + String renameColumnSql = + buildRenameColumnSql(tableId, entry.getKey(), entry.getValue()); + try { + executeUpdate(renameColumnSql); + LOG.info( + "Success to rename column table {}.{}, sql: {}", + tableId.getSchemaName(), + tableId.getTableName(), + renameColumnSql); + } catch (Exception e) { + LOG.error( + "Failed to rename column table {}.{}, sql: {},reason:{}", + tableId.getSchemaName(), + tableId.getTableName(), + renameColumnSql, + e.getMessage()); + } + } + } + + @Override + public void alterColumnTyp(TableId tableId, AlterColumnTypeEvent alterColumns) + throws TableNotExistException, CatalogException { + checkTableIdArguments(tableId); + + Map<String, DataType> typeMapping = alterColumns.getTypeMapping(); + for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) { + String alterColumnTypeSql = + buildAlterColumnTypeSql(tableId, entry.getKey(), entry.getValue()); + + try { + executeUpdate(alterColumnTypeSql); + LOG.info( + "Success to alert column table {}.{}, sql: {}", + tableId.getSchemaName(), + tableId.getTableName(), + alterColumnTypeSql); + } catch (Exception e) { + LOG.error( + "Failed to alert column table {}.{}, sql: {},reason:{}", + tableId.getSchemaName(), + tableId.getTableName(), + alterColumnTypeSql, + e.getMessage()); + } + } + } + + @Override + public String getUpsertStatement(TableId tableId, Schema schema) { + checkTableIdArguments(tableId); + + return buildUpsertSql(tableId, schema); + } + + @Override + public String getDeleteStatement(TableId tableId, List<String> primaryKeys) { + checkTableIdArguments(tableId); + + return buildDeleteSql(tableId, primaryKeys); + } + + protected void checkTableIdArguments(TableId tableId) { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableId.getSchemaName()), + "database name cannot be null or empty."); + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableId.getTableName()), + "table name cannot be null or empty."); + } + + protected abstract String buildUpsertSql(TableId tableId, Schema schema); + + protected abstract String buildDeleteSql(TableId tableId, List<String> primaryKeys); + + protected abstract String buildCreateTableSql( + TableId tableId, Schema schema, boolean ignoreIfExists); + + protected abstract String buildAlterAddColumnsSql( + TableId tableId, List<AddColumnEvent.ColumnWithPosition> addedColumns); + + protected abstract String buildRenameColumnSql(TableId tableId, String oldName, String newName); + + protected abstract String buildDropColumnSql(TableId tableId, String column); + + protected abstract String buildAlterColumnTypeSql( + TableId tableId, String columnName, DataType columnType); + + protected void executeUpdate(String sql) throws SQLException { + try (Connection connection = connectionFactory.connect(); + Statement statement = connection.createStatement()) { + statement.executeUpdate(sql); + } + } + + /** Process ResultSet objects. */ + @FunctionalInterface + public interface ResultSetConsumer<T> { + T apply(ResultSet rs) throws SQLException; + } + + public <T> Optional<List<T>> executeQuery(String sql, ResultSetConsumer<T> consumer) Review Comment: This method is unused, is it necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org