GOODBOY008 commented on code in PR #3668: URL: https://github.com/apache/flink-cdc/pull/3668#discussion_r1905613729
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java: ########## @@ -384,6 +390,40 @@ public static TableId getTableId(SourceRecord dataRecord) { return new TableId(dbName, null, tableName); } + public static SourceRecord setTableId( + SourceRecord dataRecord, TableId originalTableId, TableId tableId) { + Struct value = (Struct) dataRecord.value(); + Document historyRecordDocument; + try { + historyRecordDocument = getHistoryRecord(dataRecord).document(); + } catch (IOException e) { + throw new RuntimeException(e); + } + HistoryRecord newHistoryRecord = + new HistoryRecord( + historyRecordDocument.set( + "ddl", + historyRecordDocument + .get("ddl") Review Comment: ```suggestion .get(HistoryRecord.Fields.DDL_STATEMENTS) ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java: ########## @@ -384,6 +390,40 @@ public static TableId getTableId(SourceRecord dataRecord) { return new TableId(dbName, null, tableName); } + public static SourceRecord setTableId( + SourceRecord dataRecord, TableId originalTableId, TableId tableId) { + Struct value = (Struct) dataRecord.value(); + Document historyRecordDocument; + try { + historyRecordDocument = getHistoryRecord(dataRecord).document(); + } catch (IOException e) { + throw new RuntimeException(e); + } + HistoryRecord newHistoryRecord = + new HistoryRecord( + historyRecordDocument.set( + "ddl", Review Comment: ditto ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java: ########## @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.table; + +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_INTERVAL; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_TIMEOUT; + +/** + * IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a + * href="https://github.com/github/gh-ost">github/gh-ost</a>/<a + * href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">doc/pt-osc</a> for + * more details. + */ +public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase { + private static final MySqlContainer MYSQL8_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf"); + + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; + + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit"; + + protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER = + createPerconaToolkitContainer(); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + private static final String GH_OST_DOWNLOAD_LINK = + DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64") + ? "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz" + : "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join(); + Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join(); + LOG.info("Containers are started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping containers..."); + MYSQL8_CONTAINER.stop(); + PERCONA_TOOLKIT_CONTAINER.close(); + LOG.info("Containers are stopped."); + } + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200); + customerDatabase.createAndInitialize(); + } + + @After + public void after() { + customerDatabase.dropDatabase(); + } + + private static void installGhOstCli(Container<?> container) { + try { + execInContainer( + container, + "download gh-ost tarball", + "curl", + "-L", + "-o", + "/tmp/gh-ost.tar.gz", + GH_OST_DOWNLOAD_LINK); + execInContainer( + container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static GenericContainer<?> createPerconaToolkitContainer() { + GenericContainer<?> perconaToolkit = + new GenericContainer<>(PERCONA_TOOLKIT) + // keep container alive + .withCommand("tail", "-f", "/dev/null") + .withNetwork(NETWORK) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + return perconaToolkit; + } + + @Test + public void testGhOstSchemaMigrationFromScratch() throws Exception { + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + LOG.info("Step 2: Start pipeline job"); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " `id` INT NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-time-zone' = 'UTC'," + + " 'server-id' = '%s'" + + ")", + MYSQL8_CONTAINER.getHost(), + MYSQL8_CONTAINER.getDatabasePort(), + TEST_USER, + TEST_PASSWORD, + customerDatabase.getDatabaseName(), + "customers", + true, + getServerId()); + + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = tEnv.executeSql("SELECT * FROM debezium_source"); + + // wait for the source startup, we don't have a better way to wait it, use sleep for now + TestCaseUtils.repeatedCheck( + () -> result.getJobClient().get().getJobStatus().get().equals(RUNNING), + DEFAULT_TIMEOUT, + DEFAULT_INTERVAL, + Arrays.asList(InterruptedException.class, NoSuchElementException.class)); + + CloseableIterator<Row> iterator = result.collect(); + + { + String[] expected = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]" + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + // Wait for a little while until we're in Binlog streaming mode. + Thread.sleep(5_000); + + { + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=add column ext int first", + "--allow-on-master", // because we don't have a replica + "--initially-drop-old-table", // drop previously generated temporary tables + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // The new column `ext` has been inserted now + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (17, 10000, 'Alice', 'Beijing', '123567891234');", + customerDatabase.getDatabaseName())); + } + + String[] expected = + new String[] { + "+I[10000, Alice, Beijing, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + { + LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=modify column ext double", + "--allow-on-master", + "--initially-drop-old-table", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');", + customerDatabase.getDatabaseName())); + } + + String[] expected = + new String[] { + "+I[10001, Bob, Chongqing, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + { + LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=drop column ext", + "--allow-on-master", + "--initially-drop-old-table", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');", + customerDatabase.getDatabaseName())); + } + + String[] expected = + new String[] { + "+I[10002, Cicada, Urumqi, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + } + + @Test + public void testPtOscSchemaMigrationFromScratch() throws Exception { + LOG.info("Step 1: Start pipeline job"); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " `id` INT NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-time-zone' = 'UTC'," + + " 'server-id' = '%s'" + + ")", + MYSQL8_CONTAINER.getHost(), + MYSQL8_CONTAINER.getDatabasePort(), + TEST_USER, + TEST_PASSWORD, + customerDatabase.getDatabaseName(), + "customers", + true, + getServerId()); + + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = tEnv.executeSql("SELECT * FROM debezium_source"); + + // wait for the source startup, we don't have a better way to wait it, use sleep for now + TestCaseUtils.repeatedCheck( + () -> result.getJobClient().get().getJobStatus().get().equals(RUNNING), + DEFAULT_TIMEOUT, + DEFAULT_INTERVAL, + Arrays.asList(InterruptedException.class, NoSuchElementException.class)); + + CloseableIterator<Row> iterator = result.collect(); + { + String[] expected = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]" + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + // Wait for a little while until we're in Binlog streaming mode. + Thread.sleep(5_000); + + LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "add column ext int FIRST", + "--charset=utf8", + "--recursion-method=NONE", // Do not look for slave nodes + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // The new column `ext` has been inserted now + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (17, 10000, 'Alice', 'Beijing', '123567891234');", + customerDatabase.getDatabaseName())); + + String[] expected = + new String[] { + "+I[10000, Alice, Beijing, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "modify column ext double", + "--charset=utf8", + "--recursion-method=NONE", + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');", + customerDatabase.getDatabaseName())); + String[] expected = + new String[] { + "+I[10001, Bob, Chongqing, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "drop column ext", + "--charset=utf8", + "--recursion-method=NONE", + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');", + customerDatabase.getDatabaseName())); + String[] expected = + new String[] { + "+I[10002, Cicada, Urumqi, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + } + + private static void execInContainer(Container<?> container, String prompt, String... commands) + throws IOException, InterruptedException { + { + LOG.info( + "Starting to {} with the following command: `{}`", + prompt, + String.join(" ", commands)); + Container.ExecResult execResult = container.execInContainer(commands); + if (execResult.getExitCode() == 0) { + LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout()); + } else { + LOG.error( + "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}", + prompt, + execResult.getExitCode(), + execResult.getStdout(), + execResult.getStderr()); + throw new IOException("Failed to execute commands: " + Arrays.toString(commands)); Review Comment: ```suggestion throw new IOException("Failed to execute commands: " + String.join(" ", commands)); ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java: ########## @@ -0,0 +1,591 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.FlinkSourceProvider; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.util.CloseableIterator; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.getServerId; +import static org.junit.Assert.assertEquals; + +/** + * IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a + * href="https://github.com/github/gh-ost">github/gh-ost</a>/<a + * href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">doc/pt-osc</a> for + * more details. + */ +public class MySqlOnLineSchemaMigrationITCase extends MySqlSourceTestBase { + private static final MySqlContainer MYSQL8_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf"); + + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + + protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER = + createPerconaToolkitContainer(); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final String GH_OST_DOWNLOAD_LINK = + DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64") + ? "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz" + : "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting MySql8 containers..."); + Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join(); + Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join(); + LOG.info("Container MySql8 is started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping MySql8 containers..."); + MYSQL8_CONTAINER.stop(); + PERCONA_TOOLKIT_CONTAINER.stop(); + LOG.info("Container MySql8 is stopped."); + } + + @Before + public void before() { + customerDatabase.createAndInitialize(); + TestValuesTableFactory.clearAllData(); + env.setParallelism(4); + env.enableCheckpointing(200); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @After + public void after() { + customerDatabase.dropDatabase(); + } + + private static void installGhOstCli(Container<?> container) { + try { + execInContainer( + container, + "download gh-ost tarball", + "curl", + "-L", + "-o", + "/tmp/gh-ost.tar.gz", + GH_OST_DOWNLOAD_LINK); + execInContainer( + container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static GenericContainer<?> createPerconaToolkitContainer() { + GenericContainer<?> perconaToolkit = + new GenericContainer<>(PERCONA_TOOLKIT) + // keep container alive + .withCommand("tail", "-f", "/dev/null") + .withNetwork(NETWORK) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + return perconaToolkit; + } + + @Test + public void testGhOstSchemaMigrationFromScratch() throws Exception { + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + LOG.info("Step 2: Start pipeline job"); + env.setParallelism(1); + TableId tableId = TableId.tableId(customerDatabase.getDatabaseName(), "customers"); + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(customerDatabase.getDatabaseName()) + .tableList(customerDatabase.getDatabaseName() + "\\.customers") + .startupOptions(StartupOptions.initial()) + .serverId(getServerId(env.getParallelism())) + .serverTimeZone("UTC") + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()) + .parseOnLineSchemaChanges(true); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + List<Event> expected = new ArrayList<>(); + Schema schemaV1 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .primaryKey(Collections.singletonList("id")) + .build(); + expected.add(new CreateTableEvent(tableId, schemaV1)); + expected.addAll(getSnapshotExpected(tableId, schemaV1)); + List<Event> actual = fetchResults(events, expected.size()); + assertEqualsInAnyOrder( + expected.stream().map(Object::toString).collect(Collectors.toList()), + actual.stream().map(Object::toString).collect(Collectors.toList())); + + // Wait for a little while until we're in Binlog streaming mode. + Thread.sleep(5_000); + + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=add column ext int", + "--allow-on-master", // because we don't have a replica + "--initially-drop-old-table", // drop previously generated temporary tables + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // The new column `ext` has been inserted now + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10000, 'Alice', 'Beijing', '123567891234', 17);", + customerDatabase.getDatabaseName())); + } + + Schema schemaV2 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .physicalColumn("ext", DataTypes.INT()) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("ext", DataTypes.INT(), null)))), + DataChangeEvent.insertEvent( + tableId, + generate(schemaV2, 10000, "Alice", "Beijing", "123567891234", 17))), + fetchResults(events, 2)); + + LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=modify column ext double", + "--allow-on-master", + "--initially-drop-old-table", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10001, 'Bob', 'Chongqing', '123567891234', 2.718281828);", + customerDatabase.getDatabaseName())); + } + + Schema schemaV3 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .physicalColumn("ext", DataTypes.DOUBLE()) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("ext", DataTypes.DOUBLE())), + DataChangeEvent.insertEvent( + tableId, + generate( + schemaV3, + 10001, + "Bob", + "Chongqing", + "123567891234", + 2.718281828))), + fetchResults(events, 2)); + + LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=drop column ext", + "--allow-on-master", + "--initially-drop-old-table", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');", + customerDatabase.getDatabaseName())); + } + + Schema schemaV4 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new DropColumnEvent(tableId, Collections.singletonList("ext")), + DataChangeEvent.insertEvent( + tableId, + generate(schemaV4, 10002, "Cicada", "Urumqi", "123567891234"))), + fetchResults(events, 2)); + } + + @Test + public void testPtOscSchemaMigrationFromScratch() throws Exception { + LOG.info("Step 1: Start pipeline job"); + + env.setParallelism(1); + TableId tableId = TableId.tableId(customerDatabase.getDatabaseName(), "customers"); + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(customerDatabase.getDatabaseName()) + .tableList(customerDatabase.getDatabaseName() + "\\.customers") + .startupOptions(StartupOptions.initial()) + .serverId(getServerId(env.getParallelism())) + .serverTimeZone("UTC") + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()) + .parseOnLineSchemaChanges(true); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + List<Event> expected = new ArrayList<>(); + Schema schemaV1 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .primaryKey(Collections.singletonList("id")) + .build(); + expected.add(new CreateTableEvent(tableId, schemaV1)); + expected.addAll(getSnapshotExpected(tableId, schemaV1)); + List<Event> actual = fetchResults(events, expected.size()); + assertEqualsInAnyOrder( + expected.stream().map(Object::toString).collect(Collectors.toList()), + actual.stream().map(Object::toString).collect(Collectors.toList())); + + // Wait for a little while until we're in Binlog streaming mode. + Thread.sleep(5_000); + + LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "add column ext int", + "--charset=utf8", + "--recursion-method=NONE", // Do not look for slave nodes + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // The new column `ext` has been inserted now + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10000, 'Alice', 'Beijing', '123567891234', 17);", + customerDatabase.getDatabaseName())); + } + + Schema schemaV2 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .physicalColumn("ext", DataTypes.INT()) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("ext", DataTypes.INT(), null)))), + DataChangeEvent.insertEvent( + tableId, + generate(schemaV2, 10000, "Alice", "Beijing", "123567891234", 17))), + fetchResults(events, 2)); + + LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "modify column ext double", + "--charset=utf8", + "--recursion-method=NONE", + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10001, 'Bob', 'Chongqing', '123567891234', 2.718281828);", + customerDatabase.getDatabaseName())); + } + + Schema schemaV3 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .physicalColumn("ext", DataTypes.DOUBLE()) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("ext", DataTypes.DOUBLE())), + DataChangeEvent.insertEvent( + tableId, + generate( + schemaV3, + 10001, + "Bob", + "Chongqing", + "123567891234", + 2.718281828))), + fetchResults(events, 2)); + + LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "drop column ext", + "--charset=utf8", + "--recursion-method=NONE", + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');", + customerDatabase.getDatabaseName())); + } + + Schema schemaV4 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new DropColumnEvent(tableId, Collections.singletonList("ext")), + DataChangeEvent.insertEvent( + tableId, + generate(schemaV4, 10002, "Cicada", "Urumqi", "123567891234"))), + fetchResults(events, 2)); + } + + private static void execInContainer(Container<?> container, String prompt, String... commands) + throws IOException, InterruptedException { + { + LOG.info( + "Starting to {} with the following command: `{}`", + prompt, + String.join(" ", commands)); + Container.ExecResult execResult = container.execInContainer(commands); + if (execResult.getExitCode() == 0) { + LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout()); + } else { + LOG.error( + "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}", + prompt, + execResult.getExitCode(), + execResult.getStdout(), + execResult.getStderr()); + throw new IOException("Failed to execute commands: " + Arrays.toString(commands)); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org