This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 3af6b2f2c6 [Feature][Zeta] Add tag to node used to filter worker when submit job (#7045) 3af6b2f2c6 is described below commit 3af6b2f2c6be41364661bd6872bd99e139986877 Author: Jarvis <liunaijie1...@163.com> AuthorDate: Wed Jun 26 20:39:14 2024 +0800 [Feature][Zeta] Add tag to node used to filter worker when submit job (#7045) --- README.md | 2 +- docs/en/about.md | 2 +- docs/en/faq.md | 6 +- docs/en/seatunnel-engine/resource-isolation.md | 83 ++++++++++++++ docs/{en => }/images/architecture_diagram.png | Bin docs/{en => }/images/azkaban.png | Bin docs/{en => }/images/checkstyle.png | Bin docs/{en => }/images/kafka.png | Bin docs/images/resource-isolation.png | Bin 0 -> 69956 bytes docs/{en => }/images/seatunnel-workflow.svg | 0 docs/{en => }/images/seatunnel_architecture.png | Bin docs/{en => }/images/seatunnel_starter.png | Bin docs/{en => }/images/workflow.png | Bin docs/sidebars.js | 3 +- docs/zh/about.md | 2 +- docs/zh/faq.md | 6 +- docs/zh/images/architecture_diagram.png | Bin 77929 -> 0 bytes docs/zh/images/azkaban.png | Bin 732486 -> 0 bytes docs/zh/images/checkstyle.png | Bin 479660 -> 0 bytes docs/zh/images/kafka.png | Bin 32151 -> 0 bytes docs/zh/images/seatunnel-workflow.svg | 4 - docs/zh/images/seatunnel_architecture.png | Bin 778394 -> 0 bytes docs/zh/images/seatunnel_starter.png | Bin 423840 -> 0 bytes docs/zh/images/workflow.png | Bin 258921 -> 0 bytes docs/zh/seatunnel-engine/resource-isolation.md | 83 ++++++++++++++ .../api/configuration/ReadonlyConfig.java | 6 +- .../apache/seatunnel/api/env/EnvCommonOptions.java | 6 ++ seatunnel-core/README.md | 2 +- seatunnel-core/seatunnel-core-starter/README.md | 2 +- .../e2e/resourceIsolation/ResourceIsolationIT.java | 64 +++++++++++ .../src/test/resources/hazelcast.yaml | 7 ++ .../resource-isolation/fakesource_to_console.conf | 54 +++++----- .../fakesource_to_console_tag_not_match.conf | 54 +++++----- .../src/main/resources/hazelcast.yaml | 2 +- .../core/parse/MultipleTableJobConfigParser.java | 7 +- .../server/dag/physical/PhysicalPlanGenerator.java | 11 +- .../engine/server/dag/physical/ResourceUtils.java | 10 +- .../engine/server/dag/physical/SubPlan.java | 5 +- .../resourcemanager/AbstractResourceManager.java | 48 ++++++++- .../server/resourcemanager/ResourceManager.java | 7 +- .../resourcemanager/ResourceRequestHandler.java | 11 +- .../resourcemanager/worker/WorkerProfile.java | 3 + .../server/service/slot/DefaultSlotService.java | 1 + .../engine/server/AbstractSeaTunnelServerTest.java | 56 +++++----- .../resourcemanager/FakeResourceManager.java | 13 ++- .../resourcemanager/FixSlotResourceTest.java | 6 +- .../resourcemanager/ResourceManagerTest.java | 8 +- .../server/resourcemanager/WorkerTagTest.java | 120 +++++++++++++++++++++ tools/documents/sync.sh | 2 +- 49 files changed, 567 insertions(+), 129 deletions(-) diff --git a/README.md b/README.md index 6e3d5d8e0f..5fa0d25501 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ SeaTunnel addresses common data integration challenges: ## SeaTunnel Workflow - + Configure jobs, select execution engines, and parallelize data using Source Connectors. Easily develop and extend connectors to meet your needs. diff --git a/docs/en/about.md b/docs/en/about.md index 38a5fe9545..5164dc081c 100644 --- a/docs/en/about.md +++ b/docs/en/about.md @@ -34,7 +34,7 @@ SeaTunnel focuses on data integration and data synchronization, and is mainly de ## SeaTunnel work flowchart - + The runtime process of SeaTunnel is shown in the figure above. diff --git a/docs/en/faq.md b/docs/en/faq.md index 7ff275f381..953cc2a956 100644 --- a/docs/en/faq.md +++ b/docs/en/faq.md @@ -65,9 +65,9 @@ Refer to: [lightbend/config#456](https://github.com/lightbend/config/issues/456) Of course! See the screenshot below: - + - + ## Does SeaTunnel have a case for configuring multiple sources, such as configuring elasticsearch and hdfs in source at the same time? @@ -184,7 +184,7 @@ The following conclusions can be drawn: 3. In general, both M and N are determined, and the conclusion can be drawn from 2: The size of `spark.streaming.kafka.maxRatePerPartition` is positively correlated with the size of `spark.executor.cores` * `spark.executor.instances`, and it can be increased while increasing the resource `maxRatePerPartition` to speed up consumption. - + ## How can I solve the Error `Exception in thread "main" java.lang.NoSuchFieldError: INSTANCE`? diff --git a/docs/en/seatunnel-engine/resource-isolation.md b/docs/en/seatunnel-engine/resource-isolation.md new file mode 100644 index 0000000000..f123e80982 --- /dev/null +++ b/docs/en/seatunnel-engine/resource-isolation.md @@ -0,0 +1,83 @@ +--- + +sidebar_position: 9 +------------------- + +After version 2.3.6. SeaTunnel can add `tag` to each worker node, when you submit job you can use `tag_filter` to filter the node you want run this job. + +# How to archive this: + +1. update the config in `hazelcast.yaml`, + +```yaml +hazelcast: + cluster-name: seatunnel + network: + rest-api: + enabled: true + endpoint-groups: + CLUSTER_WRITE: + enabled: true + DATA: + enabled: true + join: + tcp-ip: + enabled: true + member-list: + - localhost + port: + auto-increment: false + port: 5801 + properties: + hazelcast.invocation.max.retry.count: 20 + hazelcast.tcp.join.port.try.count: 30 + hazelcast.logging.type: log4j2 + hazelcast.operation.generic.thread.count: 50 + member-attributes: + group: + type: string + value: platform + team: + type: string + value: team1 +``` + +In this config, we specify the tag by `member-attributes`, the node has `group=platform, team=team1` tags. + +2. add `tag_filter` to your job config + +```hacon +env { + parallelism = 1 + job.mode = "BATCH" + tag_filter { + group = "platform" + team = "team1" + } +} +source { + FakeSource { + result_table_name = "fake" + parallelism = 1 + schema = { + fields { + name = "string" + } + } + } +} +transform { +} +sink { + console { + source_table_name="fake" + } +} +``` + +**Notice:** +- If not set `tag_filter` in job config, it will random choose the node in all active nodes. +- When you add multiple tag in `tag_filter`, it need all key exist and value match. if all node not match, you will get `NoEnoughResourceException` exception. + + + diff --git a/docs/en/images/architecture_diagram.png b/docs/images/architecture_diagram.png similarity index 100% rename from docs/en/images/architecture_diagram.png rename to docs/images/architecture_diagram.png diff --git a/docs/en/images/azkaban.png b/docs/images/azkaban.png similarity index 100% rename from docs/en/images/azkaban.png rename to docs/images/azkaban.png diff --git a/docs/en/images/checkstyle.png b/docs/images/checkstyle.png similarity index 100% rename from docs/en/images/checkstyle.png rename to docs/images/checkstyle.png diff --git a/docs/en/images/kafka.png b/docs/images/kafka.png similarity index 100% rename from docs/en/images/kafka.png rename to docs/images/kafka.png diff --git a/docs/images/resource-isolation.png b/docs/images/resource-isolation.png new file mode 100644 index 0000000000..3986cbfb59 Binary files /dev/null and b/docs/images/resource-isolation.png differ diff --git a/docs/en/images/seatunnel-workflow.svg b/docs/images/seatunnel-workflow.svg similarity index 100% rename from docs/en/images/seatunnel-workflow.svg rename to docs/images/seatunnel-workflow.svg diff --git a/docs/en/images/seatunnel_architecture.png b/docs/images/seatunnel_architecture.png similarity index 100% rename from docs/en/images/seatunnel_architecture.png rename to docs/images/seatunnel_architecture.png diff --git a/docs/en/images/seatunnel_starter.png b/docs/images/seatunnel_starter.png similarity index 100% rename from docs/en/images/seatunnel_starter.png rename to docs/images/seatunnel_starter.png diff --git a/docs/en/images/workflow.png b/docs/images/workflow.png similarity index 100% rename from docs/en/images/workflow.png rename to docs/images/workflow.png diff --git a/docs/sidebars.js b/docs/sidebars.js index ad1093689f..f07b198557 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -178,7 +178,8 @@ const sidebars = { "seatunnel-engine/checkpoint-storage", "seatunnel-engine/rest-api", "seatunnel-engine/tcp", - "seatunnel-engine/engine-jar-storage-mode" + "seatunnel-engine/engine-jar-storage-mode", + "seatunnel-engine/resource-isolation", ] }, { diff --git a/docs/zh/about.md b/docs/zh/about.md index 024c520852..ae789d4d7f 100644 --- a/docs/zh/about.md +++ b/docs/zh/about.md @@ -32,7 +32,7 @@ SeaTunnel专注于数据集成和数据同步,主要旨在解决数据集成 ## SeaTunnel work flowchart - + SeaTunnel的运行流程如上图所示。 diff --git a/docs/zh/faq.md b/docs/zh/faq.md index 8c836a3612..5fdb06c280 100644 --- a/docs/zh/faq.md +++ b/docs/zh/faq.md @@ -65,9 +65,9 @@ your string 1 当然! 请参阅下面的屏幕截图: - + - + ## SeaTunnel是否有配置多个源的情况,例如同时在源中配置elasticsearch和hdfs? @@ -185,7 +185,7 @@ sink { 3、一般来说,M和N都确定了,从2可以得出结论:`spark.streaming.kafka.maxRatePerPartition`的大小与`spark.executor.cores` * `spark的大小正相关 .executor.instances`,可以在增加资源`maxRatePerPartition`的同时增加,以加快消耗。 - + ## 如何解决错误 `Exception in thread "main" java.lang.NoSuchFieldError: INSTANCE`? diff --git a/docs/zh/images/architecture_diagram.png b/docs/zh/images/architecture_diagram.png deleted file mode 100644 index ce72254694..0000000000 Binary files a/docs/zh/images/architecture_diagram.png and /dev/null differ diff --git a/docs/zh/images/azkaban.png b/docs/zh/images/azkaban.png deleted file mode 100644 index 78780dce2d..0000000000 Binary files a/docs/zh/images/azkaban.png and /dev/null differ diff --git a/docs/zh/images/checkstyle.png b/docs/zh/images/checkstyle.png deleted file mode 100644 index 4cf8303e71..0000000000 Binary files a/docs/zh/images/checkstyle.png and /dev/null differ diff --git a/docs/zh/images/kafka.png b/docs/zh/images/kafka.png deleted file mode 100644 index 14b22ebcbe..0000000000 Binary files a/docs/zh/images/kafka.png and /dev/null differ diff --git a/docs/zh/images/seatunnel-workflow.svg b/docs/zh/images/seatunnel-workflow.svg deleted file mode 100644 index 7280e4a4c4..0000000000 --- a/docs/zh/images/seatunnel-workflow.svg +++ /dev/null @@ -1,4 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- Do not edit this file with editors other than diagrams.net --> -<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> -<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="622px" height="718px" viewBox="-0.5 -0.5 622 718" content="<mxfile host="Electron" modified="2021-12-30T15:17:57.852Z" agent="5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/15.4.0 Chrome/91.0.4472.164 Electron/13.5.0 Safari/537.36" etag="y11mgoacIhryQ4lqCp5C" version="15.4.0" type="device& [...] \ No newline at end of file diff --git a/docs/zh/images/seatunnel_architecture.png b/docs/zh/images/seatunnel_architecture.png deleted file mode 100644 index c96cb272e5..0000000000 Binary files a/docs/zh/images/seatunnel_architecture.png and /dev/null differ diff --git a/docs/zh/images/seatunnel_starter.png b/docs/zh/images/seatunnel_starter.png deleted file mode 100644 index 4d9700899a..0000000000 Binary files a/docs/zh/images/seatunnel_starter.png and /dev/null differ diff --git a/docs/zh/images/workflow.png b/docs/zh/images/workflow.png deleted file mode 100644 index 9ce48b8bfc..0000000000 Binary files a/docs/zh/images/workflow.png and /dev/null differ diff --git a/docs/zh/seatunnel-engine/resource-isolation.md b/docs/zh/seatunnel-engine/resource-isolation.md new file mode 100644 index 0000000000..ea09d6a892 --- /dev/null +++ b/docs/zh/seatunnel-engine/resource-isolation.md @@ -0,0 +1,83 @@ +--- + +sidebar_position: 9 +------------------- + +在2.3.6版本之后, SeaTunnel支持对每个实例添加`tag`, 然后在提交任务时可以在配置文件中使用`tag_filter`来选择任务将要运行的节点. + +# 如何实现改功能 + +1. 更新`hazelcast.yaml`文件 + +```yaml +hazelcast: + cluster-name: seatunnel + network: + rest-api: + enabled: true + endpoint-groups: + CLUSTER_WRITE: + enabled: true + DATA: + enabled: true + join: + tcp-ip: + enabled: true + member-list: + - localhost + port: + auto-increment: false + port: 5801 + properties: + hazelcast.invocation.max.retry.count: 20 + hazelcast.tcp.join.port.try.count: 30 + hazelcast.logging.type: log4j2 + hazelcast.operation.generic.thread.count: 50 + member-attributes: + group: + type: string + value: platform + team: + type: string + value: team1 +``` + +在这个配置中, 我们通过`member-attributes`设置了`group=platform, team=team1`这样两个`tag` + +2. 在任务的配置中添加`tag_filter`来选择你需要运行该任务的节点 + +```hacon +env { + parallelism = 1 + job.mode = "BATCH" + tag_filter { + group = "platform" + team = "team1" + } +} +source { + FakeSource { + result_table_name = "fake" + parallelism = 1 + schema = { + fields { + name = "string" + } + } + } +} +transform { +} +sink { + console { + source_table_name="fake" + } +} +``` + +**注意:** +- 当在任务的配置中, 没有添加`tag_filter`时, 会从所有节点中随机选择节点来运行任务. +- 当`tag_filter`中存在多个过滤条件时, 会根据key存在以及value相等的全部匹配的节点, 当没有找到匹配的节点时, 会抛出 `NoEnoughResourceException`异常. + + + diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java index 81d64500dc..0ac179ae76 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java @@ -69,8 +69,8 @@ public class ReadonlyConfig implements Serializable { /** * Transform to Config todo: This method should be removed after we remove Config * - * @deprecated Please use ReadonlyConfig directly * @return Config + * @deprecated Please use ReadonlyConfig directly */ @Deprecated public Config toConfig() { @@ -96,6 +96,10 @@ public class ReadonlyConfig implements Serializable { } } + public Map<String, Object> getSourceMap() { + return confData; + } + public <T> Optional<T> getOptional(Option<T> option) { if (option == null) { throw new NullPointerException("Option not be null."); diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java index cabf0856dc..1054b91e97 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java @@ -94,4 +94,10 @@ public interface EnvCommonOptions { .mapType() .noDefaultValue() .withDescription("custom parameters for run engine"); + + Option<Map<String, String>> NODE_TAG_FILTER = + Options.key("tag_filter") + .mapType() + .noDefaultValue() + .withDescription("Define the worker where the job runs by tag"); } diff --git a/seatunnel-core/README.md b/seatunnel-core/README.md index 5f666dcebc..ec9d027a65 100644 --- a/seatunnel-core/README.md +++ b/seatunnel-core/README.md @@ -1,7 +1,7 @@ # Introduction This module is the seatunnel job entrypoint. SeaTunnel jobs are started by the below process. - + - seatunnel-core-flink: The flink job starter. - seatunnel-core-flink-sql: The flink sql job starter. diff --git a/seatunnel-core/seatunnel-core-starter/README.md b/seatunnel-core/seatunnel-core-starter/README.md index 835a5b1992..7420c941b9 100644 --- a/seatunnel-core/seatunnel-core-starter/README.md +++ b/seatunnel-core/seatunnel-core-starter/README.md @@ -2,7 +2,7 @@ This module is the base start module for SeaTunnel new connector API. - + # SeaTunnel Job Execute Process diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/ResourceIsolationIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/ResourceIsolationIT.java new file mode 100644 index 0000000000..266d4ff018 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/ResourceIsolationIT.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e.resourceIsolation; + +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.apache.commons.lang3.StringUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +public class ResourceIsolationIT extends TestSuiteBase { + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "only work on Zeta") + public void testTagMatch(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/resource-isolation/fakesource_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "only work on Zeta") + public void testTagNotMatch(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob( + "/resource-isolation/fakesource_to_console_tag_not_match.conf"); + Assertions.assertNotEquals(0, execResult.getExitCode()); + Assertions.assertTrue( + StringUtils.isNotBlank(execResult.getStderr()) + && execResult + .getStderr() + .contains( + "org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException")); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml index c28e9c94d5..7bd3fdf8f2 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml @@ -41,3 +41,10 @@ hazelcast: hazelcast.slow.operation.detector.stacktrace.logging.enabled: true hazelcast.logging.type: log4j2 hazelcast.operation.generic.thread.count: 200 + member-attributes: + group: + type: string + value: platform + team: + type: string + value: team1 \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console.conf similarity index 59% copy from seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console.conf index 64376c9a44..c960427899 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console.conf @@ -14,28 +14,34 @@ # See the License for the specific language governing permissions and # limitations under the License. # +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### -hazelcast: - cluster-name: seatunnel - network: - rest-api: - enabled: true - endpoint-groups: - CLUSTER_WRITE: - enabled: true - DATA: - enabled: true - join: - tcp-ip: - enabled: true - member-list: - - localhost - port: - auto-increment: true - port-count: 100 - port: 5801 - properties: - hazelcast.invocation.max.retry.count: 20 - hazelcast.tcp.join.port.try.count: 30 - hazelcast.logging.type: log4j2 - hazelcast.operation.generic.thread.count: 50 +env { + job.mode = "BATCH" + tag_filter { + group = "platform" + team = "team1" + } +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + schema { + fields { + id = "int" + name = "string" + age = "int" + } + } + } +} + +sink { + console { + } + +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console_tag_not_match.conf similarity index 59% copy from seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console_tag_not_match.conf index 64376c9a44..9952fde5d4 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console_tag_not_match.conf @@ -14,28 +14,34 @@ # See the License for the specific language governing permissions and # limitations under the License. # +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### -hazelcast: - cluster-name: seatunnel - network: - rest-api: - enabled: true - endpoint-groups: - CLUSTER_WRITE: - enabled: true - DATA: - enabled: true - join: - tcp-ip: - enabled: true - member-list: - - localhost - port: - auto-increment: true - port-count: 100 - port: 5801 - properties: - hazelcast.invocation.max.retry.count: 20 - hazelcast.tcp.join.port.try.count: 30 - hazelcast.logging.type: log4j2 - hazelcast.operation.generic.thread.count: 50 +env { + job.mode = "BATCH" + tag_filter { + group = "error_tag" + team = "error_tag" + } +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + schema { + fields { + id = "int" + name = "string" + age = "int" + } + } + } +} + +sink { + console { + } + +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml index 64376c9a44..0b48069c3e 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml @@ -38,4 +38,4 @@ hazelcast: hazelcast.invocation.max.retry.count: 20 hazelcast.tcp.join.port.try.count: 30 hazelcast.logging.type: log4j2 - hazelcast.operation.generic.thread.count: 50 + hazelcast.operation.generic.thread.count: 50 \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index c1ff66c0d3..172bbbff5f 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -276,12 +276,7 @@ public class MultipleTableJobConfigParser { || jobConfig.getName().equals(EnvCommonOptions.JOB_NAME.defaultValue())) { jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME)); } - envOptions - .toMap() - .forEach( - (k, v) -> { - jobConfig.getEnvOptions().put(k, v); - }); + jobConfig.getEnvOptions().putAll(envOptions.getSourceMap()); } private static <T extends Factory> boolean isFallback( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java index 62f83b53b9..2a18984a95 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.engine.server.dag.physical; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.engine.common.config.server.QueueType; import org.apache.seatunnel.engine.common.utils.IdGenerator; @@ -150,7 +151,12 @@ public class PhysicalPlanGenerator { } public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() { - + Map<String, String> tagFilter = + (Map<String, String>) + jobImmutableInformation + .getJobConfig() + .getEnvOptions() + .get(EnvCommonOptions.NODE_TAG_FILTER.key()); // TODO Determine which tasks do not need to be restored according to state CopyOnWriteArrayList<PassiveCompletableFuture<PipelineStatus>> waitForCompleteBySubPlanList = new CopyOnWriteArrayList<>(); @@ -205,7 +211,8 @@ public class PhysicalPlanGenerator { jobImmutableInformation, executorService, runningJobStateIMap, - runningJobStateTimestampsIMap); + runningJobStateTimestampsIMap, + tagFilter); }); PhysicalPlan physicalPlan = diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java index c6bbc48b89..cab3e8fa99 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java @@ -42,14 +42,16 @@ public class ResourceUtils { coordinator -> futures.put( coordinator.getTaskGroupLocation(), - applyResourceForTask(resourceManager, coordinator))); + applyResourceForTask( + resourceManager, coordinator, subPlan.getTags()))); subPlan.getPhysicalVertexList() .forEach( task -> futures.put( task.getTaskGroupLocation(), - applyResourceForTask(resourceManager, task))); + applyResourceForTask( + resourceManager, task, subPlan.getTags()))); futures.forEach( (key, value) -> { @@ -68,9 +70,9 @@ public class ResourceUtils { } public static CompletableFuture<SlotProfile> applyResourceForTask( - ResourceManager resourceManager, PhysicalVertex task) { + ResourceManager resourceManager, PhysicalVertex task, Map<String, String> tags) { // TODO custom resource size return resourceManager.applyResource( - task.getTaskGroupLocation().getJobId(), new ResourceProfile()); + task.getTaskGroupLocation().getJobId(), new ResourceProfile(), tags); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java index f845294bbb..54f795db92 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java @@ -69,6 +69,7 @@ public class SubPlan { private final String pipelineFullName; private final IMap<Object, Object> runningJobStateIMap; + private final Map<String, String> tags; /** * Timestamps (in milliseconds) as returned by {@code System.currentTimeMillis()} when the @@ -114,7 +115,8 @@ public class SubPlan { @NonNull JobImmutableInformation jobImmutableInformation, @NonNull ExecutorService executorService, @NonNull IMap runningJobStateIMap, - @NonNull IMap runningJobStateTimestampsIMap) { + @NonNull IMap runningJobStateTimestampsIMap, + Map<String, String> tags) { this.pipelineId = pipelineId; this.pipelineLocation = new PipelineLocation(jobImmutableInformation.getJobId(), pipelineId); @@ -158,6 +160,7 @@ public class SubPlan { this.runningJobStateIMap = runningJobStateIMap; this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap; this.executorService = executorService; + this.tags = tags; } public synchronized PassiveCompletableFuture<PipelineExecutionState> initStateFuture() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java index 9d9b9d9a76..e7d709750e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -89,10 +90,11 @@ public abstract class AbstractResourceManager implements ResourceManager { } @Override - public CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile) + public CompletableFuture<SlotProfile> applyResource( + long jobId, ResourceProfile resourceProfile, Map<String, String> tagFilter) throws NoEnoughResourceException { CompletableFuture<SlotProfile> completableFuture = new CompletableFuture<>(); - applyResources(jobId, Collections.singletonList(resourceProfile)) + applyResources(jobId, Collections.singletonList(resourceProfile), tagFilter) .whenComplete( (profile, error) -> { if (error != null) { @@ -129,9 +131,44 @@ public abstract class AbstractResourceManager implements ResourceManager { @Override public CompletableFuture<List<SlotProfile>> applyResources( - long jobId, List<ResourceProfile> resourceProfile) throws NoEnoughResourceException { + long jobId, List<ResourceProfile> resourceProfile, Map<String, String> tagFilter) + throws NoEnoughResourceException { waitingWorkerRegister(); - return new ResourceRequestHandler(jobId, resourceProfile, registerWorker, this).request(); + ConcurrentMap<Address, WorkerProfile> matchedWorker; + if (tagFilter == null || tagFilter.isEmpty()) { + matchedWorker = registerWorker; + } else { + matchedWorker = + registerWorker.entrySet().stream() + .filter( + e -> { + Map<String, String> workerAttr = + e.getValue().getAttributes(); + if (workerAttr == null || workerAttr.isEmpty()) { + return false; + } + boolean match = true; + for (Map.Entry<String, String> entry : + tagFilter.entrySet()) { + if (!workerAttr.containsKey(entry.getKey()) + || !workerAttr + .get(entry.getKey()) + .equals(entry.getValue())) { + return false; + } + } + return match; + }) + .collect( + Collectors.toConcurrentMap( + Map.Entry::getKey, Map.Entry::getValue)); + } + if (matchedWorker.isEmpty()) { + log.error("No matched worker with tag filter {}.", tagFilter); + throw new NoEnoughResourceException(); + } + return new ResourceRequestHandler(jobId, resourceProfile, matchedWorker, this) + .request(tagFilter); } protected boolean supportDynamicWorker() { @@ -143,7 +180,8 @@ public abstract class AbstractResourceManager implements ResourceManager { * * @param resourceProfiles the worker should have resource profile list */ - protected void findNewWorker(List<ResourceProfile> resourceProfiles) { + protected void findNewWorker( + List<ResourceProfile> resourceProfiles, Map<String, String> tagFilter) { throw new UnsupportedOperationException( "Unsupported operation to find new worker in " + this.getClass().getName()); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java index 8a04b21e4b..8e222b0682 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java @@ -24,16 +24,19 @@ import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; import com.hazelcast.internal.services.MembershipServiceEvent; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; public interface ResourceManager { void init(); - CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile) + CompletableFuture<SlotProfile> applyResource( + long jobId, ResourceProfile resourceProfile, Map<String, String> tagFilter) throws NoEnoughResourceException; CompletableFuture<List<SlotProfile>> applyResources( - long jobId, List<ResourceProfile> resourceProfile) throws NoEnoughResourceException; + long jobId, List<ResourceProfile> resourceProfile, Map<String, String> tagFilter) + throws NoEnoughResourceException; CompletableFuture<Void> releaseResources(long jobId, List<SlotProfile> profiles); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java index 680aa1c07c..0af3738a4a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -72,7 +73,7 @@ public class ResourceRequestHandler { this.resourceManager = resourceManager; } - public CompletableFuture<List<SlotProfile>> request() { + public CompletableFuture<List<SlotProfile>> request(Map<String, String> tags) { List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new ArrayList<>(); for (int i = 0; i < resourceProfile.size(); i++) { ResourceProfile r = resourceProfile.get(i); @@ -96,7 +97,7 @@ public class ResourceRequestHandler { if (resultSlotProfiles.size() < resourceProfile.size()) { // meaning have some slot not request success if (resourceManager.supportDynamicWorker()) { - applyByDynamicWorker(); + applyByDynamicWorker(tags); } else { completeRequestWithException( new NoEnoughResourceException( @@ -188,7 +189,7 @@ public class ResourceRequestHandler { * third-party resource management to create a new worker, and then complete the resource * application */ - private void applyByDynamicWorker() { + private void applyByDynamicWorker(Map<String, String> tags) { List<ResourceProfile> needApplyResource = new ArrayList<>(); List<Integer> needApplyIndex = new ArrayList<>(); for (int i = 0; i < resultSlotProfiles.size(); i++) { @@ -197,9 +198,9 @@ public class ResourceRequestHandler { needApplyIndex.add(i); } } - resourceManager.findNewWorker(needApplyResource); + resourceManager.findNewWorker(needApplyResource, tags); resourceManager - .applyResources(jobId, needApplyResource) + .applyResources(jobId, needApplyResource, tags) .whenComplete( withTryCatch( LOGGER, diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java index 836b25201e..291df1f1f8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java @@ -29,6 +29,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import java.io.IOException; +import java.util.Map; /** * Used to describe the status of the current Worker, including address and resource assign status @@ -47,6 +48,8 @@ public class WorkerProfile implements IdentifiedDataSerializable { private SlotProfile[] unassignedSlots; + private Map<String, String> attributes; + public WorkerProfile(Address address) { this.address = address; this.unassignedResource = new ResourceProfile(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java index a01ddfd79b..250a6f2eb4 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java @@ -258,6 +258,7 @@ public class DefaultSlotService implements SlotService { workerProfile.setAssignedSlots(assignedSlots.values().toArray(new SlotProfile[0])); workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new SlotProfile[0])); workerProfile.setUnassignedResource(unassignedResource.get()); + workerProfile.setAttributes(nodeEngine.getLocalMember().getAttributes()); return workerProfile; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java index 234fd20c8b..434e76132d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java @@ -52,33 +52,7 @@ public abstract class AbstractSeaTunnelServerTest<T extends AbstractSeaTunnelSer @BeforeAll public void before() { String name = ((T) this).getClass().getName(); - String yaml = - "hazelcast:\n" - + " cluster-name: seatunnel\n" - + " network:\n" - + " rest-api:\n" - + " enabled: true\n" - + " endpoint-groups:\n" - + " CLUSTER_WRITE:\n" - + " enabled: true\n" - + " join:\n" - + " tcp-ip:\n" - + " enabled: true\n" - + " member-list:\n" - + " - localhost\n" - + " port:\n" - + " auto-increment: true\n" - + " port-count: 100\n" - + " port: 5801\n" - + "\n" - + " properties:\n" - + " hazelcast.invocation.max.retry.count: 200\n" - + " hazelcast.tcp.join.port.try.count: 30\n" - + " hazelcast.invocation.retry.pause.millis: 2000\n" - + " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n" - + " hazelcast.logging.type: log4j2\n" - + " hazelcast.operation.generic.thread.count: 200\n"; - Config hazelcastConfig = Config.loadFromString(yaml); + Config hazelcastConfig = Config.loadFromString(getHazelcastConfig()); hazelcastConfig.setClusterName( TestUtils.getClusterName("AbstractSeaTunnelServerTest_" + name)); SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig(); @@ -89,6 +63,34 @@ public abstract class AbstractSeaTunnelServerTest<T extends AbstractSeaTunnelSer LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class); } + protected String getHazelcastConfig() { + return "hazelcast:\n" + + " cluster-name: seatunnel\n" + + " network:\n" + + " rest-api:\n" + + " enabled: true\n" + + " endpoint-groups:\n" + + " CLUSTER_WRITE:\n" + + " enabled: true\n" + + " join:\n" + + " tcp-ip:\n" + + " enabled: true\n" + + " member-list:\n" + + " - localhost\n" + + " port:\n" + + " auto-increment: true\n" + + " port-count: 100\n" + + " port: 5801\n" + + "\n" + + " properties:\n" + + " hazelcast.invocation.max.retry.count: 200\n" + + " hazelcast.tcp.join.port.try.count: 30\n" + + " hazelcast.invocation.retry.pause.millis: 2000\n" + + " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n" + + " hazelcast.logging.type: log4j2\n" + + " hazelcast.operation.generic.thread.count: 200\n"; + } + public SeaTunnelConfig loadSeaTunnelConfig() { return ConfigProvider.locateAndGetSeaTunnelConfig(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java index 9c8595e167..e8ba7ed94a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java @@ -28,6 +28,7 @@ import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationservice.Operation; import java.net.UnknownHostException; +import java.util.Collections; import java.util.concurrent.CompletableFuture; /** Used to test ResourceManager, override init method to register more workers. */ @@ -47,7 +48,8 @@ public class FakeResourceManager extends AbstractResourceManager { new ResourceProfile(), new ResourceProfile(), new SlotProfile[] {}, - new SlotProfile[] {}); + new SlotProfile[] {}, + Collections.emptyMap()); this.registerWorker.put(address1, workerProfile1); Address address2 = new Address("localhost", 5802); @@ -57,7 +59,8 @@ public class FakeResourceManager extends AbstractResourceManager { new ResourceProfile(), new ResourceProfile(), new SlotProfile[] {}, - new SlotProfile[] {}); + new SlotProfile[] {}, + Collections.emptyMap()); this.registerWorker.put(address2, workerProfile2); Address address3 = new Address("localhost", 5803); WorkerProfile workerProfile3 = @@ -66,7 +69,8 @@ public class FakeResourceManager extends AbstractResourceManager { new ResourceProfile(), new ResourceProfile(), new SlotProfile[] {}, - new SlotProfile[] {}); + new SlotProfile[] {}, + Collections.emptyMap()); this.registerWorker.put(address3, workerProfile3); } catch (UnknownHostException e) { throw new RuntimeException(e); @@ -84,7 +88,8 @@ public class FakeResourceManager extends AbstractResourceManager { new ResourceProfile(), new ResourceProfile(), new SlotProfile[] {}, - new SlotProfile[] {}), + new SlotProfile[] {}, + Collections.emptyMap()), new SlotProfile(address, 1, new ResourceProfile(), ""))); } else { return super.sendToMember(operation, address); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java index cf67ec8de0..cbba82dda8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java @@ -56,7 +56,7 @@ public class FixSlotResourceTest extends AbstractSeaTunnelServerTest<FixSlotReso List<SlotProfile> slotProfiles = server.getCoordinatorService() .getResourceManager() - .applyResources(jobId, resourceProfiles) + .applyResources(jobId, resourceProfiles, null) .get(); Assertions.assertEquals(slotProfiles.size(), 3); server.getCoordinatorService().getResourceManager().releaseResources(jobId, slotProfiles); @@ -73,7 +73,7 @@ public class FixSlotResourceTest extends AbstractSeaTunnelServerTest<FixSlotReso try { server.getCoordinatorService() .getResourceManager() - .applyResources(jobId, resourceProfiles) + .applyResources(jobId, resourceProfiles, null) .get(); } catch (ExecutionException e) { Assertions.assertTrue(e.getMessage().contains("NoEnoughResourceException")); @@ -93,7 +93,7 @@ public class FixSlotResourceTest extends AbstractSeaTunnelServerTest<FixSlotReso List<SlotProfile> slotProfiles = server.getCoordinatorService() .getResourceManager() - .applyResources(jobId, resourceProfiles) + .applyResources(jobId, resourceProfiles, null) .get(); Assertions.assertEquals(slotProfiles.size(), 3); server.getCoordinatorService().getResourceManager().releaseResources(jobId, slotProfiles); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java index 9244283889..2589e6530c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java @@ -55,7 +55,7 @@ public class ResourceManagerTest extends AbstractSeaTunnelServerTest<ResourceMan resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(200))); resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(300))); List<SlotProfile> slotProfiles = - resourceManager.applyResources(jobId, resourceProfiles).get(); + resourceManager.applyResources(jobId, resourceProfiles, null).get(); Assertions.assertEquals( resourceProfiles.get(0).getHeapMemory().getBytes(), @@ -78,7 +78,8 @@ public class ResourceManagerTest extends AbstractSeaTunnelServerTest<ResourceMan resourceManager .applyResource( jobId, - new ResourceProfile(CPU.of(0), Memory.of(Long.MAX_VALUE))) + new ResourceProfile(CPU.of(0), Memory.of(Long.MAX_VALUE)), + null) .get()); } @@ -93,7 +94,8 @@ public class ResourceManagerTest extends AbstractSeaTunnelServerTest<ResourceMan resourceProfiles.add(new ResourceProfile()); resourceProfiles.add(new ResourceProfile()); resourceProfiles.add(new ResourceProfile()); - List<SlotProfile> slotProfiles = resourceManager.applyResources(1L, resourceProfiles).get(); + List<SlotProfile> slotProfiles = + resourceManager.applyResources(1L, resourceProfiles, null).get(); Assertions.assertEquals(slotProfiles.size(), 5); boolean hasDifferentWorker = false; diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/WorkerTagTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/WorkerTagTest.java new file mode 100644 index 0000000000..3eb27f2451 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/WorkerTagTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.engine.server.resourcemanager; + +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; +import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU; +import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory; +import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +public class WorkerTagTest extends AbstractSeaTunnelServerTest<WorkerTagTest> { + + private ResourceManager resourceManager; + + private final long jobId = 5; + + @BeforeAll + public void before() { + super.before(); + resourceManager = server.getCoordinatorService().getResourceManager(); + server.getSlotService(); + } + + @Override + protected String getHazelcastConfig() { + // for the use case not set node attribute, it tested in ResourceManagerTest and + // FixSlotResourceTest + return "hazelcast:\n" + + " cluster-name: seatunnel\n" + + " network:\n" + + " rest-api:\n" + + " enabled: true\n" + + " endpoint-groups:\n" + + " CLUSTER_WRITE:\n" + + " enabled: true\n" + + " join:\n" + + " tcp-ip:\n" + + " enabled: true\n" + + " member-list:\n" + + " - localhost\n" + + " port:\n" + + " auto-increment: true\n" + + " port-count: 100\n" + + " port: 5801\n" + + "\n" + + " properties:\n" + + " hazelcast.invocation.max.retry.count: 200\n" + + " hazelcast.tcp.join.port.try.count: 30\n" + + " hazelcast.invocation.retry.pause.millis: 2000\n" + + " hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n" + + " hazelcast.logging.type: log4j2\n" + + " hazelcast.operation.generic.thread.count: 200\n" + + " member-attributes:\n" + + " group:\n" + + " type: string\n" + + " value: platform\n" + + " team:\n" + + " type: string\n" + + " value: team1"; + } + + @Test + public void testTagMatch() { + Map<String, String> tag = new HashMap<>(); + tag.put("group", "platform"); + tag.put("team", "team1"); + Assertions.assertDoesNotThrow(() -> testApplyResourceByTag(tag)); + } + + @Test + public void testNullTag() { + Assertions.assertDoesNotThrow(() -> testApplyResourceByTag(null)); + } + + @Test + public void testTagNotMatch() { + Map<String, String> tag = new HashMap<>(); + tag.put("group", "platform"); + tag.put("team", "team2"); + Assertions.assertThrows(NoEnoughResourceException.class, () -> testApplyResourceByTag(tag)); + } + + private void testApplyResourceByTag(Map<String, String> tag) + throws ExecutionException, InterruptedException { + List<ResourceProfile> resourceProfiles = new ArrayList<>(); + resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(100))); + List<SlotProfile> slotProfiles = + resourceManager.applyResources(jobId, resourceProfiles, tag).get(); + + Assertions.assertEquals( + resourceProfiles.get(0).getHeapMemory().getBytes(), + slotProfiles.get(0).getResourceProfile().getHeapMemory().getBytes()); + + resourceManager.releaseResources(jobId, slotProfiles).get(); + } +} diff --git a/tools/documents/sync.sh b/tools/documents/sync.sh index f8549e6023..cd67231a81 100644 --- a/tools/documents/sync.sh +++ b/tools/documents/sync.sh @@ -20,7 +20,7 @@ set -euo pipefail PR_DIR=$1 -PR_IMG_DIR="${PR_DIR}/docs/en/images" +PR_IMG_DIR="${PR_DIR}/docs/images" PR_DOC_DIR="${PR_DIR}/docs/en" PR_SIDEBAR_PATH="${PR_DIR}/docs/sidebars.js"