This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 06df51bcbb [Feature][Transform] Support rename table/column (#8170)
06df51bcbb is described below

commit 06df51bcbbb3294f40ebb5607c70bc031c5175f8
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Thu Dec 5 14:03:10 2024 +0800

    [Feature][Transform] Support rename table/column (#8170)
---
 docs/en/transform-v2/field-rename.md               | 132 ++++++++
 docs/en/transform-v2/table-rename.md               | 132 ++++++++
 plugin-mapping.properties                          |   2 +
 .../connector/TransformSpecificationCheckTest.java |   2 +-
 .../seatunnel/e2e/transform/TestRenameIT.java      |  43 +++
 .../resources/table_field_rename_multi_table.conf  | 228 ++++++++++++++
 .../seatunnel/transform/rename/ConvertCase.java    |  23 ++
 .../transform/rename/FieldRenameConfig.java        | 129 ++++++++
 .../rename/FieldRenameMultiCatalogTransform.java   |  45 +++
 .../transform/rename/FieldRenameTransform.java     | 333 +++++++++++++++++++++
 .../rename/FieldRenameTransformFactory.java        |  58 ++++
 .../transform/rename/TableRenameConfig.java        |  98 ++++++
 .../rename/TableRenameMultiCatalogTransform.java   |  45 +++
 .../transform/rename/TableRenameTransform.java     | 267 +++++++++++++++++
 .../rename/TableRenameTransformFactory.java        |  56 ++++
 .../transform/rename/FieldRenameTransformTest.java | 243 +++++++++++++++
 .../transform/rename/TableRenameTransformTest.java | 150 ++++++++++
 17 files changed, 1985 insertions(+), 1 deletion(-)

diff --git a/docs/en/transform-v2/field-rename.md 
b/docs/en/transform-v2/field-rename.md
new file mode 100644
index 0000000000..faf198b695
--- /dev/null
+++ b/docs/en/transform-v2/field-rename.md
@@ -0,0 +1,132 @@
+# FieldRename
+
+> FieldRename transform plugin
+
+## Description
+
+FieldRename transform plugin for rename field name.
+
+## Options
+
+|          name           | type   | required | default value | Description    
                                                                                
                       |
+|:-----------------------:|--------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------|
+|      convert_case       | string | no       |               | The case 
conversion type. The options can be `UPPER`, `LOWER`                            
                             |
+|         prefix          | string | no       |               | The prefix to 
be added to the field name                                                      
                        |
+|         suffix          | string | no       |               | The suffix to 
be added to the field name                                                      
                        |
+| replacements_with_regex | array  | no       |               | The array of 
replacement rules with regex. The replacement rule is a map with `replace_from` 
and `replace_to` fields. |
+
+## Examples
+
+### Convert field to uppercase
+
+```
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    MySQL-CDC {
+        plugin_output = "customers_mysql_cdc"
+        
+        username = "root"
+        password = "123456"
+        table-names = ["source.user_shop", "source.user_order"]
+        base-url = "jdbc:mysql://localhost:3306/source"
+    }
+}
+
+transform {
+  FieldRename {
+    plugin_input = "customers_mysql_cdc"
+    plugin_output = "trans_result"
+    
+    convert_case = "UPPER"
+    prefix = "F_"
+    suffix = "_S"
+    replacements_with_regex = [
+      {
+        replace_from = "create_time"
+        replace_to = "SOURCE_CREATE_TIME"
+      }
+    ]
+  }
+}
+
+sink {
+  Jdbc {
+    plugin_input = "trans_result"
+    
+    driver="oracle.jdbc.OracleDriver"
+    url="jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
+    user="myuser"
+    password="mypwd"
+    
+    generate_sink_sql = true
+    database = "ORCLCDB"
+    table = "${database_name}.${table_name}"
+    primary_keys = ["${primary_key}"]
+    
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode = "APPEND_DATA"
+  }
+}
+```
+
+### Convert field name to lowercase
+
+```
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Oracle-CDC {
+    plugin_output = "customers_oracle_cdc"
+    
+    base-url = "jdbc:oracle:thin:@localhost:1521/ORCLCDB"
+    username = "dbzuser"
+    password = "dbz"
+    database-names = ["ORCLCDB"]
+    schema-names = ["DEBEZIUM"]
+    table-names = ["SOURCE.USER_SHOP", "SOURCE.USER_ORDER"]
+  }
+}
+
+transform {
+  FieldRename {
+    plugin_input = "customers_oracle_cdc"
+    plugin_output = "trans_result"
+    
+    convert_case = "LOWER"
+    prefix = "f_"
+    suffix = "_s"
+    replacements_with_regex = [
+      {
+        replace_from = "CREATE_TIME"
+        replace_to = "source_create_time"
+      }
+    ]
+  }
+}
+
+sink {
+  Jdbc {
+    plugin_input = "trans_result"
+    
+    url = "jdbc:mysql://localhost:3306/test"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "st_user_sink"
+    password = "mysqlpw"
+    
+    generate_sink_sql = true
+    database = "${schema_name}"
+    table = "${table_name}"
+    primary_keys = ["${primary_key}"]
+    
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode = "APPEND_DATA"
+  }
+}
+```
\ No newline at end of file
diff --git a/docs/en/transform-v2/table-rename.md 
b/docs/en/transform-v2/table-rename.md
new file mode 100644
index 0000000000..6cd1a60de7
--- /dev/null
+++ b/docs/en/transform-v2/table-rename.md
@@ -0,0 +1,132 @@
+# TableRename
+
+> TableRename transform plugin
+
+## Description
+
+TableRename transform plugin for rename table name.
+
+## Options
+
+|          name           | type   | required | default value | Description    
                                                                                
                       |
+|:-----------------------:|--------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------|
+|      convert_case       | string | no       |               | The case 
conversion type. The options can be `UPPER`, `LOWER`                            
                             |
+|         prefix          | string | no       |               | The prefix to 
be added to the table name                                                      
                        |
+|         suffix          | string | no       |               | The suffix to 
be added to the table name                                                      
                        |
+| replacements_with_regex | array  | no       |               | The array of 
replacement rules with regex. The replacement rule is a map with `replace_from` 
and `replace_to` fields. |
+
+## Examples
+
+### Convert table name to uppercase
+
+```
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    MySQL-CDC {
+        plugin_output = "customers_mysql_cdc"
+        
+        username = "root"
+        password = "123456"
+        table-names = ["source.user_shop", "source.user_order"]
+        base-url = "jdbc:mysql://localhost:3306/source"
+    }
+}
+
+transform {
+  TableRename {
+    plugin_input = "customers_mysql_cdc"
+    plugin_output = "trans_result"
+    
+    convert_case = "UPPER"
+    prefix = "CDC_"
+    suffix = "_TABLE"
+    replacements_with_regex = [
+      {
+        replace_from = "user"
+        replace_to = "U"
+      }
+    ]
+  }
+}
+
+sink {
+  Jdbc {
+    plugin_input = "trans_result"
+    
+    driver="oracle.jdbc.OracleDriver"
+    url="jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
+    user="myuser"
+    password="mypwd"
+    
+    generate_sink_sql = true
+    database = "ORCLCDB"
+    table = "${database_name}.${table_name}"
+    primary_keys = ["${primary_key}"]
+    
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode = "APPEND_DATA"
+  }
+}
+```
+
+### Convert table name to lowercase
+
+```
+env {
+    parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Oracle-CDC {
+    plugin_output = "customers_oracle_cdc"
+    
+    base-url = "jdbc:oracle:thin:@localhost:1521/ORCLCDB"
+    username = "dbzuser"
+    password = "dbz"
+    database-names = ["ORCLCDB"]
+    schema-names = ["DEBEZIUM"]
+    table-names = ["SOURCE.USER_SHOP", "SOURCE.USER_ORDER"]
+  }
+}
+
+transform {
+  TableRename {
+    plugin_input = "customers_oracle_cdc"
+    plugin_output = "trans_result"
+    
+    convert_case = "LOWER"
+    prefix = "cdc_"
+    suffix = "_table"
+    replacements_with_regex = [
+      {
+        replace_from = "USER"
+        replace_to = "u"
+      }
+    ]
+  }
+}
+
+sink {
+  Jdbc {
+    plugin_input = "trans_result"
+    
+    url = "jdbc:mysql://localhost:3306/test"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "st_user_sink"
+    password = "mysqlpw"
+    
+    generate_sink_sql = true
+    database = "${schema_name}"
+    table = "${table_name}"
+    primary_keys = ["${primary_key}"]
+    
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode = "APPEND_DATA"
+  }
+}
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c494686161..32cc214c21 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -155,3 +155,5 @@ seatunnel.transform.LLM = seatunnel-transforms-v2
 seatunnel.transform.Embedding = seatunnel-transforms-v2
 seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
 seatunnel.transform.Metadata = seatunnel-transforms-v2
+seatunnel.transform.FieldRename = seatunnel-transforms-v2
+seatunnel.transform.TableRename = seatunnel-transforms-v2
diff --git 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
index bb3fe1f55b..89f3ec9c48 100644
--- 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
+++ 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
@@ -43,7 +43,7 @@ public class TransformSpecificationCheckTest {
                 FactoryUtil.discoverFactories(
                         Thread.currentThread().getContextClassLoader(),
                         TableTransformFactory.class);
-        Assertions.assertEquals(13, factories.size());
+        Assertions.assertEquals(15, factories.size());
     }
 
     @Test
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java
new file mode 100644
index 0000000000..8b2e55551b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestRenameIT extends TestSuiteBase {
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK},
+            disabledReason = "Currently SPARK do not multi table transform")
+    @TestTemplate
+    public void testRenameMultiTable(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/table_field_rename_multi_table.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_field_rename_multi_table.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_field_rename_multi_table.conf
new file mode 100644
index 0000000000..254f2032d4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_field_rename_multi_table.conf
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "source1"
+
+    tables_configs = [
+      {
+        row.num = 3
+        schema = {
+          table = "test.abc"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            }
+          ]
+        }
+      },
+      {
+        row.num = 5
+        schema = {
+          table = "test.xyz"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            }
+          ]
+        }
+      },
+      {
+        row.num = 10
+        schema = {
+          table = "test.www"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            }
+          ]
+        }
+      }
+    ]
+  }
+}
+transform {
+  TableRename {
+    plugin_input = "source1"
+    plugin_output = "transform1"
+
+    table_match_regex = "test.a.*"
+    table_transform = [{
+      table_path = "test.xyz"
+      convert_case = "UPPER"
+      prefix = "P2_"
+      suffix = "_S2"
+      replacements_with_regex = [
+        {
+          replace_from = "z"
+          replace_to = "ZZ"
+        }
+      ]
+    }]
+    convert_case = "UPPER"
+    prefix = "P1_"
+    suffix = "_S1"
+    replacements_with_regex = [
+      {
+        replace_from = "c"
+        replace_to = "CC"
+      }
+    ]
+  }
+
+  FieldRename {
+      plugin_input = "transform1"
+      plugin_output = "transform2"
+
+      table_match_regex = "TEST.P.*"
+      table_transform = [{
+        table_path = "TEST.P2_XYZZ_S2"
+        convert_case = "UPPER"
+        prefix = "F_P2_"
+        suffix = "_S2_F"
+        replacements_with_regex = [
+          {
+            replace_from = "id"
+            replace_to = "ID_1"
+          }
+        ]
+      }]
+      convert_case = "UPPER"
+      prefix = "F_P1_"
+      suffix = "_S1_F"
+      replacements_with_regex = [
+        {
+          replace_from = "name"
+          replace_to = "NAME_1"
+        }
+      ]
+    }
+}
+sink {
+  Assert {
+    plugin_input = "transform2"
+
+    rules =
+      {
+        tables_configs = [
+          {
+            table_path = "TEST.P1_ABCC_S1"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 3
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 3
+              }
+            ],
+            field_rules = [
+                {
+                  field_name = F_P1_ID_S1_F
+                  field_type = bigint
+                  field_value = [
+                    {
+                      rule_type = NOT_NULL
+                    }
+                  ]
+                },
+                {
+                  field_name = F_P1_NAME_1_S1_F
+                  field_type = string
+                  field_value = [
+                    {
+                      rule_type = NOT_NULL
+                    }
+                  ]
+                }
+            ]
+          },
+          {
+            table_path = "TEST.P2_XYZZ_S2"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 5
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 5
+              }
+            ],
+            field_rules = [
+                {
+                  field_name = F_P2_ID_1_S2_F
+                  field_type = bigint
+                  field_value = [
+                    {
+                      rule_type = NOT_NULL
+                    }
+                  ]
+                },
+                {
+                  field_name = F_P2_NAME_S2_F
+                  field_type = string
+                  field_value = [
+                    {
+                      rule_type = NOT_NULL
+                    }
+                  ]
+                }
+            ]
+          },
+          {
+            table_path = "test.www"
+            catalog_table_rule {
+              table_path = "test.www"
+              column_rule = [
+                {
+                  name = "id"
+                  type = "bigint"
+                },
+                {
+                  name = "name"
+                  type = "string"
+                }
+              ]
+            }
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/ConvertCase.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/ConvertCase.java
new file mode 100644
index 0000000000..5dafb26cd3
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/ConvertCase.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+public enum ConvertCase {
+    LOWER,
+    UPPER
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameConfig.java
new file mode 100644
index 0000000000..7bd8ec1495
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameConfig.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+@Accessors(chain = true)
+public class FieldRenameConfig implements Serializable {
+
+    public static final Option<ConvertCase> CONVERT_CASE =
+            Options.key("convert_case")
+                    .enumType(ConvertCase.class)
+                    .noDefaultValue()
+                    .withDescription("Convert to uppercase or lowercase");
+
+    public static final Option<String> PREFIX =
+            Options.key("prefix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Add prefix for field name");
+
+    public static final Option<String> SUFFIX =
+            Options.key("suffix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Add suffix for field name");
+
+    public static final Option<List<ReplacementsWithRegex>> 
REPLACEMENTS_WITH_REGEX =
+            Options.key("replacements_with_regex")
+                    .listType(ReplacementsWithRegex.class)
+                    .noDefaultValue()
+                    .withDescription("The regex of replace fields name to ");
+
+    public static final Option<List<SpecificModify>> SPECIFIC =
+            Options.key("specific")
+                    .listType(SpecificModify.class)
+                    .noDefaultValue()
+                    .withDescription("The specific modify field name");
+
+    @JsonAlias("table_match_regex")
+    private String tableMatchRegex;
+
+    @JsonAlias("is_table_match_regex")
+    private Boolean isTableMatchRegex;
+
+    @JsonAlias("match_tables")
+    private List<String> matchTables;
+
+    @JsonAlias("convert_case")
+    private ConvertCase convertCase;
+
+    @JsonAlias("prefix")
+    private String prefix;
+
+    @JsonAlias("suffix")
+    private String suffix;
+
+    @JsonAlias("replacements_with_regex")
+    private List<ReplacementsWithRegex> replacementsWithRegex;
+
+    @JsonAlias("specific")
+    private List<SpecificModify> specific;
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class SpecificModify implements Serializable {
+        @JsonAlias("field_name")
+        private String fieldName;
+
+        @JsonAlias("target_name")
+        private String targetName;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class ReplacementsWithRegex implements Serializable {
+        @JsonAlias("replace_from")
+        private String replaceFrom;
+
+        @JsonAlias("replace_to")
+        private String replaceTo;
+
+        @JsonAlias("is_regex")
+        private Boolean isRegex;
+    }
+
+    public static FieldRenameConfig of(ReadonlyConfig config) {
+        FieldRenameConfig renameConfig = new FieldRenameConfig();
+        renameConfig.setConvertCase(config.get(CONVERT_CASE));
+        renameConfig.setPrefix(config.get(PREFIX));
+        renameConfig.setSuffix(config.get(SUFFIX));
+        
renameConfig.setReplacementsWithRegex(config.get(REPLACEMENTS_WITH_REGEX));
+        renameConfig.setSpecific(config.get(SPECIFIC));
+        return renameConfig;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
new file mode 100644
index 0000000000..dd0f61e4e4
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+
+import java.util.List;
+
+public class FieldRenameMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
+
+    public FieldRenameMultiCatalogTransform(
+            List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+        super(inputCatalogTables, config);
+    }
+
+    @Override
+    public String getPluginName() {
+        return FieldRenameTransform.PLUGIN_NAME;
+    }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
+            CatalogTable table, ReadonlyConfig config) {
+        return new FieldRenameTransform(FieldRenameConfig.of(config), table);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransform.java
new file mode 100644
index 0000000000..b151b355ac
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransform.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import 
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
+import 
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventHandler;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class FieldRenameTransform extends AbstractCatalogSupportMapTransform {
+    public static String PLUGIN_NAME = "FieldRename";
+
+    private CatalogTable inputTable;
+    private final FieldRenameConfig config;
+    private TableSchemaChangeEventHandler tableSchemaChangeEventHandler;
+
+    public FieldRenameTransform(FieldRenameConfig config, CatalogTable table) {
+        super(table);
+        this.config = config;
+        this.inputTable = table;
+        this.tableSchemaChangeEventHandler = new 
TableSchemaChangeEventDispatcher();
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+        return inputRow;
+    }
+
+    @Override
+    public SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent event) {
+        TableSchema newTableSchema =
+                
tableSchemaChangeEventHandler.reset(inputTable.getTableSchema()).apply(event);
+        this.inputTable =
+                CatalogTable.of(
+                        inputTable.getTableId(),
+                        newTableSchema,
+                        inputTable.getOptions(),
+                        inputTable.getPartitionKeys(),
+                        inputTable.getComment());
+
+        if (event instanceof AlterTableColumnsEvent) {
+            AlterTableColumnsEvent alterTableColumnsEvent = 
(AlterTableColumnsEvent) event;
+            AlterTableColumnsEvent newEvent =
+                    new AlterTableColumnsEvent(
+                            event.tableIdentifier(),
+                            alterTableColumnsEvent.getEvents().stream()
+                                    .map(this::convertName)
+                                    .collect(Collectors.toList()));
+
+            newEvent.setJobId(event.getJobId());
+            newEvent.setStatement(((AlterTableColumnsEvent) 
event).getStatement());
+            newEvent.setSourceDialectName(((AlterTableColumnsEvent) 
event).getSourceDialectName());
+            if (event.getChangeAfter() != null) {
+                newEvent.setChangeAfter(
+                        CatalogTable.of(
+                                event.getChangeAfter().getTableId(), 
event.getChangeAfter()));
+            }
+            return newEvent;
+        }
+        if (event instanceof AlterTableColumnEvent) {
+            return convertName((AlterTableColumnEvent) event);
+        }
+        return event;
+    }
+
+    @Override
+    protected TableSchema transformTableSchema() {
+        return convertTableSchema(inputTable.getTableSchema());
+    }
+
+    @Override
+    protected TableIdentifier transformTableIdentifier() {
+        return inputTable.getTableId();
+    }
+
+    @VisibleForTesting
+    public String convertName(String name) {
+        if (name == null) {
+            return null;
+        }
+
+        Optional<FieldRenameConfig.SpecificModify> specificValue = 
getSpecificModify(name);
+        if (specificValue.isPresent()) {
+            return specificValue.get().getTargetName();
+        }
+        String replaceTo = null;
+        Map<Integer, Integer> replaceIndex = new LinkedHashMap<>();
+
+        if (CollectionUtils.isNotEmpty(config.getReplacementsWithRegex())) {
+            for (FieldRenameConfig.ReplacementsWithRegex replacementsWithRegex 
:
+                    config.getReplacementsWithRegex()) {
+                Boolean isRegex = replacementsWithRegex.getIsRegex();
+                String replacement = replacementsWithRegex.getReplaceFrom();
+                if (StringUtils.isNotEmpty(replacement)) {
+                    Map<Integer, Integer> matched = new LinkedHashMap<>();
+                    if (BooleanUtils.isNotTrue(isRegex)) {
+                        if (StringUtils.equals(replacement, name)) {
+                            matched.put(0, name.length());
+                        }
+                    } else {
+                        Matcher matcher = 
Pattern.compile(replacement).matcher(name);
+                        while (matcher.find()) {
+                            matched.put(matcher.start(), matcher.end());
+                        }
+                    }
+                    if (!matched.isEmpty()) {
+                        replaceTo = replacementsWithRegex.getReplaceTo();
+                        replaceIndex = matched;
+                    }
+                }
+            }
+        }
+
+        if (config.getConvertCase() != null) {
+            switch (config.getConvertCase()) {
+                case UPPER:
+                    name = name.toUpperCase();
+                    break;
+                case LOWER:
+                    name = name.toLowerCase();
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported convert case: " + 
config.getConvertCase());
+            }
+        }
+        int offset = 0;
+        for (Map.Entry<Integer, Integer> index : replaceIndex.entrySet()) {
+            int indexStart = index.getKey();
+            int indexEnd = index.getValue();
+            name =
+                    name.substring(0, indexStart + offset)
+                            + replaceTo.trim()
+                            + name.substring(indexEnd + offset);
+            offset += replaceTo.trim().length() - (indexEnd - indexStart);
+        }
+        if (StringUtils.isNotBlank(config.getPrefix())) {
+            name = config.getPrefix().trim() + name;
+        }
+        if (StringUtils.isNotBlank(config.getSuffix())) {
+            name = name + config.getSuffix().trim();
+        }
+        return name;
+    }
+
+    private Optional<FieldRenameConfig.SpecificModify> 
getSpecificModify(String oldColumnName) {
+        if (config.getSpecific() == null) {
+            return Optional.empty();
+        }
+        return config.getSpecific().stream()
+                .filter(specific -> 
specific.getFieldName().equals(oldColumnName))
+                .findFirst();
+    }
+
+    @VisibleForTesting
+    public AlterTableColumnEvent convertName(AlterTableColumnEvent event) {
+        AlterTableColumnEvent newEvent = event;
+        switch (event.getEventType()) {
+            case SCHEMA_CHANGE_ADD_COLUMN:
+                AlterTableAddColumnEvent addColumnEvent = 
(AlterTableAddColumnEvent) event;
+                newEvent =
+                        new AlterTableAddColumnEvent(
+                                event.tableIdentifier(),
+                                convertName(addColumnEvent.getColumn()),
+                                addColumnEvent.isFirst(),
+                                convertName(addColumnEvent.getAfterColumn()));
+                break;
+            case SCHEMA_CHANGE_DROP_COLUMN:
+                AlterTableDropColumnEvent dropColumnEvent = 
(AlterTableDropColumnEvent) event;
+                newEvent =
+                        new AlterTableDropColumnEvent(
+                                event.tableIdentifier(), 
convertName(dropColumnEvent.getColumn()));
+                break;
+            case SCHEMA_CHANGE_MODIFY_COLUMN:
+                AlterTableModifyColumnEvent modifyColumnEvent = 
(AlterTableModifyColumnEvent) event;
+                newEvent =
+                        new AlterTableModifyColumnEvent(
+                                event.tableIdentifier(),
+                                convertName(modifyColumnEvent.getColumn()),
+                                modifyColumnEvent.isFirst(),
+                                
convertName(modifyColumnEvent.getAfterColumn()));
+                break;
+            case SCHEMA_CHANGE_CHANGE_COLUMN:
+                AlterTableChangeColumnEvent changeColumnEvent = 
(AlterTableChangeColumnEvent) event;
+                boolean nameChanged =
+                        !changeColumnEvent
+                                .getOldColumn()
+                                
.equals(changeColumnEvent.getColumn().getName());
+                if (nameChanged) {
+                    log.warn(
+                            "FieldRenameTransform does not support changing 
column name, "
+                                    + "old column name: {}, new column name: 
{}",
+                            changeColumnEvent.getOldColumn(),
+                            changeColumnEvent.getColumn().getName());
+                    return changeColumnEvent;
+                }
+
+                newEvent =
+                        new AlterTableChangeColumnEvent(
+                                event.tableIdentifier(),
+                                convertName(changeColumnEvent.getOldColumn()),
+                                convertName(changeColumnEvent.getColumn()),
+                                changeColumnEvent.isFirst(),
+                                
convertName(changeColumnEvent.getAfterColumn()));
+                break;
+            default:
+                log.warn("Unsupported event: {}", event);
+                return event;
+        }
+
+        newEvent.setJobId(event.getJobId());
+        newEvent.setStatement(event.getStatement());
+        newEvent.setSourceDialectName(event.getSourceDialectName());
+        if (event.getChangeAfter() != null) {
+            CatalogTable newChangeAfter =
+                    CatalogTable.of(
+                            event.getChangeAfter().getTableId(),
+                            
convertTableSchema(event.getChangeAfter().getTableSchema()),
+                            event.getChangeAfter().getOptions(),
+                            event.getChangeAfter().getPartitionKeys(),
+                            event.getChangeAfter().getComment());
+            newEvent.setChangeAfter(newChangeAfter);
+        }
+        return newEvent;
+    }
+
+    private Column convertName(Column column) {
+        return column.rename(convertName(column.getName()));
+    }
+
+    private TableSchema convertTableSchema(TableSchema tableSchema) {
+        List<Column> columns =
+                tableSchema.getColumns().stream()
+                        .map(
+                                column -> {
+                                    String newColumnName = 
convertName(column.getName());
+                                    return column.rename(newColumnName);
+                                })
+                        .collect(Collectors.toList());
+        PrimaryKey primaryKey =
+                Optional.ofNullable(tableSchema.getPrimaryKey())
+                        .map(
+                                pk ->
+                                        PrimaryKey.of(
+                                                pk.getPrimaryKey(),
+                                                pk.getColumnNames().stream()
+                                                        .map(this::convertName)
+                                                        
.collect(Collectors.toList()),
+                                                pk.getEnableAutoId()))
+                        .orElse(null);
+        List<ConstraintKey> constraintKeys =
+                Optional.ofNullable(tableSchema.getConstraintKeys())
+                        .map(
+                                keyList ->
+                                        keyList.stream()
+                                                .map(
+                                                        key ->
+                                                                
ConstraintKey.of(
+                                                                        
key.getConstraintType(),
+                                                                        
key.getConstraintName(),
+                                                                        
key.getColumnNames()
+                                                                               
 .stream()
+                                                                               
 .map(
+                                                                               
         column ->
+                                                                               
                 ConstraintKey
+                                                                               
                         .ConstraintKeyColumn
+                                                                               
                         .of(
+                                                                               
                                 convertName(
+                                                                               
                                         column
+                                                                               
                                                 .getColumnName()),
+                                                                               
                                 column
+                                                                               
                                         .getSortType()))
+                                                                               
 .collect(
+                                                                               
         Collectors
+                                                                               
                 .toList())))
+                                                .collect(Collectors.toList()))
+                        .orElse(null);
+        return TableSchema.builder()
+                .columns(columns)
+                .primaryKey(primaryKey)
+                .constraintKey(constraintKeys)
+                .build();
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransformFactory.java
new file mode 100644
index 0000000000..299bbb5255
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransformFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
+
+import com.google.auto.service.AutoService;
+
+import static 
org.apache.seatunnel.transform.rename.FieldRenameConfig.CONVERT_CASE;
+import static org.apache.seatunnel.transform.rename.FieldRenameConfig.PREFIX;
+import static 
org.apache.seatunnel.transform.rename.FieldRenameConfig.REPLACEMENTS_WITH_REGEX;
+import static org.apache.seatunnel.transform.rename.FieldRenameConfig.SPECIFIC;
+import static org.apache.seatunnel.transform.rename.FieldRenameConfig.SUFFIX;
+
+@AutoService(Factory.class)
+public class FieldRenameTransformFactory implements TableTransformFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return FieldRenameTransform.PLUGIN_NAME;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .optional(CONVERT_CASE, PREFIX, SUFFIX, 
REPLACEMENTS_WITH_REGEX, SPECIFIC)
+                .optional(TransformCommonOptions.MULTI_TABLES)
+                .optional(TransformCommonOptions.TABLE_MATCH_REGEX)
+                .build();
+    }
+
+    @Override
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        return () ->
+                new FieldRenameMultiCatalogTransform(
+                        context.getCatalogTables(), context.getOptions());
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameConfig.java
new file mode 100644
index 0000000000..be2d6a25e2
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+@Accessors(chain = true)
+public class TableRenameConfig implements Serializable {
+
+    public static final Option<ConvertCase> CONVERT_CASE =
+            Options.key("convert_case")
+                    .enumType(ConvertCase.class)
+                    .noDefaultValue()
+                    .withDescription("Convert to uppercase or lowercase");
+
+    public static final Option<String> PREFIX =
+            Options.key("prefix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Add prefix for table name");
+
+    public static final Option<String> SUFFIX =
+            Options.key("suffix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Add suffix for table name");
+
+    public static final Option<List<ReplacementsWithRegex>> 
REPLACEMENTS_WITH_REGEX =
+            Options.key("replacements_with_regex")
+                    .listType(ReplacementsWithRegex.class)
+                    .noDefaultValue()
+                    .withDescription("The regex of replace table name to ");
+
+    @JsonAlias("convert_case")
+    private ConvertCase convertCase;
+
+    @JsonAlias("prefix")
+    private String prefix;
+
+    @JsonAlias("suffix")
+    private String suffix;
+
+    @JsonAlias("replacements_with_regex")
+    private List<ReplacementsWithRegex> replacementsWithRegex;
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class ReplacementsWithRegex implements Serializable {
+        @JsonAlias("replace_from")
+        private String replaceFrom;
+
+        @JsonAlias("replace_to")
+        private String replaceTo;
+
+        private final Boolean isRegex = true;
+    }
+
+    public static TableRenameConfig of(ReadonlyConfig config) {
+        TableRenameConfig renameConfig = new TableRenameConfig();
+        renameConfig.setConvertCase(config.get(CONVERT_CASE));
+        renameConfig.setPrefix(config.get(PREFIX));
+        renameConfig.setSuffix(config.get(SUFFIX));
+        
renameConfig.setReplacementsWithRegex(config.get(REPLACEMENTS_WITH_REGEX));
+        return renameConfig;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
new file mode 100644
index 0000000000..67cff881da
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+
+import java.util.List;
+
+public class TableRenameMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
+
+    public TableRenameMultiCatalogTransform(
+            List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+        super(inputCatalogTables, config);
+    }
+
+    @Override
+    public String getPluginName() {
+        return TableRenameTransform.PLUGIN_NAME;
+    }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
+            CatalogTable table, ReadonlyConfig config) {
+        return new TableRenameTransform(TableRenameConfig.of(config), table);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransform.java
new file mode 100644
index 0000000000..d1a3156115
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransform.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class TableRenameTransform extends AbstractCatalogSupportMapTransform {
+    public static String PLUGIN_NAME = "TableRename";
+
+    private final CatalogTable inputTable;
+    private final TableRenameConfig config;
+
+    private TablePath outputTablePath;
+    private String outputTableId;
+
+    public TableRenameTransform(TableRenameConfig config, CatalogTable table) {
+        super(table);
+        this.inputTable = table;
+        this.config = config;
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    protected TableSchema transformTableSchema() {
+        return TableSchema.builder()
+                .columns(inputTable.getTableSchema().getColumns())
+                .constraintKey(inputTable.getTableSchema().getConstraintKeys())
+                .primaryKey(inputTable.getTableSchema().getPrimaryKey())
+                .build();
+    }
+
+    @Override
+    protected TableIdentifier transformTableIdentifier() {
+        TablePath inputTablePath = inputTable.getTablePath();
+        String inputDatabaseName = inputTablePath.getDatabaseName();
+        String inputSchemaName = inputTablePath.getSchemaName();
+        String inputTableName = inputTablePath.getTableName();
+
+        String outputDatabaseName =
+                
Optional.ofNullable(inputDatabaseName).map(this::convertCase).orElse(null);
+        String outputSchemaName =
+                
Optional.ofNullable(inputSchemaName).map(this::convertCase).orElse(null);
+        String outputTableName = convertName(inputTableName);
+        TablePath outputTablePath =
+                TablePath.of(outputDatabaseName, outputSchemaName, 
outputTableName);
+        this.outputTablePath = outputTablePath;
+        this.outputTableId = outputTablePath.getFullName();
+        return TableIdentifier.of(inputTable.getCatalogName(), 
outputTablePath);
+    }
+
+    @Override
+    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+        if (inputRow.getTableId() == null) {
+            log.debug("Table id is null, skip renaming");
+            return inputRow;
+        }
+        if (outputTableId.equals(inputRow.getTableId())) {
+            return inputRow;
+        }
+
+        SeaTunnelRow outputRow = inputRow.copy();
+        outputRow.setTableId(outputTableId);
+        return outputRow;
+    }
+
+    @Override
+    public SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent event) {
+        TablePath inputTablePath = event.tablePath();
+        if (inputTablePath == null) {
+            return event;
+        }
+        if (outputTablePath.equals(inputTablePath)) {
+            return event;
+        }
+
+        if (event instanceof AlterTableColumnsEvent) {
+            TableIdentifier newTableIdentifier =
+                    
TableIdentifier.of(event.tableIdentifier().getCatalogName(), outputTablePath);
+            AlterTableColumnsEvent alterTableColumnsEvent = 
(AlterTableColumnsEvent) event;
+            AlterTableColumnsEvent newEvent =
+                    new AlterTableColumnsEvent(
+                            newTableIdentifier,
+                            alterTableColumnsEvent.getEvents().stream()
+                                    .map(this::convertName)
+                                    .collect(Collectors.toList()));
+
+            newEvent.setJobId(event.getJobId());
+            newEvent.setStatement(((AlterTableColumnsEvent) 
event).getStatement());
+            newEvent.setSourceDialectName(((AlterTableColumnsEvent) 
event).getSourceDialectName());
+            if (event.getChangeAfter() != null) {
+                newEvent.setChangeAfter(
+                        CatalogTable.of(newTableIdentifier, 
event.getChangeAfter()));
+            }
+            return newEvent;
+        }
+        if (event instanceof AlterTableColumnEvent) {
+            return convertName((AlterTableColumnEvent) event);
+        }
+        return event;
+    }
+
+    public String convertCase(String name) {
+        if (config.getConvertCase() != null) {
+            switch (config.getConvertCase()) {
+                case UPPER:
+                    return name.toUpperCase();
+                case LOWER:
+                    return name.toLowerCase();
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported convert case: " + 
config.getConvertCase());
+            }
+        }
+        return name;
+    }
+
+    @VisibleForTesting
+    public String convertName(String tableName) {
+        String replaceTo = null;
+        Map<Integer, Integer> replaceIndex = new LinkedHashMap<>();
+
+        if (CollectionUtils.isNotEmpty(config.getReplacementsWithRegex())) {
+            for (TableRenameConfig.ReplacementsWithRegex replacementsWithRegex 
:
+                    config.getReplacementsWithRegex()) {
+                Boolean isRegex = replacementsWithRegex.getIsRegex();
+                String replacement = replacementsWithRegex.getReplaceFrom();
+                if (StringUtils.isNotEmpty(replacement)) {
+                    Map<Integer, Integer> matched = new LinkedHashMap<>();
+                    if (BooleanUtils.isNotTrue(isRegex)) {
+                        if (StringUtils.equals(replacement, tableName)) {
+                            matched.put(0, tableName.length());
+                        }
+                    } else {
+                        Matcher matcher = 
Pattern.compile(replacement).matcher(tableName);
+                        while (matcher.find()) {
+                            matched.put(matcher.start(), matcher.end());
+                        }
+                    }
+                    if (!matched.isEmpty()) {
+                        replaceTo = replacementsWithRegex.getReplaceTo();
+                        replaceIndex = matched;
+                    }
+                }
+            }
+        }
+
+        tableName = convertCase(tableName);
+
+        int offset = 0;
+        for (Map.Entry<Integer, Integer> index : replaceIndex.entrySet()) {
+            int indexStart = index.getKey();
+            int indexEnd = index.getValue();
+            tableName =
+                    tableName.substring(0, indexStart + offset)
+                            + replaceTo.trim()
+                            + tableName.substring(indexEnd + offset);
+            offset += replaceTo.trim().length() - (indexEnd - indexStart);
+        }
+        if (StringUtils.isNotBlank(config.getPrefix())) {
+            tableName = config.getPrefix().trim() + tableName;
+        }
+        if (StringUtils.isNotBlank(config.getSuffix())) {
+            tableName = tableName + config.getSuffix().trim();
+        }
+        return tableName;
+    }
+
+    @VisibleForTesting
+    public AlterTableColumnEvent convertName(AlterTableColumnEvent event) {
+        TableIdentifier newTableIdentifier =
+                TableIdentifier.of(event.tableIdentifier().getCatalogName(), 
outputTablePath);
+        AlterTableColumnEvent newEvent = event;
+        switch (event.getEventType()) {
+            case SCHEMA_CHANGE_ADD_COLUMN:
+                AlterTableAddColumnEvent addColumnEvent = 
(AlterTableAddColumnEvent) event;
+                newEvent =
+                        new AlterTableAddColumnEvent(
+                                newTableIdentifier,
+                                addColumnEvent.getColumn(),
+                                addColumnEvent.isFirst(),
+                                addColumnEvent.getAfterColumn());
+                break;
+            case SCHEMA_CHANGE_DROP_COLUMN:
+                AlterTableDropColumnEvent dropColumnEvent = 
(AlterTableDropColumnEvent) event;
+                newEvent =
+                        new AlterTableDropColumnEvent(
+                                newTableIdentifier, 
dropColumnEvent.getColumn());
+                break;
+            case SCHEMA_CHANGE_MODIFY_COLUMN:
+                AlterTableModifyColumnEvent modifyColumnEvent = 
(AlterTableModifyColumnEvent) event;
+                newEvent =
+                        new AlterTableModifyColumnEvent(
+                                newTableIdentifier,
+                                modifyColumnEvent.getColumn(),
+                                modifyColumnEvent.isFirst(),
+                                modifyColumnEvent.getAfterColumn());
+                break;
+            case SCHEMA_CHANGE_CHANGE_COLUMN:
+                AlterTableChangeColumnEvent changeColumnEvent = 
(AlterTableChangeColumnEvent) event;
+                newEvent =
+                        new AlterTableChangeColumnEvent(
+                                newTableIdentifier,
+                                changeColumnEvent.getOldColumn(),
+                                changeColumnEvent.getColumn(),
+                                changeColumnEvent.isFirst(),
+                                changeColumnEvent.getAfterColumn());
+                break;
+            default:
+                log.warn("Unsupported event: {}", event);
+                return event;
+        }
+
+        newEvent.setJobId(event.getJobId());
+        newEvent.setStatement(event.getStatement());
+        newEvent.setSourceDialectName(event.getSourceDialectName());
+        if (event.getChangeAfter() != null) {
+            newEvent.setChangeAfter(CatalogTable.of(newTableIdentifier, 
event.getChangeAfter()));
+        }
+        return newEvent;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransformFactory.java
new file mode 100644
index 0000000000..840bfff209
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransformFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
+
+import com.google.auto.service.AutoService;
+
+import static 
org.apache.seatunnel.transform.rename.TableRenameConfig.CONVERT_CASE;
+import static org.apache.seatunnel.transform.rename.TableRenameConfig.PREFIX;
+import static 
org.apache.seatunnel.transform.rename.TableRenameConfig.REPLACEMENTS_WITH_REGEX;
+import static org.apache.seatunnel.transform.rename.TableRenameConfig.SUFFIX;
+
+@AutoService(Factory.class)
+public class TableRenameTransformFactory implements TableTransformFactory {
+    @Override
+    public String factoryIdentifier() {
+        return TableRenameTransform.PLUGIN_NAME;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .optional(CONVERT_CASE, PREFIX, SUFFIX, 
REPLACEMENTS_WITH_REGEX)
+                .optional(TransformCommonOptions.MULTI_TABLES)
+                .optional(TransformCommonOptions.TABLE_MATCH_REGEX)
+                .build();
+    }
+
+    @Override
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        return () ->
+                new TableRenameMultiCatalogTransform(
+                        context.getCatalogTables(), context.getOptions());
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameTransformTest.java
new file mode 100644
index 0000000000..02a8b01bcf
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameTransformTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+public class FieldRenameTransformTest {
+
+    private static final CatalogTable DEFAULT_TABLE =
+            CatalogTable.of(
+                    TableIdentifier.of("test", "Database-x", "Schema-x", 
"Table-x"),
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            "f1",
+                                            BasicType.LONG_TYPE,
+                                            null,
+                                            null,
+                                            false,
+                                            null,
+                                            null))
+                            .column(
+                                    PhysicalColumn.of(
+                                            "f2",
+                                            BasicType.LONG_TYPE,
+                                            null,
+                                            null,
+                                            true,
+                                            null,
+                                            null))
+                            .column(
+                                    PhysicalColumn.of(
+                                            "f3",
+                                            BasicType.LONG_TYPE,
+                                            null,
+                                            null,
+                                            true,
+                                            null,
+                                            null))
+                            .primaryKey(PrimaryKey.of("pk1", 
Arrays.asList("f1")))
+                            .constraintKey(
+                                    ConstraintKey.of(
+                                            
ConstraintKey.ConstraintType.UNIQUE_KEY,
+                                            "uk1",
+                                            Arrays.asList(
+                                                    
ConstraintKey.ConstraintKeyColumn.of(
+                                                            "f2", 
ConstraintKey.ColumnSortType.ASC),
+                                                    
ConstraintKey.ConstraintKeyColumn.of(
+                                                            "f3",
+                                                            
ConstraintKey.ColumnSortType.ASC))))
+                            .build(),
+                    Collections.emptyMap(),
+                    Collections.singletonList("f2"),
+                    null);
+
+    @Test
+    public void testRename() {
+        AlterTableAddColumnEvent addColumnEvent =
+                AlterTableAddColumnEvent.add(
+                        DEFAULT_TABLE.getTableId(),
+                        PhysicalColumn.of("f4", BasicType.LONG_TYPE, null, 
null, true, null, null));
+        AlterTableModifyColumnEvent modifyColumnEvent =
+                AlterTableModifyColumnEvent.modify(
+                        DEFAULT_TABLE.getTableId(),
+                        PhysicalColumn.of("f4", BasicType.INT_TYPE, null, 
null, true, null, null));
+        AlterTableChangeColumnEvent changeColumnEvent =
+                AlterTableChangeColumnEvent.change(
+                        DEFAULT_TABLE.getTableId(),
+                        "f4",
+                        PhysicalColumn.of("f5", BasicType.INT_TYPE, null, 
null, true, null, null));
+        AlterTableDropColumnEvent dropColumnEvent =
+                new AlterTableDropColumnEvent(DEFAULT_TABLE.getTableId(), 
"f5");
+
+        FieldRenameConfig config = new 
FieldRenameConfig().setConvertCase(ConvertCase.LOWER);
+        FieldRenameTransform transform = new FieldRenameTransform(config, 
DEFAULT_TABLE);
+        CatalogTable outputCatalogTable = transform.getProducedCatalogTable();
+        AlterTableAddColumnEvent outputAddEvent =
+                (AlterTableAddColumnEvent) 
transform.mapSchemaChangeEvent(addColumnEvent);
+        AlterTableModifyColumnEvent outputModifyEvent =
+                (AlterTableModifyColumnEvent) 
transform.mapSchemaChangeEvent(modifyColumnEvent);
+        AlterTableChangeColumnEvent outputChangeEvent =
+                (AlterTableChangeColumnEvent) 
transform.mapSchemaChangeEvent(changeColumnEvent);
+        AlterTableDropColumnEvent outputDropEvent =
+                (AlterTableDropColumnEvent) 
transform.mapSchemaChangeEvent(dropColumnEvent);
+
+        Assertions.assertIterableEquals(
+                Arrays.asList("f1", "f2", "f3"),
+                
Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames()));
+        Assertions.assertIterableEquals(
+                Arrays.asList("f1"),
+                
outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames());
+        outputCatalogTable.getTableSchema().getConstraintKeys().stream()
+                .forEach(
+                        key ->
+                                Assertions.assertIterableEquals(
+                                        Arrays.asList("f2", "f3"),
+                                        key.getColumnNames().stream()
+                                                .map(
+                                                        
ConstraintKey.ConstraintKeyColumn
+                                                                
::getColumnName)
+                                                
.collect(Collectors.toList())));
+        Assertions.assertEquals("f4", outputAddEvent.getColumn().getName());
+        Assertions.assertEquals("f4", outputModifyEvent.getColumn().getName());
+        Assertions.assertEquals("f4", outputChangeEvent.getOldColumn());
+        Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName());
+        Assertions.assertEquals("f5", outputDropEvent.getColumn());
+
+        config = new FieldRenameConfig().setConvertCase(ConvertCase.UPPER);
+        transform = new FieldRenameTransform(config, DEFAULT_TABLE);
+        outputCatalogTable = transform.getProducedCatalogTable();
+        outputAddEvent = (AlterTableAddColumnEvent) 
transform.mapSchemaChangeEvent(addColumnEvent);
+        outputModifyEvent =
+                (AlterTableModifyColumnEvent) 
transform.mapSchemaChangeEvent(modifyColumnEvent);
+        outputChangeEvent =
+                (AlterTableChangeColumnEvent) 
transform.mapSchemaChangeEvent(changeColumnEvent);
+        outputDropEvent =
+                (AlterTableDropColumnEvent) 
transform.mapSchemaChangeEvent(dropColumnEvent);
+        Assertions.assertIterableEquals(
+                Arrays.asList("F1", "F2", "F3"),
+                
Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames()));
+        Assertions.assertIterableEquals(
+                Arrays.asList("F1"),
+                
outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames());
+        outputCatalogTable.getTableSchema().getConstraintKeys().stream()
+                .forEach(
+                        key ->
+                                Assertions.assertIterableEquals(
+                                        Arrays.asList("F2", "F3"),
+                                        key.getColumnNames().stream()
+                                                .map(
+                                                        
ConstraintKey.ConstraintKeyColumn
+                                                                
::getColumnName)
+                                                
.collect(Collectors.toList())));
+        Assertions.assertEquals("F4", outputAddEvent.getColumn().getName());
+        Assertions.assertEquals("F4", outputModifyEvent.getColumn().getName());
+        Assertions.assertEquals("f4", outputChangeEvent.getOldColumn());
+        Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName());
+        Assertions.assertEquals("F5", outputDropEvent.getColumn());
+
+        config = new FieldRenameConfig().setPrefix("p-").setSuffix("-s");
+        transform = new FieldRenameTransform(config, DEFAULT_TABLE);
+        outputCatalogTable = transform.getProducedCatalogTable();
+        outputAddEvent = (AlterTableAddColumnEvent) 
transform.mapSchemaChangeEvent(addColumnEvent);
+        outputModifyEvent =
+                (AlterTableModifyColumnEvent) 
transform.mapSchemaChangeEvent(modifyColumnEvent);
+        outputChangeEvent =
+                (AlterTableChangeColumnEvent) 
transform.mapSchemaChangeEvent(changeColumnEvent);
+        outputDropEvent =
+                (AlterTableDropColumnEvent) 
transform.mapSchemaChangeEvent(dropColumnEvent);
+        Assertions.assertIterableEquals(
+                Arrays.asList("p-f1-s", "p-f2-s", "p-f3-s"),
+                
Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames()));
+        Assertions.assertIterableEquals(
+                Arrays.asList("p-f1-s"),
+                
outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames());
+        outputCatalogTable.getTableSchema().getConstraintKeys().stream()
+                .forEach(
+                        key ->
+                                Assertions.assertIterableEquals(
+                                        Arrays.asList("p-f2-s", "p-f3-s"),
+                                        key.getColumnNames().stream()
+                                                .map(
+                                                        
ConstraintKey.ConstraintKeyColumn
+                                                                
::getColumnName)
+                                                
.collect(Collectors.toList())));
+        Assertions.assertEquals("p-f4-s", 
outputAddEvent.getColumn().getName());
+        Assertions.assertEquals("p-f4-s", 
outputModifyEvent.getColumn().getName());
+        Assertions.assertEquals("f4", outputChangeEvent.getOldColumn());
+        Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName());
+        Assertions.assertEquals("p-f5-s", outputDropEvent.getColumn());
+
+        config =
+                new FieldRenameConfig()
+                        .setReplacementsWithRegex(
+                                Arrays.asList(
+                                        new 
FieldRenameConfig.ReplacementsWithRegex(
+                                                "f1", "t1", true),
+                                        new 
FieldRenameConfig.ReplacementsWithRegex(
+                                                "f1", "t2", true)));
+        transform = new FieldRenameTransform(config, DEFAULT_TABLE);
+        outputCatalogTable = transform.getProducedCatalogTable();
+        outputAddEvent = (AlterTableAddColumnEvent) 
transform.mapSchemaChangeEvent(addColumnEvent);
+        outputModifyEvent =
+                (AlterTableModifyColumnEvent) 
transform.mapSchemaChangeEvent(modifyColumnEvent);
+        outputChangeEvent =
+                (AlterTableChangeColumnEvent) 
transform.mapSchemaChangeEvent(changeColumnEvent);
+        outputDropEvent =
+                (AlterTableDropColumnEvent) 
transform.mapSchemaChangeEvent(dropColumnEvent);
+        Assertions.assertIterableEquals(
+                Arrays.asList("t2", "f2", "f3"),
+                
Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames()));
+        Assertions.assertIterableEquals(
+                Arrays.asList("t2"),
+                
outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames());
+        outputCatalogTable.getTableSchema().getConstraintKeys().stream()
+                .forEach(
+                        key ->
+                                Assertions.assertIterableEquals(
+                                        Arrays.asList("f2", "f3"),
+                                        key.getColumnNames().stream()
+                                                .map(
+                                                        
ConstraintKey.ConstraintKeyColumn
+                                                                
::getColumnName)
+                                                
.collect(Collectors.toList())));
+        Assertions.assertEquals("f4", outputAddEvent.getColumn().getName());
+        Assertions.assertEquals("f4", outputModifyEvent.getColumn().getName());
+        Assertions.assertEquals("f4", outputChangeEvent.getOldColumn());
+        Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName());
+        Assertions.assertEquals("f5", outputDropEvent.getColumn());
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/TableRenameTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/TableRenameTransformTest.java
new file mode 100644
index 0000000000..d1d756922e
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/TableRenameTransformTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rename;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class TableRenameTransformTest {
+
+    private static final CatalogTable DEFAULT_TABLE =
+            CatalogTable.of(
+                    TableIdentifier.of("test", "Database-x", "Schema-x", 
"Table-x"),
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            "f1",
+                                            BasicType.LONG_TYPE,
+                                            null,
+                                            null,
+                                            false,
+                                            null,
+                                            null))
+                            .column(
+                                    PhysicalColumn.of(
+                                            "f2",
+                                            BasicType.LONG_TYPE,
+                                            null,
+                                            null,
+                                            true,
+                                            null,
+                                            null))
+                            .column(
+                                    PhysicalColumn.of(
+                                            "f3",
+                                            BasicType.LONG_TYPE,
+                                            null,
+                                            null,
+                                            true,
+                                            null,
+                                            null))
+                            .primaryKey(PrimaryKey.of("pk1", 
Arrays.asList("f1")))
+                            .constraintKey(
+                                    ConstraintKey.of(
+                                            
ConstraintKey.ConstraintType.UNIQUE_KEY,
+                                            "uk1",
+                                            Arrays.asList(
+                                                    
ConstraintKey.ConstraintKeyColumn.of(
+                                                            "f2", 
ConstraintKey.ColumnSortType.ASC),
+                                                    
ConstraintKey.ConstraintKeyColumn.of(
+                                                            "f3",
+                                                            
ConstraintKey.ColumnSortType.ASC))))
+                            .build(),
+                    Collections.emptyMap(),
+                    Collections.singletonList("f2"),
+                    null);
+
+    @Test
+    public void testRename() {
+        SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {1L, 1L, 1L});
+        inputRow.setTableId(DEFAULT_TABLE.getTablePath().getFullName());
+        AlterTableAddColumnEvent inputEvent =
+                AlterTableAddColumnEvent.add(
+                        DEFAULT_TABLE.getTableId(),
+                        PhysicalColumn.of("f4", BasicType.LONG_TYPE, null, 
null, true, null, null));
+
+        TableRenameConfig config = new 
TableRenameConfig().setConvertCase(ConvertCase.LOWER);
+
+        TableRenameTransform transform = new TableRenameTransform(config, 
DEFAULT_TABLE);
+        List<CatalogTable> outputCatalogTable = 
transform.getProducedCatalogTables();
+        SeaTunnelRow outputRow = transform.map(inputRow);
+        SchemaChangeEvent outputEvent = 
transform.mapSchemaChangeEvent(inputEvent);
+        Assertions.assertEquals(
+                "database-x.schema-x.table-x",
+                
outputCatalogTable.get(0).getTableId().toTablePath().getFullName());
+        Assertions.assertEquals("database-x.schema-x.table-x", 
outputRow.getTableId());
+        Assertions.assertEquals(
+                "database-x.schema-x.table-x", 
outputEvent.tablePath().getFullName());
+
+        config = new TableRenameConfig().setConvertCase(ConvertCase.UPPER);
+        transform = new TableRenameTransform(config, DEFAULT_TABLE);
+        outputCatalogTable = transform.getProducedCatalogTables();
+        outputRow = transform.map(inputRow);
+        outputEvent = transform.mapSchemaChangeEvent(inputEvent);
+        Assertions.assertEquals(
+                "DATABASE-X.SCHEMA-X.TABLE-X",
+                
outputCatalogTable.get(0).getTableId().toTablePath().getFullName());
+        Assertions.assertEquals("DATABASE-X.SCHEMA-X.TABLE-X", 
outputRow.getTableId());
+        Assertions.assertEquals(
+                "DATABASE-X.SCHEMA-X.TABLE-X", 
outputEvent.tablePath().getFullName());
+
+        config = new 
TableRenameConfig().setPrefix("user-").setSuffix("-table");
+        transform = new TableRenameTransform(config, DEFAULT_TABLE);
+        outputCatalogTable = transform.getProducedCatalogTables();
+        outputRow = transform.map(inputRow);
+        outputEvent = transform.mapSchemaChangeEvent(inputEvent);
+        Assertions.assertEquals(
+                "Database-x.Schema-x.user-Table-x-table",
+                
outputCatalogTable.get(0).getTableId().toTablePath().getFullName());
+        Assertions.assertEquals("Database-x.Schema-x.user-Table-x-table", 
outputRow.getTableId());
+        Assertions.assertEquals(
+                "Database-x.Schema-x.user-Table-x-table", 
outputEvent.tablePath().getFullName());
+
+        config =
+                new TableRenameConfig()
+                        .setReplacementsWithRegex(
+                                Arrays.asList(
+                                        new 
TableRenameConfig.ReplacementsWithRegex("Table", "t1"),
+                                        new 
TableRenameConfig.ReplacementsWithRegex(
+                                                "Table", "t2")));
+        transform = new TableRenameTransform(config, DEFAULT_TABLE);
+        outputCatalogTable = transform.getProducedCatalogTables();
+        outputRow = transform.map(inputRow);
+        outputEvent = transform.mapSchemaChangeEvent(inputEvent);
+        Assertions.assertEquals(
+                "Database-x.Schema-x.t2-x",
+                
outputCatalogTable.get(0).getTableId().toTablePath().getFullName());
+        Assertions.assertEquals("Database-x.Schema-x.t2-x", 
outputRow.getTableId());
+        Assertions.assertEquals("Database-x.Schema-x.t2-x", 
outputEvent.tablePath().getFullName());
+    }
+}


Reply via email to