This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ceceb4d78f [fix](memory) Fix erase invalid MemTrackerLimiter from 
tracker pool (#33074)
9ceceb4d78f is described below

commit 9ceceb4d78f8524c0f0eac723cfe60063dd1e0e3
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Sun Mar 31 13:28:34 2024 +0800

    [fix](memory) Fix erase invalid MemTrackerLimiter from tracker pool (#33074)
---
 be/src/runtime/memory/mem_tracker_limiter.cpp | 41 +++++++++++++++------------
 be/src/runtime/memory/mem_tracker_limiter.h   |  3 ++
 2 files changed, 26 insertions(+), 18 deletions(-)

diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index ebe874834c5..57864c3b07b 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -159,12 +159,19 @@ void MemTrackerLimiter::refresh_global_counter() {
             {Type::GLOBAL, 0},     {Type::QUERY, 0},         {Type::LOAD, 0},
             {Type::COMPACTION, 0}, {Type::SCHEMA_CHANGE, 0}, {Type::OTHER, 0}};
     // always ExecEnv::ready(), because Daemon::_stop_background_threads_latch
-    for (auto& group : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
+    for (unsigned i = 0; i < 
ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
+        TrackerLimiterGroup& group = 
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i];
         std::lock_guard<std::mutex> l(group.group_lock);
-        for (auto trackerWptr : group.trackers) {
-            auto tracker = trackerWptr.lock();
-            CHECK(tracker != nullptr);
-            type_mem_sum[tracker->type()] += tracker->consumption();
+        for (auto it = group.trackers.begin(); it != group.trackers.end();) {
+            auto tracker = (*it).lock();
+            if (tracker == nullptr) {
+                LOG(WARNING) << "abnormally invalid MemTrackerLimiter, env 
tracking memory: "
+                             << ExecEnv::tracking_memory() << ", group num: " 
<< i;
+                it = group.trackers.erase(it);
+            } else {
+                type_mem_sum[tracker->type()] += tracker->consumption();
+                ++it;
+            }
         }
     }
     for (auto it : type_mem_sum) {
@@ -223,9 +230,10 @@ void 
MemTrackerLimiter::make_type_snapshots(std::vector<MemTracker::Snapshot>* s
                 
ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].group_lock);
         for (auto trackerWptr : 
ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].trackers) {
             auto tracker = trackerWptr.lock();
-            CHECK(tracker != nullptr);
-            (*snapshots).emplace_back(tracker->make_snapshot());
-            MemTracker::make_group_snapshot(snapshots, tracker->group_num(), 
tracker->label());
+            if (tracker != nullptr) {
+                (*snapshots).emplace_back(tracker->make_snapshot());
+                MemTracker::make_group_snapshot(snapshots, 
tracker->group_num(), tracker->label());
+            }
         }
     } else {
         for (unsigned i = 1; i < 
ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
@@ -233,8 +241,7 @@ void 
MemTrackerLimiter::make_type_snapshots(std::vector<MemTracker::Snapshot>* s
                     
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
             for (auto trackerWptr : 
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
                 auto tracker = trackerWptr.lock();
-                CHECK(tracker != nullptr);
-                if (tracker->type() == type) {
+                if (tracker != nullptr && tracker->type() == type) {
                     (*snapshots).emplace_back(tracker->make_snapshot());
                     MemTracker::make_group_snapshot(snapshots, 
tracker->group_num(),
                                                     tracker->label());
@@ -253,8 +260,9 @@ void 
MemTrackerLimiter::make_top_consumption_snapshots(std::vector<MemTracker::S
                 
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
         for (auto trackerWptr : 
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
             auto tracker = trackerWptr.lock();
-            CHECK(tracker != nullptr);
-            max_pq.emplace(tracker->make_snapshot());
+            if (tracker != nullptr) {
+                max_pq.emplace(tracker->make_snapshot());
+            }
         }
     }
 
@@ -286,8 +294,7 @@ std::string MemTrackerLimiter::type_detail_usage(const 
std::string& msg, Type ty
                 
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
         for (auto trackerWptr : 
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
             auto tracker = trackerWptr.lock();
-            CHECK(tracker != nullptr);
-            if (tracker->type() == type) {
+            if (tracker != nullptr && tracker->type() == type) {
                 detail += "\n    " + 
MemTrackerLimiter::log_usage(tracker->make_snapshot());
             }
         }
@@ -468,8 +475,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(
             std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
             for (auto trackerWptr : tracker_groups[i].trackers) {
                 auto tracker = trackerWptr.lock();
-                CHECK(tracker != nullptr);
-                if (tracker->type() == type) {
+                if (tracker != nullptr && tracker->type() == type) {
                     seek_num++;
                     if (tracker->is_query_cancelled()) {
                         canceling_task.push_back(fmt::format("{}:{} Bytes", 
tracker->label(),
@@ -593,8 +599,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
             std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
             for (auto trackerWptr : tracker_groups[i].trackers) {
                 auto tracker = trackerWptr.lock();
-                CHECK(tracker != nullptr);
-                if (tracker->type() == type) {
+                if (tracker != nullptr && tracker->type() == type) {
                     seek_num++;
                     // 32M small query does not cancel
                     if (tracker->consumption() <= 33554432 ||
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 164a9aa9e52..d22a79c73f7 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -46,6 +46,9 @@ class RuntimeProfile;
 constexpr auto MEM_TRACKER_GROUP_NUM = 1000;
 
 struct TrackerLimiterGroup {
+    // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support 
resize,
+    // the copy construction of TrackerLimiterGroup is disabled.
+    // so cannot copy TrackerLimiterGroup anywhere, should use reference.
     TrackerLimiterGroup() = default;
     TrackerLimiterGroup(TrackerLimiterGroup&&) noexcept {}
     TrackerLimiterGroup(const TrackerLimiterGroup&) {}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to