This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c41fe2087f [Feature][Transform-V2] Support RegexExtract Transform 
(#9829)
c41fe2087f is described below

commit c41fe2087ff9ed7ba4da8bc1ea0224e81fa35db6
Author: xiaochen <[email protected]>
AuthorDate: Tue Sep 9 16:55:41 2025 +0800

    [Feature][Transform-V2] Support RegexExtract Transform (#9829)
---
 docs/en/transform-v2/regexextract.md               | 122 +++++++++++++++++
 docs/zh/transform-v2/regexextract.md               | 122 +++++++++++++++++
 .../connector/TransformSpecificationCheckTest.java |   2 +-
 .../e2e/transform/TestRegexExtractIT.java          |  45 ++++++
 .../regexextract/regex_extract_transform.conf      | 101 ++++++++++++++
 .../regex_extract_transform_multi_table.conf       | 152 +++++++++++++++++++++
 .../RegexExtractMultiCatalogTransform.java         |  45 ++++++
 .../regexextract/RegexExtractTransform.java        | 126 +++++++++++++++++
 .../regexextract/RegexExtractTransformConfig.java  |  83 +++++++++++
 .../RegexExtractTransformErrorCode.java            |  41 ++++++
 .../regexextract/RegexExtractTransformFactory.java |  56 ++++++++
 .../RegexExtractTransformFactoryTest.java          |  32 +++++
 .../regexextract/RegexExtractTransformTest.java    |  78 +++++++++++
 13 files changed, 1004 insertions(+), 1 deletion(-)

diff --git a/docs/en/transform-v2/regexextract.md 
b/docs/en/transform-v2/regexextract.md
new file mode 100644
index 0000000000..995e9b3e0d
--- /dev/null
+++ b/docs/en/transform-v2/regexextract.md
@@ -0,0 +1,122 @@
+# RegexExtract
+
+> RegexExtract transform plugin
+
+## Description
+
+The `RegexExtract` transform plugin uses regular expressions to extract data 
from a specified field and outputs the extracted values to new fields. It 
supports capture groups in regex patterns and allows setting default values for 
each output field when the pattern doesn't match.
+
+## Options
+
+| name           | type    | required | default value |
+|----------------|---------|----------|---------------|
+| source_field   | string  | yes      |               |
+| regex_pattern  | string  | yes      |               |
+| output_fields  | array   | yes      |               |
+| default_values | array   | no       |               |
+
+### source_field [string]
+
+The source field name to extract data from.
+
+### regex_pattern [string]
+
+The regular expression pattern with capture groups. The number of capture 
groups must match the number of output fields.
+
+### output_fields [array]
+
+The names of the output fields for extracted values. The size must match the 
number of capture groups in the regex pattern.
+
+### default_values [array]
+
+Default values for output fields when the regex pattern does not match or the 
source field is null. If provided, the size must match the number of output 
fields.
+
+
+## Example
+
+The data read from source is a table like this:
+
+| id | email              | log_entry                                          
  |
+|----|--------------------|------------------------------------------------------|
+| 1  | [email protected]  | 2023-12-01 10:30:45 INFO User login successful     
  |
+| 2  | [email protected]     | 2023-12-01 11:15:22 ERROR Database connection 
failed |
+| 3  | [email protected]   | 2023-12-01 12:00:00 WARN Memory usage high         
  |
+
+We want to extract username, domain, and top-level domain from the `email` 
field:
+
+```
+transform {
+  RegexExtract {
+    plugin_input = "fake"
+    plugin_output = "regex_result"
+    source_field = "email"
+    regex_pattern = "([^@]+)@([^.]+)\\.(.+)"
+    output_fields = ["username", "domain", "tld"]
+    default_values = ["unknown", "unknown", "unknown"]
+  }
+}
+```
+
+Then the data in result table `regex_result` will be:
+
+| id | email              | log_entry                                          
  | username | domain  | tld |
+|----|--------------------|------------------------------------------------------|----------|---------|-----|
+| 1  | [email protected]  | 2023-12-01 10:30:45 INFO User login successful     
  | user1    | example | com |
+| 2  | [email protected]     | 2023-12-01 11:15:22 ERROR Database connection 
failed | admin    | test    | org |
+| 3  | [email protected]   | 2023-12-01 12:00:00 WARN Memory usage high         
  | guest    | domain  | net |
+
+## Job Config Example
+
+```
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        email = "string"
+        log_entry = "string"
+      }
+    }
+    rows = [
+      {
+          kind = INSERT,
+          fields = [1, "[email protected]", "2023-12-01 10:30:45 INFO User 
login successful"]
+      },
+      {
+        kind = INSERT,
+        fields = [2, "[email protected]", "2023-12-01 11:15:22 ERROR Database 
connection failed"]
+      },
+      {
+        kind = INSERT,
+        fields = [3, "[email protected]", "2023-12-01 12:00:00 WARN Memory 
usage high"]
+      }
+    ]
+  }
+}
+
+transform {
+  RegexExtract {
+    plugin_input = "fake"
+    plugin_output = "regex_result"
+    source_field = "email"
+    regex_pattern = "([^@]+)@([^.]+)\\.(.+)"
+    output_fields = ["username", "domain", "tld"]
+    default_values = ["unknown", "unknown", "unknown"]
+  }
+}
+
+sink {
+  Console {
+    plugin_input = "regex_result"
+  }
+}
+```
+
+## Changelog
+
diff --git a/docs/zh/transform-v2/regexextract.md 
b/docs/zh/transform-v2/regexextract.md
new file mode 100644
index 0000000000..6ffc5a27fe
--- /dev/null
+++ b/docs/zh/transform-v2/regexextract.md
@@ -0,0 +1,122 @@
+# 正则提取
+
+> 正则提取转换插件
+
+## 描述
+
+`RegexExtract` 
转换插件使用正则表达式从指定字段中提取数据,并将提取的值输出到新字段中。它支持正则表达式中的捕获组,并允许在模式不匹配时为每个输出字段设置默认值。
+
+## 属性
+
+| 名称              | 类型       | 是否必须     | 默认值   |
+|-----------------|----------|----------|-------|
+| source_field    | string   | yes      |       |
+| regex_pattern   | string   | yes      |       |
+| output_fields   | array    | yes      |       |
+| default_values  | array    | no       |       |
+
+### source_field [string]
+
+要提取数据的源字段名称。
+
+### regex_pattern [string]
+
+带有捕获组的正则表达式模式。捕获组的数量必须与输出字段的数量匹配。
+
+### output_fields [array]
+
+提取值的输出字段名称。大小必须与正则表达式模式中的捕获组数量匹配。
+
+### default_values [array]
+
+当正则表达式模式不匹配或源字段为 null 时,输出字段的默认值。如果提供,大小必须与输出字段数量匹配。
+
+
+## 示例
+
+源端数据读取的表格如下:
+
+| id | email              | log_entry                                          
  |
+|----|--------------------|------------------------------------------------------|
+| 1  | [email protected]  | 2023-12-01 10:30:45 INFO User login successful     
  |
+| 2  | [email protected]     | 2023-12-01 11:15:22 ERROR Database connection 
failed |
+| 3  | [email protected]   | 2023-12-01 12:00:00 WARN Memory usage high         
  |
+
+我们想要从 `email` 字段中提取用户名、域名和顶级域名:
+
+```
+transform {
+  RegexExtract {
+    plugin_input = "fake"
+    plugin_output = "regex_result"
+    source_field = "email"
+    regex_pattern = "([^@]+)@([^.]+)\\.(.+)"
+    output_fields = ["username", "domain", "tld"]
+    default_values = ["unknown", "unknown", "unknown"]
+  }
+}
+```
+
+那么结果表 `regex_result` 中的数据将会更新为:
+
+| id | email              | log_entry                                          
  | username | domain  | tld |
+|----|--------------------|------------------------------------------------------|----------|---------|-----|
+| 1  | [email protected]  | 2023-12-01 10:30:45 INFO User login successful     
  | user1    | example | com |
+| 2  | [email protected]     | 2023-12-01 11:15:22 ERROR Database connection 
failed | admin    | test    | org |
+| 3  | [email protected]   | 2023-12-01 12:00:00 WARN Memory usage high         
  | guest    | domain  | net |
+
+## 作业配置示例
+
+```
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        email = "string"
+        log_entry = "string"
+      }
+    }
+    rows = [
+      {
+          kind = INSERT,
+          fields = [1, "[email protected]", "2023-12-01 10:30:45 INFO User 
login successful"]
+      },
+      {
+        kind = INSERT,
+        fields = [2, "[email protected]", "2023-12-01 11:15:22 ERROR Database 
connection failed"]
+      },
+      {
+        kind = INSERT,
+        fields = [3, "[email protected]", "2023-12-01 12:00:00 WARN Memory 
usage high"]
+      }
+    ]
+  }
+}
+
+transform {
+  RegexExtract {
+    plugin_input = "fake"
+    plugin_output = "regex_result"
+    source_field = "email"
+    regex_pattern = "([^@]+)@([^.]+)\\.(.+)"
+    output_fields = ["username", "domain", "tld"]
+    default_values = ["unknown", "unknown", "unknown"]
+  }
+}
+
+sink {
+  Console {
+    plugin_input = "regex_result"
+  }
+}
+```
+
+## 更新日志
+
diff --git 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
index 26e8ac55b7..d58e8f083d 100644
--- 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
+++ 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/TransformSpecificationCheckTest.java
@@ -43,7 +43,7 @@ public class TransformSpecificationCheckTest {
                 FactoryUtil.discoverFactories(
                         Thread.currentThread().getContextClassLoader(),
                         TableTransformFactory.class);
-        Assertions.assertEquals(19, factories.size());
+        Assertions.assertEquals(20, factories.size());
     }
 
     @Test
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRegexExtractIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRegexExtractIT.java
new file mode 100644
index 0000000000..21c2697198
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRegexExtractIT.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestRegexExtractIT extends TestSuiteBase {
+
+    @TestTemplate
+    public void testRegexExtract(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/regexextract/regex_extract_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testRegexExtractMultiTable(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/regexextract/regex_extract_transform_multi_table.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/regexextract/regex_extract_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/regexextract/regex_extract_transform.conf
new file mode 100644
index 0000000000..5c75b2758b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/regexextract/regex_extract_transform.conf
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        email = "string"
+        log_entry = "string"
+      }
+    }
+    rows = [
+      {
+          kind = INSERT,
+          fields = [1, "[email protected]", "2023-12-01 10:30:45 INFO User 
login successful"]
+      },
+      {
+        kind = INSERT,
+        fields = [2, "[email protected]", "2023-12-01 11:15:22 ERROR Database 
connection failed"]
+      },
+      {
+        kind = INSERT,
+        fields = [3, "[email protected]", "2023-12-01 12:00:00 WARN Memory 
usage high"]
+      }
+    ]
+  }
+}
+
+transform {
+  RegexExtract {
+    plugin_input = "fake"
+    plugin_output = "regex_result"
+    source_field = "email"
+    regex_pattern = "([^@]+)@([^.]+)\\.(.+)"
+    output_fields = ["username", "domain", "tld"]
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "regex_result"
+    rules = {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 3
+        }
+      ],
+      field_rules = [
+        {
+          field_name = username
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = domain
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = tld
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/regexextract/regex_extract_transform_multi_table.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/regexextract/regex_extract_transform_multi_table.conf
new file mode 100644
index 0000000000..71ae5e4504
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/regexextract/regex_extract_transform_multi_table.conf
@@ -0,0 +1,152 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    tables_configs = [
+      {
+        row.num = 20
+        schema = {
+          table = "test.user_logs"
+          fields {
+            id = "int"
+            access_info = "string"
+          }
+        }
+        rows = [
+          {
+            kind = INSERT,
+            fields = [1, "2023-12-01 10:30:45 user:[email protected] login"]
+          },
+          {
+            kind = INSERT,
+            fields = [2, "2023-12-01 11:15:22 user:[email protected] error"]
+          }
+        ]
+      },
+      {
+        row.num = 30
+        schema = {
+          table = "test.access_logs"
+          fields {
+            id = "int"
+            access_info = "string"
+          }
+        }
+        rows = [
+          {
+            kind = INSERT,
+            fields = [1, "2023-12-01 11:15:22 user:[email protected]"]
+          },
+          {
+            kind = INSERT,
+            fields = [2, "2023-12-01 11:15:22 user:[email protected]"]
+          }
+        ]
+      }
+    ]
+  }
+}
+
+transform {
+  RegexExtract {
+    plugin_input = "fake"
+    plugin_output = "regex_result"
+    source_field = "access_info"
+    regex_pattern = 
"(\\d{4}-\\d{2}-\\d{2})\\s+(\\d{2}:\\d{2}:\\d{2})\\s+([^@]+@[^\\s]+)"
+    output_fields = ["date", "time", "email"]
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = "regex_result"
+    rules = {
+      tables_configs = [
+        {
+          table_path = "test.user_logs"
+          field_rules = [
+            {
+              field_name = date
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+            {
+              field_name = time
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+            {
+              field_name = email
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }
+          ]
+        },
+        {
+          table_path = "test.access_logs"
+          field_rules = [
+            {
+              field_name = date
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+            {
+              field_name = time
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+            {
+              field_name = email
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
new file mode 100644
index 0000000000..937f69f725
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractMultiCatalogTransform.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.regexextract;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+
+import java.util.List;
+
+public class RegexExtractMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
+
+    public RegexExtractMultiCatalogTransform(
+            List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+        super(inputCatalogTables, config);
+    }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
+            CatalogTable inputCatalogTable, ReadonlyConfig config) {
+        return new 
RegexExtractTransform(RegexExtractTransformConfig.of(config), 
inputCatalogTable);
+    }
+
+    @Override
+    public String getPluginName() {
+        return RegexExtractTransform.PLUGIN_NAME;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransform.java
new file mode 100644
index 0000000000..809ee5bff3
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransform.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.regexextract;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Slf4j
+public class RegexExtractTransform extends MultipleFieldOutputTransform {
+    public static final String PLUGIN_NAME = "RegexExtract";
+
+    private final RegexExtractTransformConfig config;
+    private final Pattern pattern;
+    private final int sourceFieldIndex;
+
+    public RegexExtractTransform(
+            @NonNull RegexExtractTransformConfig config, @NonNull CatalogTable 
catalogTable) {
+        super(catalogTable);
+        this.config = config;
+        this.pattern = Pattern.compile(config.getRegexPattern());
+
+        try {
+            sourceFieldIndex = 
catalogTable.getTableSchema().indexOf(config.getSourceField());
+        } catch (IllegalArgumentException e) {
+            throw TransformCommonError.cannotFindInputFieldError(
+                    getPluginName(), config.getSourceField());
+        }
+        int groupCount = pattern.matcher("").groupCount();
+        int outputFieldsSize = config.getOutputFields().size();
+        if (groupCount != outputFieldsSize) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Regex group count (%d) must equal output fields 
size (%d)",
+                            groupCount, outputFieldsSize));
+        }
+
+        List<String> defaultValues = config.getDefaultValues();
+        if (defaultValues != null
+                && !defaultValues.isEmpty()
+                && defaultValues.size() != outputFieldsSize) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Default values size (%d) must equal output fields 
size (%d)",
+                            defaultValues.size(), outputFieldsSize));
+        }
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+        Object sourceValue = inputRow.getField(sourceFieldIndex);
+
+        if (sourceValue == null) {
+            Object[] result = new Object[config.getOutputFields().size()];
+            fillWithDefaultValues(result);
+            return result;
+        }
+
+        Matcher sourceFieldMatcher = pattern.matcher(sourceValue.toString());
+        Object[] result = new Object[config.getOutputFields().size()];
+        if (!sourceFieldMatcher.find()) {
+            fillWithDefaultValues(result);
+            return result;
+        }
+
+        for (int i = 0; i < result.length; i++) {
+            result[i] = sourceFieldMatcher.group(i + 1);
+        }
+        return result;
+    }
+
+    @Override
+    protected Column[] getOutputColumns() {
+        return config.getOutputFields().stream()
+                .map(
+                        fieldName ->
+                                PhysicalColumn.of(
+                                        fieldName, BasicType.STRING_TYPE, 200, 
true, "", ""))
+                .toArray(Column[]::new);
+    }
+
+    private void fillWithDefaultValues(Object[] result) {
+        for (int i = 0; i < result.length; i++) {
+            result[i] = getDefaultValue(i);
+        }
+    }
+
+    private String getDefaultValue(int index) {
+        List<String> defaultValues = config.getDefaultValues();
+        if (defaultValues == null || defaultValues.isEmpty()) {
+            return null;
+        }
+        return defaultValues.get(index);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformConfig.java
new file mode 100644
index 0000000000..5cde8ef0fa
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformConfig.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.regexextract;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@Setter
+public class RegexExtractTransformConfig implements Serializable {
+    public static final String PLUGIN_NAME = "RegexExtract";
+
+    public static final Option<String> KEY_REGEX_PATTERN =
+            Options.key("regex_pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Regex pattern with capture groups");
+
+    public static final Option<String> KEY_SOURCE_FIELD =
+            Options.key("source_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Source field to extract from");
+
+    public static final Option<List<String>> KEY_OUTPUT_FIELDS =
+            Options.key("output_fields")
+                    .listType(String.class)
+                    .noDefaultValue()
+                    .withDescription("Output field names for extracted 
groups");
+
+    public static final Option<List<String>> KEY_DEFAULT_VALUES =
+            Options.key("default_values")
+                    .listType(String.class)
+                    .noDefaultValue()
+                    .withDescription(
+                            "Default values for output fields when regex 
pattern does not match");
+
+    private String regexPattern;
+    private String sourceField;
+    private List<String> outputFields;
+    private final List<String> defaultValues;
+
+    public RegexExtractTransformConfig(
+            String sourceField,
+            String regexPattern,
+            List<String> outputFields,
+            List<String> defaultValues) {
+        this.sourceField = sourceField;
+        this.regexPattern = regexPattern;
+        this.outputFields = outputFields;
+        this.defaultValues = defaultValues;
+    }
+
+    public static RegexExtractTransformConfig of(ReadonlyConfig config) {
+        return new RegexExtractTransformConfig(
+                config.get(KEY_SOURCE_FIELD),
+                config.get(KEY_REGEX_PATTERN),
+                config.get(KEY_OUTPUT_FIELDS),
+                config.get(KEY_DEFAULT_VALUES));
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformErrorCode.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformErrorCode.java
new file mode 100644
index 0000000000..29dbcde272
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformErrorCode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.regexextract;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum RegexExtractTransformErrorCode implements SeaTunnelErrorCode {
+    REGEX_EXTRACT_ERROR(
+            "REGEX_EXTRACT_ERROR_CODE-01", "JsonPathTransform config columns 
must not empty");
+    private final String code;
+    private final String description;
+
+    RegexExtractTransformErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformFactory.java
new file mode 100644
index 0000000000..d6c6fd3a49
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.regexextract;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RegexExtractTransformFactory implements TableTransformFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "RegexExtract";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(
+                        RegexExtractTransformConfig.KEY_SOURCE_FIELD,
+                        RegexExtractTransformConfig.KEY_REGEX_PATTERN,
+                        RegexExtractTransformConfig.KEY_OUTPUT_FIELDS)
+                .optional(
+                        RegexExtractTransformConfig.KEY_DEFAULT_VALUES,
+                        TransformCommonOptions.MULTI_TABLES)
+                .build();
+    }
+
+    @Override
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        return () ->
+                new RegexExtractMultiCatalogTransform(
+                        context.getCatalogTables(), context.getOptions());
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/RegexExtractTransformFactoryTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/RegexExtractTransformFactoryTest.java
new file mode 100644
index 0000000000..a1acfdea7c
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/RegexExtractTransformFactoryTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import 
org.apache.seatunnel.transform.regexextract.RegexExtractTransformFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class RegexExtractTransformFactoryTest {
+    @Test
+    public void testOptionRule() throws Exception {
+        RegexExtractTransformFactory regexExtractTransformFactory =
+                new RegexExtractTransformFactory();
+        Assertions.assertNotNull(regexExtractTransformFactory.optionRule());
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformTest.java
new file mode 100644
index 0000000000..7ed5a29b93
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/regexextract/RegexExtractTransformTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.regexextract;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RegexExtractTransformTest {
+
+    private CatalogTable catalogTable;
+
+    @BeforeEach
+    void setUp() {
+        catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("default", "default", "default", 
"test"),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "text", BasicType.STRING_TYPE, 
1000, true, "", ""))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "id", BasicType.INT_TYPE, 0, 
true, "", ""))
+                                .build(),
+                        new HashMap<>(),
+                        Arrays.asList(),
+                        "");
+    }
+
+    @Test
+    void testGetProducedCatalogTable() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("source_field", "text");
+        configMap.put("regex_pattern", "(\\w+)@(\\w+\\.\\w+)");
+        configMap.put("output_fields", Arrays.asList("username", "domain"));
+
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        RegexExtractTransformConfig transformConfig = 
RegexExtractTransformConfig.of(config);
+        RegexExtractTransform transform = new 
RegexExtractTransform(transformConfig, catalogTable);
+
+        CatalogTable outputTable = transform.getProducedCatalogTable();
+        Column usernameColumn = 
outputTable.getTableSchema().getColumn("username");
+        Column domainColumn = outputTable.getTableSchema().getColumn("domain");
+
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
usernameColumn.getDataType());
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
domainColumn.getDataType());
+        Assertions.assertEquals(200, usernameColumn.getColumnLength());
+        Assertions.assertEquals(200, domainColumn.getColumnLength());
+    }
+}


Reply via email to