This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dda7ef188ec [improvement] Log workload group's thread num when publish (#37159) dda7ef188ec is described below commit dda7ef188ecccbf2af6726bfd4773861768c4cb9 Author: wangbo <wan...@apache.org> AuthorDate: Wed Jul 3 18:54:36 2024 +0800 [improvement] Log workload group's thread num when publish (#37159) --- be/src/agent/cgroup_cpu_ctl.cpp | 7 +++--- be/src/agent/cgroup_cpu_ctl.h | 4 ++-- be/src/agent/workload_group_listener.cpp | 14 +++++------- be/src/common/config.cpp | 1 - be/src/common/config.h | 1 - be/src/pipeline/task_scheduler.h | 2 ++ be/src/runtime/fragment_mgr.cpp | 4 +--- be/src/runtime/workload_group/workload_group.cpp | 28 ++++++++++++++++++------ be/src/runtime/workload_group/workload_group.h | 6 +++-- be/src/util/threadpool.h | 7 ++++++ be/src/vec/exec/scan/scanner_scheduler.h | 2 ++ 11 files changed, 49 insertions(+), 27 deletions(-) diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index 69d2580aebc..e1bdd1c7207 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -130,7 +130,7 @@ Status CgroupV1CpuCtl::init() { return Status::InternalError<false>("invalid cgroup path, not find cpu quota file"); } - if (_tg_id == -1) { + if (_wg_id == -1) { // means current cgroup cpu ctl is just used to clear dir, // it does not contains workload group. // todo(wb) rethinking whether need to refactor cgroup_cpu_ctl @@ -140,7 +140,7 @@ Status CgroupV1CpuCtl::init() { } // workload group path - _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_tg_id); + _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_wg_id); if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) { int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU); if (ret != 0) { @@ -186,7 +186,8 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() { return Status::OK(); #else int tid = static_cast<int>(syscall(SYS_gettid)); - std::string msg = "add thread " + std::to_string(tid) + " to group"; + std::string msg = + "add thread " + std::to_string(tid) + " to group" + " " + std::to_string(_wg_id); std::lock_guard<std::shared_mutex> w_lock(_lock_mutex); return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true); #endif diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index 1289f26307b..b5f8d2d5d80 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -35,7 +35,7 @@ class CgroupCpuCtl { public: virtual ~CgroupCpuCtl() = default; CgroupCpuCtl() = default; - CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; } + CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; } virtual Status init(); @@ -63,7 +63,7 @@ protected: int _cpu_hard_limit = 0; std::shared_mutex _lock_mutex; bool _init_succ = false; - uint64_t _tg_id = -1; // workload group id + uint64_t _wg_id = -1; // workload group id uint64_t _cpu_shares = 0; }; diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index 5ba95a36784..15c61be5156 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -43,13 +43,13 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi current_wg_ids.insert(workload_group_info.id); } if (!ret.ok()) { - LOG(INFO) << "[topic_publish_wg]parse topic info failed, tg_id=" + LOG(INFO) << "[topic_publish_wg]parse topic info failed, wg_id=" << workload_group_info.id << ", reason:" << ret.to_string(); continue; } // 2 update workload group - auto tg = + auto wg = _exec_env->workload_group_mgr()->get_or_create_workload_group(workload_group_info); // 3 set cpu soft hard limit switch @@ -57,17 +57,15 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi workload_group_info.enable_cpu_hard_limit); // 4 create and update task scheduler - tg->upsert_task_scheduler(&workload_group_info, _exec_env); + wg->upsert_task_scheduler(&workload_group_info, _exec_env); - LOG(INFO) << "[topic_publish_wg]update workload group finish, tg info=" - << tg->debug_string() << ", enable_cpu_hard_limit=" + LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info=" + << wg->debug_string() << ", enable_cpu_hard_limit=" << (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false") << ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares << ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit - << ", enable_cgroup_cpu_soft_limit=" - << (config::enable_cgroup_cpu_soft_limit ? "true" : "false") << ", cgroup home path=" << config::doris_cgroup_cpu_path - << ", list size=" << list_size; + << ", list size=" << list_size << ", thread info=" << wg->thread_debug_info(); } // NOTE(wb) when is_set_workload_group_info=false, it means FE send a empty workload group list diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 799427e2226..37a239e6f79 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1168,7 +1168,6 @@ DEFINE_Bool(enable_flush_file_cache_async, "true"); // cgroup DEFINE_mString(doris_cgroup_cpu_path, ""); -DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true"); DEFINE_mBool(enable_workload_group_memory_gc, "true"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 94c2ec5b0a7..abe13ef40e3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1246,7 +1246,6 @@ DECLARE_mBool(exit_on_exception); // cgroup DECLARE_mString(doris_cgroup_cpu_path); -DECLARE_mBool(enable_cgroup_cpu_soft_limit); DECLARE_mBool(enable_workload_group_memory_gc); diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index b2e081b910c..9a20807ea26 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -61,6 +61,8 @@ public: void stop(); + std::vector<int> thread_debug_info() { return _fix_thread_pool->debug_info(); } + private: std::unique_ptr<ThreadPool> _fix_thread_pool; std::shared_ptr<TaskQueue> _task_queue; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index fe7f0d13c2b..4551ccd00b2 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -636,9 +636,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) << ", use workload group: " << workload_group_ptr->debug_string() - << ", is pipeline: " << ((int)is_pipeline) - << ", enable cgroup soft limit: " - << ((int)config::enable_cgroup_cpu_soft_limit); + << ", is pipeline: " << ((int)is_pipeline); } else { LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) << " carried group info but can not find group in be"; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 843e06440d2..540ead2567c 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -367,9 +367,9 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e Status ret = cgroup_cpu_ctl->init(); if (ret.ok()) { _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl); - LOG(INFO) << "[upsert wg thread pool] cgroup init success"; + LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id; } else { - LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " << tg_id + LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " << tg_id << ", reason=" << ret.to_string(); } } @@ -474,11 +474,9 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e << cpu_hard_limit << ", gid=" << tg_id; } } else { - if (config::enable_cgroup_cpu_soft_limit) { - _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares); - _cgroup_cpu_ctl->update_cpu_hard_limit( - CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit - } + _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares); + _cgroup_cpu_ctl->update_cpu_hard_limit( + CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit } _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares), &(tg_info->cgroup_cpu_hard_limit)); @@ -496,6 +494,22 @@ void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sc *memtable_flush_pool = _memtable_flush_pool.get(); } +std::string WorkloadGroup::thread_debug_info() { + std::vector<int> exec_t_info = _task_sched->thread_debug_info(); + std::string str = fmt::format("[exec num:{}, real_num:{}, min_num:{}, max_num:{}],", + exec_t_info[0], exec_t_info[1], exec_t_info[2], exec_t_info[3]); + + str += fmt::format("[l_scan num:{}, real_num:{}, min_num:{}, max_num{}],", exec_t_info[0], + exec_t_info[1], exec_t_info[2], exec_t_info[3]); + + str += fmt::format("[r_scan num:{}, real_num:{}, min_num:{}, max_num:{}],", exec_t_info[0], + exec_t_info[1], exec_t_info[2], exec_t_info[3]); + + str += fmt::format("[mem_tab_flush num:{}, real_num:{}, min_num:{}, max_num:{}]", + exec_t_info[0], exec_t_info[1], exec_t_info[2], exec_t_info[3]); + return str; +} + void WorkloadGroup::try_stop_schedulers() { std::lock_guard<std::shared_mutex> wlock(_task_sched_lock); if (_task_sched) { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 786e297bc29..971cc1cb023 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -161,6 +161,8 @@ public: return _query_ctxs; } + std::string thread_debug_info(); + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; @@ -185,11 +187,11 @@ private: std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs; std::shared_mutex _task_sched_lock; - std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl = nullptr; + std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr}; std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr}; std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched {nullptr}; std::unique_ptr<vectorized::SimplifiedScanScheduler> _remote_scan_task_sched {nullptr}; - std::unique_ptr<ThreadPool> _memtable_flush_pool = nullptr; + std::unique_ptr<ThreadPool> _memtable_flush_pool {nullptr}; }; using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>; diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index 526836cb09e..5ce27e2f27b 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -256,6 +256,13 @@ public: return _total_queued_tasks; } + std::vector<int> debug_info() { + std::lock_guard<std::mutex> l(_lock); + std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads, + _max_threads}; + return arr; + } + private: friend class ThreadPoolBuilder; friend class ThreadPoolToken; diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index f194afe4bb0..238afc15bf6 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -193,6 +193,8 @@ public: } } + std::vector<int> thread_debug_info() { return _scan_thread_pool->debug_info(); } + private: std::unique_ptr<ThreadPool> _scan_thread_pool; std::atomic<bool> _is_stop; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org