This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new be7273da83e [refactor](executor)Refactor workload meta update to be 
#26710
be7273da83e is described below

commit be7273da83e12bdc2d2c2d3d2acf9e05f20268dc
Author: wangbo <[email protected]>
AuthorDate: Sat Nov 18 11:19:38 2023 +0800

    [refactor](executor)Refactor workload meta update to be #26710
---
 be/src/agent/cgroup_cpu_ctl.cpp                    | 26 +++---
 be/src/agent/cgroup_cpu_ctl.h                      |  3 +
 be/src/agent/topic_listener.h                      |  2 +-
 be/src/agent/topic_subscriber.cpp                  |  7 +-
 be/src/agent/workload_group_listener.cpp           | 40 +++++++--
 be/src/agent/workload_group_listener.h             |  2 +-
 be/src/common/config.cpp                           |  4 +-
 be/src/common/config.h                             |  4 +-
 be/src/runtime/fragment_mgr.cpp                    | 53 ++++--------
 be/src/runtime/task_group/task_group.cpp           | 97 +++++++++++++++-------
 be/src/runtime/task_group/task_group.h             | 12 ++-
 be/src/runtime/task_group/task_group_manager.cpp   | 73 +++++++++++-----
 be/src/runtime/task_group/task_group_manager.h     | 14 +++-
 .../common/publish/WorkloadGroupPublisher.java     |  8 +-
 .../resource/workloadgroup/WorkloadGroup.java      | 47 ++++++++---
 .../resource/workloadgroup/WorkloadGroupMgr.java   |  7 --
 .../workloadgroup/WorkloadGroupMgrTest.java        | 16 ++--
 gensrc/thrift/BackendService.thrift                | 17 +++-
 18 files changed, 277 insertions(+), 155 deletions(-)

diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index d16a32b7be5..ac09725cb23 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -25,14 +25,14 @@ Status CgroupCpuCtl::init() {
     _doris_cgroup_cpu_path = config::doris_cgroup_cpu_path;
     if (_doris_cgroup_cpu_path.empty()) {
         LOG(INFO) << "doris cgroup cpu path is not specify, path=" << 
_doris_cgroup_cpu_path;
-        return Status::InternalError("doris cgroup cpu path {} is not 
specify.",
-                                     _doris_cgroup_cpu_path);
+        return Status::InternalError<false>("doris cgroup cpu path {} is not 
specify.",
+                                            _doris_cgroup_cpu_path);
     }
 
     if (access(_doris_cgroup_cpu_path.c_str(), F_OK) != 0) {
         LOG(ERROR) << "doris cgroup cpu path not exists, path=" << 
_doris_cgroup_cpu_path;
-        return Status::InternalError("doris cgroup cpu path {} not exists.",
-                                     _doris_cgroup_cpu_path);
+        return Status::InternalError<false>("doris cgroup cpu path {} not 
exists.",
+                                            _doris_cgroup_cpu_path);
     }
 
     if (_doris_cgroup_cpu_path.back() != '/') {
@@ -41,6 +41,12 @@ Status CgroupCpuCtl::init() {
     return Status::OK();
 }
 
+void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* 
cpu_hard_limit) {
+    std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
+    *cpu_shares = this->_cpu_shares;
+    *cpu_hard_limit = this->_cpu_hard_limit;
+}
+
 void CgroupCpuCtl::update_cpu_hard_limit(int cpu_hard_limit) {
     if (!_init_succ) {
         return;
@@ -72,14 +78,14 @@ Status CgroupCpuCtl::write_cg_sys_file(std::string 
file_path, int value, std::st
     int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR);
     if (fd == -1) {
         LOG(ERROR) << "open path failed, path=" << file_path;
-        return Status::InternalError("open path failed, path={}", file_path);
+        return Status::InternalError<false>("open path failed, path={}", 
file_path);
     }
 
     auto str = fmt::format("{}\n", value);
     int ret = write(fd, str.c_str(), str.size());
     if (ret == -1) {
         LOG(ERROR) << msg << " write sys file failed";
-        return Status::InternalError("{} write sys file failed", msg);
+        return Status::InternalError<false>("{} write sys file failed", msg);
     }
     LOG(INFO) << msg << " success";
     return Status::OK();
@@ -94,8 +100,8 @@ Status CgroupV1CpuCtl::init() {
         int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU);
         if (ret != 0) {
             LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << 
_cgroup_v1_cpu_query_path;
-            return Status::InternalError("cgroup v1 mkdir query failed, path=",
-                                         _cgroup_v1_cpu_query_path);
+            return Status::InternalError<false>("cgroup v1 mkdir query failed, 
path=",
+                                                _cgroup_v1_cpu_query_path);
         }
     }
 
@@ -105,8 +111,8 @@ Status CgroupV1CpuCtl::init() {
         int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
         if (ret != 0) {
             LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << 
_cgroup_v1_cpu_tg_path;
-            return Status::InternalError("cgroup v1 mkdir workload group 
failed, path=",
-                                         _cgroup_v1_cpu_tg_path);
+            return Status::InternalError<false>("cgroup v1 mkdir workload 
group failed, path=",
+                                                _cgroup_v1_cpu_tg_path);
         }
     }
 
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index b98e268da09..4d78ca82ab7 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -47,6 +47,9 @@ public:
 
     void update_cpu_soft_limit(int cpu_shares);
 
+    // for log
+    void get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit);
+
 protected:
     Status write_cg_sys_file(std::string file_path, int value, std::string 
msg, bool is_append);
 
diff --git a/be/src/agent/topic_listener.h b/be/src/agent/topic_listener.h
index af99a78545b..40cf0ba2a21 100644
--- a/be/src/agent/topic_listener.h
+++ b/be/src/agent/topic_listener.h
@@ -25,6 +25,6 @@ class TopicListener {
 public:
     virtual ~TopicListener() {}
 
-    virtual void handle_topic_info(const TPublishTopicRequest& topic_request) 
= 0;
+    virtual void handle_topic_info(const std::vector<TopicInfo>& 
topic_info_list) = 0;
 };
 } // namespace doris
diff --git a/be/src/agent/topic_subscriber.cpp 
b/be/src/agent/topic_subscriber.cpp
index c3bcc29c623..c29533bf617 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -42,8 +42,11 @@ void TopicSubscriber::handle_topic_info(const 
TPublishTopicRequest& topic_reques
     std::shared_lock lock(_listener_mtx);
     LOG(INFO) << "begin handle topic info";
     for (auto& listener_pair : _registered_listeners) {
-        listener_pair.second->handle_topic_info(topic_request);
-        LOG(INFO) << "handle topic " << listener_pair.first << " succ";
+        if (topic_request.topic_map.find(listener_pair.first) != 
topic_request.topic_map.end()) {
+            listener_pair.second->handle_topic_info(
+                    topic_request.topic_map.at(listener_pair.first));
+            LOG(INFO) << "handle topic " << listener_pair.first << " succ";
+        }
     }
 }
 } // namespace doris
diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index bf27861c284..bc1d3294f6c 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -24,18 +24,44 @@
 
 namespace doris {
 
-void WorkloadGroupListener::handle_topic_info(const TPublishTopicRequest& 
topic_request) {
+void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& 
topic_info_list) {
     std::set<uint64_t> current_wg_ids;
-    for (const TopicInfo& topic_info : topic_request.topic_list) {
-        if (topic_info.topic_type != 
doris::TTopicInfoType::type::WORKLOAD_GROUP) {
+    for (const TopicInfo& topic_info : topic_info_list) {
+        if (!topic_info.__isset.workload_group_info) {
             continue;
         }
 
-        int wg_id = 0;
-        auto iter2 = topic_info.info_map.find("id");
-        std::from_chars(iter2->second.c_str(), iter2->second.c_str() + 
iter2->second.size(), wg_id);
+        // 1 parse topicinfo to group info
+        taskgroup::TaskGroupInfo task_group_info;
+        Status ret = 
taskgroup::TaskGroupInfo::parse_topic_info(topic_info.workload_group_info,
+                                                                
&task_group_info);
+        if (!ret.ok()) {
+            LOG(INFO) << "parse topic info failed, tg_id=" << 
task_group_info.id
+                      << ", reason:" << ret.to_string();
+            continue;
+        }
+        current_wg_ids.insert(task_group_info.id);
+
+        // 2 update task group
+        auto tg = 
_exec_env->task_group_manager()->get_or_create_task_group(task_group_info);
+
+        // 3 set cpu soft hard limit switch
+        _exec_env->task_group_manager()->_enable_cpu_hard_limit.store(
+                task_group_info.enable_cpu_hard_limit);
+
+        // 4 create and update task scheduler
+        Status ret2 =
+                
_exec_env->task_group_manager()->upsert_task_scheduler(&task_group_info, 
_exec_env);
+        if (!ret2.ok()) {
+            LOG(WARNING) << "upsert task sche failed, tg_id=" << 
task_group_info.id
+                         << ", reason=" << ret2.to_string();
+        }
 
-        current_wg_ids.insert(wg_id);
+        LOG(INFO) << "update task group success, tg info=" << 
tg->debug_string()
+                  << ", enable_cpu_hard_limit="
+                  << _exec_env->task_group_manager()->enable_cpu_hard_limit()
+                  << ", cgroup cpu_shares=" << 
task_group_info.cgroup_cpu_shares
+                  << ", cgroup cpu_hard_limit=" << 
task_group_info.cgroup_cpu_hard_limit;
     }
 
     _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
diff --git a/be/src/agent/workload_group_listener.h 
b/be/src/agent/workload_group_listener.h
index d31b1c4ef65..732f5752e44 100644
--- a/be/src/agent/workload_group_listener.h
+++ b/be/src/agent/workload_group_listener.h
@@ -29,7 +29,7 @@ public:
     ~WorkloadGroupListener() {}
     WorkloadGroupListener(ExecEnv* exec_env) : _exec_env(exec_env) {}
 
-    void handle_topic_info(const TPublishTopicRequest& topic_request) override;
+    void handle_topic_info(const std::vector<TopicInfo>& topic_info_list) 
override;
 
 private:
     ExecEnv* _exec_env;
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 870ba5c9b0a..5f867429bff 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1090,8 +1090,8 @@ DEFINE_Bool(exit_on_exception, "false");
 DEFINE_Bool(enable_flush_file_cache_async, "true");
 
 // cgroup
-DEFINE_String(doris_cgroup_cpu_path, "");
-DEFINE_Bool(enable_cgroup_cpu_soft_limit, "false");
+DEFINE_mString(doris_cgroup_cpu_path, "");
+DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");
 
 DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ac560d1f1c6..6b76d37387e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1163,8 +1163,8 @@ DECLARE_mInt32(tablet_schema_cache_recycle_interval);
 DECLARE_mBool(exit_on_exception);
 
 // cgroup
-DECLARE_String(doris_cgroup_cpu_path);
-DECLARE_Bool(enable_cgroup_cpu_soft_limit);
+DECLARE_mString(doris_cgroup_cpu_path);
+DECLARE_mBool(enable_cgroup_cpu_soft_limit);
 
 // This config controls whether the s3 file writer would flush cache 
asynchronously
 DECLARE_Bool(enable_flush_file_cache_async);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a7a89bfd421..210de93a7ec 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -654,45 +654,28 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
 
         if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
             if (params.__isset.workload_groups && 
!params.workload_groups.empty()) {
-                taskgroup::TaskGroupInfo task_group_info;
-                auto status = 
taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0],
-                                                                         
&task_group_info);
-                if (status.ok()) {
-                    auto tg = 
_exec_env->task_group_manager()->get_or_create_task_group(
-                            task_group_info);
-                    tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
-                    uint64_t tg_id = tg->id();
-                    std::string tg_name = tg->name();
-                    LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
-                              << " use task group: " << tg->debug_string()
-                              << " cpu_hard_limit: " << 
task_group_info.cpu_hard_limit
-                              << " cpu_share:" << task_group_info.cpu_share
-                              << " enable cgroup soft cpu:" << 
config::enable_cgroup_cpu_soft_limit;
-                    if (task_group_info.cpu_hard_limit > 0) {
-                        Status ret = 
_exec_env->task_group_manager()->create_and_get_task_scheduler(
-                                tg_id, tg_name, task_group_info.cpu_hard_limit,
-                                task_group_info.cpu_share, _exec_env, 
query_ctx.get());
-                        if (!ret.ok()) {
-                            LOG(INFO) << "workload group init failed "
-                                      << ", name=" << tg_name << ", id=" << 
tg_id
-                                      << ", reason=" << ret.to_string();
-                        }
+                uint64_t tg_id = params.workload_groups[0].id;
+                auto* tg_mgr = _exec_env->task_group_manager();
+                if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) 
{
+                    std::stringstream ss;
+                    ss << "Query/load id: " << print_id(query_ctx->query_id());
+                    ss << " use task group " << task_group_ptr->debug_string();
+                    if (tg_mgr->enable_cpu_soft_limit() && 
!config::enable_cgroup_cpu_soft_limit) {
+                        query_ctx->set_task_group(task_group_ptr);
+                        ss << ", cpu soft limit based doris sche";
                     } else {
-                        if (!config::enable_cgroup_cpu_soft_limit) {
-                            query_ctx->set_task_group(tg);
+                        bool ret = tg_mgr->set_task_sche_for_query_ctx(tg_id, 
query_ctx.get());
+                        if (tg_mgr->enable_cpu_hard_limit()) {
+                            ss << ", cpu hard limit based cgroup";
                         } else {
-                            Status ret =
-                                    
_exec_env->task_group_manager()->create_and_get_task_scheduler(
-                                            tg_id, tg_name, 
task_group_info.cpu_hard_limit,
-                                            task_group_info.cpu_share, 
_exec_env, query_ctx.get());
-                            if (!ret.ok()) {
-                                LOG(INFO) << "workload group cpu soft limit 
init failed "
-                                          << ", name=" << tg_name << ", id=" 
<< tg_id
-                                          << ", reason=" << ret.to_string();
-                            }
+                            ss << ", cpu soft limit based cgroup";
+                        }
+                        if (!ret) {
+                            ss << ", but cgroup init failed, scan or exec 
fallback to no group";
                         }
                     }
-                }
+                    LOG(INFO) << ss.str();
+                } // else, query run with no group
             } else {
                 VLOG_DEBUG << "Query/load id: " << 
print_id(query_ctx->query_id())
                            << " does not use task group.";
diff --git a/be/src/runtime/task_group/task_group.cpp 
b/be/src/runtime/task_group/task_group.cpp
index 37e4b9ae597..5878e38ed38 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -113,15 +113,16 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
           _cpu_share(tg_info.cpu_share),
           _task_entity(this, "pipeline task entity"),
           _local_scan_entity(this, "local scan entity"),
-          _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {}
+          _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
+          _cpu_hard_limit(tg_info.cpu_hard_limit) {}
 
 std::string TaskGroup::debug_string() const {
     std::shared_lock<std::shared_mutex> rl {_mutex};
     return fmt::format(
             "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, 
enable_memory_overcommit = "
-            "{}, version = {}]",
+            "{}, version = {}, cpu_hard_limit = {}]",
             _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, 
TUnit::BYTES),
-            _enable_memory_overcommit ? "true" : "false", _version);
+            _enable_memory_overcommit ? "true" : "false", _version, 
cpu_hard_limit());
 }
 
 void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
@@ -142,6 +143,7 @@ void TaskGroup::check_and_update(const TaskGroupInfo& 
tg_info) {
             _memory_limit = tg_info.memory_limit;
             _enable_memory_overcommit = tg_info.enable_memory_overcommit;
             _cpu_share = tg_info.cpu_share;
+            _cpu_hard_limit = tg_info.cpu_hard_limit;
         } else {
             return;
         }
@@ -185,49 +187,80 @@ void TaskGroup::task_group_info(TaskGroupInfo* tg_info) 
const {
     tg_info->version = _version;
 }
 
-Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup& 
resource_group,
-                                       TaskGroupInfo* task_group_info) {
-    if (UNLIKELY(!check_group_info(resource_group))) {
-        std::stringstream ss;
-        ss << "incomplete resource group parameters: ";
-        resource_group.printTo(ss);
-        LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
+Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& 
workload_group_info,
+                                       taskgroup::TaskGroupInfo* 
task_group_info) {
+    // 1 id
+    int tg_id = 0;
+    if (workload_group_info.__isset.id) {
+        tg_id = workload_group_info.id;
+    } else {
+        return Status::InternalError<false>("workload group id is required");
     }
+    task_group_info->id = tg_id;
 
-    auto iter = resource_group.properties.find(CPU_SHARE);
-    uint64_t share = 0;
-    std::from_chars(iter->second.c_str(), iter->second.c_str() + 
iter->second.size(), share);
+    // 2 name
+    std::string name = "INVALID_NAME";
+    if (workload_group_info.__isset.name) {
+        name = workload_group_info.name;
+    }
+    task_group_info->name = name;
+
+    // 3 version
+    int version = 0;
+    if (workload_group_info.__isset.version) {
+        version = workload_group_info.version;
+    } else {
+        return Status::InternalError<false>("workload group version is 
required");
+    }
+    task_group_info->version = version;
 
-    int cpu_hard_limit = 0;
-    auto iter2 = resource_group.properties.find(CPU_HARD_LIMIT);
-    std::from_chars(iter2->second.c_str(), iter2->second.c_str() + 
iter2->second.size(),
-                    cpu_hard_limit);
+    // 4 cpu_share
+    uint64_t cpu_share = 1024;
+    if (workload_group_info.__isset.cpu_share) {
+        cpu_share = workload_group_info.cpu_share;
+    }
+    task_group_info->cpu_share = cpu_share;
 
-    task_group_info->id = resource_group.id;
-    task_group_info->name = resource_group.name;
-    task_group_info->version = resource_group.version;
-    task_group_info->cpu_share = share;
+    // 5 cpu hard limit
+    int cpu_hard_limit = -1;
+    if (workload_group_info.__isset.cpu_hard_limit) {
+        cpu_hard_limit = workload_group_info.cpu_hard_limit;
+    }
     task_group_info->cpu_hard_limit = cpu_hard_limit;
 
+    // 6 mem_limit
     bool is_percent = true;
-    auto mem_limit_str = resource_group.properties.find(MEMORY_LIMIT)->second;
-    auto mem_limit =
+    std::string mem_limit_str;
+    if (workload_group_info.__isset.mem_limit) {
+        mem_limit_str = workload_group_info.mem_limit;
+    } else {
+        return Status::InternalError<false>("workload group mem_limit is 
required");
+    }
+    int64_t mem_limit =
             ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), 
&is_percent);
     if (UNLIKELY(mem_limit <= 0)) {
         std::stringstream ss;
-        ss << "parse memory limit from TPipelineWorkloadGroup error, " << 
MEMORY_LIMIT << ": "
-           << mem_limit_str;
+        ss << "parse memory limit error, " << MEMORY_LIMIT << ": " << 
mem_limit_str;
         LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
+        return Status::InternalError<false>("invalid value for {}, val={}", 
MEMORY_LIMIT,
+                                            mem_limit);
     }
     task_group_info->memory_limit = mem_limit;
 
-    auto enable_memory_overcommit_iter = 
resource_group.properties.find(ENABLE_MEMORY_OVERCOMMIT);
-    task_group_info->enable_memory_overcommit =
-            enable_memory_overcommit_iter != resource_group.properties.end() &&
-            enable_memory_overcommit_iter->second ==
-                    "true" /* fe guarantees it is 'true' or 'false' */;
+    // 7 mem overcommit
+    bool enable_memory_overcommit = true;
+    if (workload_group_info.__isset.enable_memory_overcommit) {
+        enable_memory_overcommit = 
workload_group_info.enable_memory_overcommit;
+    }
+    task_group_info->enable_memory_overcommit = enable_memory_overcommit;
+
+    // 8 cpu soft limit or hard limit
+    bool enable_cpu_hard_limit = false;
+    if (workload_group_info.__isset.enable_cpu_hard_limit) {
+        enable_cpu_hard_limit = workload_group_info.enable_cpu_hard_limit;
+    }
+    task_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit;
+
     return Status::OK();
 }
 
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index a948bf53ec0..b3daf582575 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <gen_cpp/BackendService_types.h>
 #include <stddef.h>
 #include <stdint.h>
 
@@ -106,6 +107,8 @@ public:
 
     uint64_t cpu_share() const { return _cpu_share.load(); }
 
+    int cpu_hard_limit() const { return _cpu_hard_limit.load(); }
+
     uint64_t id() const { return _id; }
 
     std::string name() const { return _name; };
@@ -147,6 +150,7 @@ private:
     TaskGroupPipelineTaskEntity _task_entity;
     TaskGroupScanTaskEntity _local_scan_entity;
     std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
+    std::atomic<int> _cpu_hard_limit;
 };
 
 using TaskGroupPtr = std::shared_ptr<TaskGroup>;
@@ -159,9 +163,13 @@ struct TaskGroupInfo {
     bool enable_memory_overcommit;
     int64_t version;
     int cpu_hard_limit;
+    bool enable_cpu_hard_limit;
+    // log cgroup cpu info
+    uint64_t cgroup_cpu_shares = 0;
+    uint64_t cgroup_cpu_hard_limit = 0;
 
-    static Status parse_group_info(const TPipelineWorkloadGroup& 
resource_group,
-                                   TaskGroupInfo* task_group_info);
+    static Status parse_topic_info(const TWorkloadGroupInfo& topic_info,
+                                   taskgroup::TaskGroupInfo* task_group_info);
 
 private:
     static bool check_group_info(const TPipelineWorkloadGroup& resource_group);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index fb940069787..f37ed7ff711 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -59,10 +59,38 @@ void TaskGroupManager::get_resource_groups(const 
std::function<bool(const TaskGr
     }
 }
 
-Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, 
std::string tg_name,
-                                                       int cpu_hard_limit, int 
cpu_shares,
-                                                       ExecEnv* exec_env,
-                                                       QueryContext* 
query_ctx_ptr) {
+TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    if (_task_groups.find(tg_id) != _task_groups.end()) {
+        return _task_groups.at(tg_id);
+    }
+    return nullptr;
+}
+
+bool TaskGroupManager::set_task_sche_for_query_ctx(uint64_t tg_id, 
QueryContext* query_ctx_ptr) {
+    std::lock_guard<std::mutex> lock(_task_scheduler_lock);
+    if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
+        query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get());
+    } else {
+        return false;
+    }
+
+    if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
+        
query_ctx_ptr->set_scan_task_scheduler(_tg_scan_sche_map.at(tg_id).get());
+    } else {
+        return false;
+    }
+    return true;
+}
+
+Status TaskGroupManager::upsert_task_scheduler(taskgroup::TaskGroupInfo* 
tg_info,
+                                               ExecEnv* exec_env) {
+    uint64_t tg_id = tg_info->id;
+    std::string tg_name = tg_info->name;
+    int cpu_hard_limit = tg_info->cpu_hard_limit;
+    uint64_t cpu_shares = tg_info->cpu_share;
+    bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
+
     std::lock_guard<std::mutex> lock(_task_scheduler_lock);
     // step 1: init cgroup cpu controller
     CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
@@ -73,7 +101,7 @@ Status 
TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
             cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
             _cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
         } else {
-            return Status::Error<INTERNAL_ERROR, false>("cgroup init failed, 
gid={}", tg_id);
+            return Status::InternalError<false>("cgroup init failed, gid={}", 
tg_id);
         }
     }
 
@@ -92,8 +120,7 @@ Status 
TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
         if (ret.ok()) {
             _tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler));
         } else {
-            return Status::Error<INTERNAL_ERROR, false>("task scheduler start 
failed, gid={}",
-                                                        tg_id);
+            return Status::InternalError<false>("task scheduler start failed, 
gid={}", tg_id);
         }
     }
 
@@ -105,27 +132,27 @@ Status 
TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
         if (ret.ok()) {
             _tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler));
         } else {
-            return Status::Error<INTERNAL_ERROR, false>("scan scheduler start 
failed, gid={}",
-                                                        tg_id);
+            return Status::InternalError<false>("scan scheduler start failed, 
gid={}", tg_id);
         }
     }
 
-    // step 4 set exec/scan task scheudler to query ctx
-    pipeline::TaskScheduler* task_sche = _tg_sche_map.at(tg_id).get();
-    query_ctx_ptr->set_task_scheduler(task_sche);
-
-    vectorized::SimplifiedScanScheduler* scan_task_sche = 
_tg_scan_sche_map.at(tg_id).get();
-    query_ctx_ptr->set_scan_task_scheduler(scan_task_sche);
-
-    // step 5 update cgroup cpu if needed
-    if (cpu_hard_limit > 0) {
-        _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
-        
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
+    // step 4 update cgroup cpu if needed
+    if (enable_cpu_hard_limit) {
+        if (cpu_hard_limit > 0) {
+            _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
+            
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
+        } else {
+            return Status::InternalError<false>("enable cpu hard limit but 
value is illegal");
+        }
     } else {
-        _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
-        _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
-                CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
+        if (config::enable_cgroup_cpu_soft_limit) {
+            _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
+            _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
+                    CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
+        }
     }
+    
_cgroup_ctl_map.at(tg_id)->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
+                                                   
&(tg_info->cgroup_cpu_hard_limit));
 
     return Status::OK();
 }
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index cf44f535440..552cddfe9dc 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -51,14 +51,22 @@ public:
     void get_resource_groups(const std::function<bool(const TaskGroupPtr& 
ptr)>& pred,
                              std::vector<TaskGroupPtr>* task_groups);
 
-    Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, 
int cpu_hard_limit,
-                                         int cpu_shares, ExecEnv* exec_env,
-                                         QueryContext* query_ctx_ptr);
+    Status upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* 
exec_env);
 
     void delete_task_group_by_ids(std::set<uint64_t> id_set);
 
+    TaskGroupPtr get_task_group_by_id(uint64_t tg_id);
+
     void stop();
 
+    std::atomic<bool> _enable_cpu_hard_limit = false;
+
+    bool enable_cpu_soft_limit() { return !_enable_cpu_hard_limit.load(); }
+
+    bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
+
+    bool set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* 
query_ctx_ptr);
+
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
index 2330700ce7b..6c5ce9e4c11 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
@@ -19,7 +19,7 @@ package org.apache.doris.common.publish;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TopicInfo;
+import org.apache.doris.thrift.TTopicInfoType;
 
 public class WorkloadGroupPublisher implements TopicPublisher {
 
@@ -31,9 +31,7 @@ public class WorkloadGroupPublisher implements TopicPublisher 
{
 
     @Override
     public void getTopicInfo(TPublishTopicRequest req) {
-        for (TopicInfo topicInfo : env.getWorkloadGroupMgr()
-                .getPublishTopicInfo()) {
-            req.addToTopicList(topicInfo);
-        }
+        req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
+                env.getWorkloadGroupMgr().getPublishTopicInfo());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 84a526aa7f0..53faa1a9133 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -18,6 +18,7 @@
 package org.apache.doris.resource.workloadgroup;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
@@ -25,7 +26,7 @@ import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
-import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TWorkloadGroupInfo;
 import org.apache.doris.thrift.TopicInfo;
 
 import com.google.common.base.Strings;
@@ -311,20 +312,44 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
     }
 
     public TPipelineWorkloadGroup toThrift() {
-        //note(wb) we need add a new key-value to properties and then transfer 
it to be, so need a copy here
-        // see WorkloadGroupMgr.getWorkloadGroup
-        HashMap<String, String> clonedHashMap = new HashMap<>();
-        clonedHashMap.putAll(properties);
-        return new 
TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version);
+        return new TPipelineWorkloadGroup().setId(id);
     }
 
     public TopicInfo toTopicInfo() {
-        HashMap<String, String> newHashMap = new HashMap<>();
-        newHashMap.put("id", String.valueOf(id));
+        TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
+        tWorkloadGroupInfo.setId(id);
+        tWorkloadGroupInfo.setName(name);
+        tWorkloadGroupInfo.setVersion(version);
+
+        String cpuShareStr = properties.get(CPU_SHARE);
+        if (cpuShareStr != null) {
+            tWorkloadGroupInfo.setCpuShare(Long.valueOf(cpuShareStr));
+        }
+
+        String cpuHardLimitStr = properties.get(CPU_HARD_LIMIT);
+        if (cpuHardLimitStr != null) {
+            
tWorkloadGroupInfo.setCpuHardLimit(Integer.valueOf(cpuHardLimitStr));
+        }
+
+        String memLimitStr = properties.get(MEMORY_LIMIT);
+        if (memLimitStr != null) {
+            tWorkloadGroupInfo.setMemLimit(memLimitStr);
+        }
+        String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT);
+        if (memOvercommitStr != null) {
+            
tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr));
+        }
+        // enable_cpu_hard_limit = true, using cpu hard limit
+        // enable_cpu_hard_limit = false, using cpu soft limit
+        tWorkloadGroupInfo.setEnableCpuHardLimit(Config.enable_cpu_hard_limit);
+
+        if (Config.enable_cpu_hard_limit && cpuHardLimit <= 0) {
+            LOG.warn("enable_cpu_hard_limit=true but cpuHardLimit value not 
illegal,"
+                    + "id=" + id + ",name=" + name);
+        }
+
         TopicInfo topicInfo = new TopicInfo();
-        topicInfo.setTopicType(TTopicInfoType.WORKLOAD_GROUP);
-        topicInfo.setInfoMap(newHashMap);
-        topicInfo.setTopicKey(name);
+        topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
         return topicInfo;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 44c43c19fad..26b11f0cb85 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -125,13 +125,6 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
                 throw new UserException("Workload group " + groupName + " does 
not exist");
             }
             workloadGroups.add(workloadGroup.toThrift());
-            // note(wb) -1 to tell be no need to not use cpu hard limit
-            int cpuHardLimitThriftVal = -1;
-            if (Config.enable_cpu_hard_limit && 
workloadGroup.getCpuHardLimit() > 0) {
-                cpuHardLimitThriftVal = workloadGroup.getCpuHardLimit();
-            }
-            
workloadGroups.get(0).getProperties().put(WorkloadGroup.CPU_HARD_LIMIT,
-                    String.valueOf(cpuHardLimitThriftVal));
             context.setWorkloadGroupName(groupName);
         } finally {
             readUnlock();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
index 89062d87cda..e4501fd6463 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
@@ -30,7 +30,7 @@ import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.thrift.TPipelineWorkloadGroup;
+import org.apache.doris.thrift.TopicInfo;
 
 import com.google.common.collect.Maps;
 import mockit.Delegate;
@@ -169,11 +169,11 @@ public class WorkloadGroupMgrTest {
         CreateWorkloadGroupStmt stmt1 = new CreateWorkloadGroupStmt(false, 
name1, properties1);
         workloadGroupMgr.createWorkloadGroup(stmt1);
         context.getSessionVariable().setWorkloadGroup(name1);
-        List<TPipelineWorkloadGroup> tWorkloadGroups1 = 
workloadGroupMgr.getWorkloadGroup(context);
+        List<TopicInfo> tWorkloadGroups1 = 
workloadGroupMgr.getPublishTopicInfo();
         Assert.assertEquals(1, tWorkloadGroups1.size());
-        TPipelineWorkloadGroup tWorkloadGroup1 = tWorkloadGroups1.get(0);
-        Assert.assertEquals(name1, tWorkloadGroup1.getName());
-        
Assert.assertTrue(tWorkloadGroup1.getProperties().containsKey(WorkloadGroup.CPU_SHARE));
+        TopicInfo tWorkloadGroup1 = tWorkloadGroups1.get(0);
+        Assert.assertEquals(name1, 
tWorkloadGroup1.getWorkloadGroupInfo().getName());
+        Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() 
== 10);
 
         try {
             context.getSessionVariable().setWorkloadGroup("g2");
@@ -242,9 +242,9 @@ public class WorkloadGroupMgrTest {
         workloadGroupMgr.alterWorkloadGroup(stmt2);
 
         context.getSessionVariable().setWorkloadGroup(name);
-        List<TPipelineWorkloadGroup> tWorkloadGroups = 
workloadGroupMgr.getWorkloadGroup(context);
+        List<TopicInfo> tWorkloadGroups = 
workloadGroupMgr.getPublishTopicInfo();
         Assert.assertEquals(1, tWorkloadGroups.size());
-        TPipelineWorkloadGroup tWorkloadGroup1 = tWorkloadGroups.get(0);
-        
Assert.assertEquals(tWorkloadGroup1.getProperties().get(WorkloadGroup.CPU_SHARE),
 "5");
+        TopicInfo tWorkloadGroup1 = tWorkloadGroups.get(0);
+        Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() 
== 5);
     }
 }
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index d35d6166d3b..1f2e8185a66 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -164,14 +164,23 @@ enum TTopicInfoType {
     WORKLOAD_GROUP
 }
 
+struct TWorkloadGroupInfo {
+  1: optional i64 id
+  2: optional string name
+  3: optional i64 version
+  4: optional i64 cpu_share
+  5: optional i32 cpu_hard_limit
+  6: optional string mem_limit
+  7: optional bool enable_memory_overcommit
+  8: optional bool enable_cpu_hard_limit
+}
+
 struct TopicInfo {
-    1: optional string topic_key
-    2: required TTopicInfoType topic_type
-    3: optional map<string, string> info_map
+    1: optional TWorkloadGroupInfo workload_group_info
 }
 
 struct TPublishTopicRequest {
-    1: required list<TopicInfo> topic_list
+    1: required map<TTopicInfoType, list<TopicInfo>> topic_map
 }
 
 struct TPublishTopicResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to