This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ea4b952af0 [INLONG-8743][Sort] Support more types of ddl in all migration (#8744) ea4b952af0 is described below commit ea4b952af0e7ae5ba86061fe0faf483e12e42d17 Author: Sting <zpen...@connect.ust.hk> AuthorDate: Wed Aug 16 19:39:35 2023 +0800 [INLONG-8743][Sort] Support more types of ddl in all migration (#8744) --- .../inlong/sort/protocol/ddl/enums/AlterType.java | 4 +- .../sort/protocol/ddl/expressions/AlterColumn.java | 6 +++ .../sort/cdc/mysql/utils/OperationUtils.java | 11 ++++ .../org/apache/inlong/sort/cdc/TestOperation.java | 62 ++++++++++++++++++++++ 4 files changed, 82 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java index c6813b3998..5dfb48301e 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java @@ -26,6 +26,8 @@ public enum AlterType { ADD_COLUMN, DROP_COLUMN, MODIFY_COLUMN, - CHANGE_COLUMN + CHANGE_COLUMN, + DROP_CONSTRAINT, + ADD_CONSTRAINT } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java index c8f30d5184..f76195fb9f 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java @@ -50,6 +50,12 @@ public class AlterColumn { this.oldColumn = oldColumn; } + public AlterColumn(@JsonProperty("alterType") AlterType alterType, + @JsonProperty("newColumn") Column newColumn) { + this.alterType = alterType; + this.newColumn = newColumn; + } + public AlterColumn(@JsonProperty("alterType") AlterType alterType) { this.alterType = alterType; } diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java index 028a98c912..7c5559f578 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java @@ -112,12 +112,23 @@ public class OperationUtils { statement.getAlterExpressions().forEach(alterExpression -> { switch (alterExpression.getOperation()) { case DROP: + if (alterExpression.getConstraintName() != null) { + alterColumns.add(new AlterColumn(AlterType.DROP_CONSTRAINT, + new Column(reformatName(alterExpression.getConstraintName())))); + break; + } alterColumns.add(new AlterColumn(AlterType.DROP_COLUMN, null, Column.builder().name(reformatName(alterExpression.getColumnName())) .build())); break; case ADD: + if (alterExpression.getIndex() != null) { + // only support constraint type now + // the sink connector doesn't support add constraint + alterColumns.add(new AlterColumn(AlterType.ADD_CONSTRAINT)); + break; + } alterColumns.add(new AlterColumn(AlterType.ADD_COLUMN, parseColumnWithPosition(isFirst, sqlType, alterExpression.getColDataTypeList().get(0)), diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java index baf3de8f68..963e13954d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java @@ -23,6 +23,7 @@ import org.apache.inlong.sort.protocol.ddl.enums.OperationType; import org.apache.inlong.sort.protocol.ddl.enums.PositionType; import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn; import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation; +import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation; import org.apache.inlong.sort.protocol.ddl.operations.Operation; import org.apache.inlong.sort.protocol.ddl.operations.UnsupportedOperation; @@ -46,6 +47,15 @@ public class TestOperation { Assert.assertEquals(operation.getOperationType(), OperationType.RENAME); } + @Test + public void testRenameTableByAlter() { + String sql = "alter table a rename to b"; + HashMap<String, Integer> sqlType = new HashMap<>(); + Operation operation = OperationUtils.generateOperation(sql, sqlType); + assert operation != null; + Assert.assertTrue(operation instanceof AlterOperation); + } + @Test public void testDropTableOperation() { String sql = "drop table `tv3`"; @@ -95,4 +105,56 @@ public class TestOperation { Assert.assertEquals(operation.getOperationType(), OperationType.OTHER); } + @Test + public void testCreateTableWithCharacterConstraint() { + String sql = "create table a (b int) engine=innodb character " + + "set=utf8 collate=utf8_bin row_format=dynamic"; + HashMap<String, Integer> sqlType = new HashMap<>(); + sqlType.put("b", 1); + Operation operation = OperationUtils.generateOperation(sql, sqlType); + assert operation != null; + Assert.assertTrue(operation instanceof CreateTableOperation); + } + + @Test + public void alterTableChangeType() { + String sql = "ALTER TABLE test CHANGE COLUMN name name1 " + + "mediumtext character set utf8mb4 COLLATE=utf8 NULL"; + HashMap<String, Integer> sqlType = new HashMap<>(); + sqlType.put("name1", 1); + Operation operation = OperationUtils.generateOperation(sql, sqlType); + assert operation != null; + Assert.assertTrue(operation instanceof AlterOperation); + AlterColumn alterColumn = ((AlterOperation) operation).getAlterColumns().get(0); + Assert.assertEquals(alterColumn.getAlterType(), AlterType.CHANGE_COLUMN); + Assert.assertEquals(alterColumn.getNewColumn().getName(), "name1"); + Assert.assertEquals(alterColumn.getOldColumn().getName(), "name"); + } + + @Test + public void dropTableConstraint() { + String sql = "ALTER TABLE test drop constraint a"; + HashMap<String, Integer> sqlType = new HashMap<>(); + sqlType.put("name1", 1); + Operation operation = OperationUtils.generateOperation(sql, sqlType); + assert operation != null; + Assert.assertTrue(operation instanceof AlterOperation); + Assert.assertEquals(operation.getOperationType(), OperationType.ALTER); + Assert.assertEquals(((AlterOperation) operation).getAlterColumns().get(0).getAlterType(), + AlterType.DROP_CONSTRAINT); + } + + @Test + public void addConstraint() { + String sql = "ALTER TABLE test add constraint primary key (a)"; + HashMap<String, Integer> sqlType = new HashMap<>(); + sqlType.put("name1", 1); + Operation operation = OperationUtils.generateOperation(sql, sqlType); + assert operation != null; + Assert.assertTrue(operation instanceof AlterOperation); + Assert.assertEquals(operation.getOperationType(), OperationType.ALTER); + Assert.assertEquals(((AlterOperation) operation).getAlterColumns().get(0).getAlterType(), + AlterType.ADD_CONSTRAINT); + } + }