This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3af6b2f2c6 [Feature][Zeta] Add tag to node used to filter worker when 
submit job (#7045)
3af6b2f2c6 is described below

commit 3af6b2f2c6be41364661bd6872bd99e139986877
Author: Jarvis <liunaijie1...@163.com>
AuthorDate: Wed Jun 26 20:39:14 2024 +0800

    [Feature][Zeta] Add tag to node used to filter worker when submit job 
(#7045)
---
 README.md                                          |   2 +-
 docs/en/about.md                                   |   2 +-
 docs/en/faq.md                                     |   6 +-
 docs/en/seatunnel-engine/resource-isolation.md     |  83 ++++++++++++++
 docs/{en => }/images/architecture_diagram.png      | Bin
 docs/{en => }/images/azkaban.png                   | Bin
 docs/{en => }/images/checkstyle.png                | Bin
 docs/{en => }/images/kafka.png                     | Bin
 docs/images/resource-isolation.png                 | Bin 0 -> 69956 bytes
 docs/{en => }/images/seatunnel-workflow.svg        |   0
 docs/{en => }/images/seatunnel_architecture.png    | Bin
 docs/{en => }/images/seatunnel_starter.png         | Bin
 docs/{en => }/images/workflow.png                  | Bin
 docs/sidebars.js                                   |   3 +-
 docs/zh/about.md                                   |   2 +-
 docs/zh/faq.md                                     |   6 +-
 docs/zh/images/architecture_diagram.png            | Bin 77929 -> 0 bytes
 docs/zh/images/azkaban.png                         | Bin 732486 -> 0 bytes
 docs/zh/images/checkstyle.png                      | Bin 479660 -> 0 bytes
 docs/zh/images/kafka.png                           | Bin 32151 -> 0 bytes
 docs/zh/images/seatunnel-workflow.svg              |   4 -
 docs/zh/images/seatunnel_architecture.png          | Bin 778394 -> 0 bytes
 docs/zh/images/seatunnel_starter.png               | Bin 423840 -> 0 bytes
 docs/zh/images/workflow.png                        | Bin 258921 -> 0 bytes
 docs/zh/seatunnel-engine/resource-isolation.md     |  83 ++++++++++++++
 .../api/configuration/ReadonlyConfig.java          |   6 +-
 .../apache/seatunnel/api/env/EnvCommonOptions.java |   6 ++
 seatunnel-core/README.md                           |   2 +-
 seatunnel-core/seatunnel-core-starter/README.md    |   2 +-
 .../e2e/resourceIsolation/ResourceIsolationIT.java |  64 +++++++++++
 .../src/test/resources/hazelcast.yaml              |   7 ++
 .../resource-isolation/fakesource_to_console.conf  |  54 +++++-----
 .../fakesource_to_console_tag_not_match.conf       |  54 +++++-----
 .../src/main/resources/hazelcast.yaml              |   2 +-
 .../core/parse/MultipleTableJobConfigParser.java   |   7 +-
 .../server/dag/physical/PhysicalPlanGenerator.java |  11 +-
 .../engine/server/dag/physical/ResourceUtils.java  |  10 +-
 .../engine/server/dag/physical/SubPlan.java        |   5 +-
 .../resourcemanager/AbstractResourceManager.java   |  48 ++++++++-
 .../server/resourcemanager/ResourceManager.java    |   7 +-
 .../resourcemanager/ResourceRequestHandler.java    |  11 +-
 .../resourcemanager/worker/WorkerProfile.java      |   3 +
 .../server/service/slot/DefaultSlotService.java    |   1 +
 .../engine/server/AbstractSeaTunnelServerTest.java |  56 +++++-----
 .../resourcemanager/FakeResourceManager.java       |  13 ++-
 .../resourcemanager/FixSlotResourceTest.java       |   6 +-
 .../resourcemanager/ResourceManagerTest.java       |   8 +-
 .../server/resourcemanager/WorkerTagTest.java      | 120 +++++++++++++++++++++
 tools/documents/sync.sh                            |   2 +-
 49 files changed, 567 insertions(+), 129 deletions(-)

diff --git a/README.md b/README.md
index 6e3d5d8e0f..5fa0d25501 100644
--- a/README.md
+++ b/README.md
@@ -61,7 +61,7 @@ SeaTunnel addresses common data integration challenges:
 
 ## SeaTunnel Workflow
 
-![SeaTunnel Workflow](docs/en/images/architecture_diagram.png)
+![SeaTunnel Workflow](docs/images/architecture_diagram.png)
 
 Configure jobs, select execution engines, and parallelize data using Source 
Connectors. Easily develop and extend connectors to meet your needs.
 
diff --git a/docs/en/about.md b/docs/en/about.md
index 38a5fe9545..5164dc081c 100644
--- a/docs/en/about.md
+++ b/docs/en/about.md
@@ -34,7 +34,7 @@ SeaTunnel focuses on data integration and data 
synchronization, and is mainly de
 
 ## SeaTunnel work flowchart
 
-![SeaTunnel work flowchart](images/architecture_diagram.png)
+![SeaTunnel work flowchart](../images/architecture_diagram.png)
 
 The runtime process of SeaTunnel is shown in the figure above.
 
diff --git a/docs/en/faq.md b/docs/en/faq.md
index 7ff275f381..953cc2a956 100644
--- a/docs/en/faq.md
+++ b/docs/en/faq.md
@@ -65,9 +65,9 @@ Refer to: 
[lightbend/config#456](https://github.com/lightbend/config/issues/456)
 
 Of course! See the screenshot below:
 
-![workflow.png](images/workflow.png)
+![workflow.png](../images/workflow.png)
 
-![azkaban.png](images/azkaban.png)
+![azkaban.png](../images/azkaban.png)
 
 ## Does SeaTunnel have a case for configuring multiple sources, such as 
configuring elasticsearch and hdfs in source at the same time?
 
@@ -184,7 +184,7 @@ The following conclusions can be drawn:
 
 3. In general, both M and N are determined, and the conclusion can be drawn 
from 2: The size of `spark.streaming.kafka.maxRatePerPartition` is positively 
correlated with the size of `spark.executor.cores` * 
`spark.executor.instances`, and it can be increased while increasing the 
resource `maxRatePerPartition` to speed up consumption.
 
-![kafka](images/kafka.png)
+![kafka](../images/kafka.png)
 
 ## How can I solve the Error `Exception in thread "main" 
java.lang.NoSuchFieldError: INSTANCE`?
 
diff --git a/docs/en/seatunnel-engine/resource-isolation.md 
b/docs/en/seatunnel-engine/resource-isolation.md
new file mode 100644
index 0000000000..f123e80982
--- /dev/null
+++ b/docs/en/seatunnel-engine/resource-isolation.md
@@ -0,0 +1,83 @@
+---
+
+sidebar_position: 9
+-------------------
+
+After version 2.3.6. SeaTunnel can add `tag` to each worker node, when you 
submit job you can use `tag_filter` to filter the node you want run this job.
+
+# How to archive this:
+
+1. update the config in `hazelcast.yaml`,
+
+```yaml
+hazelcast:
+  cluster-name: seatunnel
+  network:
+    rest-api:
+      enabled: true
+      endpoint-groups:
+        CLUSTER_WRITE:
+          enabled: true
+        DATA:
+          enabled: true
+    join:
+      tcp-ip:
+        enabled: true
+        member-list:
+          - localhost
+    port:
+      auto-increment: false
+      port: 5801
+  properties:
+    hazelcast.invocation.max.retry.count: 20
+    hazelcast.tcp.join.port.try.count: 30
+    hazelcast.logging.type: log4j2
+    hazelcast.operation.generic.thread.count: 50
+  member-attributes:
+    group:
+      type: string
+      value: platform
+    team:
+      type: string
+      value: team1
+```
+
+In this config, we specify the tag by `member-attributes`, the node has 
`group=platform, team=team1` tags.
+
+2. add `tag_filter` to your job config
+
+```hacon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  tag_filter {
+    group = "platform"
+    team = "team1"
+  }
+}
+source {
+  FakeSource {
+    result_table_name = "fake"
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+      }
+    }
+  }
+}
+transform {
+}
+sink {
+  console {
+    source_table_name="fake"
+  }
+}
+```
+
+**Notice:**
+- If not set `tag_filter` in job config, it will random choose the node in all 
active nodes.
+- When you add multiple tag in `tag_filter`, it need all key exist and value 
match. if all node not match, you will get `NoEnoughResourceException` 
exception.
+
+![img.png](../../images/resource-isolation.png)
+
diff --git a/docs/en/images/architecture_diagram.png 
b/docs/images/architecture_diagram.png
similarity index 100%
rename from docs/en/images/architecture_diagram.png
rename to docs/images/architecture_diagram.png
diff --git a/docs/en/images/azkaban.png b/docs/images/azkaban.png
similarity index 100%
rename from docs/en/images/azkaban.png
rename to docs/images/azkaban.png
diff --git a/docs/en/images/checkstyle.png b/docs/images/checkstyle.png
similarity index 100%
rename from docs/en/images/checkstyle.png
rename to docs/images/checkstyle.png
diff --git a/docs/en/images/kafka.png b/docs/images/kafka.png
similarity index 100%
rename from docs/en/images/kafka.png
rename to docs/images/kafka.png
diff --git a/docs/images/resource-isolation.png 
b/docs/images/resource-isolation.png
new file mode 100644
index 0000000000..3986cbfb59
Binary files /dev/null and b/docs/images/resource-isolation.png differ
diff --git a/docs/en/images/seatunnel-workflow.svg 
b/docs/images/seatunnel-workflow.svg
similarity index 100%
rename from docs/en/images/seatunnel-workflow.svg
rename to docs/images/seatunnel-workflow.svg
diff --git a/docs/en/images/seatunnel_architecture.png 
b/docs/images/seatunnel_architecture.png
similarity index 100%
rename from docs/en/images/seatunnel_architecture.png
rename to docs/images/seatunnel_architecture.png
diff --git a/docs/en/images/seatunnel_starter.png 
b/docs/images/seatunnel_starter.png
similarity index 100%
rename from docs/en/images/seatunnel_starter.png
rename to docs/images/seatunnel_starter.png
diff --git a/docs/en/images/workflow.png b/docs/images/workflow.png
similarity index 100%
rename from docs/en/images/workflow.png
rename to docs/images/workflow.png
diff --git a/docs/sidebars.js b/docs/sidebars.js
index ad1093689f..f07b198557 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -178,7 +178,8 @@ const sidebars = {
                 "seatunnel-engine/checkpoint-storage",
                 "seatunnel-engine/rest-api",
                 "seatunnel-engine/tcp",
-                "seatunnel-engine/engine-jar-storage-mode"
+                "seatunnel-engine/engine-jar-storage-mode",
+                "seatunnel-engine/resource-isolation",
             ]
         },
         {
diff --git a/docs/zh/about.md b/docs/zh/about.md
index 024c520852..ae789d4d7f 100644
--- a/docs/zh/about.md
+++ b/docs/zh/about.md
@@ -32,7 +32,7 @@ SeaTunnel专注于数据集成和数据同步,主要旨在解决数据集成
 
 ## SeaTunnel work flowchart
 
-![SeaTunnel work flowchart](images/architecture_diagram.png)
+![SeaTunnel work flowchart](../images/architecture_diagram.png)
 
 SeaTunnel的运行流程如上图所示。
 
diff --git a/docs/zh/faq.md b/docs/zh/faq.md
index 8c836a3612..5fdb06c280 100644
--- a/docs/zh/faq.md
+++ b/docs/zh/faq.md
@@ -65,9 +65,9 @@ your string 1
 
 当然! 请参阅下面的屏幕截图:
 
-![工作流程.png](images/workflow.png)
+![工作流程.png](../images/workflow.png)
 
-![azkaban.png](images/azkaban.png)
+![azkaban.png](../images/azkaban.png)
 
 ## SeaTunnel是否有配置多个源的情况,例如同时在源中配置elasticsearch和hdfs?
 
@@ -185,7 +185,7 @@ sink {
 
 
3、一般来说,M和N都确定了,从2可以得出结论:`spark.streaming.kafka.maxRatePerPartition`的大小与`spark.executor.cores`
 * `spark的大小正相关 .executor.instances`,可以在增加资源`maxRatePerPartition`的同时增加,以加快消耗。
 
-![kafka](images/kafka.png)
+![kafka](../images/kafka.png)
 
 ## 如何解决错误 `Exception in thread "main" java.lang.NoSuchFieldError: INSTANCE`?
 
diff --git a/docs/zh/images/architecture_diagram.png 
b/docs/zh/images/architecture_diagram.png
deleted file mode 100644
index ce72254694..0000000000
Binary files a/docs/zh/images/architecture_diagram.png and /dev/null differ
diff --git a/docs/zh/images/azkaban.png b/docs/zh/images/azkaban.png
deleted file mode 100644
index 78780dce2d..0000000000
Binary files a/docs/zh/images/azkaban.png and /dev/null differ
diff --git a/docs/zh/images/checkstyle.png b/docs/zh/images/checkstyle.png
deleted file mode 100644
index 4cf8303e71..0000000000
Binary files a/docs/zh/images/checkstyle.png and /dev/null differ
diff --git a/docs/zh/images/kafka.png b/docs/zh/images/kafka.png
deleted file mode 100644
index 14b22ebcbe..0000000000
Binary files a/docs/zh/images/kafka.png and /dev/null differ
diff --git a/docs/zh/images/seatunnel-workflow.svg 
b/docs/zh/images/seatunnel-workflow.svg
deleted file mode 100644
index 7280e4a4c4..0000000000
--- a/docs/zh/images/seatunnel-workflow.svg
+++ /dev/null
@@ -1,4 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Do not edit this file with editors other than diagrams.net -->
-<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" 
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd";>
-<svg xmlns="http://www.w3.org/2000/svg"; 
xmlns:xlink="http://www.w3.org/1999/xlink"; version="1.1" width="622px" 
height="718px" viewBox="-0.5 -0.5 622 718" content="&lt;mxfile 
host=&quot;Electron&quot; modified=&quot;2021-12-30T15:17:57.852Z&quot; 
agent=&quot;5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, 
like Gecko) draw.io/15.4.0 Chrome/91.0.4472.164 Electron/13.5.0 
Safari/537.36&quot; etag=&quot;y11mgoacIhryQ4lqCp5C&quot; 
version=&quot;15.4.0&quot; type=&quot;device& [...]
\ No newline at end of file
diff --git a/docs/zh/images/seatunnel_architecture.png 
b/docs/zh/images/seatunnel_architecture.png
deleted file mode 100644
index c96cb272e5..0000000000
Binary files a/docs/zh/images/seatunnel_architecture.png and /dev/null differ
diff --git a/docs/zh/images/seatunnel_starter.png 
b/docs/zh/images/seatunnel_starter.png
deleted file mode 100644
index 4d9700899a..0000000000
Binary files a/docs/zh/images/seatunnel_starter.png and /dev/null differ
diff --git a/docs/zh/images/workflow.png b/docs/zh/images/workflow.png
deleted file mode 100644
index 9ce48b8bfc..0000000000
Binary files a/docs/zh/images/workflow.png and /dev/null differ
diff --git a/docs/zh/seatunnel-engine/resource-isolation.md 
b/docs/zh/seatunnel-engine/resource-isolation.md
new file mode 100644
index 0000000000..ea09d6a892
--- /dev/null
+++ b/docs/zh/seatunnel-engine/resource-isolation.md
@@ -0,0 +1,83 @@
+---
+
+sidebar_position: 9
+-------------------
+
+在2.3.6版本之后, SeaTunnel支持对每个实例添加`tag`, 
然后在提交任务时可以在配置文件中使用`tag_filter`来选择任务将要运行的节点.
+
+# 如何实现改功能
+
+1. 更新`hazelcast.yaml`文件
+
+```yaml
+hazelcast:
+  cluster-name: seatunnel
+  network:
+    rest-api:
+      enabled: true
+      endpoint-groups:
+        CLUSTER_WRITE:
+          enabled: true
+        DATA:
+          enabled: true
+    join:
+      tcp-ip:
+        enabled: true
+        member-list:
+          - localhost
+    port:
+      auto-increment: false
+      port: 5801
+  properties:
+    hazelcast.invocation.max.retry.count: 20
+    hazelcast.tcp.join.port.try.count: 30
+    hazelcast.logging.type: log4j2
+    hazelcast.operation.generic.thread.count: 50
+  member-attributes:
+    group:
+      type: string
+      value: platform
+    team:
+      type: string
+      value: team1
+```
+
+在这个配置中, 我们通过`member-attributes`设置了`group=platform, team=team1`这样两个`tag`
+
+2. 在任务的配置中添加`tag_filter`来选择你需要运行该任务的节点
+
+```hacon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  tag_filter {
+    group = "platform"
+    team = "team1"
+  }
+}
+source {
+  FakeSource {
+    result_table_name = "fake"
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+      }
+    }
+  }
+}
+transform {
+}
+sink {
+  console {
+    source_table_name="fake"
+  }
+}
+```
+
+**注意:**
+- 当在任务的配置中, 没有添加`tag_filter`时, 会从所有节点中随机选择节点来运行任务.
+- 当`tag_filter`中存在多个过滤条件时, 会根据key存在以及value相等的全部匹配的节点, 当没有找到匹配的节点时, 会抛出 
`NoEnoughResourceException`异常.
+
+![img.png](../../images/resource-isolation.png)
+
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
index 81d64500dc..0ac179ae76 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -69,8 +69,8 @@ public class ReadonlyConfig implements Serializable {
     /**
      * Transform to Config todo: This method should be removed after we remove 
Config
      *
-     * @deprecated Please use ReadonlyConfig directly
      * @return Config
+     * @deprecated Please use ReadonlyConfig directly
      */
     @Deprecated
     public Config toConfig() {
@@ -96,6 +96,10 @@ public class ReadonlyConfig implements Serializable {
         }
     }
 
+    public Map<String, Object> getSourceMap() {
+        return confData;
+    }
+
     public <T> Optional<T> getOptional(Option<T> option) {
         if (option == null) {
             throw new NullPointerException("Option not be null.");
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index cabf0856dc..1054b91e97 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -94,4 +94,10 @@ public interface EnvCommonOptions {
                     .mapType()
                     .noDefaultValue()
                     .withDescription("custom parameters for run engine");
+
+    Option<Map<String, String>> NODE_TAG_FILTER =
+            Options.key("tag_filter")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription("Define the worker where the job runs by 
tag");
 }
diff --git a/seatunnel-core/README.md b/seatunnel-core/README.md
index 5f666dcebc..ec9d027a65 100644
--- a/seatunnel-core/README.md
+++ b/seatunnel-core/README.md
@@ -1,7 +1,7 @@
 # Introduction
 
 This module is the seatunnel job entrypoint. SeaTunnel jobs are started by the 
below process.
-![seatunnel-workflow.svg](../docs/en/images/seatunnel_starter.png)
+![seatunnel-workflow.svg](../docs/images/seatunnel_starter.png)
 
 - seatunnel-core-flink: The flink job starter.
 - seatunnel-core-flink-sql: The flink sql job starter.
diff --git a/seatunnel-core/seatunnel-core-starter/README.md 
b/seatunnel-core/seatunnel-core-starter/README.md
index 835a5b1992..7420c941b9 100644
--- a/seatunnel-core/seatunnel-core-starter/README.md
+++ b/seatunnel-core/seatunnel-core-starter/README.md
@@ -2,7 +2,7 @@
 
 This module is the base start module for SeaTunnel new connector API.
 
-![seatunnel_architecture.png](../../docs/en/images/seatunnel_architecture.png)
+![seatunnel_architecture.png](../../docs/images/seatunnel_architecture.png)
 
 # SeaTunnel Job Execute Process
 
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/ResourceIsolationIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/ResourceIsolationIT.java
new file mode 100644
index 0000000000..266d4ff018
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/ResourceIsolationIT.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e.resourceIsolation;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class ResourceIsolationIT extends TestSuiteBase {
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "only work on Zeta")
+    public void testTagMatch(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/resource-isolation/fakesource_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "only work on Zeta")
+    public void testTagNotMatch(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/resource-isolation/fakesource_to_console_tag_not_match.conf");
+        Assertions.assertNotEquals(0, execResult.getExitCode());
+        Assertions.assertTrue(
+                StringUtils.isNotBlank(execResult.getStderr())
+                        && execResult
+                                .getStderr()
+                                .contains(
+                                        
"org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException"));
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
index c28e9c94d5..7bd3fdf8f2 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
@@ -41,3 +41,10 @@ hazelcast:
     hazelcast.slow.operation.detector.stacktrace.logging.enabled: true
     hazelcast.logging.type: log4j2
     hazelcast.operation.generic.thread.count: 200
+  member-attributes:
+    group:
+      type: string
+      value: platform
+    team:
+      type: string
+      value: team1
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console.conf
similarity index 59%
copy from 
seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console.conf
index 64376c9a44..c960427899 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console.conf
@@ -14,28 +14,34 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
 
-hazelcast:
-  cluster-name: seatunnel
-  network:
-    rest-api:
-      enabled: true
-      endpoint-groups:
-        CLUSTER_WRITE:
-          enabled: true
-        DATA:
-          enabled: true
-    join:
-      tcp-ip:
-        enabled: true
-        member-list:
-          - localhost
-    port:
-      auto-increment: true
-      port-count: 100
-      port: 5801
-  properties:
-    hazelcast.invocation.max.retry.count: 20
-    hazelcast.tcp.join.port.try.count: 30
-    hazelcast.logging.type: log4j2
-    hazelcast.operation.generic.thread.count: 50
+env {
+  job.mode = "BATCH"
+  tag_filter {
+    group = "platform"
+    team = "team1"
+  }
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    schema {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+sink {
+  console {
+  }
+
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console_tag_not_match.conf
similarity index 59%
copy from 
seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console_tag_not_match.conf
index 64376c9a44..9952fde5d4 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console_tag_not_match.conf
@@ -14,28 +14,34 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
 
-hazelcast:
-  cluster-name: seatunnel
-  network:
-    rest-api:
-      enabled: true
-      endpoint-groups:
-        CLUSTER_WRITE:
-          enabled: true
-        DATA:
-          enabled: true
-    join:
-      tcp-ip:
-        enabled: true
-        member-list:
-          - localhost
-    port:
-      auto-increment: true
-      port-count: 100
-      port: 5801
-  properties:
-    hazelcast.invocation.max.retry.count: 20
-    hazelcast.tcp.join.port.try.count: 30
-    hazelcast.logging.type: log4j2
-    hazelcast.operation.generic.thread.count: 50
+env {
+  job.mode = "BATCH"
+  tag_filter {
+    group = "error_tag"
+    team = "error_tag"
+  }
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    schema {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+sink {
+  console {
+  }
+
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml 
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
index 64376c9a44..0b48069c3e 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
@@ -38,4 +38,4 @@ hazelcast:
     hazelcast.invocation.max.retry.count: 20
     hazelcast.tcp.join.port.try.count: 30
     hazelcast.logging.type: log4j2
-    hazelcast.operation.generic.thread.count: 50
+    hazelcast.operation.generic.thread.count: 50
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index c1ff66c0d3..172bbbff5f 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -276,12 +276,7 @@ public class MultipleTableJobConfigParser {
                 || 
jobConfig.getName().equals(EnvCommonOptions.JOB_NAME.defaultValue())) {
             jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME));
         }
-        envOptions
-                .toMap()
-                .forEach(
-                        (k, v) -> {
-                            jobConfig.getEnvOptions().put(k, v);
-                        });
+        jobConfig.getEnvOptions().putAll(envOptions.getSourceMap());
     }
 
     private static <T extends Factory> boolean isFallback(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 62f83b53b9..2a18984a95 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -150,7 +151,12 @@ public class PhysicalPlanGenerator {
     }
 
     public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
-
+        Map<String, String> tagFilter =
+                (Map<String, String>)
+                        jobImmutableInformation
+                                .getJobConfig()
+                                .getEnvOptions()
+                                .get(EnvCommonOptions.NODE_TAG_FILTER.key());
         // TODO Determine which tasks do not need to be restored according to 
state
         CopyOnWriteArrayList<PassiveCompletableFuture<PipelineStatus>>
                 waitForCompleteBySubPlanList = new CopyOnWriteArrayList<>();
@@ -205,7 +211,8 @@ public class PhysicalPlanGenerator {
                                             jobImmutableInformation,
                                             executorService,
                                             runningJobStateIMap,
-                                            runningJobStateTimestampsIMap);
+                                            runningJobStateTimestampsIMap,
+                                            tagFilter);
                                 });
 
         PhysicalPlan physicalPlan =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
index c6bbc48b89..cab3e8fa99 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
@@ -42,14 +42,16 @@ public class ResourceUtils {
                         coordinator ->
                                 futures.put(
                                         coordinator.getTaskGroupLocation(),
-                                        applyResourceForTask(resourceManager, 
coordinator)));
+                                        applyResourceForTask(
+                                                resourceManager, coordinator, 
subPlan.getTags())));
 
         subPlan.getPhysicalVertexList()
                 .forEach(
                         task ->
                                 futures.put(
                                         task.getTaskGroupLocation(),
-                                        applyResourceForTask(resourceManager, 
task)));
+                                        applyResourceForTask(
+                                                resourceManager, task, 
subPlan.getTags())));
 
         futures.forEach(
                 (key, value) -> {
@@ -68,9 +70,9 @@ public class ResourceUtils {
     }
 
     public static CompletableFuture<SlotProfile> applyResourceForTask(
-            ResourceManager resourceManager, PhysicalVertex task) {
+            ResourceManager resourceManager, PhysicalVertex task, Map<String, 
String> tags) {
         // TODO custom resource size
         return resourceManager.applyResource(
-                task.getTaskGroupLocation().getJobId(), new ResourceProfile());
+                task.getTaskGroupLocation().getJobId(), new ResourceProfile(), 
tags);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index f845294bbb..54f795db92 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -69,6 +69,7 @@ public class SubPlan {
     private final String pipelineFullName;
 
     private final IMap<Object, Object> runningJobStateIMap;
+    private final Map<String, String> tags;
 
     /**
      * Timestamps (in milliseconds) as returned by {@code 
System.currentTimeMillis()} when the
@@ -114,7 +115,8 @@ public class SubPlan {
             @NonNull JobImmutableInformation jobImmutableInformation,
             @NonNull ExecutorService executorService,
             @NonNull IMap runningJobStateIMap,
-            @NonNull IMap runningJobStateTimestampsIMap) {
+            @NonNull IMap runningJobStateTimestampsIMap,
+            Map<String, String> tags) {
         this.pipelineId = pipelineId;
         this.pipelineLocation =
                 new PipelineLocation(jobImmutableInformation.getJobId(), 
pipelineId);
@@ -158,6 +160,7 @@ public class SubPlan {
         this.runningJobStateIMap = runningJobStateIMap;
         this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
         this.executorService = executorService;
+        this.tags = tags;
     }
 
     public synchronized PassiveCompletableFuture<PipelineExecutionState> 
initStateFuture() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 9d9b9d9a76..e7d709750e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -89,10 +90,11 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
     }
 
     @Override
-    public CompletableFuture<SlotProfile> applyResource(long jobId, 
ResourceProfile resourceProfile)
+    public CompletableFuture<SlotProfile> applyResource(
+            long jobId, ResourceProfile resourceProfile, Map<String, String> 
tagFilter)
             throws NoEnoughResourceException {
         CompletableFuture<SlotProfile> completableFuture = new 
CompletableFuture<>();
-        applyResources(jobId, Collections.singletonList(resourceProfile))
+        applyResources(jobId, Collections.singletonList(resourceProfile), 
tagFilter)
                 .whenComplete(
                         (profile, error) -> {
                             if (error != null) {
@@ -129,9 +131,44 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
 
     @Override
     public CompletableFuture<List<SlotProfile>> applyResources(
-            long jobId, List<ResourceProfile> resourceProfile) throws 
NoEnoughResourceException {
+            long jobId, List<ResourceProfile> resourceProfile, Map<String, 
String> tagFilter)
+            throws NoEnoughResourceException {
         waitingWorkerRegister();
-        return new ResourceRequestHandler(jobId, resourceProfile, 
registerWorker, this).request();
+        ConcurrentMap<Address, WorkerProfile> matchedWorker;
+        if (tagFilter == null || tagFilter.isEmpty()) {
+            matchedWorker = registerWorker;
+        } else {
+            matchedWorker =
+                    registerWorker.entrySet().stream()
+                            .filter(
+                                    e -> {
+                                        Map<String, String> workerAttr =
+                                                e.getValue().getAttributes();
+                                        if (workerAttr == null || 
workerAttr.isEmpty()) {
+                                            return false;
+                                        }
+                                        boolean match = true;
+                                        for (Map.Entry<String, String> entry :
+                                                tagFilter.entrySet()) {
+                                            if 
(!workerAttr.containsKey(entry.getKey())
+                                                    || !workerAttr
+                                                            
.get(entry.getKey())
+                                                            
.equals(entry.getValue())) {
+                                                return false;
+                                            }
+                                        }
+                                        return match;
+                                    })
+                            .collect(
+                                    Collectors.toConcurrentMap(
+                                            Map.Entry::getKey, 
Map.Entry::getValue));
+        }
+        if (matchedWorker.isEmpty()) {
+            log.error("No matched worker with tag filter {}.", tagFilter);
+            throw new NoEnoughResourceException();
+        }
+        return new ResourceRequestHandler(jobId, resourceProfile, 
matchedWorker, this)
+                .request(tagFilter);
     }
 
     protected boolean supportDynamicWorker() {
@@ -143,7 +180,8 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
      *
      * @param resourceProfiles the worker should have resource profile list
      */
-    protected void findNewWorker(List<ResourceProfile> resourceProfiles) {
+    protected void findNewWorker(
+            List<ResourceProfile> resourceProfiles, Map<String, String> 
tagFilter) {
         throw new UnsupportedOperationException(
                 "Unsupported operation to find new worker in " + 
this.getClass().getName());
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index 8a04b21e4b..8e222b0682 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -24,16 +24,19 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 import com.hazelcast.internal.services.MembershipServiceEvent;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 public interface ResourceManager {
     void init();
 
-    CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile 
resourceProfile)
+    CompletableFuture<SlotProfile> applyResource(
+            long jobId, ResourceProfile resourceProfile, Map<String, String> 
tagFilter)
             throws NoEnoughResourceException;
 
     CompletableFuture<List<SlotProfile>> applyResources(
-            long jobId, List<ResourceProfile> resourceProfile) throws 
NoEnoughResourceException;
+            long jobId, List<ResourceProfile> resourceProfile, Map<String, 
String> tagFilter)
+            throws NoEnoughResourceException;
 
     CompletableFuture<Void> releaseResources(long jobId, List<SlotProfile> 
profiles);
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index 680aa1c07c..0af3738a4a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -72,7 +73,7 @@ public class ResourceRequestHandler {
         this.resourceManager = resourceManager;
     }
 
-    public CompletableFuture<List<SlotProfile>> request() {
+    public CompletableFuture<List<SlotProfile>> request(Map<String, String> 
tags) {
         List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new 
ArrayList<>();
         for (int i = 0; i < resourceProfile.size(); i++) {
             ResourceProfile r = resourceProfile.get(i);
@@ -96,7 +97,7 @@ public class ResourceRequestHandler {
                                     if (resultSlotProfiles.size() < 
resourceProfile.size()) {
                                         // meaning have some slot not request 
success
                                         if 
(resourceManager.supportDynamicWorker()) {
-                                            applyByDynamicWorker();
+                                            applyByDynamicWorker(tags);
                                         } else {
                                             completeRequestWithException(
                                                     new 
NoEnoughResourceException(
@@ -188,7 +189,7 @@ public class ResourceRequestHandler {
      * third-party resource management to create a new worker, and then 
complete the resource
      * application
      */
-    private void applyByDynamicWorker() {
+    private void applyByDynamicWorker(Map<String, String> tags) {
         List<ResourceProfile> needApplyResource = new ArrayList<>();
         List<Integer> needApplyIndex = new ArrayList<>();
         for (int i = 0; i < resultSlotProfiles.size(); i++) {
@@ -197,9 +198,9 @@ public class ResourceRequestHandler {
                 needApplyIndex.add(i);
             }
         }
-        resourceManager.findNewWorker(needApplyResource);
+        resourceManager.findNewWorker(needApplyResource, tags);
         resourceManager
-                .applyResources(jobId, needApplyResource)
+                .applyResources(jobId, needApplyResource, tags)
                 .whenComplete(
                         withTryCatch(
                                 LOGGER,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index 836b25201e..291df1f1f8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -29,6 +29,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Used to describe the status of the current Worker, including address and 
resource assign status
@@ -47,6 +48,8 @@ public class WorkerProfile implements 
IdentifiedDataSerializable {
 
     private SlotProfile[] unassignedSlots;
 
+    private Map<String, String> attributes;
+
     public WorkerProfile(Address address) {
         this.address = address;
         this.unassignedResource = new ResourceProfile();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index a01ddfd79b..250a6f2eb4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -258,6 +258,7 @@ public class DefaultSlotService implements SlotService {
         workerProfile.setAssignedSlots(assignedSlots.values().toArray(new 
SlotProfile[0]));
         workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new 
SlotProfile[0]));
         workerProfile.setUnassignedResource(unassignedResource.get());
+        
workerProfile.setAttributes(nodeEngine.getLocalMember().getAttributes());
         return workerProfile;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 234fd20c8b..434e76132d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -52,33 +52,7 @@ public abstract class AbstractSeaTunnelServerTest<T extends 
AbstractSeaTunnelSer
     @BeforeAll
     public void before() {
         String name = ((T) this).getClass().getName();
-        String yaml =
-                "hazelcast:\n"
-                        + "  cluster-name: seatunnel\n"
-                        + "  network:\n"
-                        + "    rest-api:\n"
-                        + "      enabled: true\n"
-                        + "      endpoint-groups:\n"
-                        + "        CLUSTER_WRITE:\n"
-                        + "          enabled: true\n"
-                        + "    join:\n"
-                        + "      tcp-ip:\n"
-                        + "        enabled: true\n"
-                        + "        member-list:\n"
-                        + "          - localhost\n"
-                        + "    port:\n"
-                        + "      auto-increment: true\n"
-                        + "      port-count: 100\n"
-                        + "      port: 5801\n"
-                        + "\n"
-                        + "  properties:\n"
-                        + "    hazelcast.invocation.max.retry.count: 200\n"
-                        + "    hazelcast.tcp.join.port.try.count: 30\n"
-                        + "    hazelcast.invocation.retry.pause.millis: 2000\n"
-                        + "    
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
-                        + "    hazelcast.logging.type: log4j2\n"
-                        + "    hazelcast.operation.generic.thread.count: 
200\n";
-        Config hazelcastConfig = Config.loadFromString(yaml);
+        Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
         hazelcastConfig.setClusterName(
                 TestUtils.getClusterName("AbstractSeaTunnelServerTest_" + 
name));
         SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
@@ -89,6 +63,34 @@ public abstract class AbstractSeaTunnelServerTest<T extends 
AbstractSeaTunnelSer
         LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
     }
 
+    protected String getHazelcastConfig() {
+        return "hazelcast:\n"
+                + "  cluster-name: seatunnel\n"
+                + "  network:\n"
+                + "    rest-api:\n"
+                + "      enabled: true\n"
+                + "      endpoint-groups:\n"
+                + "        CLUSTER_WRITE:\n"
+                + "          enabled: true\n"
+                + "    join:\n"
+                + "      tcp-ip:\n"
+                + "        enabled: true\n"
+                + "        member-list:\n"
+                + "          - localhost\n"
+                + "    port:\n"
+                + "      auto-increment: true\n"
+                + "      port-count: 100\n"
+                + "      port: 5801\n"
+                + "\n"
+                + "  properties:\n"
+                + "    hazelcast.invocation.max.retry.count: 200\n"
+                + "    hazelcast.tcp.join.port.try.count: 30\n"
+                + "    hazelcast.invocation.retry.pause.millis: 2000\n"
+                + "    
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+                + "    hazelcast.logging.type: log4j2\n"
+                + "    hazelcast.operation.generic.thread.count: 200\n";
+    }
+
     public SeaTunnelConfig loadSeaTunnelConfig() {
         return ConfigProvider.locateAndGetSeaTunnelConfig();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
index 9c8595e167..e8ba7ed94a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
@@ -28,6 +28,7 @@ import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.net.UnknownHostException;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
 /** Used to test ResourceManager, override init method to register more 
workers. */
@@ -47,7 +48,8 @@ public class FakeResourceManager extends 
AbstractResourceManager {
                             new ResourceProfile(),
                             new ResourceProfile(),
                             new SlotProfile[] {},
-                            new SlotProfile[] {});
+                            new SlotProfile[] {},
+                            Collections.emptyMap());
             this.registerWorker.put(address1, workerProfile1);
 
             Address address2 = new Address("localhost", 5802);
@@ -57,7 +59,8 @@ public class FakeResourceManager extends 
AbstractResourceManager {
                             new ResourceProfile(),
                             new ResourceProfile(),
                             new SlotProfile[] {},
-                            new SlotProfile[] {});
+                            new SlotProfile[] {},
+                            Collections.emptyMap());
             this.registerWorker.put(address2, workerProfile2);
             Address address3 = new Address("localhost", 5803);
             WorkerProfile workerProfile3 =
@@ -66,7 +69,8 @@ public class FakeResourceManager extends 
AbstractResourceManager {
                             new ResourceProfile(),
                             new ResourceProfile(),
                             new SlotProfile[] {},
-                            new SlotProfile[] {});
+                            new SlotProfile[] {},
+                            Collections.emptyMap());
             this.registerWorker.put(address3, workerProfile3);
         } catch (UnknownHostException e) {
             throw new RuntimeException(e);
@@ -84,7 +88,8 @@ public class FakeResourceManager extends 
AbstractResourceManager {
                                             new ResourceProfile(),
                                             new ResourceProfile(),
                                             new SlotProfile[] {},
-                                            new SlotProfile[] {}),
+                                            new SlotProfile[] {},
+                                            Collections.emptyMap()),
                                     new SlotProfile(address, 1, new 
ResourceProfile(), "")));
         } else {
             return super.sendToMember(operation, address);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
index cf67ec8de0..cbba82dda8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
@@ -56,7 +56,7 @@ public class FixSlotResourceTest extends 
AbstractSeaTunnelServerTest<FixSlotReso
         List<SlotProfile> slotProfiles =
                 server.getCoordinatorService()
                         .getResourceManager()
-                        .applyResources(jobId, resourceProfiles)
+                        .applyResources(jobId, resourceProfiles, null)
                         .get();
         Assertions.assertEquals(slotProfiles.size(), 3);
         
server.getCoordinatorService().getResourceManager().releaseResources(jobId, 
slotProfiles);
@@ -73,7 +73,7 @@ public class FixSlotResourceTest extends 
AbstractSeaTunnelServerTest<FixSlotReso
         try {
             server.getCoordinatorService()
                     .getResourceManager()
-                    .applyResources(jobId, resourceProfiles)
+                    .applyResources(jobId, resourceProfiles, null)
                     .get();
         } catch (ExecutionException e) {
             
Assertions.assertTrue(e.getMessage().contains("NoEnoughResourceException"));
@@ -93,7 +93,7 @@ public class FixSlotResourceTest extends 
AbstractSeaTunnelServerTest<FixSlotReso
         List<SlotProfile> slotProfiles =
                 server.getCoordinatorService()
                         .getResourceManager()
-                        .applyResources(jobId, resourceProfiles)
+                        .applyResources(jobId, resourceProfiles, null)
                         .get();
         Assertions.assertEquals(slotProfiles.size(), 3);
         
server.getCoordinatorService().getResourceManager().releaseResources(jobId, 
slotProfiles);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
index 9244283889..2589e6530c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
@@ -55,7 +55,7 @@ public class ResourceManagerTest extends 
AbstractSeaTunnelServerTest<ResourceMan
         resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(200)));
         resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(300)));
         List<SlotProfile> slotProfiles =
-                resourceManager.applyResources(jobId, resourceProfiles).get();
+                resourceManager.applyResources(jobId, resourceProfiles, 
null).get();
 
         Assertions.assertEquals(
                 resourceProfiles.get(0).getHeapMemory().getBytes(),
@@ -78,7 +78,8 @@ public class ResourceManagerTest extends 
AbstractSeaTunnelServerTest<ResourceMan
                         resourceManager
                                 .applyResource(
                                         jobId,
-                                        new ResourceProfile(CPU.of(0), 
Memory.of(Long.MAX_VALUE)))
+                                        new ResourceProfile(CPU.of(0), 
Memory.of(Long.MAX_VALUE)),
+                                        null)
                                 .get());
     }
 
@@ -93,7 +94,8 @@ public class ResourceManagerTest extends 
AbstractSeaTunnelServerTest<ResourceMan
         resourceProfiles.add(new ResourceProfile());
         resourceProfiles.add(new ResourceProfile());
         resourceProfiles.add(new ResourceProfile());
-        List<SlotProfile> slotProfiles = resourceManager.applyResources(1L, 
resourceProfiles).get();
+        List<SlotProfile> slotProfiles =
+                resourceManager.applyResources(1L, resourceProfiles, 
null).get();
         Assertions.assertEquals(slotProfiles.size(), 5);
 
         boolean hasDifferentWorker = false;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/WorkerTagTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/WorkerTagTest.java
new file mode 100644
index 0000000000..3eb27f2451
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/WorkerTagTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
+import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class WorkerTagTest extends AbstractSeaTunnelServerTest<WorkerTagTest> {
+
+    private ResourceManager resourceManager;
+
+    private final long jobId = 5;
+
+    @BeforeAll
+    public void before() {
+        super.before();
+        resourceManager = server.getCoordinatorService().getResourceManager();
+        server.getSlotService();
+    }
+
+    @Override
+    protected String getHazelcastConfig() {
+        // for the use case not set node attribute, it tested in 
ResourceManagerTest and
+        // FixSlotResourceTest
+        return "hazelcast:\n"
+                + "  cluster-name: seatunnel\n"
+                + "  network:\n"
+                + "    rest-api:\n"
+                + "      enabled: true\n"
+                + "      endpoint-groups:\n"
+                + "        CLUSTER_WRITE:\n"
+                + "          enabled: true\n"
+                + "    join:\n"
+                + "      tcp-ip:\n"
+                + "        enabled: true\n"
+                + "        member-list:\n"
+                + "          - localhost\n"
+                + "    port:\n"
+                + "      auto-increment: true\n"
+                + "      port-count: 100\n"
+                + "      port: 5801\n"
+                + "\n"
+                + "  properties:\n"
+                + "    hazelcast.invocation.max.retry.count: 200\n"
+                + "    hazelcast.tcp.join.port.try.count: 30\n"
+                + "    hazelcast.invocation.retry.pause.millis: 2000\n"
+                + "    
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+                + "    hazelcast.logging.type: log4j2\n"
+                + "    hazelcast.operation.generic.thread.count: 200\n"
+                + "  member-attributes:\n"
+                + "    group:\n"
+                + "      type: string\n"
+                + "      value: platform\n"
+                + "    team:\n"
+                + "      type: string\n"
+                + "      value: team1";
+    }
+
+    @Test
+    public void testTagMatch() {
+        Map<String, String> tag = new HashMap<>();
+        tag.put("group", "platform");
+        tag.put("team", "team1");
+        Assertions.assertDoesNotThrow(() -> testApplyResourceByTag(tag));
+    }
+
+    @Test
+    public void testNullTag() {
+        Assertions.assertDoesNotThrow(() -> testApplyResourceByTag(null));
+    }
+
+    @Test
+    public void testTagNotMatch() {
+        Map<String, String> tag = new HashMap<>();
+        tag.put("group", "platform");
+        tag.put("team", "team2");
+        Assertions.assertThrows(NoEnoughResourceException.class, () -> 
testApplyResourceByTag(tag));
+    }
+
+    private void testApplyResourceByTag(Map<String, String> tag)
+            throws ExecutionException, InterruptedException {
+        List<ResourceProfile> resourceProfiles = new ArrayList<>();
+        resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(100)));
+        List<SlotProfile> slotProfiles =
+                resourceManager.applyResources(jobId, resourceProfiles, 
tag).get();
+
+        Assertions.assertEquals(
+                resourceProfiles.get(0).getHeapMemory().getBytes(),
+                
slotProfiles.get(0).getResourceProfile().getHeapMemory().getBytes());
+
+        resourceManager.releaseResources(jobId, slotProfiles).get();
+    }
+}
diff --git a/tools/documents/sync.sh b/tools/documents/sync.sh
index f8549e6023..cd67231a81 100644
--- a/tools/documents/sync.sh
+++ b/tools/documents/sync.sh
@@ -20,7 +20,7 @@
 set -euo pipefail
 
 PR_DIR=$1
-PR_IMG_DIR="${PR_DIR}/docs/en/images"
+PR_IMG_DIR="${PR_DIR}/docs/images"
 PR_DOC_DIR="${PR_DIR}/docs/en"
 PR_SIDEBAR_PATH="${PR_DIR}/docs/sidebars.js"
 

Reply via email to