This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6637f9c15f9 Add enable_cgroup_cpu_soft_limit (#26510)
6637f9c15f9 is described below

commit 6637f9c15f9346abbd00feb41eca9624a7bc1c1e
Author: wangbo <wan...@apache.org>
AuthorDate: Wed Nov 8 15:52:13 2023 +0800

    Add enable_cgroup_cpu_soft_limit (#26510)
---
 be/src/agent/cgroup_cpu_ctl.cpp                  | 24 ++++++++++++++++++++++--
 be/src/agent/cgroup_cpu_ctl.h                    | 22 +++++++++++++++++++---
 be/src/common/config.cpp                         |  1 +
 be/src/common/config.h                           |  2 ++
 be/src/runtime/fragment_mgr.cpp                  | 21 +++++++++++++++++----
 be/src/runtime/task_group/task_group_manager.cpp | 12 ++++++++++--
 be/src/runtime/task_group/task_group_manager.h   |  3 ++-
 7 files changed, 73 insertions(+), 12 deletions(-)

diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index ee24b34e167..d16a32b7be5 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -54,6 +54,19 @@ void CgroupCpuCtl::update_cpu_hard_limit(int cpu_hard_limit) 
{
     }
 }
 
+void CgroupCpuCtl::update_cpu_soft_limit(int cpu_shares) {
+    if (!_init_succ) {
+        return;
+    }
+    std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
+    if (_cpu_shares != cpu_shares) {
+        Status ret = modify_cg_cpu_soft_limit_no_lock(cpu_shares);
+        if (ret.ok()) {
+            _cpu_shares = cpu_shares;
+        }
+    }
+}
+
 Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, 
std::string msg,
                                        bool is_append) {
     int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR);
@@ -97,9 +110,11 @@ Status CgroupV1CpuCtl::init() {
         }
     }
 
-    // quota path
+    // quota file
     _cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path + 
"/cpu.cfs_quota_us";
-    // task path
+    // cpu.shares file
+    _cgroup_v1_cpu_tg_shares_file = _cgroup_v1_cpu_tg_path + "/cpu.shares";
+    // task file
     _cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks";
     LOG(INFO) << "cgroup v1 cpu path init success"
               << ", query tg path=" << _cgroup_v1_cpu_tg_path
@@ -110,6 +125,11 @@ Status CgroupV1CpuCtl::init() {
     return Status::OK();
 }
 
+Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_shares) {
+    std::string msg = "modify cpu shares to " + std::to_string(cpu_shares);
+    return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_shares_file, 
cpu_shares, msg, false);
+}
+
 Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
     int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
     std::string msg = "modify cpu quota value to " + std::to_string(val);
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index c3a30660147..b98e268da09 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -28,6 +28,12 @@
 
 namespace doris {
 
+// cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit
+const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
+
+// cgroup cpu.shares default value
+const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
+
 class CgroupCpuCtl {
 public:
     virtual ~CgroupCpuCtl() = default;
@@ -35,15 +41,19 @@ public:
 
     virtual Status init();
 
-    virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;
-
     virtual Status add_thread_to_cgroup() = 0;
 
     void update_cpu_hard_limit(int cpu_hard_limit);
 
+    void update_cpu_soft_limit(int cpu_shares);
+
 protected:
     Status write_cg_sys_file(std::string file_path, int value, std::string 
msg, bool is_append);
 
+    virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;
+
+    virtual Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) = 0;
+
     std::string _doris_cgroup_cpu_path;
     uint64_t _cpu_core_num = CpuInfo::num_cores();
     uint64_t _cpu_cfs_period_us = 100000;
@@ -51,6 +61,7 @@ protected:
     std::shared_mutex _lock_mutex;
     bool _init_succ = false;
     uint64_t _tg_id; // workload group id
+    uint64_t _cpu_shares = 0;
 };
 
 /*
@@ -73,20 +84,25 @@ protected:
     6 workload group quota file:
         /sys/fs/cgroup/cpu/{doris_home}/query/{workload group 
id}/cpu.cfs_quota_us
     
-     7 workload group tasks file:
+    7 workload group tasks file:
         /sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/tasks
+    
+    8 workload group cpu.shares file:
+    /sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/cpu.shares
 */
 class CgroupV1CpuCtl : public CgroupCpuCtl {
 public:
     CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
     Status init() override;
     Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
+    Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
     Status add_thread_to_cgroup() override;
 
 private:
     std::string _cgroup_v1_cpu_query_path;
     std::string _cgroup_v1_cpu_tg_path; // workload group path
     std::string _cgroup_v1_cpu_tg_quota_file;
+    std::string _cgroup_v1_cpu_tg_shares_file;
     std::string _cgroup_v1_cpu_tg_task_file;
 };
 
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e858a5b6acb..17f69c3d360 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1113,6 +1113,7 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
 
 // cgroup
 DEFINE_String(doris_cgroup_cpu_path, "");
+DEFINE_Bool(enable_cgroup_cpu_soft_limit, "false");
 
 DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 493f9342aa7..cfcd09c1984 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1183,6 +1183,8 @@ DECLARE_mBool(exit_on_exception);
 
 // cgroup
 DECLARE_String(doris_cgroup_cpu_path);
+DECLARE_Bool(enable_cgroup_cpu_soft_limit);
+
 // This config controls whether the s3 file writer would flush cache 
asynchronously
 DECLARE_Bool(enable_flush_file_cache_async);
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 83b57c28fd7..266c652ca7f 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -671,18 +671,31 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
                     LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
                               << " use task group: " << tg->debug_string()
                               << " cpu_hard_limit: " << 
task_group_info.cpu_hard_limit
-                              << " cpu_share:" << task_group_info.cpu_share;
+                              << " cpu_share:" << task_group_info.cpu_share
+                              << " enable cgroup soft cpu:" << 
config::enable_cgroup_cpu_soft_limit;
                     if (task_group_info.cpu_hard_limit > 0) {
                         Status ret = 
_exec_env->task_group_manager()->create_and_get_task_scheduler(
-                                tg_id, tg_name, 
task_group_info.cpu_hard_limit, _exec_env,
-                                query_ctx.get());
+                                tg_id, tg_name, task_group_info.cpu_hard_limit,
+                                task_group_info.cpu_share, _exec_env, 
query_ctx.get());
                         if (!ret.ok()) {
                             LOG(INFO) << "workload group init failed "
                                       << ", name=" << tg_name << ", id=" << 
tg_id
                                       << ", reason=" << ret.to_string();
                         }
                     } else {
-                        query_ctx->set_task_group(tg);
+                        if (!config::enable_cgroup_cpu_soft_limit) {
+                            query_ctx->set_task_group(tg);
+                        } else {
+                            Status ret =
+                                    
_exec_env->task_group_manager()->create_and_get_task_scheduler(
+                                            tg_id, tg_name, 
task_group_info.cpu_hard_limit,
+                                            task_group_info.cpu_share, 
_exec_env, query_ctx.get());
+                            if (!ret.ok()) {
+                                LOG(INFO) << "workload group cpu soft limit 
init failed "
+                                          << ", name=" << tg_name << ", id=" 
<< tg_id
+                                          << ", reason=" << ret.to_string();
+                            }
+                        }
                     }
                 }
             } else {
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index b3c24fa96e7..fb940069787 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -60,7 +60,8 @@ void TaskGroupManager::get_resource_groups(const 
std::function<bool(const TaskGr
 }
 
 Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, 
std::string tg_name,
-                                                       int cpu_hard_limit, 
ExecEnv* exec_env,
+                                                       int cpu_hard_limit, int 
cpu_shares,
+                                                       ExecEnv* exec_env,
                                                        QueryContext* 
query_ctx_ptr) {
     std::lock_guard<std::mutex> lock(_task_scheduler_lock);
     // step 1: init cgroup cpu controller
@@ -117,7 +118,14 @@ Status 
TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
     query_ctx_ptr->set_scan_task_scheduler(scan_task_sche);
 
     // step 5 update cgroup cpu if needed
-    _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
+    if (cpu_hard_limit > 0) {
+        _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
+        
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
+    } else {
+        _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
+        _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
+                CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
+    }
 
     return Status::OK();
 }
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index ae501e93f3e..cf44f535440 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -52,7 +52,8 @@ public:
                              std::vector<TaskGroupPtr>* task_groups);
 
     Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, 
int cpu_hard_limit,
-                                         ExecEnv* exec_env, QueryContext* 
query_ctx_ptr);
+                                         int cpu_shares, ExecEnv* exec_env,
+                                         QueryContext* query_ctx_ptr);
 
     void delete_task_group_by_ids(std::set<uint64_t> id_set);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to