This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9c16d7c7ea [Feature][Transform] Add table filter transform (#9189)
9c16d7c7ea is described below
commit 9c16d7c7ea5f4b334d653164633687e1f8c0cd94
Author: hailin0 <[email protected]>
AuthorDate: Sun Apr 27 09:36:44 2025 +0800
[Feature][Transform] Add table filter transform (#9189)
---
docs/en/transform-v2/table-filter.md | 51 +++++
docs/zh/transform-v2/table-filter.md | 51 +++++
plugin-mapping.properties | 1 +
.../seatunnel/e2e/transform/TestTableFilterIT.java | 41 ++++
.../test/resources/table_filter_multi_table.conf | 208 +++++++++++++++++++++
.../engine/core/dag/actions/ShuffleAction.java | 35 ----
.../engine/core/dag/actions/ShuffleConfig.java | 42 -----
.../dag/actions/ShuffleMultipleRowStrategy.java | 117 ------------
.../core/dag/actions/ShufflePartitionStrategy.java | 116 ------------
.../engine/core/dag/actions/ShuffleStrategy.java | 70 -------
.../dag/execution/ExecutionPlanGenerator.java | 103 +---------
.../server/dag/execution/PipelineGenerator.java | 10 +-
.../server/dag/physical/PhysicalPlanGenerator.java | 154 +--------------
.../engine/server/task/SeaTunnelTask.java | 25 ---
.../server/task/flow/ShuffleSinkFlowLifeCycle.java | 164 ----------------
.../task/flow/ShuffleSourceFlowLifeCycle.java | 159 ----------------
.../transform/table/TableFilterConfig.java | 137 ++++++++++++++
.../table/TableFilterMultiCatalogTransform.java | 95 ++++++++++
.../transform/table/TableFilterTransform.java | 60 ++++++
.../table/TableFilterTransformFactory.java | 55 ++++++
20 files changed, 707 insertions(+), 987 deletions(-)
diff --git a/docs/en/transform-v2/table-filter.md
b/docs/en/transform-v2/table-filter.md
new file mode 100644
index 0000000000..9223703759
--- /dev/null
+++ b/docs/en/transform-v2/table-filter.md
@@ -0,0 +1,51 @@
+# TableMerge
+
+> TableFilter transform plugin
+
+## Description
+
+TableFilter transform plugin for filter tables.
+
+## Options
+
+| name | type | required | default value | Description
|
+|:----------------:|--------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| database_pattern | string | no | | Specify database
filter pattern, the default value is null, which means no filtering. If you
want to filter the database name, please set it to a regular expression. |
+| schema_pattern | string | no | | Specify schema filter
pattern, the default value is null, which means no filtering. If you want to
filter the schema name, please set it to a regular expression. |
+| table_pattern | string | no | | Specify table filter
pattern, the default value is null, which means no filtering. If you want to
filter the table name, please set it to a regular expression. |
+| pattern_mode | string | no | INCLUDE | Specify pattern mode,
the default value is INCLUDE, which means include the matched table. If you
want to exclude the matched table, please set it to EXCLUDE. |
+
+## Examples
+
+### Include filter tables
+
+Include filter tables with the name matching the regular expression `user_\d+`
in the database `test`.
+
+```hocon
+transform {
+ TableFilter {
+ plugin_input = "source1"
+ plugin_output = "transform_a_1"
+
+ database_pattern = "test"
+ table_pattern = "user_\\d+"
+ }
+}
+```
+
+### Exclude filter tables
+
+Exclude filter tables with the name matching the regular expression `user_\d+`
in the database `test`.
+
+```hocon
+transform {
+ TableFilter {
+ plugin_input = "source1"
+ plugin_output = "transform_a_1"
+
+ database_pattern = "test"
+ table_pattern = "user_\\d+"
+ pattern_mode = "EXCLUDE"
+ }
+}
+```
\ No newline at end of file
diff --git a/docs/zh/transform-v2/table-filter.md
b/docs/zh/transform-v2/table-filter.md
new file mode 100644
index 0000000000..031bfd66e1
--- /dev/null
+++ b/docs/zh/transform-v2/table-filter.md
@@ -0,0 +1,51 @@
+# TableMerge
+
+> TableFilter transform plugin
+
+## Description
+
+表过滤 transform,用于正向或者反向过滤部分表
+
+## Options
+
+| name | type | required | default value | Description
|
+|:----------------:|--------|----------|---------------|--------------------------------------------------------|
+| database_pattern | string | no | | 指定数据库过滤模式,默认值为
null,表示不过滤。如果要过滤数据库名称,请将其设置为正则表达式。 |
+| schema_pattern | string | no | | 指定 schema 过滤模式,默认值为
null,表示不过滤。如果要过滤架构名称,请将其设置为正则表达式。 |
+| table_pattern | string | no | | 指定表过滤模式,默认值为
null,表示不过滤。如果要过滤表名称,请将其设置为正则表达式。 |
+| pattern_mode | string | no | INCLUDE | 指定过滤模式,默认值为
INCLUDE,表示包含匹配的表。如果要排除匹配的表,请将其设置为 EXCLUDE。 |
+
+## Examples
+
+### 包含表过滤
+
+在数据库 "test" 中包含名称与正则表达式 "user_\d+" 匹配的过滤表。
+
+```hocon
+transform {
+ TableFilter {
+ plugin_input = "source1"
+ plugin_output = "transform_a_1"
+
+ database_pattern = "test"
+ table_pattern = "user_\\d+"
+ }
+}
+```
+
+### 排除表过滤
+
+排除数据库 "test" 中名称与正则表达式 "user_\d+" 匹配的过滤表。
+
+```hocon
+transform {
+ TableFilter {
+ plugin_input = "source1"
+ plugin_output = "transform_a_1"
+
+ database_pattern = "test"
+ table_pattern = "user_\\d+"
+ pattern_mode = "EXCLUDE"
+ }
+}
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index aa1f390634..1c3b5e7ab7 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -161,3 +161,4 @@ seatunnel.transform.Metadata = seatunnel-transforms-v2
seatunnel.transform.FieldRename = seatunnel-transforms-v2
seatunnel.transform.TableRename = seatunnel-transforms-v2
seatunnel.transform.TableMerge = seatunnel-transforms-v2
+seatunnel.transform.TableFilter = seatunnel-transforms-v2
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
new file mode 100644
index 0000000000..f4d9216f50
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestTableFilterIT.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestTableFilterIT extends TestSuiteBase {
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Only support for seatunnel")
+ @TestTemplate
+ public void testFilterMultiTable(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/table_filter_multi_table.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table.conf
new file mode 100644
index 0000000000..f032f722e4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/table_filter_multi_table.conf
@@ -0,0 +1,208 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "source1"
+
+ tables_configs = [
+ {
+ row.num = 3
+ schema = {
+ table = "test.user_1"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ }
+ ]
+ }
+ },
+ {
+ row.num = 3
+ schema = {
+ table = "test.user_2"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ }
+ ]
+ }
+ },
+ {
+ row.num = 5
+ schema = {
+ table = "test.xyz"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "age"
+ type = "int"
+ }
+ ]
+ }
+ }
+ ]
+ }
+}
+transform {
+ TableFilter {
+ plugin_input = "source1"
+ plugin_output = "transform_a_1"
+
+ database_pattern = "test"
+ table_pattern = "user_\\d+"
+ }
+ TableRename {
+ plugin_input = "transform_a_1"
+ plugin_output = "transform_a_2"
+
+ prefix = "table_a_"
+ }
+
+
+
+ TableFilter {
+ plugin_input = "source1"
+ plugin_output = "transform_b_1"
+
+ database_pattern = "test"
+ table_pattern = "xyz"
+ }
+ TableRename {
+ plugin_input = "transform_b_1"
+ plugin_output = "transform_b_2"
+
+ prefix = "table_b_"
+ }
+}
+sink {
+ Assert {
+ plugin_input = "transform_a_2"
+
+ rules =
+ {
+ tables_configs = [
+ {
+ table_path = "test.table_a_user_1"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ]
+ },
+ {
+ table_path = "test.table_a_user_2"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ]
+ },
+ {
+ table_path = "test.table_a_xyz"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 0
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 0
+ }
+ ]
+ }
+ ]
+ }
+ }
+
+ Assert {
+ plugin_input = "transform_b_2"
+
+ rules =
+ {
+ tables_configs = [
+ {
+ table_path = "test.table_b_user_1"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 0
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 0
+ }
+ ]
+ },
+ {
+ table_path = "test.table_b_user_2"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 0
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 0
+ }
+ ]
+ },
+ {
+ table_path = "test.table_b_xyz"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
deleted file mode 100644
index cbfa44a5eb..0000000000
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.dag.actions;
-
-import lombok.NonNull;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-
-public class ShuffleAction extends AbstractAction {
-
- public ShuffleAction(long id, @NonNull String name, @NonNull ShuffleConfig
shuffleConfig) {
- super(id, name, new ArrayList<>(), new HashSet<>(), new HashSet<>(),
shuffleConfig);
- }
-
- @Override
- public ShuffleConfig getConfig() {
- return (ShuffleConfig) super.getConfig();
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
deleted file mode 100644
index 908bf80780..0000000000
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.dag.actions;
-
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.experimental.Tolerate;
-
-import java.util.concurrent.TimeUnit;
-
-@Getter
-@Setter
-@ToString
-@Builder(toBuilder = true)
-public class ShuffleConfig implements Config {
- public static final int DEFAULT_BATCH_SIZE = 1024;
- public static final long DEFAULT_BATCH_FLUSH_INTERVAL =
TimeUnit.SECONDS.toMillis(3);
-
- @Builder.Default private int batchSize = DEFAULT_BATCH_SIZE;
- @Builder.Default private long batchFlushInterval =
DEFAULT_BATCH_FLUSH_INTERVAL;
- private ShuffleStrategy shuffleStrategy;
-
- @Tolerate
- public ShuffleConfig() {}
-}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
deleted file mode 100644
index 707fe82499..0000000000
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.dag.actions;
-
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
-import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-
-import com.hazelcast.collection.IQueue;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.experimental.SuperBuilder;
-import lombok.experimental.Tolerate;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-@Slf4j
-@SuperBuilder(toBuilder = true)
-@Getter
-@Setter
-@ToString
-public class ShuffleMultipleRowStrategy extends ShuffleStrategy {
- private List<CatalogTable> catalogTables;
- private String targetTableId;
-
- @Tolerate
- public ShuffleMultipleRowStrategy() {}
-
- @Override
- public Map<String, IQueue<Record<?>>> createShuffles(
- HazelcastInstance hazelcast, int pipelineId, int inputIndex) {
- Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
- for (CatalogTable entry : catalogTables) {
- String tableId = entry.getTableId().toTablePath().toString();
- String queueName = generateQueueName(pipelineId, inputIndex,
tableId);
- IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
- // clear old data when job restore
- queue.clear();
- shuffleMap.put(queueName, queue);
- }
-
- log.info(
- "pipeline[{}] / reader[{}] assigned shuffle queue list: {}",
- pipelineId,
- inputIndex,
- shuffleMap.keySet());
-
- return shuffleMap;
- }
-
- @Override
- public String createShuffleKey(Record<?> record, int pipelineId, int
inputIndex) {
- String tableId;
- if (record.getData() instanceof SeaTunnelRow) {
- tableId = ((SeaTunnelRow) record.getData()).getTableId();
- } else if (record.getData() instanceof SchemaChangeEvent) {
- tableId = ((SchemaChangeEvent)
record.getData()).tablePath().toString();
- } else {
- throw new UnsupportedOperationException("Unsupported record: " +
record);
- }
- return generateQueueName(pipelineId, inputIndex, tableId);
- }
-
- @Override
- public IQueue<Record<?>>[] getShuffles(
- HazelcastInstance hazelcast, int pipelineId, int targetIndex) {
- IQueue<Record<?>>[] queues = new IQueue[getInputPartitions()];
- for (int inputIndex = 0; inputIndex < getInputPartitions();
inputIndex++) {
- Objects.requireNonNull(targetTableId);
- String queueName = generateQueueName(pipelineId, inputIndex,
targetTableId);
- queues[inputIndex] = getIQueue(hazelcast, queueName);
- }
-
- log.info(
- "pipeline[{}] / writer[{}] assigned shuffle queue list: {}",
- pipelineId,
- targetIndex,
- Stream.of(queues).map(e ->
e.getName()).collect(Collectors.toList()));
-
- return queues;
- }
-
- private String generateQueueName(int pipelineId, int inputIndex, String
tableId) {
- return "ShuffleMultipleRow-Queue_"
- + getJobId()
- + "_"
- + pipelineId
- + "_"
- + inputIndex
- + "_"
- + tableId;
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
deleted file mode 100644
index 45144d210f..0000000000
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.dag.actions;
-
-import org.apache.seatunnel.api.table.type.Record;
-
-import com.hazelcast.collection.IQueue;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.experimental.SuperBuilder;
-import lombok.experimental.Tolerate;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
-
-@Slf4j
-@SuperBuilder
-@Getter
-@Setter
-@ToString
-public class ShufflePartitionStrategy extends ShuffleStrategy {
- private final Map<Integer, String[]> inputQueueMapping = new HashMap<>();
- private int targetPartitions;
-
- @Tolerate
- public ShufflePartitionStrategy() {}
-
- @Override
- public Map<String, IQueue<Record<?>>> createShuffles(
- HazelcastInstance hazelcast, int pipelineId, int inputIndex) {
- checkArgument(inputIndex >= 0 && inputIndex < getInputPartitions());
- Map<String, IQueue<Record<?>>> shuffleMap = new LinkedHashMap<>();
- for (int targetIndex = 0; targetIndex < targetPartitions;
targetIndex++) {
- String queueName = generateQueueName(pipelineId, inputIndex,
targetIndex);
- IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
- // clear old data when job restore
- queue.clear();
- shuffleMap.put(queueName, queue);
- }
-
- log.info(
- "pipeline[{}] / reader[{}] assigned shuffle queue list: {}",
- pipelineId,
- inputIndex,
- shuffleMap.keySet());
-
- return shuffleMap;
- }
-
- @Override
- public String createShuffleKey(Record<?> record, int pipelineId, int
inputIndex) {
- String[] inputQueueNames =
- inputQueueMapping.computeIfAbsent(
- inputIndex,
- key -> {
- String[] queueNames = new String[targetPartitions];
- for (int targetIndex = 0;
- targetIndex < targetPartitions;
- targetIndex++) {
- queueNames[targetIndex] =
- generateQueueName(pipelineId, key,
targetIndex);
- }
- return queueNames;
- });
- return
inputQueueNames[ThreadLocalRandom.current().nextInt(targetPartitions)];
- }
-
- @Override
- public IQueue<Record<?>>[] getShuffles(
- HazelcastInstance hazelcast, int pipelineId, int targetIndex) {
- checkArgument(targetIndex >= 0 && targetIndex < targetPartitions);
- IQueue<Record<?>>[] shuffles = new IQueue[getInputPartitions()];
- for (int inputIndex = 0; inputIndex < getInputPartitions();
inputIndex++) {
- String queueName = generateQueueName(pipelineId, inputIndex,
targetIndex);
- shuffles[inputIndex] = getIQueue(hazelcast, queueName);
- }
-
- log.info(
- "pipeline[{}] / writer[{}] assigned shuffle queue list: {}",
- pipelineId,
- targetIndex,
- Stream.of(shuffles).map(e ->
e.getName()).collect(Collectors.toList()));
-
- return shuffles;
- }
-
- private String generateQueueName(int pipelineId, int inputIndex, int
targetIndex) {
- return String.format(
- "ShufflePartition-Queue_%s_%s_%s_%s",
- getJobId(), pipelineId, inputIndex, targetIndex);
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
deleted file mode 100644
index 43feac3572..0000000000
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.core.dag.actions;
-
-import org.apache.seatunnel.api.table.type.Record;
-
-import com.hazelcast.collection.IQueue;
-import com.hazelcast.config.QueueConfig;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.experimental.SuperBuilder;
-import lombok.experimental.Tolerate;
-
-import java.io.Serializable;
-import java.util.Map;
-
-@SuperBuilder(toBuilder = true)
-@Getter
-@Setter
-@ToString
-public abstract class ShuffleStrategy implements Serializable {
- private static final int DEFAULT_QUEUE_SIZE = 2048;
- private static final int DEFAULT_QUEUE_BACKUP_COUNT = 0;
- private static final int DEFAULT_QUEUE_ASYNC_BACKUP_COUNT = 0;
-
- protected long jobId;
- protected int inputPartitions;
- @Builder.Default protected int queueMaxSize = DEFAULT_QUEUE_SIZE;
- @Builder.Default protected int queueBackupCount =
DEFAULT_QUEUE_BACKUP_COUNT;
- @Builder.Default protected int queueAsyncBackupCount =
DEFAULT_QUEUE_ASYNC_BACKUP_COUNT;
- protected int queueEmptyQueueTtl;
-
- @Tolerate
- public ShuffleStrategy() {}
-
- public abstract Map<String, IQueue<Record<?>>> createShuffles(
- HazelcastInstance hazelcast, int pipelineId, int inputIndex);
-
- public abstract String createShuffleKey(Record<?> record, int pipelineId,
int inputIndex);
-
- public abstract IQueue<Record<?>>[] getShuffles(
- HazelcastInstance hazelcast, int pipelineId, int targetIndex);
-
- protected IQueue<Record<?>> getIQueue(HazelcastInstance hazelcast, String
queueName) {
- QueueConfig targetQueueConfig =
hazelcast.getConfig().getQueueConfig(queueName);
- targetQueueConfig.setMaxSize(queueMaxSize);
- targetQueueConfig.setBackupCount(queueBackupCount);
- targetQueueConfig.setAsyncBackupCount(queueAsyncBackupCount);
- targetQueueConfig.setEmptyQueueTtl(queueEmptyQueueTtl);
- return hazelcast.getQueue(queueName);
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 866af5bedb..0ebef5e050 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -17,16 +17,10 @@
package org.apache.seatunnel.engine.server.dag.execution;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -47,11 +41,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -80,27 +72,21 @@ public class ExecutionPlanGenerator {
Set<ExecutionEdge> executionEdges =
generateExecutionEdges(logicalPlan.getEdges());
log.debug("Phase 1: generate execution edge list {}", executionEdges);
- executionEdges = generateShuffleEdges(executionEdges);
- log.debug("Phase 2: generate shuffle edge list {}", executionEdges);
-
executionEdges = generateTransformChainEdges(executionEdges);
- log.debug("Phase 3: generate transform chain edge list {}",
executionEdges);
+ log.debug("Phase 2: generate transform chain edge list {}",
executionEdges);
List<Pipeline> pipelines = generatePipelines(executionEdges);
- log.debug("Phase 4: generate pipeline list {}", pipelines);
+ log.debug("Phase 3: generate pipeline list {}", pipelines);
ExecutionPlan executionPlan = new ExecutionPlan(pipelines,
jobImmutableInformation);
- log.debug("Phase 5: generate execution plan: {}", executionPlan);
+ log.debug("Phase 4: generate execution plan: {}", executionPlan);
return executionPlan;
}
public static Action recreateAction(Action action, Long id, int
parallelism) {
Action newAction;
- if (action instanceof ShuffleAction) {
- newAction =
- new ShuffleAction(id, action.getName(), ((ShuffleAction)
action).getConfig());
- } else if (action instanceof SinkAction) {
+ if (action instanceof SinkAction) {
newAction =
new SinkAction<>(
id,
@@ -202,87 +188,6 @@ public class ExecutionPlanGenerator {
return executionEdges;
}
- @SuppressWarnings("MagicNumber")
- private Set<ExecutionEdge> generateShuffleEdges(Set<ExecutionEdge>
executionEdges) {
- Map<Long, List<ExecutionVertex>> targetVerticesMap = new
LinkedHashMap<>();
- Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
- executionEdges.forEach(
- edge -> {
- ExecutionVertex leftVertex = edge.getLeftVertex();
- ExecutionVertex rightVertex = edge.getRightVertex();
- if (leftVertex.getAction() instanceof SourceAction) {
- sourceExecutionVertices.add(leftVertex);
- }
- targetVerticesMap
- .computeIfAbsent(leftVertex.getVertexId(), id ->
new ArrayList<>())
- .add(rightVertex);
- });
- if (sourceExecutionVertices.size() != 1) {
- return executionEdges;
- }
- ExecutionVertex sourceExecutionVertex =
sourceExecutionVertices.stream().findFirst().get();
- Action sourceAction = sourceExecutionVertex.getAction();
- List<CatalogTable> producedCatalogTables = new ArrayList<>();
- if (sourceAction instanceof SourceAction) {
- try {
- producedCatalogTables =
- ((SourceAction<?, ?, ?>) sourceAction)
- .getSource()
- .getProducedCatalogTables();
- } catch (UnsupportedOperationException e) {
- }
- } else if (sourceAction instanceof TransformChainAction) {
- return executionEdges;
- } else {
- throw new SeaTunnelException(
- "source action must be SourceAction or
TransformChainAction");
- }
- if (producedCatalogTables.size() <= 1
- ||
targetVerticesMap.get(sourceExecutionVertex.getVertexId()).size() <= 1) {
- return executionEdges;
- }
-
- List<ExecutionVertex> sinkVertices =
- targetVerticesMap.get(sourceExecutionVertex.getVertexId());
- Optional<ExecutionVertex> hasOtherAction =
- sinkVertices.stream()
- .filter(vertex -> !(vertex.getAction() instanceof
SinkAction))
- .findFirst();
- checkArgument(!hasOtherAction.isPresent());
-
- Set<ExecutionEdge> newExecutionEdges = new LinkedHashSet<>();
- ShuffleStrategy shuffleStrategy =
- ShuffleMultipleRowStrategy.builder()
- .jobId(jobImmutableInformation.getJobId())
- .inputPartitions(sourceAction.getParallelism())
- .catalogTables(producedCatalogTables)
- .queueEmptyQueueTtl(
- (int)
-
(engineConfig.getCheckpointConfig().getCheckpointInterval()
- * 3))
- .build();
- ShuffleConfig shuffleConfig =
-
ShuffleConfig.builder().shuffleStrategy(shuffleStrategy).build();
-
- long shuffleVertexId = idGenerator.getNextId();
- String shuffleActionName = String.format("Shuffle [%s]",
sourceAction.getName());
- ShuffleAction shuffleAction =
- new ShuffleAction(shuffleVertexId, shuffleActionName,
shuffleConfig);
- shuffleAction.setParallelism(sourceAction.getParallelism());
- ExecutionVertex shuffleVertex =
- new ExecutionVertex(shuffleVertexId, shuffleAction,
shuffleAction.getParallelism());
- ExecutionEdge sourceToShuffleEdge = new
ExecutionEdge(sourceExecutionVertex, shuffleVertex);
- newExecutionEdges.add(sourceToShuffleEdge);
-
- for (ExecutionVertex sinkVertex : sinkVertices) {
- sinkVertex.setParallelism(1);
- sinkVertex.getAction().setParallelism(1);
- ExecutionEdge shuffleToSinkEdge = new ExecutionEdge(shuffleVertex,
sinkVertex);
- newExecutionEdges.add(shuffleToSinkEdge);
- }
- return newExecutionEdges;
- }
-
private Set<ExecutionEdge> generateTransformChainEdges(Set<ExecutionEdge>
executionEdges) {
Map<Long, List<ExecutionVertex>> inputVerticesMap = new HashMap<>();
Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index f0a92a6691..e63138dad7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.engine.server.dag.execution;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import java.util.ArrayList;
import java.util.Collection;
@@ -135,10 +134,7 @@ public class PipelineGenerator {
/** If this execution vertex have partition transform, can't be spilt */
private boolean checkCanSplit(List<ExecutionEdge> edges) {
- return edges.stream()
- .noneMatch(e -> e.getRightVertex().getAction()
instanceof ShuffleAction)
- && edges.stream()
- .anyMatch(e ->
inputVerticesMap.get(e.getRightVertexId()).size() > 1);
+ return edges.stream().anyMatch(e ->
inputVerticesMap.get(e.getRightVertexId()).size() > 1);
}
private void splitUnionVertex(
@@ -184,9 +180,7 @@ public class PipelineGenerator {
long id = idGenerator.getNextId();
Action action = vertex.getAction();
return new ExecutionVertex(
- id,
- ExecutionPlanGenerator.recreateAction(action, id, parallelism),
- action instanceof ShuffleAction ? vertex.getParallelism() :
parallelism);
+ id, ExecutionPlanGenerator.recreateAction(action, id,
parallelism), parallelism);
}
private void fillVerticesMap(List<ExecutionEdge> edges) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 0bc7e46d5f..b413be0cd0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -27,10 +27,6 @@ import
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
@@ -191,9 +187,6 @@ public class PhysicalPlanGenerator {
getSourceTask(
edges, sources,
pipelineId, totalPipelineNum);
- physicalVertexList.addAll(
- getShuffleTask(edges, pipelineId,
totalPipelineNum));
-
CompletableFuture<PipelineStatus>
pipelineFuture =
new CompletableFuture<>();
waitForCompleteBySubPlanList.add(
@@ -323,149 +316,6 @@ public class PhysicalPlanGenerator {
.collect(Collectors.toList());
}
- private List<PhysicalVertex> getShuffleTask(
- List<ExecutionEdge> edges, int pipelineIndex, int
totalPipelineNum) {
- return edges.stream()
- .filter(s -> s.getLeftVertex().getAction() instanceof
ShuffleAction)
- .map(q -> (ShuffleAction) q.getLeftVertex().getAction())
- .collect(Collectors.toSet())
- .stream()
- .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges,
q)))
- .flatMap(
- flow -> {
- List<PhysicalVertex> physicalVertices = new
ArrayList<>();
-
- ShuffleAction shuffleAction = (ShuffleAction)
flow.getAction();
- ShuffleConfig shuffleConfig =
shuffleAction.getConfig();
- ShuffleStrategy shuffleStrategy =
shuffleConfig.getShuffleStrategy();
- if (shuffleStrategy instanceof
ShuffleMultipleRowStrategy) {
- ShuffleMultipleRowStrategy
shuffleMultipleRowStrategy =
- (ShuffleMultipleRowStrategy)
shuffleStrategy;
- AtomicInteger atomicInteger = new
AtomicInteger(0);
- for (Flow nextFlow : flow.getNext()) {
- PhysicalExecutionFlow sinkFlow =
- (PhysicalExecutionFlow) nextFlow;
- SinkAction sinkAction = (SinkAction)
sinkFlow.getAction();
- String sinkTableId =
-
sinkAction.getConfig().getTablePath().toString();
-
- int parallelismIndex =
atomicInteger.getAndIncrement();
- ShuffleStrategy shuffleStrategyOfSinkFlow =
- shuffleMultipleRowStrategy
- .toBuilder()
- .targetTableId(sinkTableId)
- .build();
- ShuffleConfig shuffleConfigOfSinkFlow =
- shuffleConfig
- .toBuilder()
-
.shuffleStrategy(shuffleStrategyOfSinkFlow)
- .build();
- String shuffleActionName =
- String.format(
- "%s -> %s -> %s",
- shuffleAction.getName(),
- sinkTableId,
- sinkAction.getName());
- ShuffleAction shuffleActionOfSinkFlow =
- new ShuffleAction(
- parallelismIndex,
- shuffleActionName,
- shuffleConfigOfSinkFlow);
- shuffleActionOfSinkFlow.setParallelism(1);
- PhysicalExecutionFlow shuffleFlow =
- new PhysicalExecutionFlow(
- shuffleActionOfSinkFlow,
-
Collections.singletonList(sinkFlow));
- setFlowConfig(shuffleFlow);
-
- long taskGroupID =
taskGroupIdGenerator.getNextId();
- TaskGroupLocation taskGroupLocation =
- new TaskGroupLocation(
-
jobImmutableInformation.getJobId(),
- pipelineIndex,
- taskGroupID);
- TaskLocation taskLocation =
- new TaskLocation(
- taskGroupLocation, 0,
parallelismIndex);
- SeaTunnelTask seaTunnelTask =
- new TransformSeaTunnelTask(
-
jobImmutableInformation.getJobId(),
- taskLocation,
- parallelismIndex,
- shuffleFlow);
-
- // checkpoint
- fillCheckpointPlan(seaTunnelTask);
- physicalVertices.add(
- new PhysicalVertex(
- parallelismIndex,
-
shuffleFlow.getAction().getParallelism(),
- new TaskGroupDefaultImpl(
- taskGroupLocation,
-
shuffleFlow.getAction().getName()
- +
"-ShuffleTask",
-
Collections.singletonList(
-
seaTunnelTask)),
- flakeIdGenerator,
- pipelineIndex,
- totalPipelineNum,
- Collections.singletonList(
-
seaTunnelTask.getJarsUrl()),
- Collections.singletonList(
-
seaTunnelTask.getConnectorPluginJars()),
- jobImmutableInformation,
- initializationTimestamp,
- nodeEngine,
- runningJobStateIMap,
-
runningJobStateTimestampsIMap));
- }
- } else {
- for (int i = 0; i <
flow.getAction().getParallelism(); i++) {
- long taskGroupID =
taskGroupIdGenerator.getNextId();
- TaskGroupLocation taskGroupLocation =
- new TaskGroupLocation(
-
jobImmutableInformation.getJobId(),
- pipelineIndex,
- taskGroupID);
- TaskLocation taskLocation =
- new
TaskLocation(taskGroupLocation, 0, i);
- setFlowConfig(flow);
- SeaTunnelTask seaTunnelTask =
- new TransformSeaTunnelTask(
-
jobImmutableInformation.getJobId(),
- taskLocation,
- i,
- flow);
- // checkpoint
- fillCheckpointPlan(seaTunnelTask);
- physicalVertices.add(
- new PhysicalVertex(
- i,
-
flow.getAction().getParallelism(),
- new TaskGroupDefaultImpl(
- taskGroupLocation,
-
flow.getAction().getName()
- +
"-ShuffleTask",
-
Lists.newArrayList(seaTunnelTask)),
- flakeIdGenerator,
- pipelineIndex,
- totalPipelineNum,
- Collections.singletonList(
-
seaTunnelTask.getJarsUrl()),
- Collections.singletonList(
-
seaTunnelTask.getConnectorPluginJars()),
- jobImmutableInformation,
- initializationTimestamp,
- nodeEngine,
- runningJobStateIMap,
-
runningJobStateTimestampsIMap));
- }
- }
- return physicalVertices.stream();
- })
- .collect(Collectors.toList());
- }
-
private List<PhysicalVertex> getEnumeratorTask(
List<SourceAction<?, ?, ?>> sources, int pipelineIndex, int
totalPipelineNum) {
AtomicInteger atomicInteger = new AtomicInteger(-1);
@@ -757,12 +607,12 @@ public class PhysicalPlanGenerator {
.collect(Collectors.toList());
List<Flow> wrappers =
actions.stream()
- .filter(a -> a instanceof ShuffleAction || a
instanceof SinkAction)
+ .filter(a -> a instanceof SinkAction)
.map(PhysicalExecutionFlow::new)
.collect(Collectors.toList());
wrappers.addAll(
actions.stream()
- .filter(a -> !(a instanceof ShuffleAction || a
instanceof SinkAction))
+ .filter(a -> !(a instanceof SinkAction))
.map(a -> new PhysicalExecutionFlow<>(a,
getNextWrapper(edges, a)))
.collect(Collectors.toList()));
return wrappers;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index e04e12d69f..ec00bbf58f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -26,7 +26,6 @@ import
org.apache.seatunnel.common.utils.function.ConsumerWithException;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
@@ -52,8 +51,6 @@ import
org.apache.seatunnel.engine.server.task.flow.ActionFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle;
import
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
-import org.apache.seatunnel.engine.server.task.flow.ShuffleSinkFlowLifeCycle;
-import org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle;
@@ -61,7 +58,6 @@ import
org.apache.seatunnel.engine.server.task.group.AbstractTaskGroupWithInterm
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
-import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -238,27 +234,6 @@ public abstract class SeaTunnelTask extends AbstractTask {
this,
new
SeaTunnelTransformCollector(flowLifeCycles),
completableFuture);
- } else if (f.getAction() instanceof ShuffleAction) {
- ShuffleAction shuffleAction = (ShuffleAction) f.getAction();
- HazelcastInstance hazelcastInstance =
getExecutionContext().getInstance();
- if (flow.getNext().isEmpty()) {
- lifeCycle =
- new ShuffleSinkFlowLifeCycle(
- this,
- indexID,
- shuffleAction,
- hazelcastInstance,
- completableFuture);
- } else {
- lifeCycle =
- new ShuffleSourceFlowLifeCycle(
- this,
- indexID,
- shuffleAction,
- hazelcastInstance,
- completableFuture);
- }
- outputs = flowLifeCycles;
} else {
throw new UnknownActionException(f.getAction());
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
deleted file mode 100644
index cba7537c68..0000000000
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.task.flow;
-
-import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
-import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
-import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
-import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.record.Barrier;
-
-import com.hazelcast.collection.IQueue;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-
-@SuppressWarnings("MagicNumber")
-@Slf4j
-public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle
- implements OneInputFlowLifeCycle<Record<?>> {
- private final int pipelineId;
- private final int taskIndex;
- private final ShuffleAction shuffleAction;
- private final Map<String, IQueue<Record<?>>> shuffles;
- private final int shuffleBatchSize;
- private final long shuffleBatchFlushInterval;
- private final Map<String, Queue<Record<?>>> shuffleBuffer;
- private final ShuffleStrategy shuffleStrategy;
- private int shuffleBufferSize;
- private long lastModify;
-
- public ShuffleSinkFlowLifeCycle(
- SeaTunnelTask runningTask,
- int taskIndex,
- ShuffleAction shuffleAction,
- HazelcastInstance hazelcastInstance,
- CompletableFuture<Void> completableFuture) {
- super(runningTask, completableFuture);
- this.pipelineId =
runningTask.getTaskLocation().getTaskGroupLocation().getPipelineId();
- this.taskIndex = taskIndex;
- this.shuffleAction = shuffleAction;
- this.shuffleStrategy = shuffleAction.getConfig().getShuffleStrategy();
- this.shuffles = shuffleStrategy.createShuffles(hazelcastInstance,
pipelineId, taskIndex);
- this.shuffleBatchSize = shuffleAction.getConfig().getBatchSize();
- this.shuffleBatchFlushInterval =
shuffleAction.getConfig().getBatchFlushInterval();
- this.shuffleBuffer = new HashMap<>();
- }
-
- @Override
- public void received(Record<?> record) throws IOException {
- if (record.getData() instanceof Barrier) {
- long startTime = System.currentTimeMillis();
-
- // flush shuffle buffer
- shuffleFlush();
-
- Barrier barrier = (Barrier) record.getData();
- if (barrier.prepareClose(runningTask.getTaskLocation())) {
- prepareClose = true;
- }
- if (barrier.snapshot()) {
- runningTask.addState(
- barrier, ActionStateKey.of(shuffleAction),
Collections.emptyList());
- }
- runningTask.ack(barrier);
-
- // The barrier needs to be replicated to all channels
- for (Map.Entry<String, IQueue<Record<?>>> shuffle :
shuffles.entrySet()) {
- IQueue<Record<?>> shuffleQueue = shuffle.getValue();
- try {
- shuffleQueue.put(record);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- log.debug(
- "trigger barrier [{}] finished, cost: {}ms. taskLocation:
[{}]",
- barrier.getId(),
- System.currentTimeMillis() - startTime,
- runningTask.getTaskLocation());
- } else if (record.getData() instanceof SchemaChangeEvent) {
- if (prepareClose) {
- return;
- }
-
- shuffleItem(record);
- } else {
- if (prepareClose) {
- return;
- }
-
- shuffleItem(record);
- }
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- for (Map.Entry<String, IQueue<Record<?>>> shuffleItem :
shuffles.entrySet()) {
- log.info("destroy shuffle queue: {}", shuffleItem.getKey());
- shuffleItem.getValue().destroy();
- }
- }
-
- private synchronized void shuffleItem(Record<?> record) {
- String shuffleKey = shuffleStrategy.createShuffleKey(record,
pipelineId, taskIndex);
- shuffleBuffer.computeIfAbsent(shuffleKey, key -> new
LinkedList<>()).add(record);
- shuffleBufferSize++;
-
- if (shuffleBufferSize >= shuffleBatchSize
- || (shuffleBufferSize > 1
- && System.currentTimeMillis() - lastModify >
shuffleBatchFlushInterval)) {
- shuffleFlush();
- }
- }
-
- private synchronized void shuffleFlush() {
- for (Map.Entry<String, Queue<Record<?>>> shuffleBatch :
shuffleBuffer.entrySet()) {
- IQueue<Record<?>> shuffleQueue =
shuffles.get(shuffleBatch.getKey());
- Queue<Record<?>> shuffleQueueBatch = shuffleBatch.getValue();
- if (shuffleQueue.remainingCapacity() <= 0
- || !shuffleQueue.addAll(shuffleBatch.getValue())) {
- for (; ; ) {
- Record<?> shuffleItem = shuffleQueueBatch.poll();
- if (shuffleItem == null) {
- break;
- }
- try {
- shuffleQueue.put(shuffleItem);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
- shuffleQueueBatch.clear();
- }
- shuffleBufferSize = 0;
- lastModify = System.currentTimeMillis();
- }
-}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
deleted file mode 100644
index 654cfff67b..0000000000
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.engine.server.task.flow;
-
-import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.api.transform.Collector;
-import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
-import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
-import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
-import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.record.Barrier;
-
-import com.hazelcast.collection.IQueue;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-@Slf4j
-@SuppressWarnings("MagicNumber")
-public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle
- implements OneOutputFlowLifeCycle<Record<?>> {
- private final ShuffleAction shuffleAction;
- private final int shuffleBatchSize;
- private final IQueue<Record<?>>[] shuffles;
- private Map<Integer, List<Record<?>>> unsentBufferMap = new HashMap<>();
- private final Map<Integer, Barrier> alignedBarriers = new HashMap<>();
- private long currentCheckpointId = Long.MAX_VALUE;
- private int alignedBarriersCounter = 0;
-
- public ShuffleSourceFlowLifeCycle(
- SeaTunnelTask runningTask,
- int taskIndex,
- ShuffleAction shuffleAction,
- HazelcastInstance hazelcastInstance,
- CompletableFuture<Void> completableFuture) {
- super(runningTask, completableFuture);
- int pipelineId = runningTask.getTaskLocation().getPipelineId();
- this.shuffleAction = shuffleAction;
- this.shuffles =
- shuffleAction
- .getConfig()
- .getShuffleStrategy()
- .getShuffles(hazelcastInstance, pipelineId, taskIndex);
- this.shuffleBatchSize = shuffleAction.getConfig().getBatchSize();
- }
-
- @Override
- public void collect(Collector<Record<?>> collector) throws Exception {
- int emptyShuffleQueueCount = 0;
-
- for (int i = 0; i < shuffles.length; i++) {
- IQueue<Record<?>> shuffleQueue = shuffles[i];
- List<Record<?>> unsentBuffer =
- unsentBufferMap.computeIfAbsent(i, k -> new
LinkedList<>());
- if (shuffleQueue.size() == 0) {
- emptyShuffleQueueCount++;
- continue;
- }
- // aligned barrier
- if (alignedBarriers.get(i) != null
- && alignedBarriers.get(i).getId() == currentCheckpointId) {
- continue;
- }
-
- List<Record<?>> shuffleBatch = new LinkedList<>();
- if (alignedBarriersCounter > 0) {
- shuffleBatch.add(shuffleQueue.take());
- } else if (!unsentBuffer.isEmpty()) {
- shuffleBatch.addAll(unsentBuffer);
- unsentBuffer.clear();
- }
-
- shuffleQueue.drainTo(shuffleBatch, shuffleBatchSize);
-
- for (int recordIndex = 0; recordIndex < shuffleBatch.size();
recordIndex++) {
- Record<?> record = shuffleBatch.get(recordIndex);
- if (record.getData() instanceof Barrier) {
- long startTime = System.currentTimeMillis();
-
- Barrier barrier = (Barrier) record.getData();
-
- // mark queue barrier
- alignedBarriers.put(i, barrier);
- alignedBarriersCounter++;
- currentCheckpointId = barrier.getId();
-
- // publish barrier
- if (alignedBarriersCounter == shuffles.length) {
- if
(barrier.prepareClose(runningTask.getTaskLocation())) {
- prepareClose = true;
- }
- if (barrier.snapshot()) {
- runningTask.addState(
- barrier,
- ActionStateKey.of(shuffleAction),
- Collections.emptyList());
- }
- runningTask.ack(barrier);
-
- collector.collect(record);
- log.debug(
- "trigger barrier [{}] finished, cost: {}ms.
taskLocation: [{}]",
- barrier.getId(),
- System.currentTimeMillis() - startTime,
- runningTask.getTaskLocation());
-
- alignedBarriersCounter = 0;
- alignedBarriers.clear();
- }
-
- if (recordIndex + 1 < shuffleBatch.size()) {
- unsentBuffer.addAll(
- shuffleBatch.subList(recordIndex + 1,
shuffleBatch.size()));
- }
- break;
- } else {
- if (prepareClose) {
- return;
- }
- collector.collect(record);
- }
- }
- }
-
- if (emptyShuffleQueueCount == shuffles.length) {
- Thread.sleep(100);
- }
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- for (IQueue<Record<?>> shuffleQueue : shuffles) {
- log.info("destroy shuffle queue: {}", shuffleQueue.getName());
- shuffleQueue.destroy();
- }
- }
-}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
new file mode 100644
index 0000000000..5a8a15396c
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterConfig.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.table;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias;
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+
+@Getter
+@Setter
+@Accessors(chain = true)
+@ToString
+public class TableFilterConfig implements Serializable {
+
+ public static final String PLUGIN_NAME = "TableFilter";
+
+ public static final Option<String> DATABASE_PATTERN =
+ Options.key("database_pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify database filter pattern"
+ + "The default value is null, which means
no filtering. "
+ + "If you want to filter the database
name, please set it to a regular expression.");
+
+ public static final Option<String> SCHEMA_PATTERN =
+ Options.key("schema_pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify schema filter pattern"
+ + "The default value is null, which means
no filtering. "
+ + "If you want to filter the schema name,
please set it to a regular expression.");
+
+ public static final Option<String> TABLE_PATTERN =
+ Options.key("table_pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify table filter pattern"
+ + "The default value is null, which means
no filtering. "
+ + "If you want to filter the table name,
please set it to a regular expression.");
+
+ public static final Option<PatternMode> PATTERN_MODE =
+ Options.key("pattern_mode")
+ .enumType(PatternMode.class)
+ .defaultValue(PatternMode.INCLUDE)
+ .withDescription(
+ "Specify pattern mode"
+ + "The default value is INCLUDE, which
means include the matched table."
+ + "If you want to exclude the matched
table, please set it to EXCLUDE.");
+
+ @JsonAlias("database_pattern")
+ private String databasePattern;
+
+ @JsonAlias("schema_pattern")
+ private String schemaPattern;
+
+ @JsonAlias("table_pattern")
+ private String tablePattern;
+
+ @JsonAlias("pattern_mode")
+ private PatternMode patternMode;
+
+ public boolean isMatch(TablePath tablePath) {
+ if (PatternMode.INCLUDE.equals(patternMode)) {
+ if (databasePattern != null &&
!tablePath.getDatabaseName().matches(databasePattern)) {
+ return false;
+ }
+ if (schemaPattern != null &&
!tablePath.getSchemaName().matches(schemaPattern)) {
+ return false;
+ }
+ if (tablePattern != null &&
!tablePath.getTableName().matches(tablePattern)) {
+ return false;
+ }
+ return true;
+ }
+
+ if (databasePattern != null &&
tablePath.getDatabaseName().matches(databasePattern)) {
+ return false;
+ }
+ if (schemaPattern != null &&
tablePath.getSchemaName().matches(schemaPattern)) {
+ return false;
+ }
+ if (tablePattern != null &&
tablePath.getTableName().matches(tablePattern)) {
+ return false;
+ }
+ return true;
+ }
+
+ public static TableFilterConfig of(ReadonlyConfig config) {
+ TableFilterConfig filterConfig = new TableFilterConfig();
+ filterConfig.setDatabasePattern(config.get(DATABASE_PATTERN));
+ filterConfig.setSchemaPattern(config.get(SCHEMA_PATTERN));
+ filterConfig.setTablePattern(config.get(TABLE_PATTERN));
+ filterConfig.setPatternMode(config.get(PATTERN_MODE));
+
+ Preconditions.checkArgument(
+ filterConfig.getDatabasePattern() != null
+ || filterConfig.getSchemaPattern() != null
+ || filterConfig.getTablePattern() != null
+ || filterConfig.getPatternMode() != null,
+ "At least one of database_pattern, schema_pattern,
table_pattern or pattern_mode must be specified.");
+ return filterConfig;
+ }
+
+ public enum PatternMode {
+ INCLUDE,
+ EXCLUDE;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
new file mode 100644
index 0000000000..ab920c8d0c
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterMultiCatalogTransform.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.table;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.transform.table.TableFilterConfig.PLUGIN_NAME;
+
+@Slf4j
+public class TableFilterMultiCatalogTransform extends
AbstractMultiCatalogMapTransform {
+
+ public TableFilterMultiCatalogTransform(
+ List<CatalogTable> inputCatalogTables, ReadonlyConfig config) {
+ super(inputCatalogTables, config);
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
+ CatalogTable table, ReadonlyConfig config) {
+ TableFilterConfig tableFilterConfig = TableFilterConfig.of(config);
+ boolean include;
+ if (tableFilterConfig.getDatabasePattern() == null
+ && tableFilterConfig.getSchemaPattern() == null
+ && tableFilterConfig.getTablePattern() == null) {
+ include =
+ TableFilterConfig.PatternMode.INCLUDE.equals(
+ tableFilterConfig.getPatternMode());
+ } else {
+ include = tableFilterConfig.isMatch(table.getTablePath());
+ }
+ return new TableFilterTransform(include, table);
+ }
+
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ List<CatalogTable> outputTables = new ArrayList<>();
+ for (CatalogTable catalogTable : inputCatalogTables) {
+ String tableId =
catalogTable.getTableId().toTablePath().toString();
+ SeaTunnelTransform<SeaTunnelRow> tableTransform =
transformMap.get(tableId);
+
+ if (tableTransform instanceof TableFilterTransform) {
+ TableFilterTransform tableFilterTransform =
(TableFilterTransform) tableTransform;
+ if (tableFilterTransform.isInclude()) {
+ outputTables.add(catalogTable);
+ } else {
+ log.info("Table {} is filtered out", tableId);
+ }
+ }
+ }
+
+ log.info(
+ "Input tables: {}",
+ inputCatalogTables.stream()
+ .map(e -> e.getTablePath().getFullName())
+ .collect(Collectors.toList()));
+ log.info(
+ "Output tables: {}",
+ outputTables.stream()
+ .map(e -> e.getTablePath().getFullName())
+ .collect(Collectors.toList()));
+
+ outputCatalogTables = outputTables;
+ return outputTables;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransform.java
new file mode 100644
index 0000000000..10442e87df
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransform.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.table;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform;
+
+import lombok.Getter;
+
+import static
org.apache.seatunnel.transform.table.TableFilterConfig.PLUGIN_NAME;
+
+public class TableFilterTransform extends AbstractCatalogSupportMapTransform {
+
+ private final CatalogTable inputTable;
+ @Getter private final boolean include;
+
+ public TableFilterTransform(boolean include, CatalogTable table) {
+ super(table);
+ this.inputTable = table;
+ this.include = include;
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ protected TableSchema transformTableSchema() {
+ return inputTable.getTableSchema();
+ }
+
+ @Override
+ protected TableIdentifier transformTableIdentifier() {
+ return inputTable.getTableId();
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ return include ? inputRow : null;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransformFactory.java
new file mode 100644
index 0000000000..987168d9cd
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/table/TableFilterTransformFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.table;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.transform.common.TransformCommonOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class TableFilterTransformFactory implements TableTransformFactory {
+ @Override
+ public String factoryIdentifier() {
+ return TableFilterConfig.PLUGIN_NAME;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .optional(
+ TableFilterConfig.DATABASE_PATTERN,
+ TableFilterConfig.SCHEMA_PATTERN,
+ TableFilterConfig.TABLE_PATTERN)
+ .optional(TableFilterConfig.PATTERN_MODE)
+ .optional(TransformCommonOptions.MULTI_TABLES)
+ .optional(TransformCommonOptions.TABLE_MATCH_REGEX)
+ .build();
+ }
+
+ @Override
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ return () ->
+ new TableFilterMultiCatalogTransform(
+ context.getCatalogTables(), context.getOptions());
+ }
+}