This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9ceceb4d78f [fix](memory) Fix erase invalid MemTrackerLimiter from tracker pool (#33074) 9ceceb4d78f is described below commit 9ceceb4d78f8524c0f0eac723cfe60063dd1e0e3 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Sun Mar 31 13:28:34 2024 +0800 [fix](memory) Fix erase invalid MemTrackerLimiter from tracker pool (#33074) --- be/src/runtime/memory/mem_tracker_limiter.cpp | 41 +++++++++++++++------------ be/src/runtime/memory/mem_tracker_limiter.h | 3 ++ 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index ebe874834c5..57864c3b07b 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -159,12 +159,19 @@ void MemTrackerLimiter::refresh_global_counter() { {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0}, {Type::COMPACTION, 0}, {Type::SCHEMA_CHANGE, 0}, {Type::OTHER, 0}}; // always ExecEnv::ready(), because Daemon::_stop_background_threads_latch - for (auto& group : ExecEnv::GetInstance()->mem_tracker_limiter_pool) { + for (unsigned i = 0; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) { + TrackerLimiterGroup& group = ExecEnv::GetInstance()->mem_tracker_limiter_pool[i]; std::lock_guard<std::mutex> l(group.group_lock); - for (auto trackerWptr : group.trackers) { - auto tracker = trackerWptr.lock(); - CHECK(tracker != nullptr); - type_mem_sum[tracker->type()] += tracker->consumption(); + for (auto it = group.trackers.begin(); it != group.trackers.end();) { + auto tracker = (*it).lock(); + if (tracker == nullptr) { + LOG(WARNING) << "abnormally invalid MemTrackerLimiter, env tracking memory: " + << ExecEnv::tracking_memory() << ", group num: " << i; + it = group.trackers.erase(it); + } else { + type_mem_sum[tracker->type()] += tracker->consumption(); + ++it; + } } } for (auto it : type_mem_sum) { @@ -223,9 +230,10 @@ void MemTrackerLimiter::make_type_snapshots(std::vector<MemTracker::Snapshot>* s ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].group_lock); for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].trackers) { auto tracker = trackerWptr.lock(); - CHECK(tracker != nullptr); - (*snapshots).emplace_back(tracker->make_snapshot()); - MemTracker::make_group_snapshot(snapshots, tracker->group_num(), tracker->label()); + if (tracker != nullptr) { + (*snapshots).emplace_back(tracker->make_snapshot()); + MemTracker::make_group_snapshot(snapshots, tracker->group_num(), tracker->label()); + } } } else { for (unsigned i = 1; i < ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) { @@ -233,8 +241,7 @@ void MemTrackerLimiter::make_type_snapshots(std::vector<MemTracker::Snapshot>* s ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock); for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) { auto tracker = trackerWptr.lock(); - CHECK(tracker != nullptr); - if (tracker->type() == type) { + if (tracker != nullptr && tracker->type() == type) { (*snapshots).emplace_back(tracker->make_snapshot()); MemTracker::make_group_snapshot(snapshots, tracker->group_num(), tracker->label()); @@ -253,8 +260,9 @@ void MemTrackerLimiter::make_top_consumption_snapshots(std::vector<MemTracker::S ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock); for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) { auto tracker = trackerWptr.lock(); - CHECK(tracker != nullptr); - max_pq.emplace(tracker->make_snapshot()); + if (tracker != nullptr) { + max_pq.emplace(tracker->make_snapshot()); + } } } @@ -286,8 +294,7 @@ std::string MemTrackerLimiter::type_detail_usage(const std::string& msg, Type ty ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock); for (auto trackerWptr : ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) { auto tracker = trackerWptr.lock(); - CHECK(tracker != nullptr); - if (tracker->type() == type) { + if (tracker != nullptr && tracker->type() == type) { detail += "\n " + MemTrackerLimiter::log_usage(tracker->make_snapshot()); } } @@ -468,8 +475,7 @@ int64_t MemTrackerLimiter::free_top_memory_query( std::lock_guard<std::mutex> l(tracker_groups[i].group_lock); for (auto trackerWptr : tracker_groups[i].trackers) { auto tracker = trackerWptr.lock(); - CHECK(tracker != nullptr); - if (tracker->type() == type) { + if (tracker != nullptr && tracker->type() == type) { seek_num++; if (tracker->is_query_cancelled()) { canceling_task.push_back(fmt::format("{}:{} Bytes", tracker->label(), @@ -593,8 +599,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( std::lock_guard<std::mutex> l(tracker_groups[i].group_lock); for (auto trackerWptr : tracker_groups[i].trackers) { auto tracker = trackerWptr.lock(); - CHECK(tracker != nullptr); - if (tracker->type() == type) { + if (tracker != nullptr && tracker->type() == type) { seek_num++; // 32M small query does not cancel if (tracker->consumption() <= 33554432 || diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 164a9aa9e52..d22a79c73f7 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -46,6 +46,9 @@ class RuntimeProfile; constexpr auto MEM_TRACKER_GROUP_NUM = 1000; struct TrackerLimiterGroup { + // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support resize, + // the copy construction of TrackerLimiterGroup is disabled. + // so cannot copy TrackerLimiterGroup anywhere, should use reference. TrackerLimiterGroup() = default; TrackerLimiterGroup(TrackerLimiterGroup&&) noexcept {} TrackerLimiterGroup(const TrackerLimiterGroup&) {} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org