This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 06df51bcbb [Feature][Transform] Support rename table/column (#8170) 06df51bcbb is described below commit 06df51bcbbb3294f40ebb5607c70bc031c5175f8 Author: hailin0 <wanghai...@apache.org> AuthorDate: Thu Dec 5 14:03:10 2024 +0800 [Feature][Transform] Support rename table/column (#8170) --- docs/en/transform-v2/field-rename.md | 132 ++++++++ docs/en/transform-v2/table-rename.md | 132 ++++++++ plugin-mapping.properties | 2 + .../connector/TransformSpecificationCheckTest.java | 2 +- .../seatunnel/e2e/transform/TestRenameIT.java | 43 +++ .../resources/table_field_rename_multi_table.conf | 228 ++++++++++++++ .../seatunnel/transform/rename/ConvertCase.java | 23 ++ .../transform/rename/FieldRenameConfig.java | 129 ++++++++ .../rename/FieldRenameMultiCatalogTransform.java | 45 +++ .../transform/rename/FieldRenameTransform.java | 333 +++++++++++++++++++++ .../rename/FieldRenameTransformFactory.java | 58 ++++ .../transform/rename/TableRenameConfig.java | 98 ++++++ .../rename/TableRenameMultiCatalogTransform.java | 45 +++ .../transform/rename/TableRenameTransform.java | 267 +++++++++++++++++ .../rename/TableRenameTransformFactory.java | 56 ++++ .../transform/rename/FieldRenameTransformTest.java | 243 +++++++++++++++ .../transform/rename/TableRenameTransformTest.java | 150 ++++++++++ 17 files changed, 1985 insertions(+), 1 deletion(-) diff --git a/docs/en/transform-v2/field-rename.md b/docs/en/transform-v2/field-rename.md new file mode 100644 index 0000000000..faf198b695 --- /dev/null +++ b/docs/en/transform-v2/field-rename.md @@ -0,0 +1,132 @@ +# FieldRename + +> FieldRename transform plugin + +## Description + +FieldRename transform plugin for rename field name. + +## Options + +| name | type | required | default value | Description | +|:-----------------------:|--------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------| +| convert_case | string | no | | The case conversion type. The options can be `UPPER`, `LOWER` | +| prefix | string | no | | The prefix to be added to the field name | +| suffix | string | no | | The suffix to be added to the field name | +| replacements_with_regex | array | no | | The array of replacement rules with regex. The replacement rule is a map with `replace_from` and `replace_to` fields. | + +## Examples + +### Convert field to uppercase + +``` +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + MySQL-CDC { + plugin_output = "customers_mysql_cdc" + + username = "root" + password = "123456" + table-names = ["source.user_shop", "source.user_order"] + base-url = "jdbc:mysql://localhost:3306/source" + } +} + +transform { + FieldRename { + plugin_input = "customers_mysql_cdc" + plugin_output = "trans_result" + + convert_case = "UPPER" + prefix = "F_" + suffix = "_S" + replacements_with_regex = [ + { + replace_from = "create_time" + replace_to = "SOURCE_CREATE_TIME" + } + ] + } +} + +sink { + Jdbc { + plugin_input = "trans_result" + + driver="oracle.jdbc.OracleDriver" + url="jdbc:oracle:thin:@oracle-host:1521/ORCLCDB" + user="myuser" + password="mypwd" + + generate_sink_sql = true + database = "ORCLCDB" + table = "${database_name}.${table_name}" + primary_keys = ["${primary_key}"] + + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode = "APPEND_DATA" + } +} +``` + +### Convert field name to lowercase + +``` +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Oracle-CDC { + plugin_output = "customers_oracle_cdc" + + base-url = "jdbc:oracle:thin:@localhost:1521/ORCLCDB" + username = "dbzuser" + password = "dbz" + database-names = ["ORCLCDB"] + schema-names = ["DEBEZIUM"] + table-names = ["SOURCE.USER_SHOP", "SOURCE.USER_ORDER"] + } +} + +transform { + FieldRename { + plugin_input = "customers_oracle_cdc" + plugin_output = "trans_result" + + convert_case = "LOWER" + prefix = "f_" + suffix = "_s" + replacements_with_regex = [ + { + replace_from = "CREATE_TIME" + replace_to = "source_create_time" + } + ] + } +} + +sink { + Jdbc { + plugin_input = "trans_result" + + url = "jdbc:mysql://localhost:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user_sink" + password = "mysqlpw" + + generate_sink_sql = true + database = "${schema_name}" + table = "${table_name}" + primary_keys = ["${primary_key}"] + + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode = "APPEND_DATA" + } +} +``` \ No newline at end of file diff --git a/docs/en/transform-v2/table-rename.md b/docs/en/transform-v2/table-rename.md new file mode 100644 index 0000000000..6cd1a60de7 --- /dev/null +++ b/docs/en/transform-v2/table-rename.md @@ -0,0 +1,132 @@ +# TableRename + +> TableRename transform plugin + +## Description + +TableRename transform plugin for rename table name. + +## Options + +| name | type | required | default value | Description | +|:-----------------------:|--------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------| +| convert_case | string | no | | The case conversion type. The options can be `UPPER`, `LOWER` | +| prefix | string | no | | The prefix to be added to the table name | +| suffix | string | no | | The suffix to be added to the table name | +| replacements_with_regex | array | no | | The array of replacement rules with regex. The replacement rule is a map with `replace_from` and `replace_to` fields. | + +## Examples + +### Convert table name to uppercase + +``` +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + MySQL-CDC { + plugin_output = "customers_mysql_cdc" + + username = "root" + password = "123456" + table-names = ["source.user_shop", "source.user_order"] + base-url = "jdbc:mysql://localhost:3306/source" + } +} + +transform { + TableRename { + plugin_input = "customers_mysql_cdc" + plugin_output = "trans_result" + + convert_case = "UPPER" + prefix = "CDC_" + suffix = "_TABLE" + replacements_with_regex = [ + { + replace_from = "user" + replace_to = "U" + } + ] + } +} + +sink { + Jdbc { + plugin_input = "trans_result" + + driver="oracle.jdbc.OracleDriver" + url="jdbc:oracle:thin:@oracle-host:1521/ORCLCDB" + user="myuser" + password="mypwd" + + generate_sink_sql = true + database = "ORCLCDB" + table = "${database_name}.${table_name}" + primary_keys = ["${primary_key}"] + + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode = "APPEND_DATA" + } +} +``` + +### Convert table name to lowercase + +``` +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Oracle-CDC { + plugin_output = "customers_oracle_cdc" + + base-url = "jdbc:oracle:thin:@localhost:1521/ORCLCDB" + username = "dbzuser" + password = "dbz" + database-names = ["ORCLCDB"] + schema-names = ["DEBEZIUM"] + table-names = ["SOURCE.USER_SHOP", "SOURCE.USER_ORDER"] + } +} + +transform { + TableRename { + plugin_input = "customers_oracle_cdc" + plugin_output = "trans_result" + + convert_case = "LOWER" + prefix = "cdc_" + suffix = "_table" + replacements_with_regex = [ + { + replace_from = "USER" + replace_to = "u" + } + ] + } +} + +sink { + Jdbc { + plugin_input = "trans_result" + + url = "jdbc:mysql://localhost:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user_sink" + password = "mysqlpw" + + generate_sink_sql = true + database = "${schema_name}" + table = "${table_name}" + primary_keys = ["${primary_key}"] + + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode = "APPEND_DATA" + } +} +``` \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index c494686161..32cc214c21 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -155,3 +155,5 @@ seatunnel.transform.LLM = seatunnel-transforms-v2 seatunnel.transform.Embedding = seatunnel-transforms-v2 seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2 seatunnel.transform.Metadata = seatunnel-transforms-v2 +seatunnel.transform.FieldRename = seatunnel-transforms-v2 +seatunnel.transform.TableRename = seatunnel-transforms-v2 diff --git a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java index bb3fe1f55b..89f3ec9c48 100644 --- a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java +++ b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java @@ -43,7 +43,7 @@ public class TransformSpecificationCheckTest { FactoryUtil.discoverFactories( Thread.currentThread().getContextClassLoader(), TableTransformFactory.class); - Assertions.assertEquals(13, factories.size()); + Assertions.assertEquals(15, factories.size()); } @Test diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java new file mode 100644 index 0000000000..8b2e55551b --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.transform; + +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +public class TestRenameIT extends TestSuiteBase { + + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}, + disabledReason = "Currently SPARK do not multi table transform") + @TestTemplate + public void testRenameMultiTable(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/table_field_rename_multi_table.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } +} diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_field_rename_multi_table.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_field_rename_multi_table.conf new file mode 100644 index 0000000000..254f2032d4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_field_rename_multi_table.conf @@ -0,0 +1,228 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + plugin_output = "source1" + + tables_configs = [ + { + row.num = 3 + schema = { + table = "test.abc" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "name" + type = "string" + } + ] + } + }, + { + row.num = 5 + schema = { + table = "test.xyz" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "name" + type = "string" + } + ] + } + }, + { + row.num = 10 + schema = { + table = "test.www" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "name" + type = "string" + } + ] + } + } + ] + } +} +transform { + TableRename { + plugin_input = "source1" + plugin_output = "transform1" + + table_match_regex = "test.a.*" + table_transform = [{ + table_path = "test.xyz" + convert_case = "UPPER" + prefix = "P2_" + suffix = "_S2" + replacements_with_regex = [ + { + replace_from = "z" + replace_to = "ZZ" + } + ] + }] + convert_case = "UPPER" + prefix = "P1_" + suffix = "_S1" + replacements_with_regex = [ + { + replace_from = "c" + replace_to = "CC" + } + ] + } + + FieldRename { + plugin_input = "transform1" + plugin_output = "transform2" + + table_match_regex = "TEST.P.*" + table_transform = [{ + table_path = "TEST.P2_XYZZ_S2" + convert_case = "UPPER" + prefix = "F_P2_" + suffix = "_S2_F" + replacements_with_regex = [ + { + replace_from = "id" + replace_to = "ID_1" + } + ] + }] + convert_case = "UPPER" + prefix = "F_P1_" + suffix = "_S1_F" + replacements_with_regex = [ + { + replace_from = "name" + replace_to = "NAME_1" + } + ] + } +} +sink { + Assert { + plugin_input = "transform2" + + rules = + { + tables_configs = [ + { + table_path = "TEST.P1_ABCC_S1" + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 3 + }, + { + rule_type = MIN_ROW + rule_value = 3 + } + ], + field_rules = [ + { + field_name = F_P1_ID_S1_F + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = F_P1_NAME_1_S1_F + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + }, + { + table_path = "TEST.P2_XYZZ_S2" + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + }, + { + rule_type = MIN_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = F_P2_ID_1_S2_F + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = F_P2_NAME_S2_F + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + }, + { + table_path = "test.www" + catalog_table_rule { + table_path = "test.www" + column_rule = [ + { + name = "id" + type = "bigint" + }, + { + name = "name" + type = "string" + } + ] + } + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/ConvertCase.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/ConvertCase.java new file mode 100644 index 0000000000..5dafb26cd3 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/ConvertCase.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +public enum ConvertCase { + LOWER, + UPPER +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameConfig.java new file mode 100644 index 0000000000..7bd8ec1495 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameConfig.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.experimental.Accessors; + +import java.io.Serializable; +import java.util.List; + +@Getter +@Setter +@Accessors(chain = true) +public class FieldRenameConfig implements Serializable { + + public static final Option<ConvertCase> CONVERT_CASE = + Options.key("convert_case") + .enumType(ConvertCase.class) + .noDefaultValue() + .withDescription("Convert to uppercase or lowercase"); + + public static final Option<String> PREFIX = + Options.key("prefix") + .stringType() + .noDefaultValue() + .withDescription("Add prefix for field name"); + + public static final Option<String> SUFFIX = + Options.key("suffix") + .stringType() + .noDefaultValue() + .withDescription("Add suffix for field name"); + + public static final Option<List<ReplacementsWithRegex>> REPLACEMENTS_WITH_REGEX = + Options.key("replacements_with_regex") + .listType(ReplacementsWithRegex.class) + .noDefaultValue() + .withDescription("The regex of replace fields name to "); + + public static final Option<List<SpecificModify>> SPECIFIC = + Options.key("specific") + .listType(SpecificModify.class) + .noDefaultValue() + .withDescription("The specific modify field name"); + + @JsonAlias("table_match_regex") + private String tableMatchRegex; + + @JsonAlias("is_table_match_regex") + private Boolean isTableMatchRegex; + + @JsonAlias("match_tables") + private List<String> matchTables; + + @JsonAlias("convert_case") + private ConvertCase convertCase; + + @JsonAlias("prefix") + private String prefix; + + @JsonAlias("suffix") + private String suffix; + + @JsonAlias("replacements_with_regex") + private List<ReplacementsWithRegex> replacementsWithRegex; + + @JsonAlias("specific") + private List<SpecificModify> specific; + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class SpecificModify implements Serializable { + @JsonAlias("field_name") + private String fieldName; + + @JsonAlias("target_name") + private String targetName; + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class ReplacementsWithRegex implements Serializable { + @JsonAlias("replace_from") + private String replaceFrom; + + @JsonAlias("replace_to") + private String replaceTo; + + @JsonAlias("is_regex") + private Boolean isRegex; + } + + public static FieldRenameConfig of(ReadonlyConfig config) { + FieldRenameConfig renameConfig = new FieldRenameConfig(); + renameConfig.setConvertCase(config.get(CONVERT_CASE)); + renameConfig.setPrefix(config.get(PREFIX)); + renameConfig.setSuffix(config.get(SUFFIX)); + renameConfig.setReplacementsWithRegex(config.get(REPLACEMENTS_WITH_REGEX)); + renameConfig.setSpecific(config.get(SPECIFIC)); + return renameConfig; + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java new file mode 100644 index 0000000000..dd0f61e4e4 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameMultiCatalogTransform.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform; + +import java.util.List; + +public class FieldRenameMultiCatalogTransform extends AbstractMultiCatalogMapTransform { + + public FieldRenameMultiCatalogTransform( + List<CatalogTable> inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + public String getPluginName() { + return FieldRenameTransform.PLUGIN_NAME; + } + + @Override + protected SeaTunnelTransform<SeaTunnelRow> buildTransform( + CatalogTable table, ReadonlyConfig config) { + return new FieldRenameTransform(FieldRenameConfig.of(config), table); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransform.java new file mode 100644 index 0000000000..b151b355ac --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransform.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher; +import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventHandler; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import lombok.extern.slf4j.Slf4j; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +@Slf4j +public class FieldRenameTransform extends AbstractCatalogSupportMapTransform { + public static String PLUGIN_NAME = "FieldRename"; + + private CatalogTable inputTable; + private final FieldRenameConfig config; + private TableSchemaChangeEventHandler tableSchemaChangeEventHandler; + + public FieldRenameTransform(FieldRenameConfig config, CatalogTable table) { + super(table); + this.config = config; + this.inputTable = table; + this.tableSchemaChangeEventHandler = new TableSchemaChangeEventDispatcher(); + } + + @Override + public String getPluginName() { + return PLUGIN_NAME; + } + + @Override + protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) { + return inputRow; + } + + @Override + public SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent event) { + TableSchema newTableSchema = + tableSchemaChangeEventHandler.reset(inputTable.getTableSchema()).apply(event); + this.inputTable = + CatalogTable.of( + inputTable.getTableId(), + newTableSchema, + inputTable.getOptions(), + inputTable.getPartitionKeys(), + inputTable.getComment()); + + if (event instanceof AlterTableColumnsEvent) { + AlterTableColumnsEvent alterTableColumnsEvent = (AlterTableColumnsEvent) event; + AlterTableColumnsEvent newEvent = + new AlterTableColumnsEvent( + event.tableIdentifier(), + alterTableColumnsEvent.getEvents().stream() + .map(this::convertName) + .collect(Collectors.toList())); + + newEvent.setJobId(event.getJobId()); + newEvent.setStatement(((AlterTableColumnsEvent) event).getStatement()); + newEvent.setSourceDialectName(((AlterTableColumnsEvent) event).getSourceDialectName()); + if (event.getChangeAfter() != null) { + newEvent.setChangeAfter( + CatalogTable.of( + event.getChangeAfter().getTableId(), event.getChangeAfter())); + } + return newEvent; + } + if (event instanceof AlterTableColumnEvent) { + return convertName((AlterTableColumnEvent) event); + } + return event; + } + + @Override + protected TableSchema transformTableSchema() { + return convertTableSchema(inputTable.getTableSchema()); + } + + @Override + protected TableIdentifier transformTableIdentifier() { + return inputTable.getTableId(); + } + + @VisibleForTesting + public String convertName(String name) { + if (name == null) { + return null; + } + + Optional<FieldRenameConfig.SpecificModify> specificValue = getSpecificModify(name); + if (specificValue.isPresent()) { + return specificValue.get().getTargetName(); + } + String replaceTo = null; + Map<Integer, Integer> replaceIndex = new LinkedHashMap<>(); + + if (CollectionUtils.isNotEmpty(config.getReplacementsWithRegex())) { + for (FieldRenameConfig.ReplacementsWithRegex replacementsWithRegex : + config.getReplacementsWithRegex()) { + Boolean isRegex = replacementsWithRegex.getIsRegex(); + String replacement = replacementsWithRegex.getReplaceFrom(); + if (StringUtils.isNotEmpty(replacement)) { + Map<Integer, Integer> matched = new LinkedHashMap<>(); + if (BooleanUtils.isNotTrue(isRegex)) { + if (StringUtils.equals(replacement, name)) { + matched.put(0, name.length()); + } + } else { + Matcher matcher = Pattern.compile(replacement).matcher(name); + while (matcher.find()) { + matched.put(matcher.start(), matcher.end()); + } + } + if (!matched.isEmpty()) { + replaceTo = replacementsWithRegex.getReplaceTo(); + replaceIndex = matched; + } + } + } + } + + if (config.getConvertCase() != null) { + switch (config.getConvertCase()) { + case UPPER: + name = name.toUpperCase(); + break; + case LOWER: + name = name.toLowerCase(); + break; + default: + throw new UnsupportedOperationException( + "Unsupported convert case: " + config.getConvertCase()); + } + } + int offset = 0; + for (Map.Entry<Integer, Integer> index : replaceIndex.entrySet()) { + int indexStart = index.getKey(); + int indexEnd = index.getValue(); + name = + name.substring(0, indexStart + offset) + + replaceTo.trim() + + name.substring(indexEnd + offset); + offset += replaceTo.trim().length() - (indexEnd - indexStart); + } + if (StringUtils.isNotBlank(config.getPrefix())) { + name = config.getPrefix().trim() + name; + } + if (StringUtils.isNotBlank(config.getSuffix())) { + name = name + config.getSuffix().trim(); + } + return name; + } + + private Optional<FieldRenameConfig.SpecificModify> getSpecificModify(String oldColumnName) { + if (config.getSpecific() == null) { + return Optional.empty(); + } + return config.getSpecific().stream() + .filter(specific -> specific.getFieldName().equals(oldColumnName)) + .findFirst(); + } + + @VisibleForTesting + public AlterTableColumnEvent convertName(AlterTableColumnEvent event) { + AlterTableColumnEvent newEvent = event; + switch (event.getEventType()) { + case SCHEMA_CHANGE_ADD_COLUMN: + AlterTableAddColumnEvent addColumnEvent = (AlterTableAddColumnEvent) event; + newEvent = + new AlterTableAddColumnEvent( + event.tableIdentifier(), + convertName(addColumnEvent.getColumn()), + addColumnEvent.isFirst(), + convertName(addColumnEvent.getAfterColumn())); + break; + case SCHEMA_CHANGE_DROP_COLUMN: + AlterTableDropColumnEvent dropColumnEvent = (AlterTableDropColumnEvent) event; + newEvent = + new AlterTableDropColumnEvent( + event.tableIdentifier(), convertName(dropColumnEvent.getColumn())); + break; + case SCHEMA_CHANGE_MODIFY_COLUMN: + AlterTableModifyColumnEvent modifyColumnEvent = (AlterTableModifyColumnEvent) event; + newEvent = + new AlterTableModifyColumnEvent( + event.tableIdentifier(), + convertName(modifyColumnEvent.getColumn()), + modifyColumnEvent.isFirst(), + convertName(modifyColumnEvent.getAfterColumn())); + break; + case SCHEMA_CHANGE_CHANGE_COLUMN: + AlterTableChangeColumnEvent changeColumnEvent = (AlterTableChangeColumnEvent) event; + boolean nameChanged = + !changeColumnEvent + .getOldColumn() + .equals(changeColumnEvent.getColumn().getName()); + if (nameChanged) { + log.warn( + "FieldRenameTransform does not support changing column name, " + + "old column name: {}, new column name: {}", + changeColumnEvent.getOldColumn(), + changeColumnEvent.getColumn().getName()); + return changeColumnEvent; + } + + newEvent = + new AlterTableChangeColumnEvent( + event.tableIdentifier(), + convertName(changeColumnEvent.getOldColumn()), + convertName(changeColumnEvent.getColumn()), + changeColumnEvent.isFirst(), + convertName(changeColumnEvent.getAfterColumn())); + break; + default: + log.warn("Unsupported event: {}", event); + return event; + } + + newEvent.setJobId(event.getJobId()); + newEvent.setStatement(event.getStatement()); + newEvent.setSourceDialectName(event.getSourceDialectName()); + if (event.getChangeAfter() != null) { + CatalogTable newChangeAfter = + CatalogTable.of( + event.getChangeAfter().getTableId(), + convertTableSchema(event.getChangeAfter().getTableSchema()), + event.getChangeAfter().getOptions(), + event.getChangeAfter().getPartitionKeys(), + event.getChangeAfter().getComment()); + newEvent.setChangeAfter(newChangeAfter); + } + return newEvent; + } + + private Column convertName(Column column) { + return column.rename(convertName(column.getName())); + } + + private TableSchema convertTableSchema(TableSchema tableSchema) { + List<Column> columns = + tableSchema.getColumns().stream() + .map( + column -> { + String newColumnName = convertName(column.getName()); + return column.rename(newColumnName); + }) + .collect(Collectors.toList()); + PrimaryKey primaryKey = + Optional.ofNullable(tableSchema.getPrimaryKey()) + .map( + pk -> + PrimaryKey.of( + pk.getPrimaryKey(), + pk.getColumnNames().stream() + .map(this::convertName) + .collect(Collectors.toList()), + pk.getEnableAutoId())) + .orElse(null); + List<ConstraintKey> constraintKeys = + Optional.ofNullable(tableSchema.getConstraintKeys()) + .map( + keyList -> + keyList.stream() + .map( + key -> + ConstraintKey.of( + key.getConstraintType(), + key.getConstraintName(), + key.getColumnNames() + .stream() + .map( + column -> + ConstraintKey + .ConstraintKeyColumn + .of( + convertName( + column + .getColumnName()), + column + .getSortType())) + .collect( + Collectors + .toList()))) + .collect(Collectors.toList())) + .orElse(null); + return TableSchema.builder() + .columns(columns) + .primaryKey(primaryKey) + .constraintKey(constraintKeys) + .build(); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransformFactory.java new file mode 100644 index 0000000000..299bbb5255 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/FieldRenameTransformFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableTransform; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; + +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.transform.rename.FieldRenameConfig.CONVERT_CASE; +import static org.apache.seatunnel.transform.rename.FieldRenameConfig.PREFIX; +import static org.apache.seatunnel.transform.rename.FieldRenameConfig.REPLACEMENTS_WITH_REGEX; +import static org.apache.seatunnel.transform.rename.FieldRenameConfig.SPECIFIC; +import static org.apache.seatunnel.transform.rename.FieldRenameConfig.SUFFIX; + +@AutoService(Factory.class) +public class FieldRenameTransformFactory implements TableTransformFactory { + + @Override + public String factoryIdentifier() { + return FieldRenameTransform.PLUGIN_NAME; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .optional(CONVERT_CASE, PREFIX, SUFFIX, REPLACEMENTS_WITH_REGEX, SPECIFIC) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) + .build(); + } + + @Override + public TableTransform createTransform(TableTransformFactoryContext context) { + return () -> + new FieldRenameMultiCatalogTransform( + context.getCatalogTables(), context.getOptions()); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameConfig.java new file mode 100644 index 0000000000..be2d6a25e2 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameConfig.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.experimental.Accessors; + +import java.io.Serializable; +import java.util.List; + +@Getter +@Setter +@Accessors(chain = true) +public class TableRenameConfig implements Serializable { + + public static final Option<ConvertCase> CONVERT_CASE = + Options.key("convert_case") + .enumType(ConvertCase.class) + .noDefaultValue() + .withDescription("Convert to uppercase or lowercase"); + + public static final Option<String> PREFIX = + Options.key("prefix") + .stringType() + .noDefaultValue() + .withDescription("Add prefix for table name"); + + public static final Option<String> SUFFIX = + Options.key("suffix") + .stringType() + .noDefaultValue() + .withDescription("Add suffix for table name"); + + public static final Option<List<ReplacementsWithRegex>> REPLACEMENTS_WITH_REGEX = + Options.key("replacements_with_regex") + .listType(ReplacementsWithRegex.class) + .noDefaultValue() + .withDescription("The regex of replace table name to "); + + @JsonAlias("convert_case") + private ConvertCase convertCase; + + @JsonAlias("prefix") + private String prefix; + + @JsonAlias("suffix") + private String suffix; + + @JsonAlias("replacements_with_regex") + private List<ReplacementsWithRegex> replacementsWithRegex; + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class ReplacementsWithRegex implements Serializable { + @JsonAlias("replace_from") + private String replaceFrom; + + @JsonAlias("replace_to") + private String replaceTo; + + private final Boolean isRegex = true; + } + + public static TableRenameConfig of(ReadonlyConfig config) { + TableRenameConfig renameConfig = new TableRenameConfig(); + renameConfig.setConvertCase(config.get(CONVERT_CASE)); + renameConfig.setPrefix(config.get(PREFIX)); + renameConfig.setSuffix(config.get(SUFFIX)); + renameConfig.setReplacementsWithRegex(config.get(REPLACEMENTS_WITH_REGEX)); + return renameConfig; + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java new file mode 100644 index 0000000000..67cff881da --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameMultiCatalogTransform.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform; + +import java.util.List; + +public class TableRenameMultiCatalogTransform extends AbstractMultiCatalogMapTransform { + + public TableRenameMultiCatalogTransform( + List<CatalogTable> inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + public String getPluginName() { + return TableRenameTransform.PLUGIN_NAME; + } + + @Override + protected SeaTunnelTransform<SeaTunnelRow> buildTransform( + CatalogTable table, ReadonlyConfig config) { + return new TableRenameTransform(TableRenameConfig.of(config), table); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransform.java new file mode 100644 index 0000000000..d1a3156115 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransform.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import lombok.extern.slf4j.Slf4j; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +@Slf4j +public class TableRenameTransform extends AbstractCatalogSupportMapTransform { + public static String PLUGIN_NAME = "TableRename"; + + private final CatalogTable inputTable; + private final TableRenameConfig config; + + private TablePath outputTablePath; + private String outputTableId; + + public TableRenameTransform(TableRenameConfig config, CatalogTable table) { + super(table); + this.inputTable = table; + this.config = config; + } + + @Override + public String getPluginName() { + return PLUGIN_NAME; + } + + @Override + protected TableSchema transformTableSchema() { + return TableSchema.builder() + .columns(inputTable.getTableSchema().getColumns()) + .constraintKey(inputTable.getTableSchema().getConstraintKeys()) + .primaryKey(inputTable.getTableSchema().getPrimaryKey()) + .build(); + } + + @Override + protected TableIdentifier transformTableIdentifier() { + TablePath inputTablePath = inputTable.getTablePath(); + String inputDatabaseName = inputTablePath.getDatabaseName(); + String inputSchemaName = inputTablePath.getSchemaName(); + String inputTableName = inputTablePath.getTableName(); + + String outputDatabaseName = + Optional.ofNullable(inputDatabaseName).map(this::convertCase).orElse(null); + String outputSchemaName = + Optional.ofNullable(inputSchemaName).map(this::convertCase).orElse(null); + String outputTableName = convertName(inputTableName); + TablePath outputTablePath = + TablePath.of(outputDatabaseName, outputSchemaName, outputTableName); + this.outputTablePath = outputTablePath; + this.outputTableId = outputTablePath.getFullName(); + return TableIdentifier.of(inputTable.getCatalogName(), outputTablePath); + } + + @Override + protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) { + if (inputRow.getTableId() == null) { + log.debug("Table id is null, skip renaming"); + return inputRow; + } + if (outputTableId.equals(inputRow.getTableId())) { + return inputRow; + } + + SeaTunnelRow outputRow = inputRow.copy(); + outputRow.setTableId(outputTableId); + return outputRow; + } + + @Override + public SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent event) { + TablePath inputTablePath = event.tablePath(); + if (inputTablePath == null) { + return event; + } + if (outputTablePath.equals(inputTablePath)) { + return event; + } + + if (event instanceof AlterTableColumnsEvent) { + TableIdentifier newTableIdentifier = + TableIdentifier.of(event.tableIdentifier().getCatalogName(), outputTablePath); + AlterTableColumnsEvent alterTableColumnsEvent = (AlterTableColumnsEvent) event; + AlterTableColumnsEvent newEvent = + new AlterTableColumnsEvent( + newTableIdentifier, + alterTableColumnsEvent.getEvents().stream() + .map(this::convertName) + .collect(Collectors.toList())); + + newEvent.setJobId(event.getJobId()); + newEvent.setStatement(((AlterTableColumnsEvent) event).getStatement()); + newEvent.setSourceDialectName(((AlterTableColumnsEvent) event).getSourceDialectName()); + if (event.getChangeAfter() != null) { + newEvent.setChangeAfter( + CatalogTable.of(newTableIdentifier, event.getChangeAfter())); + } + return newEvent; + } + if (event instanceof AlterTableColumnEvent) { + return convertName((AlterTableColumnEvent) event); + } + return event; + } + + public String convertCase(String name) { + if (config.getConvertCase() != null) { + switch (config.getConvertCase()) { + case UPPER: + return name.toUpperCase(); + case LOWER: + return name.toLowerCase(); + default: + throw new UnsupportedOperationException( + "Unsupported convert case: " + config.getConvertCase()); + } + } + return name; + } + + @VisibleForTesting + public String convertName(String tableName) { + String replaceTo = null; + Map<Integer, Integer> replaceIndex = new LinkedHashMap<>(); + + if (CollectionUtils.isNotEmpty(config.getReplacementsWithRegex())) { + for (TableRenameConfig.ReplacementsWithRegex replacementsWithRegex : + config.getReplacementsWithRegex()) { + Boolean isRegex = replacementsWithRegex.getIsRegex(); + String replacement = replacementsWithRegex.getReplaceFrom(); + if (StringUtils.isNotEmpty(replacement)) { + Map<Integer, Integer> matched = new LinkedHashMap<>(); + if (BooleanUtils.isNotTrue(isRegex)) { + if (StringUtils.equals(replacement, tableName)) { + matched.put(0, tableName.length()); + } + } else { + Matcher matcher = Pattern.compile(replacement).matcher(tableName); + while (matcher.find()) { + matched.put(matcher.start(), matcher.end()); + } + } + if (!matched.isEmpty()) { + replaceTo = replacementsWithRegex.getReplaceTo(); + replaceIndex = matched; + } + } + } + } + + tableName = convertCase(tableName); + + int offset = 0; + for (Map.Entry<Integer, Integer> index : replaceIndex.entrySet()) { + int indexStart = index.getKey(); + int indexEnd = index.getValue(); + tableName = + tableName.substring(0, indexStart + offset) + + replaceTo.trim() + + tableName.substring(indexEnd + offset); + offset += replaceTo.trim().length() - (indexEnd - indexStart); + } + if (StringUtils.isNotBlank(config.getPrefix())) { + tableName = config.getPrefix().trim() + tableName; + } + if (StringUtils.isNotBlank(config.getSuffix())) { + tableName = tableName + config.getSuffix().trim(); + } + return tableName; + } + + @VisibleForTesting + public AlterTableColumnEvent convertName(AlterTableColumnEvent event) { + TableIdentifier newTableIdentifier = + TableIdentifier.of(event.tableIdentifier().getCatalogName(), outputTablePath); + AlterTableColumnEvent newEvent = event; + switch (event.getEventType()) { + case SCHEMA_CHANGE_ADD_COLUMN: + AlterTableAddColumnEvent addColumnEvent = (AlterTableAddColumnEvent) event; + newEvent = + new AlterTableAddColumnEvent( + newTableIdentifier, + addColumnEvent.getColumn(), + addColumnEvent.isFirst(), + addColumnEvent.getAfterColumn()); + break; + case SCHEMA_CHANGE_DROP_COLUMN: + AlterTableDropColumnEvent dropColumnEvent = (AlterTableDropColumnEvent) event; + newEvent = + new AlterTableDropColumnEvent( + newTableIdentifier, dropColumnEvent.getColumn()); + break; + case SCHEMA_CHANGE_MODIFY_COLUMN: + AlterTableModifyColumnEvent modifyColumnEvent = (AlterTableModifyColumnEvent) event; + newEvent = + new AlterTableModifyColumnEvent( + newTableIdentifier, + modifyColumnEvent.getColumn(), + modifyColumnEvent.isFirst(), + modifyColumnEvent.getAfterColumn()); + break; + case SCHEMA_CHANGE_CHANGE_COLUMN: + AlterTableChangeColumnEvent changeColumnEvent = (AlterTableChangeColumnEvent) event; + newEvent = + new AlterTableChangeColumnEvent( + newTableIdentifier, + changeColumnEvent.getOldColumn(), + changeColumnEvent.getColumn(), + changeColumnEvent.isFirst(), + changeColumnEvent.getAfterColumn()); + break; + default: + log.warn("Unsupported event: {}", event); + return event; + } + + newEvent.setJobId(event.getJobId()); + newEvent.setStatement(event.getStatement()); + newEvent.setSourceDialectName(event.getSourceDialectName()); + if (event.getChangeAfter() != null) { + newEvent.setChangeAfter(CatalogTable.of(newTableIdentifier, event.getChangeAfter())); + } + return newEvent; + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransformFactory.java new file mode 100644 index 0000000000..840bfff209 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rename/TableRenameTransformFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableTransform; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; + +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.transform.rename.TableRenameConfig.CONVERT_CASE; +import static org.apache.seatunnel.transform.rename.TableRenameConfig.PREFIX; +import static org.apache.seatunnel.transform.rename.TableRenameConfig.REPLACEMENTS_WITH_REGEX; +import static org.apache.seatunnel.transform.rename.TableRenameConfig.SUFFIX; + +@AutoService(Factory.class) +public class TableRenameTransformFactory implements TableTransformFactory { + @Override + public String factoryIdentifier() { + return TableRenameTransform.PLUGIN_NAME; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .optional(CONVERT_CASE, PREFIX, SUFFIX, REPLACEMENTS_WITH_REGEX) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) + .build(); + } + + @Override + public TableTransform createTransform(TableTransformFactoryContext context) { + return () -> + new TableRenameMultiCatalogTransform( + context.getCatalogTables(), context.getOptions()); + } +} diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameTransformTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameTransformTest.java new file mode 100644 index 0000000000..02a8b01bcf --- /dev/null +++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/FieldRenameTransformTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.type.BasicType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.stream.Collectors; + +public class FieldRenameTransformTest { + + private static final CatalogTable DEFAULT_TABLE = + CatalogTable.of( + TableIdentifier.of("test", "Database-x", "Schema-x", "Table-x"), + TableSchema.builder() + .column( + PhysicalColumn.of( + "f1", + BasicType.LONG_TYPE, + null, + null, + false, + null, + null)) + .column( + PhysicalColumn.of( + "f2", + BasicType.LONG_TYPE, + null, + null, + true, + null, + null)) + .column( + PhysicalColumn.of( + "f3", + BasicType.LONG_TYPE, + null, + null, + true, + null, + null)) + .primaryKey(PrimaryKey.of("pk1", Arrays.asList("f1"))) + .constraintKey( + ConstraintKey.of( + ConstraintKey.ConstraintType.UNIQUE_KEY, + "uk1", + Arrays.asList( + ConstraintKey.ConstraintKeyColumn.of( + "f2", ConstraintKey.ColumnSortType.ASC), + ConstraintKey.ConstraintKeyColumn.of( + "f3", + ConstraintKey.ColumnSortType.ASC)))) + .build(), + Collections.emptyMap(), + Collections.singletonList("f2"), + null); + + @Test + public void testRename() { + AlterTableAddColumnEvent addColumnEvent = + AlterTableAddColumnEvent.add( + DEFAULT_TABLE.getTableId(), + PhysicalColumn.of("f4", BasicType.LONG_TYPE, null, null, true, null, null)); + AlterTableModifyColumnEvent modifyColumnEvent = + AlterTableModifyColumnEvent.modify( + DEFAULT_TABLE.getTableId(), + PhysicalColumn.of("f4", BasicType.INT_TYPE, null, null, true, null, null)); + AlterTableChangeColumnEvent changeColumnEvent = + AlterTableChangeColumnEvent.change( + DEFAULT_TABLE.getTableId(), + "f4", + PhysicalColumn.of("f5", BasicType.INT_TYPE, null, null, true, null, null)); + AlterTableDropColumnEvent dropColumnEvent = + new AlterTableDropColumnEvent(DEFAULT_TABLE.getTableId(), "f5"); + + FieldRenameConfig config = new FieldRenameConfig().setConvertCase(ConvertCase.LOWER); + FieldRenameTransform transform = new FieldRenameTransform(config, DEFAULT_TABLE); + CatalogTable outputCatalogTable = transform.getProducedCatalogTable(); + AlterTableAddColumnEvent outputAddEvent = + (AlterTableAddColumnEvent) transform.mapSchemaChangeEvent(addColumnEvent); + AlterTableModifyColumnEvent outputModifyEvent = + (AlterTableModifyColumnEvent) transform.mapSchemaChangeEvent(modifyColumnEvent); + AlterTableChangeColumnEvent outputChangeEvent = + (AlterTableChangeColumnEvent) transform.mapSchemaChangeEvent(changeColumnEvent); + AlterTableDropColumnEvent outputDropEvent = + (AlterTableDropColumnEvent) transform.mapSchemaChangeEvent(dropColumnEvent); + + Assertions.assertIterableEquals( + Arrays.asList("f1", "f2", "f3"), + Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames())); + Assertions.assertIterableEquals( + Arrays.asList("f1"), + outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()); + outputCatalogTable.getTableSchema().getConstraintKeys().stream() + .forEach( + key -> + Assertions.assertIterableEquals( + Arrays.asList("f2", "f3"), + key.getColumnNames().stream() + .map( + ConstraintKey.ConstraintKeyColumn + ::getColumnName) + .collect(Collectors.toList()))); + Assertions.assertEquals("f4", outputAddEvent.getColumn().getName()); + Assertions.assertEquals("f4", outputModifyEvent.getColumn().getName()); + Assertions.assertEquals("f4", outputChangeEvent.getOldColumn()); + Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName()); + Assertions.assertEquals("f5", outputDropEvent.getColumn()); + + config = new FieldRenameConfig().setConvertCase(ConvertCase.UPPER); + transform = new FieldRenameTransform(config, DEFAULT_TABLE); + outputCatalogTable = transform.getProducedCatalogTable(); + outputAddEvent = (AlterTableAddColumnEvent) transform.mapSchemaChangeEvent(addColumnEvent); + outputModifyEvent = + (AlterTableModifyColumnEvent) transform.mapSchemaChangeEvent(modifyColumnEvent); + outputChangeEvent = + (AlterTableChangeColumnEvent) transform.mapSchemaChangeEvent(changeColumnEvent); + outputDropEvent = + (AlterTableDropColumnEvent) transform.mapSchemaChangeEvent(dropColumnEvent); + Assertions.assertIterableEquals( + Arrays.asList("F1", "F2", "F3"), + Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames())); + Assertions.assertIterableEquals( + Arrays.asList("F1"), + outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()); + outputCatalogTable.getTableSchema().getConstraintKeys().stream() + .forEach( + key -> + Assertions.assertIterableEquals( + Arrays.asList("F2", "F3"), + key.getColumnNames().stream() + .map( + ConstraintKey.ConstraintKeyColumn + ::getColumnName) + .collect(Collectors.toList()))); + Assertions.assertEquals("F4", outputAddEvent.getColumn().getName()); + Assertions.assertEquals("F4", outputModifyEvent.getColumn().getName()); + Assertions.assertEquals("f4", outputChangeEvent.getOldColumn()); + Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName()); + Assertions.assertEquals("F5", outputDropEvent.getColumn()); + + config = new FieldRenameConfig().setPrefix("p-").setSuffix("-s"); + transform = new FieldRenameTransform(config, DEFAULT_TABLE); + outputCatalogTable = transform.getProducedCatalogTable(); + outputAddEvent = (AlterTableAddColumnEvent) transform.mapSchemaChangeEvent(addColumnEvent); + outputModifyEvent = + (AlterTableModifyColumnEvent) transform.mapSchemaChangeEvent(modifyColumnEvent); + outputChangeEvent = + (AlterTableChangeColumnEvent) transform.mapSchemaChangeEvent(changeColumnEvent); + outputDropEvent = + (AlterTableDropColumnEvent) transform.mapSchemaChangeEvent(dropColumnEvent); + Assertions.assertIterableEquals( + Arrays.asList("p-f1-s", "p-f2-s", "p-f3-s"), + Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames())); + Assertions.assertIterableEquals( + Arrays.asList("p-f1-s"), + outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()); + outputCatalogTable.getTableSchema().getConstraintKeys().stream() + .forEach( + key -> + Assertions.assertIterableEquals( + Arrays.asList("p-f2-s", "p-f3-s"), + key.getColumnNames().stream() + .map( + ConstraintKey.ConstraintKeyColumn + ::getColumnName) + .collect(Collectors.toList()))); + Assertions.assertEquals("p-f4-s", outputAddEvent.getColumn().getName()); + Assertions.assertEquals("p-f4-s", outputModifyEvent.getColumn().getName()); + Assertions.assertEquals("f4", outputChangeEvent.getOldColumn()); + Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName()); + Assertions.assertEquals("p-f5-s", outputDropEvent.getColumn()); + + config = + new FieldRenameConfig() + .setReplacementsWithRegex( + Arrays.asList( + new FieldRenameConfig.ReplacementsWithRegex( + "f1", "t1", true), + new FieldRenameConfig.ReplacementsWithRegex( + "f1", "t2", true))); + transform = new FieldRenameTransform(config, DEFAULT_TABLE); + outputCatalogTable = transform.getProducedCatalogTable(); + outputAddEvent = (AlterTableAddColumnEvent) transform.mapSchemaChangeEvent(addColumnEvent); + outputModifyEvent = + (AlterTableModifyColumnEvent) transform.mapSchemaChangeEvent(modifyColumnEvent); + outputChangeEvent = + (AlterTableChangeColumnEvent) transform.mapSchemaChangeEvent(changeColumnEvent); + outputDropEvent = + (AlterTableDropColumnEvent) transform.mapSchemaChangeEvent(dropColumnEvent); + Assertions.assertIterableEquals( + Arrays.asList("t2", "f2", "f3"), + Arrays.asList(outputCatalogTable.getTableSchema().getFieldNames())); + Assertions.assertIterableEquals( + Arrays.asList("t2"), + outputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()); + outputCatalogTable.getTableSchema().getConstraintKeys().stream() + .forEach( + key -> + Assertions.assertIterableEquals( + Arrays.asList("f2", "f3"), + key.getColumnNames().stream() + .map( + ConstraintKey.ConstraintKeyColumn + ::getColumnName) + .collect(Collectors.toList()))); + Assertions.assertEquals("f4", outputAddEvent.getColumn().getName()); + Assertions.assertEquals("f4", outputModifyEvent.getColumn().getName()); + Assertions.assertEquals("f4", outputChangeEvent.getOldColumn()); + Assertions.assertEquals("f5", outputChangeEvent.getColumn().getName()); + Assertions.assertEquals("f5", outputDropEvent.getColumn()); + } +} diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/TableRenameTransformTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/TableRenameTransformTest.java new file mode 100644 index 0000000000..d1d756922e --- /dev/null +++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rename/TableRenameTransformTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.rename; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class TableRenameTransformTest { + + private static final CatalogTable DEFAULT_TABLE = + CatalogTable.of( + TableIdentifier.of("test", "Database-x", "Schema-x", "Table-x"), + TableSchema.builder() + .column( + PhysicalColumn.of( + "f1", + BasicType.LONG_TYPE, + null, + null, + false, + null, + null)) + .column( + PhysicalColumn.of( + "f2", + BasicType.LONG_TYPE, + null, + null, + true, + null, + null)) + .column( + PhysicalColumn.of( + "f3", + BasicType.LONG_TYPE, + null, + null, + true, + null, + null)) + .primaryKey(PrimaryKey.of("pk1", Arrays.asList("f1"))) + .constraintKey( + ConstraintKey.of( + ConstraintKey.ConstraintType.UNIQUE_KEY, + "uk1", + Arrays.asList( + ConstraintKey.ConstraintKeyColumn.of( + "f2", ConstraintKey.ColumnSortType.ASC), + ConstraintKey.ConstraintKeyColumn.of( + "f3", + ConstraintKey.ColumnSortType.ASC)))) + .build(), + Collections.emptyMap(), + Collections.singletonList("f2"), + null); + + @Test + public void testRename() { + SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {1L, 1L, 1L}); + inputRow.setTableId(DEFAULT_TABLE.getTablePath().getFullName()); + AlterTableAddColumnEvent inputEvent = + AlterTableAddColumnEvent.add( + DEFAULT_TABLE.getTableId(), + PhysicalColumn.of("f4", BasicType.LONG_TYPE, null, null, true, null, null)); + + TableRenameConfig config = new TableRenameConfig().setConvertCase(ConvertCase.LOWER); + + TableRenameTransform transform = new TableRenameTransform(config, DEFAULT_TABLE); + List<CatalogTable> outputCatalogTable = transform.getProducedCatalogTables(); + SeaTunnelRow outputRow = transform.map(inputRow); + SchemaChangeEvent outputEvent = transform.mapSchemaChangeEvent(inputEvent); + Assertions.assertEquals( + "database-x.schema-x.table-x", + outputCatalogTable.get(0).getTableId().toTablePath().getFullName()); + Assertions.assertEquals("database-x.schema-x.table-x", outputRow.getTableId()); + Assertions.assertEquals( + "database-x.schema-x.table-x", outputEvent.tablePath().getFullName()); + + config = new TableRenameConfig().setConvertCase(ConvertCase.UPPER); + transform = new TableRenameTransform(config, DEFAULT_TABLE); + outputCatalogTable = transform.getProducedCatalogTables(); + outputRow = transform.map(inputRow); + outputEvent = transform.mapSchemaChangeEvent(inputEvent); + Assertions.assertEquals( + "DATABASE-X.SCHEMA-X.TABLE-X", + outputCatalogTable.get(0).getTableId().toTablePath().getFullName()); + Assertions.assertEquals("DATABASE-X.SCHEMA-X.TABLE-X", outputRow.getTableId()); + Assertions.assertEquals( + "DATABASE-X.SCHEMA-X.TABLE-X", outputEvent.tablePath().getFullName()); + + config = new TableRenameConfig().setPrefix("user-").setSuffix("-table"); + transform = new TableRenameTransform(config, DEFAULT_TABLE); + outputCatalogTable = transform.getProducedCatalogTables(); + outputRow = transform.map(inputRow); + outputEvent = transform.mapSchemaChangeEvent(inputEvent); + Assertions.assertEquals( + "Database-x.Schema-x.user-Table-x-table", + outputCatalogTable.get(0).getTableId().toTablePath().getFullName()); + Assertions.assertEquals("Database-x.Schema-x.user-Table-x-table", outputRow.getTableId()); + Assertions.assertEquals( + "Database-x.Schema-x.user-Table-x-table", outputEvent.tablePath().getFullName()); + + config = + new TableRenameConfig() + .setReplacementsWithRegex( + Arrays.asList( + new TableRenameConfig.ReplacementsWithRegex("Table", "t1"), + new TableRenameConfig.ReplacementsWithRegex( + "Table", "t2"))); + transform = new TableRenameTransform(config, DEFAULT_TABLE); + outputCatalogTable = transform.getProducedCatalogTables(); + outputRow = transform.map(inputRow); + outputEvent = transform.mapSchemaChangeEvent(inputEvent); + Assertions.assertEquals( + "Database-x.Schema-x.t2-x", + outputCatalogTable.get(0).getTableId().toTablePath().getFullName()); + Assertions.assertEquals("Database-x.Schema-x.t2-x", outputRow.getTableId()); + Assertions.assertEquals("Database-x.Schema-x.t2-x", outputEvent.tablePath().getFullName()); + } +}