This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ea4b952af0 [INLONG-8743][Sort] Support more types of ddl in all 
migration (#8744)
ea4b952af0 is described below

commit ea4b952af0e7ae5ba86061fe0faf483e12e42d17
Author: Sting <zpen...@connect.ust.hk>
AuthorDate: Wed Aug 16 19:39:35 2023 +0800

    [INLONG-8743][Sort] Support more types of ddl in all migration (#8744)
---
 .../inlong/sort/protocol/ddl/enums/AlterType.java  |  4 +-
 .../sort/protocol/ddl/expressions/AlterColumn.java |  6 +++
 .../sort/cdc/mysql/utils/OperationUtils.java       | 11 ++++
 .../org/apache/inlong/sort/cdc/TestOperation.java  | 62 ++++++++++++++++++++++
 4 files changed, 82 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
index c6813b3998..5dfb48301e 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/enums/AlterType.java
@@ -26,6 +26,8 @@ public enum AlterType {
     ADD_COLUMN,
     DROP_COLUMN,
     MODIFY_COLUMN,
-    CHANGE_COLUMN
+    CHANGE_COLUMN,
+    DROP_CONSTRAINT,
+    ADD_CONSTRAINT
 
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
index c8f30d5184..f76195fb9f 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/ddl/expressions/AlterColumn.java
@@ -50,6 +50,12 @@ public class AlterColumn {
         this.oldColumn = oldColumn;
     }
 
+    public AlterColumn(@JsonProperty("alterType") AlterType alterType,
+            @JsonProperty("newColumn") Column newColumn) {
+        this.alterType = alterType;
+        this.newColumn = newColumn;
+    }
+
     public AlterColumn(@JsonProperty("alterType") AlterType alterType) {
         this.alterType = alterType;
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
index 028a98c912..7c5559f578 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/OperationUtils.java
@@ -112,12 +112,23 @@ public class OperationUtils {
         statement.getAlterExpressions().forEach(alterExpression -> {
             switch (alterExpression.getOperation()) {
                 case DROP:
+                    if (alterExpression.getConstraintName() != null) {
+                        alterColumns.add(new 
AlterColumn(AlterType.DROP_CONSTRAINT,
+                                new 
Column(reformatName(alterExpression.getConstraintName()))));
+                        break;
+                    }
                     alterColumns.add(new AlterColumn(AlterType.DROP_COLUMN,
                             null,
                             
Column.builder().name(reformatName(alterExpression.getColumnName()))
                                     .build()));
                     break;
                 case ADD:
+                    if (alterExpression.getIndex() != null) {
+                        // only support constraint type now
+                        // the sink connector doesn't support add constraint
+                        alterColumns.add(new 
AlterColumn(AlterType.ADD_CONSTRAINT));
+                        break;
+                    }
                     alterColumns.add(new AlterColumn(AlterType.ADD_COLUMN,
                             parseColumnWithPosition(isFirst, sqlType,
                                     
alterExpression.getColDataTypeList().get(0)),
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
index baf3de8f68..963e13954d 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/test/java/org/apache/inlong/sort/cdc/TestOperation.java
@@ -23,6 +23,7 @@ import 
org.apache.inlong.sort.protocol.ddl.enums.OperationType;
 import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
 import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
 import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
 import org.apache.inlong.sort.protocol.ddl.operations.Operation;
 import org.apache.inlong.sort.protocol.ddl.operations.UnsupportedOperation;
 
@@ -46,6 +47,15 @@ public class TestOperation {
         Assert.assertEquals(operation.getOperationType(), 
OperationType.RENAME);
     }
 
+    @Test
+    public void testRenameTableByAlter() {
+        String sql = "alter table a rename to b";
+        HashMap<String, Integer> sqlType = new HashMap<>();
+        Operation operation = OperationUtils.generateOperation(sql, sqlType);
+        assert operation != null;
+        Assert.assertTrue(operation instanceof AlterOperation);
+    }
+
     @Test
     public void testDropTableOperation() {
         String sql = "drop table `tv3`";
@@ -95,4 +105,56 @@ public class TestOperation {
         Assert.assertEquals(operation.getOperationType(), OperationType.OTHER);
     }
 
+    @Test
+    public void testCreateTableWithCharacterConstraint() {
+        String sql = "create table a (b int) engine=innodb character "
+                + "set=utf8 collate=utf8_bin row_format=dynamic";
+        HashMap<String, Integer> sqlType = new HashMap<>();
+        sqlType.put("b", 1);
+        Operation operation = OperationUtils.generateOperation(sql, sqlType);
+        assert operation != null;
+        Assert.assertTrue(operation instanceof CreateTableOperation);
+    }
+
+    @Test
+    public void alterTableChangeType() {
+        String sql = "ALTER TABLE test CHANGE COLUMN name name1 "
+                + "mediumtext character set utf8mb4 COLLATE=utf8 NULL";
+        HashMap<String, Integer> sqlType = new HashMap<>();
+        sqlType.put("name1", 1);
+        Operation operation = OperationUtils.generateOperation(sql, sqlType);
+        assert operation != null;
+        Assert.assertTrue(operation instanceof AlterOperation);
+        AlterColumn alterColumn = ((AlterOperation) 
operation).getAlterColumns().get(0);
+        Assert.assertEquals(alterColumn.getAlterType(), 
AlterType.CHANGE_COLUMN);
+        Assert.assertEquals(alterColumn.getNewColumn().getName(), "name1");
+        Assert.assertEquals(alterColumn.getOldColumn().getName(), "name");
+    }
+
+    @Test
+    public void dropTableConstraint() {
+        String sql = "ALTER TABLE test drop constraint a";
+        HashMap<String, Integer> sqlType = new HashMap<>();
+        sqlType.put("name1", 1);
+        Operation operation = OperationUtils.generateOperation(sql, sqlType);
+        assert operation != null;
+        Assert.assertTrue(operation instanceof AlterOperation);
+        Assert.assertEquals(operation.getOperationType(), OperationType.ALTER);
+        Assert.assertEquals(((AlterOperation) 
operation).getAlterColumns().get(0).getAlterType(),
+                AlterType.DROP_CONSTRAINT);
+    }
+
+    @Test
+    public void addConstraint() {
+        String sql = "ALTER TABLE test add constraint primary key (a)";
+        HashMap<String, Integer> sqlType = new HashMap<>();
+        sqlType.put("name1", 1);
+        Operation operation = OperationUtils.generateOperation(sql, sqlType);
+        assert operation != null;
+        Assert.assertTrue(operation instanceof AlterOperation);
+        Assert.assertEquals(operation.getOperationType(), OperationType.ALTER);
+        Assert.assertEquals(((AlterOperation) 
operation).getAlterColumns().get(0).getAlterType(),
+                AlterType.ADD_CONSTRAINT);
+    }
+
 }

Reply via email to