This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c570c34b7f [Fix](executor)Release resource correctly when drop 
workload group (#30279)
4c570c34b7f is described below

commit 4c570c34b7f4b2f6ff3e8b19633f26e3e679d6f6
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Jan 25 09:47:57 2024 +0800

    [Fix](executor)Release resource correctly when drop workload group (#30279)
---
 be/src/runtime/fragment_mgr.cpp                  |  8 +-
 be/src/runtime/query_context.cpp                 | 16 ++--
 be/src/runtime/query_context.h                   |  4 +-
 be/src/runtime/task_group/task_group.h           | 17 ++++
 be/src/runtime/task_group/task_group_manager.cpp | 98 ++++++++++++++++--------
 be/src/runtime/task_group/task_group_manager.h   |  4 +
 6 files changed, 106 insertions(+), 41 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a76c7687f02..403c2463c2a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -685,7 +685,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         if (params.__isset.workload_groups && !params.workload_groups.empty()) 
{
             uint64_t tg_id = params.workload_groups[0].id;
             auto* tg_mgr = _exec_env->task_group_manager();
-            if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) {
+            taskgroup::TaskGroupPtr task_group_ptr = nullptr;
+            Status ret = tg_mgr->add_query_to_group(tg_id, 
query_ctx->query_id(), &task_group_ptr);
+            if (ret.ok()) {
                 
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
                 // set task group to queryctx for memory tracker can be 
removed, see QueryContext's destructor
                 query_ctx->set_task_group(task_group_ptr);
@@ -698,6 +700,10 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
                           << ", is pipeline: " << ((int)is_pipeline)
                           << ", enable cgroup soft limit: "
                           << ((int)config::enable_cgroup_cpu_soft_limit);
+            } else {
+                LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
+                          << " carried group info but can not find group in 
be, reason: "
+                          << ret.to_string();
             }
         }
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index a70bf6695ac..5b2de639d47 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -64,8 +64,10 @@ QueryContext::~QueryContext() {
     }
     if (_task_group) {
         _task_group->remove_mem_tracker_limiter(query_mem_tracker);
+        
_exec_env->task_group_manager()->remove_query_from_group(_task_group->id(), 
_query_id);
     }
 
+    
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
     LOG_INFO("Query {} deconstructed, {}", print_id(_query_id), 
mem_tracker_msg);
     // Not release the the thread token in query context's dector method, 
because the query
     // conext may be dectored in the thread token it self. It is very 
dangerous and may core.
@@ -75,7 +77,6 @@ QueryContext::~QueryContext() {
         
static_cast<void>(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
                 
std::make_shared<DelayReleaseToken>(std::move(_thread_token))));
     }
-    
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
 }
 
 void QueryContext::set_ready_to_execute(bool is_cancelled) {
@@ -160,13 +161,14 @@ void QueryContext::set_query_scheduler(uint64_t tg_id) {
 }
 
 doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
-    if (!config::enable_cgroup_cpu_soft_limit) {
-        return _exec_env->pipeline_task_group_scheduler();
-    } else if (_task_scheduler) {
-        return _task_scheduler;
-    } else {
-        return _exec_env->pipeline_task_scheduler();
+    if (_task_group) {
+        if (!config::enable_cgroup_cpu_soft_limit) {
+            return _exec_env->pipeline_task_group_scheduler();
+        } else if (_task_scheduler) {
+            return _task_scheduler;
+        }
     }
+    return _exec_env->pipeline_task_scheduler();
 }
 
 ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index d5a8f12cee1..1c5d98b0472 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -153,7 +153,9 @@ public:
 
     void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
 
-    taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); }
+    taskgroup::TaskGroup* get_task_group() const {
+        return _task_group == nullptr ? nullptr : _task_group.get();
+    }
 
     int execution_timeout() const {
         return _query_options.__isset.execution_timeout ? 
_query_options.execution_timeout
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index 04dbf518f0d..3440bead9df 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -29,6 +29,7 @@
 #include <unordered_set>
 
 #include "common/status.h"
+#include "util/hash_util.hpp"
 
 namespace doris {
 
@@ -141,6 +142,16 @@ public:
         return _memory_limit > 0;
     }
 
+    void add_query(TUniqueId query_id) { _query_id_set.insert(query_id); }
+
+    void remove_query(TUniqueId query_id) { _query_id_set.erase(query_id); }
+
+    void shutdown() { _is_shutdown = true; }
+
+    int query_num() { return _query_id_set.size(); }
+
+    bool is_shutdown() { return _is_shutdown; }
+
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
@@ -152,6 +163,12 @@ private:
     TaskGroupPipelineTaskEntity _task_entity;
     std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
     std::atomic<int> _cpu_hard_limit;
+
+    // means task group is mark dropped
+    // new query can not submit
+    // waiting running query to be cancelled or finish
+    bool _is_shutdown = false;
+    std::unordered_set<TUniqueId> _query_id_set;
 };
 
 using TaskGroupPtr = std::shared_ptr<TaskGroup>;
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index 74694baa9fc..068f5eced37 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -180,68 +180,72 @@ Status 
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
 }
 
 void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) 
{
-    // stop task sche may cost some time, so it should not be locked
-    std::set<doris::pipeline::TaskScheduler*> task_sche_to_del;
-    std::set<vectorized::SimplifiedScanScheduler*> scan_task_sche_to_del;
-    std::set<ThreadPool*> non_pip_thread_pool_to_del;
+    int64_t begin_time = MonotonicMillis();
+    // 1 get delete group without running queries
     std::set<uint64_t> deleted_tg_ids;
     {
-        std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
-        for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end(); 
iter++) {
+        std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+        for (auto iter = _task_groups.begin(); iter != _task_groups.end(); 
iter++) {
             uint64_t tg_id = iter->first;
+            auto* task_group_ptr = iter->second.get();
             if (used_wg_id.find(tg_id) == used_wg_id.end()) {
-                task_sche_to_del.insert(_tg_sche_map[tg_id].get());
-                deleted_tg_ids.insert(tg_id);
+                task_group_ptr->shutdown();
+                // only when no query running in task group, its resource can 
be released in BE
+                if (task_group_ptr->query_num() == 0) {
+                    deleted_tg_ids.insert(tg_id);
+                }
             }
         }
+    }
 
-        for (auto iter = _tg_scan_sche_map.begin(); iter != 
_tg_scan_sche_map.end(); iter++) {
-            uint64_t tg_id = iter->first;
-            if (used_wg_id.find(tg_id) == used_wg_id.end()) {
-                scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get());
+    // 2 stop active thread
+    std::vector<doris::pipeline::TaskScheduler*> task_sched_to_stop;
+    std::vector<vectorized::SimplifiedScanScheduler*> scan_task_sched_to_stop;
+    std::vector<ThreadPool*> non_pip_thread_pool_to_stop;
+    {
+        std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
+        for (uint64_t tg_id : deleted_tg_ids) {
+            if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
+                task_sched_to_stop.emplace_back(_tg_sche_map.at(tg_id).get());
             }
-        }
-        for (auto iter = _non_pipe_thread_pool_map.begin(); iter != 
_non_pipe_thread_pool_map.end();
-             iter++) {
-            uint64_t tg_id = iter->first;
-            if (used_wg_id.find(tg_id) == used_wg_id.end()) {
-                
non_pip_thread_pool_to_del.insert(_non_pipe_thread_pool_map[tg_id].get());
+            if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
+                
scan_task_sched_to_stop.emplace_back(_tg_scan_sche_map.at(tg_id).get());
+            }
+            if (_non_pipe_thread_pool_map.find(tg_id) != 
_non_pipe_thread_pool_map.end()) {
+                
non_pip_thread_pool_to_stop.emplace_back(_non_pipe_thread_pool_map.at(tg_id).get());
             }
         }
     }
-    // 1 stop all threads
-    for (auto* ptr1 : task_sche_to_del) {
+    for (auto* ptr1 : task_sched_to_stop) {
         ptr1->stop();
     }
-    for (auto* ptr2 : scan_task_sche_to_del) {
+    for (auto* ptr2 : scan_task_sched_to_stop) {
         ptr2->stop();
     }
-    for (auto& ptr3 : non_pip_thread_pool_to_del) {
+    for (auto& ptr3 : non_pip_thread_pool_to_stop) {
         ptr3->shutdown();
+        ptr3->wait();
     }
-    // 2 release resource in memory
+
+    // 3 release resource in memory
     {
         std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
         for (uint64_t tg_id : deleted_tg_ids) {
             _tg_sche_map.erase(tg_id);
             _tg_scan_sche_map.erase(tg_id);
             _cgroup_ctl_map.erase(tg_id);
+            _non_pipe_thread_pool_map.erase(tg_id);
         }
     }
 
     {
         std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
-        for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
-            uint64_t tg_id = iter->first;
-            if (used_wg_id.find(tg_id) == used_wg_id.end()) {
-                iter = _task_groups.erase(iter);
-            } else {
-                iter++;
-            }
+        for (uint64_t tg_id : deleted_tg_ids) {
+            _task_groups.erase(tg_id);
         }
     }
 
-    // 3 clear cgroup dir
+    // 4 clear cgroup dir
     // NOTE(wb) currently we use rmdir to delete cgroup path,
     // this action may be failed until task file is cleared which means all 
thread are stopped.
     // So the first time to rmdir a cgroup path may failed.
@@ -266,7 +270,37 @@ void 
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
             }
         }
     }
-    LOG(INFO) << "finish clear unused cgroup path";
+    int64_t time_cost_ms = MonotonicMillis() - begin_time;
+    LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms
+              << "ms, deleted group size:" << deleted_tg_ids.size();
+}
+
+Status TaskGroupManager::add_query_to_group(uint64_t tg_id, TUniqueId query_id,
+                                            TaskGroupPtr* tg_ptr) {
+    std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+    auto tg_iter = _task_groups.find(tg_id);
+    if (tg_iter != _task_groups.end()) {
+        if (tg_iter->second->is_shutdown()) {
+            return Status::InternalError<false>("workload group {} is 
shutdown.", tg_id);
+        }
+        tg_iter->second->add_query(query_id);
+        *tg_ptr = tg_iter->second;
+        return Status::OK();
+    } else {
+        return Status::InternalError<false>("can not find workload group {}.", 
tg_id);
+    }
+}
+
+void TaskGroupManager::remove_query_from_group(uint64_t tg_id, TUniqueId 
query_id) {
+    std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+    auto tg_iter = _task_groups.find(tg_id);
+    if (tg_iter != _task_groups.end()) {
+        tg_iter->second->remove_query(query_id);
+    } else {
+        //NOTE: This should never happen
+        LOG(INFO) << "can not find task group when remove query, tg:" << tg_id
+                  << ", query_id:" << print_id(query_id);
+    }
 }
 
 void TaskGroupManager::stop() {
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index a7ccb52f00e..78610b4efc3 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -69,6 +69,10 @@ public:
                              vectorized::SimplifiedScanScheduler** scan_sched,
                              ThreadPool** non_pipe_thread_pool);
 
+    Status add_query_to_group(uint64_t tg_id, TUniqueId query_id, 
TaskGroupPtr* tg_ptr);
+
+    void remove_query_from_group(uint64_t tg_id, TUniqueId query_id);
+
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to