This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9c16d7c7ea [Feature][Transform] Add table filter transform (#9189)
9c16d7c7ea is described below

commit 9c16d7c7ea5f4b334d653164633687e1f8c0cd94
Author: hailin0 <[email protected]>
AuthorDate: Sun Apr 27 09:36:44 2025 +0800

    [Feature][Transform] Add table filter transform (#9189)
---
 docs/en/transform-v2/table-filter.md               |  51 +++++
 docs/zh/transform-v2/table-filter.md               |  51 +++++
 plugin-mapping.properties                          |   1 +
 .../seatunnel/e2e/transform/TestTableFilterIT.java |  41 ++++
 .../test/resources/table_filter_multi_table.conf   | 208 +++++++++++++++++++++
 .../engine/core/dag/actions/ShuffleAction.java     |  35 ----
 .../engine/core/dag/actions/ShuffleConfig.java     |  42 -----
 .../dag/actions/ShuffleMultipleRowStrategy.java    | 117 ------------
 .../core/dag/actions/ShufflePartitionStrategy.java | 116 ------------
 .../engine/core/dag/actions/ShuffleStrategy.java   |  70 -------
 .../dag/execution/ExecutionPlanGenerator.java      | 103 +---------
 .../server/dag/execution/PipelineGenerator.java    |  10 +-
 .../server/dag/physical/PhysicalPlanGenerator.java | 154 +--------------
 .../engine/server/task/SeaTunnelTask.java          |  25 ---
 .../server/task/flow/ShuffleSinkFlowLifeCycle.java | 164 ----------------
 .../task/flow/ShuffleSourceFlowLifeCycle.java      | 159 ----------------
 .../transform/table/TableFilterConfig.java         | 137 ++++++++++++++
 .../table/TableFilterMultiCatalogTransform.java    |  95 ++++++++++
 .../transform/table/TableFilterTransform.java      |  60 ++++++
 .../table/TableFilterTransformFactory.java         |  55 ++++++
 20 files changed, 707 insertions(+), 987 deletions(-)

diff --git a/docs/en/transform-v2/table-filter.md 
b/docs/en/transform-v2/table-filter.md
new file mode 100644
index 0000000000..9223703759
--- /dev/null
+++ b/docs/en/transform-v2/table-filter.md
@@ -0,0 +1,51 @@
+# TableMerge
+
+> TableFilter transform plugin
+
+## Description
+
+TableFilter transform plugin for filter tables.
+
+## Options
+
+|       name       | type   | required | default value | Description           
                                                                                
                                                                |
+|:----------------:|--------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| database_pattern | string | no       |               | Specify database 
filter pattern, the default value is null, which means no filtering. If you 
want to filter the database name, please set it to a regular expression. |
+|  schema_pattern  | string | no       |               | Specify schema filter 
pattern, the default value is null, which means no filtering. If you want to 
filter the schema name, please set it to a regular expression.     |
+|  table_pattern   | string | no       |               | Specify table filter 
pattern, the default value is null, which means no filtering. If you want to 
filter the table name, please set it to a regular expression.       |
+|   pattern_mode   | string | no       | INCLUDE       | Specify pattern mode, 
the default value is INCLUDE, which means include the matched table. If you 
want to exclude the matched table, please set it to EXCLUDE.        |
+
+## Examples
+
+### Include filter tables
+
+Include filter tables with the name matching the regular expression `user_\d+` 
in the database `test`.
+
+```hocon
+transform {
+    TableFilter {
+        plugin_input = "source1"
+        plugin_output = "transform_a_1"
+    
+        database_pattern = "test"
+        table_pattern = "user_\\d+"
+    }
+}
+```
+
+### Exclude filter tables
+
+Exclude filter tables with the name matching the regular expression `user_\d+` 
in the database `test`.
+
+```hocon
+transform {
+    TableFilter {
+        plugin_input = "source1"
+        plugin_output = "transform_a_1"
+    
+        database_pattern = "test"
+        table_pattern = "user_\\d+"
+        pattern_mode = "EXCLUDE"
+    }
+}
+```
\ No newline at end of file
diff --git a/docs/zh/transform-v2/table-filter.md 
b/docs/zh/transform-v2/table-filter.md
new file mode 100644
index 0000000000..031bfd66e1
--- /dev/null
+++ b/docs/zh/transform-v2/table-filter.md
@@ -0,0 +1,51 @@
+# TableMerge
+
+> TableFilter transform plugin
+
+## Description
+
+表过滤 transform,用于正向或者反向过滤部分表
+
+## Options
+
+|       name       | type   | required | default value | Description           
                                 |
+|:----------------:|--------|----------|---------------|--------------------------------------------------------|
+| database_pattern | string | no       |               | 指定数据库过滤模式,默认值为 
null,表示不过滤。如果要过滤数据库名称,请将其设置为正则表达式。      |
+|  schema_pattern  | string | no       |               | 指定 schema 过滤模式,默认值为 
null,表示不过滤。如果要过滤架构名称,请将其设置为正则表达式。  |
+|  table_pattern   | string | no       |               | 指定表过滤模式,默认值为 
null,表示不过滤。如果要过滤表名称,请将其设置为正则表达式。          |
+|   pattern_mode   | string | no       | INCLUDE       | 指定过滤模式,默认值为 
INCLUDE,表示包含匹配的表。如果要排除匹配的表,请将其设置为 EXCLUDE。 |
+
+## Examples
+
+### 包含表过滤
+
+在数据库 "test" 中包含名称与正则表达式 "user_\d+" 匹配的过滤表。
+
+```hocon
+transform {
+    TableFilter {
+        plugin_input = "source1"
+        plugin_output = "transform_a_1"
+    
+        database_pattern = "test"
+        table_pattern = "user_\\d+"
+    }
+}
+```
+
+### 排除表过滤
+
+排除数据库 "test" 中名称与正则表达式 "user_\d+" 匹配的过滤表。
+
+```hocon
+transform {
+    TableFilter {
+        plugin_input = "source1"
+        plugin_output = "transform_a_1"
+    
+        database_pattern = "test"
+        table_pattern = "user_\\d+"
+        pattern_mode = "EXCLUDE"
+    }
+}
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index aa1f390634..1c3b5e7ab7 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -161,3 +161,4 @@ seatunnel.transform.Metadata = seatunnel-transforms-v2
 seatunnel.transform.FieldRename = seatunnel-transforms-v2
 seatunnel.transform.TableRename = seatunnel-transforms-v2
 seatunnel.transform.TableMerge = seatunnel-transforms-v2
+seatunnel.transform.TableFilter = seatunnel-transforms-v2
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
new file mode 100644
index 0000000000..f4d9216f50
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestTableFilterIT extends TestSuiteBase {
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Only support for seatunnel")
+    @TestTemplate
+    public void testFilterMultiTable(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/table_filter_multi_table.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table.conf
new file mode 100644
index 0000000000..f032f722e4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table.conf
@@ -0,0 +1,208 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "source1"
+
+    tables_configs = [
+      {
+        row.num = 3
+        schema = {
+          table = "test.user_1"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            }
+          ]
+        }
+      },
+      {
+        row.num = 3
+        schema = {
+          table = "test.user_2"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "name"
+              type = "string"
+            }
+          ]
+        }
+      },
+      {
+        row.num = 5
+        schema = {
+          table = "test.xyz"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "age"
+              type = "int"
+            }
+          ]
+        }
+      }
+    ]
+  }
+}
+transform {
+  TableFilter {
+    plugin_input = "source1"
+    plugin_output = "transform_a_1"
+
+    database_pattern = "test"
+    table_pattern = "user_\\d+"
+  }
+  TableRename {
+      plugin_input = "transform_a_1"
+      plugin_output = "transform_a_2"
+
+      prefix = "table_a_"
+    }
+
+
+
+  TableFilter {
+    plugin_input = "source1"
+    plugin_output = "transform_b_1"
+
+    database_pattern = "test"
+    table_pattern = "xyz"
+  }
+    TableRename {
+        plugin_input = "transform_b_1"
+        plugin_output = "transform_b_2"
+
+        prefix = "table_b_"
+      }
+}
+sink {
+  Assert {
+    plugin_input = "transform_a_2"
+
+    rules =
+      {
+        tables_configs = [
+          {
+            table_path = "test.table_a_user_1"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 3
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 3
+              }
+            ]
+          },
+          {
+              table_path = "test.table_a_user_2"
+              row_rules = [
+                {
+                  rule_type = MAX_ROW
+                  rule_value = 3
+                },
+                {
+                  rule_type = MIN_ROW
+                  rule_value = 3
+                }
+              ]
+          },
+        {
+          table_path = "test.table_a_xyz"
+          row_rules = [
+            {
+              rule_type = MAX_ROW
+              rule_value = 0
+            },
+            {
+              rule_type = MIN_ROW
+              rule_value = 0
+            }
+          ]
+        }
+        ]
+      }
+  }
+
+  Assert {
+      plugin_input = "transform_b_2"
+
+      rules =
+        {
+          tables_configs = [
+            {
+              table_path = "test.table_b_user_1"
+              row_rules = [
+                {
+                  rule_type = MAX_ROW
+                  rule_value = 0
+                },
+                {
+                  rule_type = MIN_ROW
+                  rule_value = 0
+                }
+              ]
+            },
+            {
+                table_path = "test.table_b_user_2"
+                row_rules = [
+                  {
+                    rule_type = MAX_ROW
+                    rule_value = 0
+                  },
+                  {
+                    rule_type = MIN_ROW
+                    rule_value = 0
+                  }
+                ]
+            },
+          {
+            table_path = "test.table_b_xyz"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 5
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 5
+              }
+            ]
+          }
+          ]
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
deleted file mode 100644
index cbfa44a5eb..0000000000
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.dag.actions;
-
-import lombok.NonNull;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-
-public class ShuffleAction extends AbstractAction {
-
-    public ShuffleAction(long id, @NonNull String name, @NonNull ShuffleConfig 
shuffleConfig) {
-        super(id, name, new ArrayList<>(), new HashSet<>(), new HashSet<>(), 
shuffleConfig);
-    }
-
-    @Override
-    public ShuffleConfig getConfig() {
-        return (ShuffleConfig) super.getConfig();
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
deleted file mode 100644
index 908bf80780..0000000000
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.dag.actions;
-
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.experimental.Tolerate;
-
-import java.util.concurrent.TimeUnit;
-
-@Getter
-@Setter
-@ToString
-@Builder(toBuilder = true)
-public class ShuffleConfig implements Config {
-    public static final int DEFAULT_BATCH_SIZE = 1024;
-    public static final long DEFAULT_BATCH_FLUSH_INTERVAL = 
TimeUnit.SECONDS.toMillis(3);
-
-    @Builder.Default private int batchSize = DEFAULT_BATCH_SIZE;
-    @Builder.Default private long batchFlushInterval = 
DEFAULT_BATCH_FLUSH_INTERVAL;
-    private ShuffleStrategy shuffleStrategy;
-
-    @Tolerate
-    public ShuffleConfig() {}
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
deleted file mode 100644
index 707fe82499..0000000000
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.dag.actions;
-
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
-import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-
-import com.hazelcast.collection.IQueue;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.experimental.SuperBuilder;
-import lombok.experimental.Tolerate;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-@Slf4j
-@SuperBuilder(toBuilder = true)
-@Getter
-@Setter
-@ToString
-public class ShuffleMultipleRowStrategy extends ShuffleStrategy {
-    private List<CatalogTable> catalogTables;
-    private String targetTableId;
-
-    @Tolerate
-    public ShuffleMultipleRowStrategy() {}
-
-    @Override
-    public Map<String, IQueue<Record<?>>> createShuffles(
-            HazelcastInstance hazelcast, int pipelineId, int inputIndex) {
-        Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
-        for (CatalogTable entry : catalogTables) {
-            String tableId = entry.getTableId().toTablePath().toString();
-            String queueName = generateQueueName(pipelineId, inputIndex, 
tableId);
-            IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
-            // clear old data when job restore
-            queue.clear();
-            shuffleMap.put(queueName, queue);
-        }
-
-        log.info(
-                "pipeline[{}] / reader[{}] assigned shuffle queue list: {}",
-                pipelineId,
-                inputIndex,
-                shuffleMap.keySet());
-
-        return shuffleMap;
-    }
-
-    @Override
-    public String createShuffleKey(Record<?> record, int pipelineId, int 
inputIndex) {
-        String tableId;
-        if (record.getData() instanceof SeaTunnelRow) {
-            tableId = ((SeaTunnelRow) record.getData()).getTableId();
-        } else if (record.getData() instanceof SchemaChangeEvent) {
-            tableId = ((SchemaChangeEvent) 
record.getData()).tablePath().toString();
-        } else {
-            throw new UnsupportedOperationException("Unsupported record: " + 
record);
-        }
-        return generateQueueName(pipelineId, inputIndex, tableId);
-    }
-
-    @Override
-    public IQueue<Record<?>>[] getShuffles(
-            HazelcastInstance hazelcast, int pipelineId, int targetIndex) {
-        IQueue<Record<?>>[] queues = new IQueue[getInputPartitions()];
-        for (int inputIndex = 0; inputIndex < getInputPartitions(); 
inputIndex++) {
-            Objects.requireNonNull(targetTableId);
-            String queueName = generateQueueName(pipelineId, inputIndex, 
targetTableId);
-            queues[inputIndex] = getIQueue(hazelcast, queueName);
-        }
-
-        log.info(
-                "pipeline[{}] / writer[{}] assigned shuffle queue list: {}",
-                pipelineId,
-                targetIndex,
-                Stream.of(queues).map(e -> 
e.getName()).collect(Collectors.toList()));
-
-        return queues;
-    }
-
-    private String generateQueueName(int pipelineId, int inputIndex, String 
tableId) {
-        return "ShuffleMultipleRow-Queue_"
-                + getJobId()
-                + "_"
-                + pipelineId
-                + "_"
-                + inputIndex
-                + "_"
-                + tableId;
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
deleted file mode 100644
index 45144d210f..0000000000
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.dag.actions;
-
-import org.apache.seatunnel.api.table.type.Record;
-
-import com.hazelcast.collection.IQueue;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.experimental.SuperBuilder;
-import lombok.experimental.Tolerate;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
-
-@Slf4j
-@SuperBuilder
-@Getter
-@Setter
-@ToString
-public class ShufflePartitionStrategy extends ShuffleStrategy {
-    private final Map<Integer, String[]> inputQueueMapping = new HashMap<>();
-    private int targetPartitions;
-
-    @Tolerate
-    public ShufflePartitionStrategy() {}
-
-    @Override
-    public Map<String, IQueue<Record<?>>> createShuffles(
-            HazelcastInstance hazelcast, int pipelineId, int inputIndex) {
-        checkArgument(inputIndex >= 0 && inputIndex < getInputPartitions());
-        Map<String, IQueue<Record<?>>> shuffleMap = new LinkedHashMap<>();
-        for (int targetIndex = 0; targetIndex < targetPartitions; 
targetIndex++) {
-            String queueName = generateQueueName(pipelineId, inputIndex, 
targetIndex);
-            IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
-            // clear old data when job restore
-            queue.clear();
-            shuffleMap.put(queueName, queue);
-        }
-
-        log.info(
-                "pipeline[{}] / reader[{}] assigned shuffle queue list: {}",
-                pipelineId,
-                inputIndex,
-                shuffleMap.keySet());
-
-        return shuffleMap;
-    }
-
-    @Override
-    public String createShuffleKey(Record<?> record, int pipelineId, int 
inputIndex) {
-        String[] inputQueueNames =
-                inputQueueMapping.computeIfAbsent(
-                        inputIndex,
-                        key -> {
-                            String[] queueNames = new String[targetPartitions];
-                            for (int targetIndex = 0;
-                                    targetIndex < targetPartitions;
-                                    targetIndex++) {
-                                queueNames[targetIndex] =
-                                        generateQueueName(pipelineId, key, 
targetIndex);
-                            }
-                            return queueNames;
-                        });
-        return 
inputQueueNames[ThreadLocalRandom.current().nextInt(targetPartitions)];
-    }
-
-    @Override
-    public IQueue<Record<?>>[] getShuffles(
-            HazelcastInstance hazelcast, int pipelineId, int targetIndex) {
-        checkArgument(targetIndex >= 0 && targetIndex < targetPartitions);
-        IQueue<Record<?>>[] shuffles = new IQueue[getInputPartitions()];
-        for (int inputIndex = 0; inputIndex < getInputPartitions(); 
inputIndex++) {
-            String queueName = generateQueueName(pipelineId, inputIndex, 
targetIndex);
-            shuffles[inputIndex] = getIQueue(hazelcast, queueName);
-        }
-
-        log.info(
-                "pipeline[{}] / writer[{}] assigned shuffle queue list: {}",
-                pipelineId,
-                targetIndex,
-                Stream.of(shuffles).map(e -> 
e.getName()).collect(Collectors.toList()));
-
-        return shuffles;
-    }
-
-    private String generateQueueName(int pipelineId, int inputIndex, int 
targetIndex) {
-        return String.format(
-                "ShufflePartition-Queue_%s_%s_%s_%s",
-                getJobId(), pipelineId, inputIndex, targetIndex);
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
deleted file mode 100644
index 43feac3572..0000000000
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.dag.actions;
-
-import org.apache.seatunnel.api.table.type.Record;
-
-import com.hazelcast.collection.IQueue;
-import com.hazelcast.config.QueueConfig;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.experimental.SuperBuilder;
-import lombok.experimental.Tolerate;
-
-import java.io.Serializable;
-import java.util.Map;
-
-@SuperBuilder(toBuilder = true)
-@Getter
-@Setter
-@ToString
-public abstract class ShuffleStrategy implements Serializable {
-    private static final int DEFAULT_QUEUE_SIZE = 2048;
-    private static final int DEFAULT_QUEUE_BACKUP_COUNT = 0;
-    private static final int DEFAULT_QUEUE_ASYNC_BACKUP_COUNT = 0;
-
-    protected long jobId;
-    protected int inputPartitions;
-    @Builder.Default protected int queueMaxSize = DEFAULT_QUEUE_SIZE;
-    @Builder.Default protected int queueBackupCount = 
DEFAULT_QUEUE_BACKUP_COUNT;
-    @Builder.Default protected int queueAsyncBackupCount = 
DEFAULT_QUEUE_ASYNC_BACKUP_COUNT;
-    protected int queueEmptyQueueTtl;
-
-    @Tolerate
-    public ShuffleStrategy() {}
-
-    public abstract Map<String, IQueue<Record<?>>> createShuffles(
-            HazelcastInstance hazelcast, int pipelineId, int inputIndex);
-
-    public abstract String createShuffleKey(Record<?> record, int pipelineId, 
int inputIndex);
-
-    public abstract IQueue<Record<?>>[] getShuffles(
-            HazelcastInstance hazelcast, int pipelineId, int targetIndex);
-
-    protected IQueue<Record<?>> getIQueue(HazelcastInstance hazelcast, String 
queueName) {
-        QueueConfig targetQueueConfig = 
hazelcast.getConfig().getQueueConfig(queueName);
-        targetQueueConfig.setMaxSize(queueMaxSize);
-        targetQueueConfig.setBackupCount(queueBackupCount);
-        targetQueueConfig.setAsyncBackupCount(queueAsyncBackupCount);
-        targetQueueConfig.setEmptyQueueTtl(queueEmptyQueueTtl);
-        return hazelcast.getQueue(queueName);
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 866af5bedb..0ebef5e050 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -17,16 +17,10 @@
 
 package org.apache.seatunnel.engine.server.dag.execution;
 
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -47,11 +41,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -80,27 +72,21 @@ public class ExecutionPlanGenerator {
         Set<ExecutionEdge> executionEdges = 
generateExecutionEdges(logicalPlan.getEdges());
         log.debug("Phase 1: generate execution edge list {}", executionEdges);
 
-        executionEdges = generateShuffleEdges(executionEdges);
-        log.debug("Phase 2: generate shuffle edge list {}", executionEdges);
-
         executionEdges = generateTransformChainEdges(executionEdges);
-        log.debug("Phase 3: generate transform chain edge list {}", 
executionEdges);
+        log.debug("Phase 2: generate transform chain edge list {}", 
executionEdges);
 
         List<Pipeline> pipelines = generatePipelines(executionEdges);
-        log.debug("Phase 4: generate pipeline list {}", pipelines);
+        log.debug("Phase 3: generate pipeline list {}", pipelines);
 
         ExecutionPlan executionPlan = new ExecutionPlan(pipelines, 
jobImmutableInformation);
-        log.debug("Phase 5: generate execution plan: {}", executionPlan);
+        log.debug("Phase 4: generate execution plan: {}", executionPlan);
 
         return executionPlan;
     }
 
     public static Action recreateAction(Action action, Long id, int 
parallelism) {
         Action newAction;
-        if (action instanceof ShuffleAction) {
-            newAction =
-                    new ShuffleAction(id, action.getName(), ((ShuffleAction) 
action).getConfig());
-        } else if (action instanceof SinkAction) {
+        if (action instanceof SinkAction) {
             newAction =
                     new SinkAction<>(
                             id,
@@ -202,87 +188,6 @@ public class ExecutionPlanGenerator {
         return executionEdges;
     }
 
-    @SuppressWarnings("MagicNumber")
-    private Set<ExecutionEdge> generateShuffleEdges(Set<ExecutionEdge> 
executionEdges) {
-        Map<Long, List<ExecutionVertex>> targetVerticesMap = new 
LinkedHashMap<>();
-        Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
-        executionEdges.forEach(
-                edge -> {
-                    ExecutionVertex leftVertex = edge.getLeftVertex();
-                    ExecutionVertex rightVertex = edge.getRightVertex();
-                    if (leftVertex.getAction() instanceof SourceAction) {
-                        sourceExecutionVertices.add(leftVertex);
-                    }
-                    targetVerticesMap
-                            .computeIfAbsent(leftVertex.getVertexId(), id -> 
new ArrayList<>())
-                            .add(rightVertex);
-                });
-        if (sourceExecutionVertices.size() != 1) {
-            return executionEdges;
-        }
-        ExecutionVertex sourceExecutionVertex = 
sourceExecutionVertices.stream().findFirst().get();
-        Action sourceAction = sourceExecutionVertex.getAction();
-        List<CatalogTable> producedCatalogTables = new ArrayList<>();
-        if (sourceAction instanceof SourceAction) {
-            try {
-                producedCatalogTables =
-                        ((SourceAction<?, ?, ?>) sourceAction)
-                                .getSource()
-                                .getProducedCatalogTables();
-            } catch (UnsupportedOperationException e) {
-            }
-        } else if (sourceAction instanceof TransformChainAction) {
-            return executionEdges;
-        } else {
-            throw new SeaTunnelException(
-                    "source action must be SourceAction or 
TransformChainAction");
-        }
-        if (producedCatalogTables.size() <= 1
-                || 
targetVerticesMap.get(sourceExecutionVertex.getVertexId()).size() <= 1) {
-            return executionEdges;
-        }
-
-        List<ExecutionVertex> sinkVertices =
-                targetVerticesMap.get(sourceExecutionVertex.getVertexId());
-        Optional<ExecutionVertex> hasOtherAction =
-                sinkVertices.stream()
-                        .filter(vertex -> !(vertex.getAction() instanceof 
SinkAction))
-                        .findFirst();
-        checkArgument(!hasOtherAction.isPresent());
-
-        Set<ExecutionEdge> newExecutionEdges = new LinkedHashSet<>();
-        ShuffleStrategy shuffleStrategy =
-                ShuffleMultipleRowStrategy.builder()
-                        .jobId(jobImmutableInformation.getJobId())
-                        .inputPartitions(sourceAction.getParallelism())
-                        .catalogTables(producedCatalogTables)
-                        .queueEmptyQueueTtl(
-                                (int)
-                                        
(engineConfig.getCheckpointConfig().getCheckpointInterval()
-                                                * 3))
-                        .build();
-        ShuffleConfig shuffleConfig =
-                
ShuffleConfig.builder().shuffleStrategy(shuffleStrategy).build();
-
-        long shuffleVertexId = idGenerator.getNextId();
-        String shuffleActionName = String.format("Shuffle [%s]", 
sourceAction.getName());
-        ShuffleAction shuffleAction =
-                new ShuffleAction(shuffleVertexId, shuffleActionName, 
shuffleConfig);
-        shuffleAction.setParallelism(sourceAction.getParallelism());
-        ExecutionVertex shuffleVertex =
-                new ExecutionVertex(shuffleVertexId, shuffleAction, 
shuffleAction.getParallelism());
-        ExecutionEdge sourceToShuffleEdge = new 
ExecutionEdge(sourceExecutionVertex, shuffleVertex);
-        newExecutionEdges.add(sourceToShuffleEdge);
-
-        for (ExecutionVertex sinkVertex : sinkVertices) {
-            sinkVertex.setParallelism(1);
-            sinkVertex.getAction().setParallelism(1);
-            ExecutionEdge shuffleToSinkEdge = new ExecutionEdge(shuffleVertex, 
sinkVertex);
-            newExecutionEdges.add(shuffleToSinkEdge);
-        }
-        return newExecutionEdges;
-    }
-
     private Set<ExecutionEdge> generateTransformChainEdges(Set<ExecutionEdge> 
executionEdges) {
         Map<Long, List<ExecutionVertex>> inputVerticesMap = new HashMap<>();
         Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index f0a92a6691..e63138dad7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.engine.server.dag.execution;
 
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -135,10 +134,7 @@ public class PipelineGenerator {
 
     /** If this execution vertex have partition transform, can't be spilt */
     private boolean checkCanSplit(List<ExecutionEdge> edges) {
-        return edges.stream()
-                        .noneMatch(e -> e.getRightVertex().getAction() 
instanceof ShuffleAction)
-                && edges.stream()
-                        .anyMatch(e -> 
inputVerticesMap.get(e.getRightVertexId()).size() > 1);
+        return edges.stream().anyMatch(e -> 
inputVerticesMap.get(e.getRightVertexId()).size() > 1);
     }
 
     private void splitUnionVertex(
@@ -184,9 +180,7 @@ public class PipelineGenerator {
         long id = idGenerator.getNextId();
         Action action = vertex.getAction();
         return new ExecutionVertex(
-                id,
-                ExecutionPlanGenerator.recreateAction(action, id, parallelism),
-                action instanceof ShuffleAction ? vertex.getParallelism() : 
parallelism);
+                id, ExecutionPlanGenerator.recreateAction(action, id, 
parallelism), parallelism);
     }
 
     private void fillVerticesMap(List<ExecutionEdge> edges) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 0bc7e46d5f..b413be0cd0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -27,10 +27,6 @@ import 
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
@@ -191,9 +187,6 @@ public class PhysicalPlanGenerator {
                                             getSourceTask(
                                                     edges, sources, 
pipelineId, totalPipelineNum);
 
-                                    physicalVertexList.addAll(
-                                            getShuffleTask(edges, pipelineId, 
totalPipelineNum));
-
                                     CompletableFuture<PipelineStatus> 
pipelineFuture =
                                             new CompletableFuture<>();
                                     waitForCompleteBySubPlanList.add(
@@ -323,149 +316,6 @@ public class PhysicalPlanGenerator {
                 .collect(Collectors.toList());
     }
 
-    private List<PhysicalVertex> getShuffleTask(
-            List<ExecutionEdge> edges, int pipelineIndex, int 
totalPipelineNum) {
-        return edges.stream()
-                .filter(s -> s.getLeftVertex().getAction() instanceof 
ShuffleAction)
-                .map(q -> (ShuffleAction) q.getLeftVertex().getAction())
-                .collect(Collectors.toSet())
-                .stream()
-                .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, 
q)))
-                .flatMap(
-                        flow -> {
-                            List<PhysicalVertex> physicalVertices = new 
ArrayList<>();
-
-                            ShuffleAction shuffleAction = (ShuffleAction) 
flow.getAction();
-                            ShuffleConfig shuffleConfig = 
shuffleAction.getConfig();
-                            ShuffleStrategy shuffleStrategy = 
shuffleConfig.getShuffleStrategy();
-                            if (shuffleStrategy instanceof 
ShuffleMultipleRowStrategy) {
-                                ShuffleMultipleRowStrategy 
shuffleMultipleRowStrategy =
-                                        (ShuffleMultipleRowStrategy) 
shuffleStrategy;
-                                AtomicInteger atomicInteger = new 
AtomicInteger(0);
-                                for (Flow nextFlow : flow.getNext()) {
-                                    PhysicalExecutionFlow sinkFlow =
-                                            (PhysicalExecutionFlow) nextFlow;
-                                    SinkAction sinkAction = (SinkAction) 
sinkFlow.getAction();
-                                    String sinkTableId =
-                                            
sinkAction.getConfig().getTablePath().toString();
-
-                                    int parallelismIndex = 
atomicInteger.getAndIncrement();
-                                    ShuffleStrategy shuffleStrategyOfSinkFlow =
-                                            shuffleMultipleRowStrategy
-                                                    .toBuilder()
-                                                    .targetTableId(sinkTableId)
-                                                    .build();
-                                    ShuffleConfig shuffleConfigOfSinkFlow =
-                                            shuffleConfig
-                                                    .toBuilder()
-                                                    
.shuffleStrategy(shuffleStrategyOfSinkFlow)
-                                                    .build();
-                                    String shuffleActionName =
-                                            String.format(
-                                                    "%s -> %s -> %s",
-                                                    shuffleAction.getName(),
-                                                    sinkTableId,
-                                                    sinkAction.getName());
-                                    ShuffleAction shuffleActionOfSinkFlow =
-                                            new ShuffleAction(
-                                                    parallelismIndex,
-                                                    shuffleActionName,
-                                                    shuffleConfigOfSinkFlow);
-                                    shuffleActionOfSinkFlow.setParallelism(1);
-                                    PhysicalExecutionFlow shuffleFlow =
-                                            new PhysicalExecutionFlow(
-                                                    shuffleActionOfSinkFlow,
-                                                    
Collections.singletonList(sinkFlow));
-                                    setFlowConfig(shuffleFlow);
-
-                                    long taskGroupID = 
taskGroupIdGenerator.getNextId();
-                                    TaskGroupLocation taskGroupLocation =
-                                            new TaskGroupLocation(
-                                                    
jobImmutableInformation.getJobId(),
-                                                    pipelineIndex,
-                                                    taskGroupID);
-                                    TaskLocation taskLocation =
-                                            new TaskLocation(
-                                                    taskGroupLocation, 0, 
parallelismIndex);
-                                    SeaTunnelTask seaTunnelTask =
-                                            new TransformSeaTunnelTask(
-                                                    
jobImmutableInformation.getJobId(),
-                                                    taskLocation,
-                                                    parallelismIndex,
-                                                    shuffleFlow);
-
-                                    // checkpoint
-                                    fillCheckpointPlan(seaTunnelTask);
-                                    physicalVertices.add(
-                                            new PhysicalVertex(
-                                                    parallelismIndex,
-                                                    
shuffleFlow.getAction().getParallelism(),
-                                                    new TaskGroupDefaultImpl(
-                                                            taskGroupLocation,
-                                                            
shuffleFlow.getAction().getName()
-                                                                    + 
"-ShuffleTask",
-                                                            
Collections.singletonList(
-                                                                    
seaTunnelTask)),
-                                                    flakeIdGenerator,
-                                                    pipelineIndex,
-                                                    totalPipelineNum,
-                                                    Collections.singletonList(
-                                                            
seaTunnelTask.getJarsUrl()),
-                                                    Collections.singletonList(
-                                                            
seaTunnelTask.getConnectorPluginJars()),
-                                                    jobImmutableInformation,
-                                                    initializationTimestamp,
-                                                    nodeEngine,
-                                                    runningJobStateIMap,
-                                                    
runningJobStateTimestampsIMap));
-                                }
-                            } else {
-                                for (int i = 0; i < 
flow.getAction().getParallelism(); i++) {
-                                    long taskGroupID = 
taskGroupIdGenerator.getNextId();
-                                    TaskGroupLocation taskGroupLocation =
-                                            new TaskGroupLocation(
-                                                    
jobImmutableInformation.getJobId(),
-                                                    pipelineIndex,
-                                                    taskGroupID);
-                                    TaskLocation taskLocation =
-                                            new 
TaskLocation(taskGroupLocation, 0, i);
-                                    setFlowConfig(flow);
-                                    SeaTunnelTask seaTunnelTask =
-                                            new TransformSeaTunnelTask(
-                                                    
jobImmutableInformation.getJobId(),
-                                                    taskLocation,
-                                                    i,
-                                                    flow);
-                                    // checkpoint
-                                    fillCheckpointPlan(seaTunnelTask);
-                                    physicalVertices.add(
-                                            new PhysicalVertex(
-                                                    i,
-                                                    
flow.getAction().getParallelism(),
-                                                    new TaskGroupDefaultImpl(
-                                                            taskGroupLocation,
-                                                            
flow.getAction().getName()
-                                                                    + 
"-ShuffleTask",
-                                                            
Lists.newArrayList(seaTunnelTask)),
-                                                    flakeIdGenerator,
-                                                    pipelineIndex,
-                                                    totalPipelineNum,
-                                                    Collections.singletonList(
-                                                            
seaTunnelTask.getJarsUrl()),
-                                                    Collections.singletonList(
-                                                            
seaTunnelTask.getConnectorPluginJars()),
-                                                    jobImmutableInformation,
-                                                    initializationTimestamp,
-                                                    nodeEngine,
-                                                    runningJobStateIMap,
-                                                    
runningJobStateTimestampsIMap));
-                                }
-                            }
-                            return physicalVertices.stream();
-                        })
-                .collect(Collectors.toList());
-    }
-
     private List<PhysicalVertex> getEnumeratorTask(
             List<SourceAction<?, ?, ?>> sources, int pipelineIndex, int 
totalPipelineNum) {
         AtomicInteger atomicInteger = new AtomicInteger(-1);
@@ -757,12 +607,12 @@ public class PhysicalPlanGenerator {
                         .collect(Collectors.toList());
         List<Flow> wrappers =
                 actions.stream()
-                        .filter(a -> a instanceof ShuffleAction || a 
instanceof SinkAction)
+                        .filter(a -> a instanceof SinkAction)
                         .map(PhysicalExecutionFlow::new)
                         .collect(Collectors.toList());
         wrappers.addAll(
                 actions.stream()
-                        .filter(a -> !(a instanceof ShuffleAction || a 
instanceof SinkAction))
+                        .filter(a -> !(a instanceof SinkAction))
                         .map(a -> new PhysicalExecutionFlow<>(a, 
getNextWrapper(edges, a)))
                         .collect(Collectors.toList()));
         return wrappers;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index e04e12d69f..ec00bbf58f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -26,7 +26,6 @@ import 
org.apache.seatunnel.common.utils.function.ConsumerWithException;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
@@ -52,8 +51,6 @@ import 
org.apache.seatunnel.engine.server.task.flow.ActionFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle;
 import 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
-import org.apache.seatunnel.engine.server.task.flow.ShuffleSinkFlowLifeCycle;
-import org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle;
@@ -61,7 +58,6 @@ import 
org.apache.seatunnel.engine.server.task.group.AbstractTaskGroupWithInterm
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
-import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.internal.metrics.MetricDescriptor;
 import com.hazelcast.internal.metrics.MetricsCollectionContext;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -238,27 +234,6 @@ public abstract class SeaTunnelTask extends AbstractTask {
                                 this,
                                 new 
SeaTunnelTransformCollector(flowLifeCycles),
                                 completableFuture);
-            } else if (f.getAction() instanceof ShuffleAction) {
-                ShuffleAction shuffleAction = (ShuffleAction) f.getAction();
-                HazelcastInstance hazelcastInstance = 
getExecutionContext().getInstance();
-                if (flow.getNext().isEmpty()) {
-                    lifeCycle =
-                            new ShuffleSinkFlowLifeCycle(
-                                    this,
-                                    indexID,
-                                    shuffleAction,
-                                    hazelcastInstance,
-                                    completableFuture);
-                } else {
-                    lifeCycle =
-                            new ShuffleSourceFlowLifeCycle(
-                                    this,
-                                    indexID,
-                                    shuffleAction,
-                                    hazelcastInstance,
-                                    completableFuture);
-                }
-                outputs = flowLifeCycles;
             } else {
                 throw new UnknownActionException(f.getAction());
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
deleted file mode 100644
index cba7537c68..0000000000
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.task.flow;
-
-import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
-import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
-import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
-import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.record.Barrier;
-
-import com.hazelcast.collection.IQueue;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-
-@SuppressWarnings("MagicNumber")
-@Slf4j
-public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle
-        implements OneInputFlowLifeCycle<Record<?>> {
-    private final int pipelineId;
-    private final int taskIndex;
-    private final ShuffleAction shuffleAction;
-    private final Map<String, IQueue<Record<?>>> shuffles;
-    private final int shuffleBatchSize;
-    private final long shuffleBatchFlushInterval;
-    private final Map<String, Queue<Record<?>>> shuffleBuffer;
-    private final ShuffleStrategy shuffleStrategy;
-    private int shuffleBufferSize;
-    private long lastModify;
-
-    public ShuffleSinkFlowLifeCycle(
-            SeaTunnelTask runningTask,
-            int taskIndex,
-            ShuffleAction shuffleAction,
-            HazelcastInstance hazelcastInstance,
-            CompletableFuture<Void> completableFuture) {
-        super(runningTask, completableFuture);
-        this.pipelineId = 
runningTask.getTaskLocation().getTaskGroupLocation().getPipelineId();
-        this.taskIndex = taskIndex;
-        this.shuffleAction = shuffleAction;
-        this.shuffleStrategy = shuffleAction.getConfig().getShuffleStrategy();
-        this.shuffles = shuffleStrategy.createShuffles(hazelcastInstance, 
pipelineId, taskIndex);
-        this.shuffleBatchSize = shuffleAction.getConfig().getBatchSize();
-        this.shuffleBatchFlushInterval = 
shuffleAction.getConfig().getBatchFlushInterval();
-        this.shuffleBuffer = new HashMap<>();
-    }
-
-    @Override
-    public void received(Record<?> record) throws IOException {
-        if (record.getData() instanceof Barrier) {
-            long startTime = System.currentTimeMillis();
-
-            // flush shuffle buffer
-            shuffleFlush();
-
-            Barrier barrier = (Barrier) record.getData();
-            if (barrier.prepareClose(runningTask.getTaskLocation())) {
-                prepareClose = true;
-            }
-            if (barrier.snapshot()) {
-                runningTask.addState(
-                        barrier, ActionStateKey.of(shuffleAction), 
Collections.emptyList());
-            }
-            runningTask.ack(barrier);
-
-            // The barrier needs to be replicated to all channels
-            for (Map.Entry<String, IQueue<Record<?>>> shuffle : 
shuffles.entrySet()) {
-                IQueue<Record<?>> shuffleQueue = shuffle.getValue();
-                try {
-                    shuffleQueue.put(record);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-
-            log.debug(
-                    "trigger barrier [{}] finished, cost: {}ms. taskLocation: 
[{}]",
-                    barrier.getId(),
-                    System.currentTimeMillis() - startTime,
-                    runningTask.getTaskLocation());
-        } else if (record.getData() instanceof SchemaChangeEvent) {
-            if (prepareClose) {
-                return;
-            }
-
-            shuffleItem(record);
-        } else {
-            if (prepareClose) {
-                return;
-            }
-
-            shuffleItem(record);
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        super.close();
-        for (Map.Entry<String, IQueue<Record<?>>> shuffleItem : 
shuffles.entrySet()) {
-            log.info("destroy shuffle queue: {}", shuffleItem.getKey());
-            shuffleItem.getValue().destroy();
-        }
-    }
-
-    private synchronized void shuffleItem(Record<?> record) {
-        String shuffleKey = shuffleStrategy.createShuffleKey(record, 
pipelineId, taskIndex);
-        shuffleBuffer.computeIfAbsent(shuffleKey, key -> new 
LinkedList<>()).add(record);
-        shuffleBufferSize++;
-
-        if (shuffleBufferSize >= shuffleBatchSize
-                || (shuffleBufferSize > 1
-                        && System.currentTimeMillis() - lastModify > 
shuffleBatchFlushInterval)) {
-            shuffleFlush();
-        }
-    }
-
-    private synchronized void shuffleFlush() {
-        for (Map.Entry<String, Queue<Record<?>>> shuffleBatch : 
shuffleBuffer.entrySet()) {
-            IQueue<Record<?>> shuffleQueue = 
shuffles.get(shuffleBatch.getKey());
-            Queue<Record<?>> shuffleQueueBatch = shuffleBatch.getValue();
-            if (shuffleQueue.remainingCapacity() <= 0
-                    || !shuffleQueue.addAll(shuffleBatch.getValue())) {
-                for (; ; ) {
-                    Record<?> shuffleItem = shuffleQueueBatch.poll();
-                    if (shuffleItem == null) {
-                        break;
-                    }
-                    try {
-                        shuffleQueue.put(shuffleItem);
-                    } catch (InterruptedException e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-            }
-            shuffleQueueBatch.clear();
-        }
-        shuffleBufferSize = 0;
-        lastModify = System.currentTimeMillis();
-    }
-}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
deleted file mode 100644
index 654cfff67b..0000000000
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.task.flow;
-
-import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.api.transform.Collector;
-import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
-import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
-import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.record.Barrier;
-
-import com.hazelcast.collection.IQueue;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-@Slf4j
-@SuppressWarnings("MagicNumber")
-public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle
-        implements OneOutputFlowLifeCycle<Record<?>> {
-    private final ShuffleAction shuffleAction;
-    private final int shuffleBatchSize;
-    private final IQueue<Record<?>>[] shuffles;
-    private Map<Integer, List<Record<?>>> unsentBufferMap = new HashMap<>();
-    private final Map<Integer, Barrier> alignedBarriers = new HashMap<>();
-    private long currentCheckpointId = Long.MAX_VALUE;
-    private int alignedBarriersCounter = 0;
-
-    public ShuffleSourceFlowLifeCycle(
-            SeaTunnelTask runningTask,
-            int taskIndex,
-            ShuffleAction shuffleAction,
-            HazelcastInstance hazelcastInstance,
-            CompletableFuture<Void> completableFuture) {
-        super(runningTask, completableFuture);
-        int pipelineId = runningTask.getTaskLocation().getPipelineId();
-        this.shuffleAction = shuffleAction;
-        this.shuffles =
-                shuffleAction
-                        .getConfig()
-                        .getShuffleStrategy()
-                        .getShuffles(hazelcastInstance, pipelineId, taskIndex);
-        this.shuffleBatchSize = shuffleAction.getConfig().getBatchSize();
-    }
-
-    @Override
-    public void collect(Collector<Record<?>> collector) throws Exception {
-        int emptyShuffleQueueCount = 0;
-
-        for (int i = 0; i < shuffles.length; i++) {
-            IQueue<Record<?>> shuffleQueue = shuffles[i];
-            List<Record<?>> unsentBuffer =
-                    unsentBufferMap.computeIfAbsent(i, k -> new 
LinkedList<>());
-            if (shuffleQueue.size() == 0) {
-                emptyShuffleQueueCount++;
-                continue;
-            }
-            // aligned barrier
-            if (alignedBarriers.get(i) != null
-                    && alignedBarriers.get(i).getId() == currentCheckpointId) {
-                continue;
-            }
-
-            List<Record<?>> shuffleBatch = new LinkedList<>();
-            if (alignedBarriersCounter > 0) {
-                shuffleBatch.add(shuffleQueue.take());
-            } else if (!unsentBuffer.isEmpty()) {
-                shuffleBatch.addAll(unsentBuffer);
-                unsentBuffer.clear();
-            }
-
-            shuffleQueue.drainTo(shuffleBatch, shuffleBatchSize);
-
-            for (int recordIndex = 0; recordIndex < shuffleBatch.size(); 
recordIndex++) {
-                Record<?> record = shuffleBatch.get(recordIndex);
-                if (record.getData() instanceof Barrier) {
-                    long startTime = System.currentTimeMillis();
-
-                    Barrier barrier = (Barrier) record.getData();
-
-                    // mark queue barrier
-                    alignedBarriers.put(i, barrier);
-                    alignedBarriersCounter++;
-                    currentCheckpointId = barrier.getId();
-
-                    // publish barrier
-                    if (alignedBarriersCounter == shuffles.length) {
-                        if 
(barrier.prepareClose(runningTask.getTaskLocation())) {
-                            prepareClose = true;
-                        }
-                        if (barrier.snapshot()) {
-                            runningTask.addState(
-                                    barrier,
-                                    ActionStateKey.of(shuffleAction),
-                                    Collections.emptyList());
-                        }
-                        runningTask.ack(barrier);
-
-                        collector.collect(record);
-                        log.debug(
-                                "trigger barrier [{}] finished, cost: {}ms. 
taskLocation: [{}]",
-                                barrier.getId(),
-                                System.currentTimeMillis() - startTime,
-                                runningTask.getTaskLocation());
-
-                        alignedBarriersCounter = 0;
-                        alignedBarriers.clear();
-                    }
-
-                    if (recordIndex + 1 < shuffleBatch.size()) {
-                        unsentBuffer.addAll(
-                                shuffleBatch.subList(recordIndex + 1, 
shuffleBatch.size()));
-                    }
-                    break;
-                } else {
-                    if (prepareClose) {
-                        return;
-                    }
-                    collector.collect(record);
-                }
-            }
-        }
-
-        if (emptyShuffleQueueCount == shuffles.length) {
-            Thread.sleep(100);
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        super.close();
-        for (IQueue<Record<?>> shuffleQueue : shuffles) {
-            log.info("destroy shuffle queue: {}", shuffleQueue.getName());
-            shuffleQueue.destroy();
-        }
-    }
-}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
new file mode 100644
index 0000000000..5a8a15396c
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.table;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+
+@Getter
+@Setter
+@Accessors(chain = true)
+@ToString
+public class TableFilterConfig implements Serializable {
+
+    public static final String PLUGIN_NAME = "TableFilter";
+
+    public static final Option<String> DATABASE_PATTERN =
+            Options.key("database_pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specify database filter pattern"
+                                    + "The default value is null, which means 
no filtering. "
+                                    + "If you want to filter the database 
name, please set it to a regular expression.");
+
+    public static final Option<String> SCHEMA_PATTERN =
+            Options.key("schema_pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specify schema filter pattern"
+                                    + "The default value is null, which means 
no filtering. "
+                                    + "If you want to filter the schema name, 
please set it to a regular expression.");
+
+    public static final Option<String> TABLE_PATTERN =
+            Options.key("table_pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specify table filter pattern"
+                                    + "The default value is null, which means 
no filtering. "
+                                    + "If you want to filter the table name, 
please set it to a regular expression.");
+
+    public static final Option<PatternMode> PATTERN_MODE =
+            Options.key("pattern_mode")
+                    .enumType(PatternMode.class)
+                    .defaultValue(PatternMode.INCLUDE)
+                    .withDescription(
+                            "Specify pattern mode"
+                                    + "The default value is INCLUDE, which 
means include the matched table."
+                                    + "If you want to exclude the matched 
table, please set it to EXCLUDE.");
+
+    @JsonAlias("database_pattern")
+    private String databasePattern;
+
+    @JsonAlias("schema_pattern")
+    private String schemaPattern;
+
+    @JsonAlias("table_pattern")
+    private String tablePattern;
+
+    @JsonAlias("pattern_mode")
+    private PatternMode patternMode;
+
+    public boolean isMatch(TablePath tablePath) {
+        if (PatternMode.INCLUDE.equals(patternMode)) {
+            if (databasePattern != null && 
!tablePath.getDatabaseName().matches(databasePattern)) {
+                return false;
+            }
+            if (schemaPattern != null && 
!tablePath.getSchemaName().matches(schemaPattern)) {
+                return false;
+            }
+            if (tablePattern != null && 
!tablePath.getTableName().matches(tablePattern)) {
+                return false;
+            }
+            return true;
+        }
+
+        if (databasePattern != null && 
tablePath.getDatabaseName().matches(databasePattern)) {
+            return false;
+        }
+        if (schemaPattern != null && 
tablePath.getSchemaName().matches(schemaPattern)) {
+            return false;
+        }
+        if (tablePattern != null && 
tablePath.getTableName().matches(tablePattern)) {
+            return false;
+        }
+        return true;
+    }
+
+    public static TableFilterConfig of(ReadonlyConfig config) {
+        TableFilterConfig filterConfig = new TableFilterConfig();
+        filterConfig.setDatabasePattern(config.get(DATABASE_PATTERN));
+        filterConfig.setSchemaPattern(config.get(SCHEMA_PATTERN));
+        filterConfig.setTablePattern(config.get(TABLE_PATTERN));
+        filterConfig.setPatternMode(config.get(PATTERN_MODE));
+
+        Preconditions.checkArgument(
+                filterConfig.getDatabasePattern() != null
+                        || filterConfig.getSchemaPattern() != null
+                        || filterConfig.getTablePattern() != null
+                        || filterConfig.getPatternMode() != null,
+                "At least one of database_pattern, schema_pattern, 
table_pattern or pattern_mode must be specified.");
+        return filterConfig;
+    }
+
+    public enum PatternMode {
+        INCLUDE,
+        EXCLUDE;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
new file mode 100644
index 0000000000..ab920c8d0c
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.table;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.transform.table.TableFilterConfig.PLUGIN_NAME;
+
+@Slf4j
+public class TableFilterMultiCatalogTransform extends 
AbstractMultiCatalogMapTransform {
+
+    public TableFilterMultiCatalogTransform(
+            List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+        super(inputCatalogTables, config);
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
+            CatalogTable table, ReadonlyConfig config) {
+        TableFilterConfig tableFilterConfig = TableFilterConfig.of(config);
+        boolean include;
+        if (tableFilterConfig.getDatabasePattern() == null
+                && tableFilterConfig.getSchemaPattern() == null
+                && tableFilterConfig.getTablePattern() == null) {
+            include =
+                    TableFilterConfig.PatternMode.INCLUDE.equals(
+                            tableFilterConfig.getPatternMode());
+        } else {
+            include = tableFilterConfig.isMatch(table.getTablePath());
+        }
+        return new TableFilterTransform(include, table);
+    }
+
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        List<CatalogTable> outputTables = new ArrayList<>();
+        for (CatalogTable catalogTable : inputCatalogTables) {
+            String tableId = 
catalogTable.getTableId().toTablePath().toString();
+            SeaTunnelTransform<SeaTunnelRow> tableTransform = 
transformMap.get(tableId);
+
+            if (tableTransform instanceof TableFilterTransform) {
+                TableFilterTransform tableFilterTransform = 
(TableFilterTransform) tableTransform;
+                if (tableFilterTransform.isInclude()) {
+                    outputTables.add(catalogTable);
+                } else {
+                    log.info("Table {} is filtered out", tableId);
+                }
+            }
+        }
+
+        log.info(
+                "Input tables: {}",
+                inputCatalogTables.stream()
+                        .map(e -> e.getTablePath().getFullName())
+                        .collect(Collectors.toList()));
+        log.info(
+                "Output tables: {}",
+                outputTables.stream()
+                        .map(e -> e.getTablePath().getFullName())
+                        .collect(Collectors.toList()));
+
+        outputCatalogTables = outputTables;
+        return outputTables;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransform.java
new file mode 100644
index 0000000000..10442e87df
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransform.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.table;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
+
+import lombok.Getter;
+
+import static 
org.apache.seatunnel.transform.table.TableFilterConfig.PLUGIN_NAME;
+
+public class TableFilterTransform extends AbstractCatalogSupportMapTransform {
+
+    private final CatalogTable inputTable;
+    @Getter private final boolean include;
+
+    public TableFilterTransform(boolean include, CatalogTable table) {
+        super(table);
+        this.inputTable = table;
+        this.include = include;
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    protected TableSchema transformTableSchema() {
+        return inputTable.getTableSchema();
+    }
+
+    @Override
+    protected TableIdentifier transformTableIdentifier() {
+        return inputTable.getTableId();
+    }
+
+    @Override
+    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+        return include ? inputRow : null;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransformFactory.java
new file mode 100644
index 0000000000..987168d9cd
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransformFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.table;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class TableFilterTransformFactory implements TableTransformFactory {
+    @Override
+    public String factoryIdentifier() {
+        return TableFilterConfig.PLUGIN_NAME;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .optional(
+                        TableFilterConfig.DATABASE_PATTERN,
+                        TableFilterConfig.SCHEMA_PATTERN,
+                        TableFilterConfig.TABLE_PATTERN)
+                .optional(TableFilterConfig.PATTERN_MODE)
+                .optional(TransformCommonOptions.MULTI_TABLES)
+                .optional(TransformCommonOptions.TABLE_MATCH_REGEX)
+                .build();
+    }
+
+    @Override
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        return () ->
+                new TableFilterMultiCatalogTransform(
+                        context.getCatalogTables(), context.getOptions());
+    }
+}


Reply via email to