lvyanquan commented on code in PR #3668: URL: https://github.com/apache/flink-cdc/pull/3668#discussion_r1905378005
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java: ########## @@ -489,4 +529,75 @@ private static Optional<WatermarkKind> getWatermarkKind(SourceRecord record) { } return Optional.empty(); } + + /** + * This utility method checks if given source record is a gh-ost/pt-osc initiated schema change + * event by checking the "alter" ddl. + */ + public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { + if (!isSchemaChangeEvent(record)) { + return false; + } + Struct value = (Struct) record.value(); + ObjectMapper mapper = new ObjectMapper(); + try { + // There will be these schema change events generated in total during one transaction. + // + // gh-ost: + // DROP TABLE IF EXISTS `db`.`_tb1_gho` + // DROP TABLE IF EXISTS `db`.`_tb1_del` + // DROP TABLE IF EXISTS `db`.`_tb1_ghc` + // create /* gh-ost */ table `db`.`_tb1_ghc` ... + // create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1` + // alter /* gh-ost */ table `db`.`_tb1_gho` add column c varchar(255) + // create /* gh-ost */ table `db`.`_tb1_del` ... + // DROP TABLE IF EXISTS `db`.`_tb1_del` + // rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del` + // rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1` + // DROP TABLE IF EXISTS `db`.`_tb1_ghc` + // DROP TABLE IF EXISTS `db`.`_tb1_del` + // + // pt-osc: + // CREATE TABLE `db`.`_test_tb1_new` + // ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50) + // CREATE TRIGGER `pt_osc_db_test_tb1_del`... + // CREATE TRIGGER `pt_osc_db_test_tb1_upd`... + // CREATE TRIGGER `pt_osc_db_test_tb1_ins`... + // ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */ + // RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, `db`.`_test_tb1_new` TO + // `db`.`test_tb1` + // DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */ + // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del` + // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd` + // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins` + // + // Among all these, we only need the "ALTER" one that happens on the `_gho`/`_new` + // table. + String ddl = + mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) + .get("ddl") + .asText() + .toLowerCase(); + if (ddl.toLowerCase().startsWith("alter")) { Review Comment: This toLowerCase() is unnecessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org