This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4c570c34b7f [Fix](executor)Release resource correctly when drop workload group (#30279) 4c570c34b7f is described below commit 4c570c34b7f4b2f6ff3e8b19633f26e3e679d6f6 Author: wangbo <wan...@apache.org> AuthorDate: Thu Jan 25 09:47:57 2024 +0800 [Fix](executor)Release resource correctly when drop workload group (#30279) --- be/src/runtime/fragment_mgr.cpp | 8 +- be/src/runtime/query_context.cpp | 16 ++-- be/src/runtime/query_context.h | 4 +- be/src/runtime/task_group/task_group.h | 17 ++++ be/src/runtime/task_group/task_group_manager.cpp | 98 ++++++++++++++++-------- be/src/runtime/task_group/task_group_manager.h | 4 + 6 files changed, 106 insertions(+), 41 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index a76c7687f02..403c2463c2a 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -685,7 +685,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo if (params.__isset.workload_groups && !params.workload_groups.empty()) { uint64_t tg_id = params.workload_groups[0].id; auto* tg_mgr = _exec_env->task_group_manager(); - if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) { + taskgroup::TaskGroupPtr task_group_ptr = nullptr; + Status ret = tg_mgr->add_query_to_group(tg_id, query_ctx->query_id(), &task_group_ptr); + if (ret.ok()) { task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker); // set task group to queryctx for memory tracker can be removed, see QueryContext's destructor query_ctx->set_task_group(task_group_ptr); @@ -698,6 +700,10 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo << ", is pipeline: " << ((int)is_pipeline) << ", enable cgroup soft limit: " << ((int)config::enable_cgroup_cpu_soft_limit); + } else { + LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) + << " carried group info but can not find group in be, reason: " + << ret.to_string(); } } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index a70bf6695ac..5b2de639d47 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -64,8 +64,10 @@ QueryContext::~QueryContext() { } if (_task_group) { _task_group->remove_mem_tracker_limiter(query_mem_tracker); + _exec_env->task_group_manager()->remove_query_from_group(_task_group->id(), _query_id); } + _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); LOG_INFO("Query {} deconstructed, {}", print_id(_query_id), mem_tracker_msg); // Not release the the thread token in query context's dector method, because the query // conext may be dectored in the thread token it self. It is very dangerous and may core. @@ -75,7 +77,6 @@ QueryContext::~QueryContext() { static_cast<void>(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit( std::make_shared<DelayReleaseToken>(std::move(_thread_token)))); } - _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); } void QueryContext::set_ready_to_execute(bool is_cancelled) { @@ -160,13 +161,14 @@ void QueryContext::set_query_scheduler(uint64_t tg_id) { } doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() { - if (!config::enable_cgroup_cpu_soft_limit) { - return _exec_env->pipeline_task_group_scheduler(); - } else if (_task_scheduler) { - return _task_scheduler; - } else { - return _exec_env->pipeline_task_scheduler(); + if (_task_group) { + if (!config::enable_cgroup_cpu_soft_limit) { + return _exec_env->pipeline_task_group_scheduler(); + } else if (_task_scheduler) { + return _task_scheduler; + } } + return _exec_env->pipeline_task_scheduler(); } ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index d5a8f12cee1..1c5d98b0472 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -153,7 +153,9 @@ public: void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; } - taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); } + taskgroup::TaskGroup* get_task_group() const { + return _task_group == nullptr ? nullptr : _task_group.get(); + } int execution_timeout() const { return _query_options.__isset.execution_timeout ? _query_options.execution_timeout diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index 04dbf518f0d..3440bead9df 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -29,6 +29,7 @@ #include <unordered_set> #include "common/status.h" +#include "util/hash_util.hpp" namespace doris { @@ -141,6 +142,16 @@ public: return _memory_limit > 0; } + void add_query(TUniqueId query_id) { _query_id_set.insert(query_id); } + + void remove_query(TUniqueId query_id) { _query_id_set.erase(query_id); } + + void shutdown() { _is_shutdown = true; } + + int query_num() { return _query_id_set.size(); } + + bool is_shutdown() { return _is_shutdown; } + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; @@ -152,6 +163,12 @@ private: TaskGroupPipelineTaskEntity _task_entity; std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool; std::atomic<int> _cpu_hard_limit; + + // means task group is mark dropped + // new query can not submit + // waiting running query to be cancelled or finish + bool _is_shutdown = false; + std::unordered_set<TUniqueId> _query_id_set; }; using TaskGroupPtr = std::shared_ptr<TaskGroup>; diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index 74694baa9fc..068f5eced37 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -180,68 +180,72 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i } void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) { - // stop task sche may cost some time, so it should not be locked - std::set<doris::pipeline::TaskScheduler*> task_sche_to_del; - std::set<vectorized::SimplifiedScanScheduler*> scan_task_sche_to_del; - std::set<ThreadPool*> non_pip_thread_pool_to_del; + int64_t begin_time = MonotonicMillis(); + // 1 get delete group without running queries std::set<uint64_t> deleted_tg_ids; { - std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock); - for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end(); iter++) { + std::lock_guard<std::shared_mutex> write_lock(_group_mutex); + for (auto iter = _task_groups.begin(); iter != _task_groups.end(); iter++) { uint64_t tg_id = iter->first; + auto* task_group_ptr = iter->second.get(); if (used_wg_id.find(tg_id) == used_wg_id.end()) { - task_sche_to_del.insert(_tg_sche_map[tg_id].get()); - deleted_tg_ids.insert(tg_id); + task_group_ptr->shutdown(); + // only when no query running in task group, its resource can be released in BE + if (task_group_ptr->query_num() == 0) { + deleted_tg_ids.insert(tg_id); + } } } + } - for (auto iter = _tg_scan_sche_map.begin(); iter != _tg_scan_sche_map.end(); iter++) { - uint64_t tg_id = iter->first; - if (used_wg_id.find(tg_id) == used_wg_id.end()) { - scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get()); + // 2 stop active thread + std::vector<doris::pipeline::TaskScheduler*> task_sched_to_stop; + std::vector<vectorized::SimplifiedScanScheduler*> scan_task_sched_to_stop; + std::vector<ThreadPool*> non_pip_thread_pool_to_stop; + { + std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock); + for (uint64_t tg_id : deleted_tg_ids) { + if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) { + task_sched_to_stop.emplace_back(_tg_sche_map.at(tg_id).get()); } - } - for (auto iter = _non_pipe_thread_pool_map.begin(); iter != _non_pipe_thread_pool_map.end(); - iter++) { - uint64_t tg_id = iter->first; - if (used_wg_id.find(tg_id) == used_wg_id.end()) { - non_pip_thread_pool_to_del.insert(_non_pipe_thread_pool_map[tg_id].get()); + if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) { + scan_task_sched_to_stop.emplace_back(_tg_scan_sche_map.at(tg_id).get()); + } + if (_non_pipe_thread_pool_map.find(tg_id) != _non_pipe_thread_pool_map.end()) { + non_pip_thread_pool_to_stop.emplace_back(_non_pipe_thread_pool_map.at(tg_id).get()); } } } - // 1 stop all threads - for (auto* ptr1 : task_sche_to_del) { + for (auto* ptr1 : task_sched_to_stop) { ptr1->stop(); } - for (auto* ptr2 : scan_task_sche_to_del) { + for (auto* ptr2 : scan_task_sched_to_stop) { ptr2->stop(); } - for (auto& ptr3 : non_pip_thread_pool_to_del) { + for (auto& ptr3 : non_pip_thread_pool_to_stop) { ptr3->shutdown(); + ptr3->wait(); } - // 2 release resource in memory + + // 3 release resource in memory { std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock); for (uint64_t tg_id : deleted_tg_ids) { _tg_sche_map.erase(tg_id); _tg_scan_sche_map.erase(tg_id); _cgroup_ctl_map.erase(tg_id); + _non_pipe_thread_pool_map.erase(tg_id); } } { std::lock_guard<std::shared_mutex> write_lock(_group_mutex); - for (auto iter = _task_groups.begin(); iter != _task_groups.end();) { - uint64_t tg_id = iter->first; - if (used_wg_id.find(tg_id) == used_wg_id.end()) { - iter = _task_groups.erase(iter); - } else { - iter++; - } + for (uint64_t tg_id : deleted_tg_ids) { + _task_groups.erase(tg_id); } } - // 3 clear cgroup dir + // 4 clear cgroup dir // NOTE(wb) currently we use rmdir to delete cgroup path, // this action may be failed until task file is cleared which means all thread are stopped. // So the first time to rmdir a cgroup path may failed. @@ -266,7 +270,37 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) { } } } - LOG(INFO) << "finish clear unused cgroup path"; + int64_t time_cost_ms = MonotonicMillis() - begin_time; + LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms + << "ms, deleted group size:" << deleted_tg_ids.size(); +} + +Status TaskGroupManager::add_query_to_group(uint64_t tg_id, TUniqueId query_id, + TaskGroupPtr* tg_ptr) { + std::lock_guard<std::shared_mutex> write_lock(_group_mutex); + auto tg_iter = _task_groups.find(tg_id); + if (tg_iter != _task_groups.end()) { + if (tg_iter->second->is_shutdown()) { + return Status::InternalError<false>("workload group {} is shutdown.", tg_id); + } + tg_iter->second->add_query(query_id); + *tg_ptr = tg_iter->second; + return Status::OK(); + } else { + return Status::InternalError<false>("can not find workload group {}.", tg_id); + } +} + +void TaskGroupManager::remove_query_from_group(uint64_t tg_id, TUniqueId query_id) { + std::lock_guard<std::shared_mutex> write_lock(_group_mutex); + auto tg_iter = _task_groups.find(tg_id); + if (tg_iter != _task_groups.end()) { + tg_iter->second->remove_query(query_id); + } else { + //NOTE: This should never happen + LOG(INFO) << "can not find task group when remove query, tg:" << tg_id + << ", query_id:" << print_id(query_id); + } } void TaskGroupManager::stop() { diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index a7ccb52f00e..78610b4efc3 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -69,6 +69,10 @@ public: vectorized::SimplifiedScanScheduler** scan_sched, ThreadPool** non_pipe_thread_pool); + Status add_query_to_group(uint64_t tg_id, TUniqueId query_id, TaskGroupPtr* tg_ptr); + + void remove_query_from_group(uint64_t tg_id, TUniqueId query_id); + private: std::shared_mutex _group_mutex; std::unordered_map<uint64_t, TaskGroupPtr> _task_groups; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org