This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dda7ef188ec [improvement] Log workload group's thread num when publish 
(#37159)
dda7ef188ec is described below

commit dda7ef188ecccbf2af6726bfd4773861768c4cb9
Author: wangbo <wan...@apache.org>
AuthorDate: Wed Jul 3 18:54:36 2024 +0800

    [improvement] Log workload group's thread num when publish (#37159)
---
 be/src/agent/cgroup_cpu_ctl.cpp                  |  7 +++---
 be/src/agent/cgroup_cpu_ctl.h                    |  4 ++--
 be/src/agent/workload_group_listener.cpp         | 14 +++++-------
 be/src/common/config.cpp                         |  1 -
 be/src/common/config.h                           |  1 -
 be/src/pipeline/task_scheduler.h                 |  2 ++
 be/src/runtime/fragment_mgr.cpp                  |  4 +---
 be/src/runtime/workload_group/workload_group.cpp | 28 ++++++++++++++++++------
 be/src/runtime/workload_group/workload_group.h   |  6 +++--
 be/src/util/threadpool.h                         |  7 ++++++
 be/src/vec/exec/scan/scanner_scheduler.h         |  2 ++
 11 files changed, 49 insertions(+), 27 deletions(-)

diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index 69d2580aebc..e1bdd1c7207 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -130,7 +130,7 @@ Status CgroupV1CpuCtl::init() {
         return Status::InternalError<false>("invalid cgroup path, not find cpu 
quota file");
     }
 
-    if (_tg_id == -1) {
+    if (_wg_id == -1) {
         // means current cgroup cpu ctl is just used to clear dir,
         // it does not contains workload group.
         // todo(wb) rethinking whether need to refactor cgroup_cpu_ctl
@@ -140,7 +140,7 @@ Status CgroupV1CpuCtl::init() {
     }
 
     // workload group path
-    _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + 
std::to_string(_tg_id);
+    _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + 
std::to_string(_wg_id);
     if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
         int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
         if (ret != 0) {
@@ -186,7 +186,8 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
     return Status::OK();
 #else
     int tid = static_cast<int>(syscall(SYS_gettid));
-    std::string msg = "add thread " + std::to_string(tid) + " to group";
+    std::string msg =
+            "add thread " + std::to_string(tid) + " to group" + " " + 
std::to_string(_wg_id);
     std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
     return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, 
msg, true);
 #endif
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 1289f26307b..b5f8d2d5d80 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -35,7 +35,7 @@ class CgroupCpuCtl {
 public:
     virtual ~CgroupCpuCtl() = default;
     CgroupCpuCtl() = default;
-    CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; }
+    CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; }
 
     virtual Status init();
 
@@ -63,7 +63,7 @@ protected:
     int _cpu_hard_limit = 0;
     std::shared_mutex _lock_mutex;
     bool _init_succ = false;
-    uint64_t _tg_id = -1; // workload group id
+    uint64_t _wg_id = -1; // workload group id
     uint64_t _cpu_shares = 0;
 };
 
diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index 5ba95a36784..15c61be5156 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -43,13 +43,13 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
             current_wg_ids.insert(workload_group_info.id);
         }
         if (!ret.ok()) {
-            LOG(INFO) << "[topic_publish_wg]parse topic info failed, tg_id="
+            LOG(INFO) << "[topic_publish_wg]parse topic info failed, wg_id="
                       << workload_group_info.id << ", reason:" << 
ret.to_string();
             continue;
         }
 
         // 2 update workload group
-        auto tg =
+        auto wg =
                 
_exec_env->workload_group_mgr()->get_or_create_workload_group(workload_group_info);
 
         // 3 set cpu soft hard limit switch
@@ -57,17 +57,15 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
                 workload_group_info.enable_cpu_hard_limit);
 
         // 4 create and update task scheduler
-        tg->upsert_task_scheduler(&workload_group_info, _exec_env);
+        wg->upsert_task_scheduler(&workload_group_info, _exec_env);
 
-        LOG(INFO) << "[topic_publish_wg]update workload group finish, tg info="
-                  << tg->debug_string() << ", enable_cpu_hard_limit="
+        LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info="
+                  << wg->debug_string() << ", enable_cpu_hard_limit="
                   << (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() 
? "true" : "false")
                   << ", cgroup cpu_shares=" << 
workload_group_info.cgroup_cpu_shares
                   << ", cgroup cpu_hard_limit=" << 
workload_group_info.cgroup_cpu_hard_limit
-                  << ", enable_cgroup_cpu_soft_limit="
-                  << (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
                   << ", cgroup home path=" << config::doris_cgroup_cpu_path
-                  << ", list size=" << list_size;
+                  << ", list size=" << list_size << ", thread info=" << 
wg->thread_debug_info();
     }
 
     // NOTE(wb) when is_set_workload_group_info=false, it means FE send a 
empty workload group list
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 799427e2226..37a239e6f79 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1168,7 +1168,6 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
 
 // cgroup
 DEFINE_mString(doris_cgroup_cpu_path, "");
-DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");
 
 DEFINE_mBool(enable_workload_group_memory_gc, "true");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 94c2ec5b0a7..abe13ef40e3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1246,7 +1246,6 @@ DECLARE_mBool(exit_on_exception);
 
 // cgroup
 DECLARE_mString(doris_cgroup_cpu_path);
-DECLARE_mBool(enable_cgroup_cpu_soft_limit);
 
 DECLARE_mBool(enable_workload_group_memory_gc);
 
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index b2e081b910c..9a20807ea26 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -61,6 +61,8 @@ public:
 
     void stop();
 
+    std::vector<int> thread_debug_info() { return 
_fix_thread_pool->debug_info(); }
+
 private:
     std::unique_ptr<ThreadPool> _fix_thread_pool;
     std::shared_ptr<TaskQueue> _task_queue;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fe7f0d13c2b..4551ccd00b2 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -636,9 +636,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
 
                 LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
                           << ", use workload group: " << 
workload_group_ptr->debug_string()
-                          << ", is pipeline: " << ((int)is_pipeline)
-                          << ", enable cgroup soft limit: "
-                          << ((int)config::enable_cgroup_cpu_soft_limit);
+                          << ", is pipeline: " << ((int)is_pipeline);
             } else {
                 LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
                           << " carried group info but can not find group in 
be";
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 843e06440d2..540ead2567c 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -367,9 +367,9 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
         Status ret = cgroup_cpu_ctl->init();
         if (ret.ok()) {
             _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
-            LOG(INFO) << "[upsert wg thread pool] cgroup init success";
+            LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" 
<< tg_id;
         } else {
-            LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " 
<< tg_id
+            LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " 
<< tg_id
                       << ", reason=" << ret.to_string();
         }
     }
@@ -474,11 +474,9 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
                           << cpu_hard_limit << ", gid=" << tg_id;
             }
         } else {
-            if (config::enable_cgroup_cpu_soft_limit) {
-                _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
-                _cgroup_cpu_ctl->update_cpu_hard_limit(
-                        CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard 
limit
-            }
+            _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
+            _cgroup_cpu_ctl->update_cpu_hard_limit(
+                    CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
         }
         _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
                                              
&(tg_info->cgroup_cpu_hard_limit));
@@ -496,6 +494,22 @@ void 
WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sc
     *memtable_flush_pool = _memtable_flush_pool.get();
 }
 
+std::string WorkloadGroup::thread_debug_info() {
+    std::vector<int> exec_t_info = _task_sched->thread_debug_info();
+    std::string str = fmt::format("[exec num:{}, real_num:{}, min_num:{}, 
max_num:{}],",
+                                  exec_t_info[0], exec_t_info[1], 
exec_t_info[2], exec_t_info[3]);
+
+    str += fmt::format("[l_scan num:{}, real_num:{}, min_num:{}, max_num{}],", 
exec_t_info[0],
+                       exec_t_info[1], exec_t_info[2], exec_t_info[3]);
+
+    str += fmt::format("[r_scan num:{}, real_num:{}, min_num:{}, 
max_num:{}],", exec_t_info[0],
+                       exec_t_info[1], exec_t_info[2], exec_t_info[3]);
+
+    str += fmt::format("[mem_tab_flush num:{}, real_num:{}, min_num:{}, 
max_num:{}]",
+                       exec_t_info[0], exec_t_info[1], exec_t_info[2], 
exec_t_info[3]);
+    return str;
+}
+
 void WorkloadGroup::try_stop_schedulers() {
     std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
     if (_task_sched) {
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 786e297bc29..971cc1cb023 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -161,6 +161,8 @@ public:
         return _query_ctxs;
     }
 
+    std::string thread_debug_info();
+
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
@@ -185,11 +187,11 @@ private:
     std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs;
 
     std::shared_mutex _task_sched_lock;
-    std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl = nullptr;
+    std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
     std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
     std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched 
{nullptr};
     std::unique_ptr<vectorized::SimplifiedScanScheduler> 
_remote_scan_task_sched {nullptr};
-    std::unique_ptr<ThreadPool> _memtable_flush_pool = nullptr;
+    std::unique_ptr<ThreadPool> _memtable_flush_pool {nullptr};
 };
 
 using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 526836cb09e..5ce27e2f27b 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -256,6 +256,13 @@ public:
         return _total_queued_tasks;
     }
 
+    std::vector<int> debug_info() {
+        std::lock_guard<std::mutex> l(_lock);
+        std::vector<int> arr = {_num_threads, 
static_cast<int>(_threads.size()), _min_threads,
+                                _max_threads};
+        return arr;
+    }
+
 private:
     friend class ThreadPoolBuilder;
     friend class ThreadPoolToken;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index f194afe4bb0..238afc15bf6 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -193,6 +193,8 @@ public:
         }
     }
 
+    std::vector<int> thread_debug_info() { return 
_scan_thread_pool->debug_info(); }
+
 private:
     std::unique_ptr<ThreadPool> _scan_thread_pool;
     std::atomic<bool> _is_stop;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to