This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f42e7fc2dee [bugfix](memory) Reserve memory print failure reason (#39862) f42e7fc2dee is described below commit f42e7fc2dee73f66af49e90070f0f4056d513417 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Tue Aug 27 18:35:53 2024 +0800 [bugfix](memory) Reserve memory print failure reason (#39862) 1. reserve memory print failure reason 2. fix bugs --- be/src/runtime/memory/thread_mem_tracker_mgr.h | 29 +++++++++++++------ be/src/runtime/thread_context.h | 2 +- be/src/runtime/workload_group/workload_group.cpp | 15 ++++++++++ be/src/runtime/workload_group/workload_group.h | 4 ++- .../runtime/memory/thread_mem_tracker_mgr_test.cpp | 33 ++++++++++++---------- 5 files changed, 57 insertions(+), 26 deletions(-) diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 73cdd3243da..50a8a5e0f7e 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -95,7 +95,8 @@ public: void consume(int64_t size, int skip_large_memory_check = 0); void flush_untracked_mem(); - bool try_reserve(int64_t size); + doris::Status try_reserve(int64_t size); + void release_reserved(); bool is_attach_query() { return _query_id != TUniqueId(); } @@ -295,7 +296,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { _stop_consume = false; } -inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) { +inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { DCHECK(_limiter_tracker_raw); DCHECK(size >= 0); CHECK(init()); @@ -303,19 +304,29 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) { // _untracked_mem store bytes that not synchronized to process reserved memory. flush_untracked_mem(); if (!_limiter_tracker_raw->try_consume(size)) { - return false; + auto err_msg = fmt::format( + "reserve memory failed, size: {}, because memory tracker consumption: {}, limit: " + "{}", + size, _limiter_tracker_raw->consumption(), _limiter_tracker_raw->limit()); + return doris::Status::MemoryLimitExceeded(err_msg); } auto wg_ptr = _wg_wptr.lock(); if (wg_ptr) { if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) { + auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, + wg_ptr->memory_debug_string()); _limiter_tracker_raw->release(size); // rollback - return false; + return doris::Status::MemoryLimitExceeded(err_msg); } } if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { - _limiter_tracker_raw->release(size); // rollback - wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback - return false; + auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, + GlobalMemoryArbitrator::process_mem_log_str()); + _limiter_tracker_raw->release(size); // rollback + if (wg_ptr) { + wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback + } + return doris::Status::MemoryLimitExceeded(err_msg); } if (_count_scope_mem) { _scope_mem += size; @@ -324,7 +335,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) { tracker->consume(size); } _reserved_mem += size; - return true; + return doris::Status::OK(); } inline void ThreadMemTrackerMgr::release_reserved() { @@ -333,7 +344,7 @@ inline void ThreadMemTrackerMgr::release_reserved() { _untracked_mem); _limiter_tracker_raw->release(_reserved_mem); auto wg_ptr = _wg_wptr.lock(); - if (!wg_ptr) { + if (wg_ptr) { wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem); } if (_count_scope_mem) { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index c54b1a6892b..ff8f2c6b0b5 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -244,7 +244,7 @@ public: thread_mem_tracker_mgr->consume(size, skip_large_memory_check); } - bool try_reserve_memory(const int64_t size) const { + doris::Status try_reserve_memory(const int64_t size) const { #ifdef USE_MEM_TRACKER DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan") diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index e37f83a00e8..e263685d07f 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -96,6 +96,21 @@ std::string WorkloadGroup::debug_string() const { _scan_bytes_per_second, _remote_scan_bytes_per_second); } +std::string WorkloadGroup::memory_debug_string() const { + return fmt::format( + "TG[id = {}, name = {}, memory_limit = {}, enable_memory_overcommit = " + "{}, weighted_memory_limit = {}, total_mem_used = {}, " + "wg_refresh_interval_memory_growth = {}, spill_low_watermark = {}, " + "spill_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]", + _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES), + _enable_memory_overcommit ? "true" : "false", + PrettyPrinter::print(_weighted_memory_limit, TUnit::BYTES), + PrettyPrinter::print(_total_mem_used, TUnit::BYTES), + PrettyPrinter::print(_wg_refresh_interval_memory_growth, TUnit::BYTES), + _spill_low_watermark, _spill_high_watermark, _version, _is_shutdown, + _query_ctxs.size()); +} + void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { if (UNLIKELY(tg_info.id != _id)) { return; diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 3561098b6ce..2fbb4dd3030 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -100,7 +100,8 @@ public: void set_weighted_memory_ratio(double ratio); bool add_wg_refresh_interval_memory_growth(int64_t size) { - auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); + auto realtime_total_mem_used = + _total_mem_used + _wg_refresh_interval_memory_growth.load() + size; if ((realtime_total_mem_used > ((double)_weighted_memory_limit * _spill_high_watermark.load(std::memory_order_relaxed) / 100))) { @@ -125,6 +126,7 @@ public: } std::string debug_string() const; + std::string memory_debug_string() const; void check_and_update(const WorkloadGroupInfo& tg_info); diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp index ab15fce05a7..bab94ace470 100644 --- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp +++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp @@ -264,7 +264,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { thread_context->consume_memory(size2); EXPECT_EQ(t->consumption(), size1 + size2); - thread_context->try_reserve_memory(size3); + auto st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -284,14 +285,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1 + size1); - std::cout << "11111 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", " - << thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl; thread_context->consume_memory(size1); thread_context->consume_memory(size1); - std::cout << "2222 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", " - << thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl; - std::cout << "3333 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", " - << thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl; // reserved memory used done EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); @@ -308,7 +303,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { EXPECT_EQ(t->consumption(), size1 + size2); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); - thread_context->try_reserve_memory(size3); + st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -358,7 +354,8 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { int64_t size3 = size2 * 1024; thread_context->attach_task(TUniqueId(), t, workload_group); - thread_context->try_reserve_memory(size3); + auto st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); EXPECT_EQ(t->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -369,15 +366,18 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { EXPECT_EQ(t->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); - thread_context->try_reserve_memory(size2); + st = thread_context->try_reserve_memory(size2); + EXPECT_TRUE(st.ok()); // ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2 // ThreadMemTrackerMgr _untracked_mem = 0 EXPECT_EQ(t->consumption(), size3 + size2); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); // size3 - size2 + size2 - thread_context->try_reserve_memory(size3); - thread_context->try_reserve_memory(size3); + st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); + st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); thread_context->consume_memory(size3); thread_context->consume_memory(size2); thread_context->consume_memory(size3); @@ -411,13 +411,15 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) { int64_t size3 = size2 * 1024; thread_context->attach_task(TUniqueId(), t1, workload_group); - thread_context->try_reserve_memory(size3); + auto st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); thread_context->consume_memory(size2); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2); - thread_context->try_reserve_memory(size3); + st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(t2->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2 + size3); @@ -428,7 +430,8 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) { EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3); - thread_context->try_reserve_memory(size3); + st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(t2->consumption(), size3 + size2); EXPECT_EQ(t3->consumption(), size3); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org