This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 42a34666a44 branch-3.0: [fix](memory) Fix
`ThreadMemTrackerMgr::limiter_mem_tracker()` performance (#50525)
42a34666a44 is described below
commit 42a34666a44f98ad322484ba84b8f9a97e968562
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed May 21 14:31:31 2025 +0800
branch-3.0: [fix](memory) Fix `ThreadMemTrackerMgr::limiter_mem_tracker()`
performance (#50525)
pick #50462
---
be/src/olap/page_cache.cpp | 3 ++-
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 8 +++++---
be/src/runtime/memory/thread_mem_tracker_mgr.h | 16 +++++++++++++---
be/src/runtime/thread_context.cpp | 2 +-
be/src/runtime/thread_context.h | 13 +++++++------
be/src/util/byte_buffer.h | 3 ++-
6 files changed, 30 insertions(+), 15 deletions(-)
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index 1f0556f4642..b386da4d7c6 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -30,7 +30,8 @@ PageBase<TAllocator>::PageBase(size_t b, bool use_cache,
segment_v2::PageTypePB
if (use_cache) {
_mem_tracker_by_allocator =
StoragePageCache::instance()->mem_tracker(page_type);
} else {
- _mem_tracker_by_allocator =
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+ _mem_tracker_by_allocator =
+
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr();
}
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 3b40426f6ef..dcdf4b1a4b4 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -51,12 +51,13 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
// _untracked_mem temporary store bytes that not synchronized to
process reserved memory,
// but bytes have been subtracted from thread _reserved_mem.
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
- _limiter_tracker->release_reserved(_untracked_mem);
+ _limiter_tracker_sptr->release_reserved(_untracked_mem);
_reserved_mem = 0;
_untracked_mem = 0;
}
_consumer_tracker_stack.clear();
- _limiter_tracker = mem_tracker;
+ _limiter_tracker_sptr = mem_tracker;
+ _limiter_tracker = _limiter_tracker_sptr.get();
}
void ThreadMemTrackerMgr::detach_limiter_tracker(
@@ -68,7 +69,8 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
_reserved_mem = _last_attach_snapshots_stack.back().reserved_mem;
_consumer_tracker_stack =
_last_attach_snapshots_stack.back().consumer_tracker_stack;
_last_attach_snapshots_stack.pop_back();
- _limiter_tracker = old_mem_tracker;
+ _limiter_tracker_sptr = old_mem_tracker;
+ _limiter_tracker = _limiter_tracker_sptr.get();
}
void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index db3b32a6298..e3a1409ddfc 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -93,11 +93,19 @@ public:
void reset_query_cancelled_flag(bool new_val) { _is_query_cancelled =
new_val; }
- std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
+ MemTrackerLimiter* limiter_mem_tracker() {
CHECK(init());
return _limiter_tracker;
}
+ // Prefer use `limiter_mem_tracker`, which is faster than
`limiter_mem_tracker_sptr`.
+ // when multiple threads hold the same `std::shared_ptr` at the same time,
+ // modifying the `std::shared_ptr` reference count will be expensive when
there is high concurrency.
+ std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker_sptr() {
+ CHECK(init());
+ return _limiter_tracker_sptr;
+ }
+
void enable_wait_gc() { _wait_gc = true; }
void disable_wait_gc() { _wait_gc = false; }
[[nodiscard]] bool wait_gc() const { return _wait_gc; }
@@ -141,7 +149,8 @@ private:
// A thread of query/load will only wait once during execution.
bool _wait_gc = false;
- std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
+ std::shared_ptr<MemTrackerLimiter> _limiter_tracker_sptr {nullptr};
+ MemTrackerLimiter* _limiter_tracker {nullptr};
std::vector<MemTracker*> _consumer_tracker_stack;
std::weak_ptr<WorkloadGroup> _wg_wptr;
@@ -156,7 +165,8 @@ inline bool ThreadMemTrackerMgr::init() {
// 2. ExecEnv not initialized when thread start, initialized in
limiter_mem_tracker().
if (_init) return true;
if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) {
- _limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker();
+ _limiter_tracker_sptr = ExecEnv::GetInstance()->orphan_mem_tracker();
+ _limiter_tracker = _limiter_tracker_sptr.get();
_wait_gc = true;
_init = true;
return true;
diff --git a/be/src/runtime/thread_context.cpp
b/be/src/runtime/thread_context.cpp
index c89f532e592..2aee48819c6 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -28,7 +28,7 @@ class MemTracker;
QueryThreadContext ThreadContext::query_thread_context() {
DCHECK(doris::pthread_context_ptr_init);
ORPHAN_TRACKER_CHECK();
- return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker(), _wg_wptr};
+ return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker_sptr(),
_wg_wptr};
}
void AttachTask::init(const QueryThreadContext& query_thread_context) {
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index e0a44af69c1..30871399eed 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -235,7 +235,7 @@ public:
// to nullptr, but the object it points to is not initialized. At this
time, when the memory
// is released somewhere, the hook is triggered to cause the crash.
std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr;
- [[nodiscard]] std::shared_ptr<MemTrackerLimiter> thread_mem_tracker()
const {
+ [[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const {
return thread_mem_tracker_mgr->limiter_mem_tracker();
}
@@ -402,7 +402,8 @@ public:
#ifndef BE_TEST
ORPHAN_TRACKER_CHECK();
query_id = doris::thread_context()->task_id();
- query_mem_tracker =
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+ query_mem_tracker =
+
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr();
wg_wptr = doris::thread_context()->workload_group();
#else
query_id = TUniqueId();
@@ -468,8 +469,8 @@ public:
const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
- if (mem_tracker !=
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
- _old_mem_tracker =
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+ if (mem_tracker !=
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) {
+ _old_mem_tracker =
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
}
}
@@ -480,8 +481,8 @@ public:
query_thread_context.query_id); // workload group alse not
change
DCHECK(query_thread_context.query_mem_tracker);
if (query_thread_context.query_mem_tracker !=
- thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
- _old_mem_tracker =
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) {
+ _old_mem_tracker =
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);
}
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index 17764b9e4f6..474a50339dc 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -73,7 +73,8 @@ private:
: pos(0),
limit(capacity_),
capacity(capacity_),
-
mem_tracker_(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker())
{
+ mem_tracker_(
+
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) {
ptr = reinterpret_cast<char*>(Allocator<false>::alloc(capacity_));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]