This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0b7becd4b71 [fix](executor)Fix memtracker not set to task group #27699
0b7becd4b71 is described below
commit 0b7becd4b713ff7ad014f6923d372490c3352902
Author: wangbo <[email protected]>
AuthorDate: Thu Nov 30 22:35:51 2023 +0800
[fix](executor)Fix memtracker not set to task group #27699
---
be/src/agent/cgroup_cpu_ctl.cpp | 5 ++--
be/src/agent/cgroup_cpu_ctl.h | 4 +--
be/src/agent/workload_group_listener.cpp | 10 ++++---
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
be/src/runtime/fragment_mgr.cpp | 36 ++++++++++++++----------
be/src/runtime/query_context.h | 2 ++
be/src/runtime/task_group/task_group.h | 2 +-
be/src/runtime/task_group/task_group_manager.cpp | 6 ++--
be/src/runtime/task_group/task_group_manager.h | 8 ++++--
9 files changed, 45 insertions(+), 30 deletions(-)
diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index ac09725cb23..6fa234e6ee2 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -41,7 +41,7 @@ Status CgroupCpuCtl::init() {
return Status::OK();
}
-void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t*
cpu_hard_limit) {
+void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, int*
cpu_hard_limit) {
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
*cpu_shares = this->_cpu_shares;
*cpu_hard_limit = this->_cpu_hard_limit;
@@ -137,7 +137,8 @@ Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int
cpu_shares) {
}
Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
- int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
+ int val = cpu_hard_limit > 0 ? (_cpu_cfs_period_us * _cpu_core_num *
cpu_hard_limit / 100)
+ : CPU_HARD_LIMIT_DEFAULT_VALUE;
std::string msg = "modify cpu quota value to " + std::to_string(val);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val,
msg, false);
}
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 4d78ca82ab7..2a7cdc5719b 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -48,7 +48,7 @@ public:
void update_cpu_soft_limit(int cpu_shares);
// for log
- void get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit);
+ void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit);
protected:
Status write_cg_sys_file(std::string file_path, int value, std::string
msg, bool is_append);
@@ -60,7 +60,7 @@ protected:
std::string _doris_cgroup_cpu_path;
uint64_t _cpu_core_num = CpuInfo::num_cores();
uint64_t _cpu_cfs_period_us = 100000;
- uint64_t _cpu_hard_limit = 0;
+ int _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
uint64_t _tg_id; // workload group id
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
index bc1d3294f6c..6d7dfb9a3a0 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -50,8 +50,8 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
task_group_info.enable_cpu_hard_limit);
// 4 create and update task scheduler
- Status ret2 =
-
_exec_env->task_group_manager()->upsert_task_scheduler(&task_group_info,
_exec_env);
+ Status ret2 =
_exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info,
+
_exec_env);
if (!ret2.ok()) {
LOG(WARNING) << "upsert task sche failed, tg_id=" <<
task_group_info.id
<< ", reason=" << ret2.to_string();
@@ -59,9 +59,11 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
LOG(INFO) << "update task group success, tg info=" <<
tg->debug_string()
<< ", enable_cpu_hard_limit="
- << _exec_env->task_group_manager()->enable_cpu_hard_limit()
+ << (_exec_env->task_group_manager()->enable_cpu_hard_limit()
? "true" : "false")
<< ", cgroup cpu_shares=" <<
task_group_info.cgroup_cpu_shares
- << ", cgroup cpu_hard_limit=" <<
task_group_info.cgroup_cpu_hard_limit;
+ << ", cgroup cpu_hard_limit=" <<
task_group_info.cgroup_cpu_hard_limit
+ << ", enable_cgroup_cpu_soft_limit="
+ << (config::enable_cgroup_cpu_soft_limit ? "true" : "false");
}
_exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 44c9645fcc4..ce3d943af70 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -696,7 +696,7 @@ Status PipelineFragmentContext::submit() {
auto* scheduler = _exec_env->pipeline_task_scheduler();
if (_query_ctx->get_task_scheduler()) {
scheduler = _query_ctx->get_task_scheduler();
- } else if (_task_group_entity) {
+ } else if (_task_group_entity &&
_query_ctx->use_task_group_for_cpu_limit.load()) {
scheduler = _exec_env->pipeline_task_group_scheduler();
}
for (auto& task : _tasks) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 247be409d65..d4f11d25e2f 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -669,25 +669,31 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
uint64_t tg_id = params.workload_groups[0].id;
auto* tg_mgr = _exec_env->task_group_manager();
if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id))
{
- std::stringstream ss;
- ss << "Query/load id: " << print_id(query_ctx->query_id());
- ss << " use task group " << task_group_ptr->debug_string();
- if (tg_mgr->enable_cpu_soft_limit() &&
!config::enable_cgroup_cpu_soft_limit) {
- query_ctx->set_task_group(task_group_ptr);
- ss << ", cpu soft limit based doris sche";
- } else {
- bool ret = tg_mgr->set_task_sche_for_query_ctx(tg_id,
query_ctx.get());
- if (tg_mgr->enable_cpu_hard_limit()) {
- ss << ", cpu hard limit based cgroup";
+
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
+ // set task group to queryctx for memory tracker can be
removed, see QueryContext's destructor
+ query_ctx->set_task_group(task_group_ptr);
+ stringstream ss;
+ ss << "Query/load id: " << print_id(query_ctx->query_id())
+ << ", use task group:" << task_group_ptr->debug_string()
+ << ", enable cpu hard limit:"
+ << (tg_mgr->enable_cpu_hard_limit() ? "true" : "false");
+ bool ret = false;
+ if (tg_mgr->enable_cgroup()) {
+ ret = tg_mgr->set_cg_task_sche_for_query_ctx(tg_id,
query_ctx.get());
+ if (ret) {
+ ss << ", use cgroup for cpu limit.";
} else {
- ss << ", cpu soft limit based cgroup";
- }
- if (!ret) {
- ss << ", but cgroup init failed, scan or exec
fallback to no group";
+ ss << ", not found cgroup sche, no limit for cpu.";
}
+ } else {
+ ss << ", use doris sche for cpu limit.";
+ query_ctx->use_task_group_for_cpu_limit.store(true);
}
LOG(INFO) << ss.str();
- } // else, query run with no group
+ } else {
+ VLOG_DEBUG << "Query/load id: " <<
print_id(query_ctx->query_id())
+ << " no task group found, does not use task
group.";
+ }
} else {
VLOG_DEBUG << "Query/load id: " <<
print_id(query_ctx->query_id())
<< " does not use task group.";
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 5810b06cfc6..0e3a04f8998 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -228,6 +228,8 @@ public:
// only for file scan node
std::map<int, TFileScanRangeParams> file_scan_range_params_map;
+ std::atomic<bool> use_task_group_for_cpu_limit = false;
+
private:
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index 95a329757a2..f1c8523664e 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -173,7 +173,7 @@ struct TaskGroupInfo {
bool enable_cpu_hard_limit;
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
- uint64_t cgroup_cpu_hard_limit = 0;
+ int cgroup_cpu_hard_limit = 0;
static Status parse_topic_info(const TWorkloadGroupInfo& topic_info,
taskgroup::TaskGroupInfo* task_group_info);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index f37ed7ff711..da6294045f8 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -67,7 +67,7 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t
tg_id) {
return nullptr;
}
-bool TaskGroupManager::set_task_sche_for_query_ctx(uint64_t tg_id,
QueryContext* query_ctx_ptr) {
+bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id,
QueryContext* query_ctx_ptr) {
std::lock_guard<std::mutex> lock(_task_scheduler_lock);
if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get());
@@ -83,8 +83,8 @@ bool TaskGroupManager::set_task_sche_for_query_ctx(uint64_t
tg_id, QueryContext*
return true;
}
-Status TaskGroupManager::upsert_task_scheduler(taskgroup::TaskGroupInfo*
tg_info,
- ExecEnv* exec_env) {
+Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo*
tg_info,
+ ExecEnv* exec_env) {
uint64_t tg_id = tg_info->id;
std::string tg_name = tg_info->name;
int cpu_hard_limit = tg_info->cpu_hard_limit;
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index 552cddfe9dc..91156237f40 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -51,7 +51,7 @@ public:
void get_resource_groups(const std::function<bool(const TaskGroupPtr&
ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups);
- Status upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv*
exec_env);
+ Status upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env);
void delete_task_group_by_ids(std::set<uint64_t> id_set);
@@ -65,7 +65,11 @@ public:
bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
- bool set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext*
query_ctx_ptr);
+ bool set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext*
query_ctx_ptr);
+
+ // currently cgroup both support cpu soft limit and cpu hard limit
+ // doris task group only support cpu soft limit
+ bool enable_cgroup() { return enable_cpu_hard_limit() ||
config::enable_cgroup_cpu_soft_limit; }
private:
std::shared_mutex _group_mutex;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]