This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7229aeac549 [refactor](wg&memtracker) using weak ptr to delete 
memtracker and query context automatically (#41549)
7229aeac549 is described below

commit 7229aeac5493601b09f28ea15e1a898ca59565b6
Author: yiguolei <676222...@qq.com>
AuthorDate: Wed Oct 9 23:37:05 2024 +0800

    [refactor](wg&memtracker) using weak ptr to delete memtracker and query 
context automatically (#41549)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/common/daemon.cpp                           |  1 +
 be/src/runtime/load_channel.cpp                    |  7 --
 be/src/runtime/load_channel.h                      |  1 -
 be/src/runtime/memory/mem_tracker_limiter.h        |  3 -
 be/src/runtime/query_context.cpp                   |  2 -
 be/src/runtime/workload_group/workload_group.cpp   | 84 +++++++++++++---------
 be/src/runtime/workload_group/workload_group.h     | 15 +---
 .../workload_group/workload_group_manager.cpp      |  7 ++
 .../workload_group/workload_group_manager.h        |  2 +
 9 files changed, 63 insertions(+), 59 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 5da49758865..27fbfb71d7f 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -296,6 +296,7 @@ void Daemon::memory_maintenance_thread() {
         // TODO replace memory_gc_thread.
 
         // step 6. Refresh weighted memory ratio of workload groups.
+        doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
         
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
 
         // step 7. Analyze blocking queries.
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index f8c11639719..1ac7753b197 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -64,7 +64,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t 
timeout_s, bool is_hig
             if (workload_group_ptr) {
                 wg_ptr = workload_group_ptr;
                 wg_ptr->add_mem_tracker_limiter(mem_tracker);
-                _need_release_memtracker = true;
             }
         }
     }
@@ -85,12 +84,6 @@ LoadChannel::~LoadChannel() {
         rows_str << ", index id: " << entry.first << ", total_received_rows: " 
<< entry.second.first
                  << ", num_rows_filtered: " << entry.second.second;
     }
-    if (_need_release_memtracker) {
-        WorkloadGroupPtr wg_ptr = 
_query_thread_context.get_workload_group_ptr();
-        if (wg_ptr) {
-            
wg_ptr->remove_mem_tracker_limiter(_query_thread_context.get_memory_tracker());
-        }
-    }
     LOG(INFO) << "load channel removed"
               << " load_id=" << _load_id << ", is high priority=" << 
_is_high_priority
               << ", sender_ip=" << _sender_ip << rows_str.str();
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 6fad8c536ec..6c150ed74d9 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -127,7 +127,6 @@ private:
     int64_t _backend_id;
 
     bool _enable_profile;
-    bool _need_release_memtracker = false;
 };
 
 inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index faf354cca4c..251a7c25a74 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -123,9 +123,6 @@ public:
     bool is_query_cancelled() { return _is_query_cancelled; }
     void set_is_query_cancelled(bool is_cancelled) { 
_is_query_cancelled.store(is_cancelled); }
 
-    // Iterator into mem_tracker_limiter_pool for this object. Stored to have 
O(1) remove.
-    std::list<std::weak_ptr<MemTrackerLimiter>>::iterator 
wg_tracker_limiter_group_it;
-
     /*
     * Part 3, Memory tracking method (use carefully!)
     *
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 8931854897e..c602dc683fe 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -159,8 +159,6 @@ QueryContext::~QueryContext() {
     uint64_t group_id = 0;
     if (_workload_group) {
         group_id = _workload_group->id(); // before remove
-        _workload_group->remove_mem_tracker_limiter(query_mem_tracker);
-        _workload_group->remove_query(_query_id);
     }
 
     
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 6f3b51f09fd..0488e9ec83c 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -144,21 +144,32 @@ void WorkloadGroup::check_and_update(const 
WorkloadGroupInfo& tg_info) {
     }
 }
 
+// MemtrackerLimiter is not removed during query context release, so that 
should remove it here.
 int64_t WorkloadGroup::make_memory_tracker_snapshots(
         std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) {
     int64_t used_memory = 0;
     for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
         std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
-        for (const auto& trackerWptr : mem_tracker_group.trackers) {
-            auto tracker = trackerWptr.lock();
-            CHECK(tracker != nullptr);
-            if (tracker_snapshots != nullptr) {
-                tracker_snapshots->insert(tracker_snapshots->end(), tracker);
+        for (auto trackerWptr = mem_tracker_group.trackers.begin();
+             trackerWptr != mem_tracker_group.trackers.end();) {
+            auto tracker = trackerWptr->lock();
+            if (tracker == nullptr) {
+                trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
+            } else {
+                if (tracker_snapshots != nullptr) {
+                    tracker_snapshots->insert(tracker_snapshots->end(), 
tracker);
+                }
+                used_memory += tracker->consumption();
+                ++trackerWptr;
             }
-            used_memory += tracker->consumption();
         }
     }
-    refresh_memory(used_memory);
+    // refresh total memory used.
+    _total_mem_used = used_memory;
+    // reserve memory is recorded in the query mem tracker
+    // and _total_mem_used already contains all the current reserve memory.
+    // so after refreshing _total_mem_used, reset 
_wg_refresh_interval_memory_growth.
+    _wg_refresh_interval_memory_growth.store(0.0);
     _mem_used_status->set_value(used_memory);
     return used_memory;
 }
@@ -167,35 +178,38 @@ int64_t WorkloadGroup::memory_used() {
     return make_memory_tracker_snapshots(nullptr);
 }
 
-void WorkloadGroup::refresh_memory(int64_t used_memory) {
-    // refresh total memory used.
-    _total_mem_used = used_memory;
-    // reserve memory is recorded in the query mem tracker
-    // and _total_mem_used already contains all the current reserve memory.
-    // so after refreshing _total_mem_used, reset 
_wg_refresh_interval_memory_growth.
-    _wg_refresh_interval_memory_growth.store(0.0);
-}
+void WorkloadGroup::do_sweep() {
+    // Clear memtracker limiter that is registered during query or load.
+    for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
+        std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
+        for (auto trackerWptr = mem_tracker_group.trackers.begin();
+             trackerWptr != mem_tracker_group.trackers.end();) {
+            auto tracker = trackerWptr->lock();
+            if (tracker == nullptr) {
+                trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
+            } else {
+                ++trackerWptr;
+            }
+        }
+    }
 
-void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr) {
+    // Clear query context that is registered during query context ctor
     std::unique_lock<std::shared_mutex> wlock(_mutex);
-    auto group_num = mem_tracker_ptr->group_num();
-    std::lock_guard<std::mutex> 
l(_mem_tracker_limiter_pool[group_num].group_lock);
-    mem_tracker_ptr->wg_tracker_limiter_group_it =
-            _mem_tracker_limiter_pool[group_num].trackers.insert(
-                    _mem_tracker_limiter_pool[group_num].trackers.end(), 
mem_tracker_ptr);
+    for (auto iter = _query_ctxs.begin(); iter != _query_ctxs.end();) {
+        if (iter->second.lock() == nullptr) {
+            iter = _query_ctxs.erase(iter);
+        } else {
+            iter++;
+        }
+    }
 }
 
-void 
WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr) {
+void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr) {
     std::unique_lock<std::shared_mutex> wlock(_mutex);
     auto group_num = mem_tracker_ptr->group_num();
     std::lock_guard<std::mutex> 
l(_mem_tracker_limiter_pool[group_num].group_lock);
-    if (mem_tracker_ptr->wg_tracker_limiter_group_it !=
-        _mem_tracker_limiter_pool[group_num].trackers.end()) {
-        _mem_tracker_limiter_pool[group_num].trackers.erase(
-                mem_tracker_ptr->wg_tracker_limiter_group_it);
-        mem_tracker_ptr->wg_tracker_limiter_group_it =
-                _mem_tracker_limiter_pool[group_num].trackers.end();
-    }
+    _mem_tracker_limiter_pool[group_num].trackers.insert(
+            _mem_tracker_limiter_pool[group_num].trackers.end(), 
mem_tracker_ptr);
 }
 
 int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* 
profile, bool is_minor_gc) {
@@ -230,14 +244,16 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, 
RuntimeProfile* profile,
     auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
                                                   const std::string& label) {
         return fmt::format(
-                "{} cancel top memory overcommit tracker <{}> consumption {}. 
details:{}, Execute "
+                "{} cancel top memory overcommit tracker <{}> consumption {}. 
details:{}, "
+                "Execute "
                 "again after enough memory, details see be.INFO.",
                 cancel_str, label, MemCounter::print_bytes(mem_consumption),
                 GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
     };
     auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const 
std::string& label) {
         return fmt::format(
-                "{} cancel top memory used tracker <{}> consumption {}. 
details:{}, Execute again "
+                "{} cancel top memory used tracker <{}> consumption {}. 
details:{}, Execute "
+                "again "
                 "after enough memory, details see be.INFO.",
                 cancel_str, label, MemCounter::print_bytes(mem_consumption),
                 
GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
@@ -249,7 +265,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, 
RuntimeProfile* profile,
             _id, _name, _memory_limit, used_memory, need_free_mem);
     Defer defer {[&]() {
         LOG(INFO) << fmt::format(
-                "[MemoryGC] work load group finished gc, id:{} name:{}, memory 
limit: {}, used: "
+                "[MemoryGC] work load group finished gc, id:{} name:{}, memory 
limit: {}, "
+                "used: "
                 "{}, need_free_mem: {}, freed memory: {}.",
                 _id, _name, _memory_limit, used_memory, need_free_mem, 
freed_mem);
     }};
@@ -542,7 +559,8 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
                 _cgroup_cpu_ctl->update_cpu_soft_limit(
                         CgroupCpuCtl::cpu_soft_limit_default_value());
             } else {
-                LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit 
but value is illegal: "
+                LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit 
but value is "
+                             "illegal: "
                           << cpu_hard_limit << ", gid=" << tg_id;
             }
         } else {
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 2fbb4dd3030..933c5afdb4e 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -89,7 +89,8 @@ public:
             std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
     // call make_memory_tracker_snapshots, so also refresh total memory used.
     int64_t memory_used();
-    void refresh_memory(int64_t used_memory);
+
+    void do_sweep();
 
     int spill_threshold_low_water_mark() const {
         return _spill_low_watermark.load(std::memory_order_relaxed);
@@ -132,8 +133,6 @@ public:
 
     void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr);
 
-    void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr);
-
     // when mem_limit <=0 , it's an invalid value, then current group not 
participating in memory GC
     // because mem_limit is not a required property
     bool is_mem_limit_valid() {
@@ -154,11 +153,6 @@ public:
         return Status::OK();
     }
 
-    void remove_query(TUniqueId query_id) {
-        std::unique_lock<std::shared_mutex> wlock(_mutex);
-        _query_ctxs.erase(query_id);
-    }
-
     void shutdown() {
         std::unique_lock<std::shared_mutex> wlock(_mutex);
         _is_shutdown = true;
@@ -169,11 +163,6 @@ public:
         return _is_shutdown && _query_ctxs.empty();
     }
 
-    int query_num() {
-        std::shared_lock<std::shared_mutex> r_lock(_mutex);
-        return _query_ctxs.size();
-    }
-
     int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool 
is_minor_gc);
 
     void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 65a8e3685c8..003f07f1db0 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -136,6 +136,13 @@ void 
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
               << ", before wg size=" << old_wg_size << ", after wg size=" << 
new_wg_size;
 }
 
+void WorkloadGroupMgr::do_sweep() {
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    for (auto& [wg_id, wg] : _workload_groups) {
+        wg->do_sweep();
+    }
+}
+
 struct WorkloadGroupMemInfo {
     int64_t total_mem_used = 0;
     std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index d8547c3383e..f76e98d2606 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -50,6 +50,8 @@ public:
 
     WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id);
 
+    void do_sweep();
+
     void stop();
 
     std::atomic<bool> _enable_cpu_hard_limit = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to