This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f42e7fc2dee [bugfix](memory) Reserve memory print failure reason 
(#39862)
f42e7fc2dee is described below

commit f42e7fc2dee73f66af49e90070f0f4056d513417
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Tue Aug 27 18:35:53 2024 +0800

    [bugfix](memory) Reserve memory print failure reason (#39862)
    
    1. reserve memory print failure reason
    2. fix bugs
---
 be/src/runtime/memory/thread_mem_tracker_mgr.h     | 29 +++++++++++++------
 be/src/runtime/thread_context.h                    |  2 +-
 be/src/runtime/workload_group/workload_group.cpp   | 15 ++++++++++
 be/src/runtime/workload_group/workload_group.h     |  4 ++-
 .../runtime/memory/thread_mem_tracker_mgr_test.cpp | 33 ++++++++++++----------
 5 files changed, 57 insertions(+), 26 deletions(-)

diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 73cdd3243da..50a8a5e0f7e 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -95,7 +95,8 @@ public:
     void consume(int64_t size, int skip_large_memory_check = 0);
     void flush_untracked_mem();
 
-    bool try_reserve(int64_t size);
+    doris::Status try_reserve(int64_t size);
+
     void release_reserved();
 
     bool is_attach_query() { return _query_id != TUniqueId(); }
@@ -295,7 +296,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
     _stop_consume = false;
 }
 
-inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
+inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
     DCHECK(_limiter_tracker_raw);
     DCHECK(size >= 0);
     CHECK(init());
@@ -303,19 +304,29 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t 
size) {
     // _untracked_mem store bytes that not synchronized to process reserved 
memory.
     flush_untracked_mem();
     if (!_limiter_tracker_raw->try_consume(size)) {
-        return false;
+        auto err_msg = fmt::format(
+                "reserve memory failed, size: {}, because memory tracker 
consumption: {}, limit: "
+                "{}",
+                size, _limiter_tracker_raw->consumption(), 
_limiter_tracker_raw->limit());
+        return doris::Status::MemoryLimitExceeded(err_msg);
     }
     auto wg_ptr = _wg_wptr.lock();
     if (wg_ptr) {
         if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
+            auto err_msg = fmt::format("reserve memory failed, size: {}, 
because {}", size,
+                                       wg_ptr->memory_debug_string());
             _limiter_tracker_raw->release(size); // rollback
-            return false;
+            return doris::Status::MemoryLimitExceeded(err_msg);
         }
     }
     if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
-        _limiter_tracker_raw->release(size);                 // rollback
-        wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
-        return false;
+        auto err_msg = fmt::format("reserve memory failed, size: {}, because 
{}", size,
+                                   
GlobalMemoryArbitrator::process_mem_log_str());
+        _limiter_tracker_raw->release(size); // rollback
+        if (wg_ptr) {
+            wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
+        }
+        return doris::Status::MemoryLimitExceeded(err_msg);
     }
     if (_count_scope_mem) {
         _scope_mem += size;
@@ -324,7 +335,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
         tracker->consume(size);
     }
     _reserved_mem += size;
-    return true;
+    return doris::Status::OK();
 }
 
 inline void ThreadMemTrackerMgr::release_reserved() {
@@ -333,7 +344,7 @@ inline void ThreadMemTrackerMgr::release_reserved() {
                                                                        
_untracked_mem);
         _limiter_tracker_raw->release(_reserved_mem);
         auto wg_ptr = _wg_wptr.lock();
-        if (!wg_ptr) {
+        if (wg_ptr) {
             wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem);
         }
         if (_count_scope_mem) {
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index c54b1a6892b..ff8f2c6b0b5 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -244,7 +244,7 @@ public:
         thread_mem_tracker_mgr->consume(size, skip_large_memory_check);
     }
 
-    bool try_reserve_memory(const int64_t size) const {
+    doris::Status try_reserve_memory(const int64_t size) const {
 #ifdef USE_MEM_TRACKER
         DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check ||
                thread_mem_tracker()->label() != "Orphan")
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index e37f83a00e8..e263685d07f 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -96,6 +96,21 @@ std::string WorkloadGroup::debug_string() const {
             _scan_bytes_per_second, _remote_scan_bytes_per_second);
 }
 
+std::string WorkloadGroup::memory_debug_string() const {
+    return fmt::format(
+            "TG[id = {}, name = {}, memory_limit = {}, 
enable_memory_overcommit = "
+            "{}, weighted_memory_limit = {}, total_mem_used = {}, "
+            "wg_refresh_interval_memory_growth = {}, spill_low_watermark = {}, 
"
+            "spill_high_watermark = {}, version = {}, is_shutdown = {}, 
query_num = {}]",
+            _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
+            _enable_memory_overcommit ? "true" : "false",
+            PrettyPrinter::print(_weighted_memory_limit, TUnit::BYTES),
+            PrettyPrinter::print(_total_mem_used, TUnit::BYTES),
+            PrettyPrinter::print(_wg_refresh_interval_memory_growth, 
TUnit::BYTES),
+            _spill_low_watermark, _spill_high_watermark, _version, 
_is_shutdown,
+            _query_ctxs.size());
+}
+
 void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
     if (UNLIKELY(tg_info.id != _id)) {
         return;
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 3561098b6ce..2fbb4dd3030 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -100,7 +100,8 @@ public:
 
     void set_weighted_memory_ratio(double ratio);
     bool add_wg_refresh_interval_memory_growth(int64_t size) {
-        auto realtime_total_mem_used = _total_mem_used + 
_wg_refresh_interval_memory_growth.load();
+        auto realtime_total_mem_used =
+                _total_mem_used + _wg_refresh_interval_memory_growth.load() + 
size;
         if ((realtime_total_mem_used >
              ((double)_weighted_memory_limit *
               _spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
@@ -125,6 +126,7 @@ public:
     }
 
     std::string debug_string() const;
+    std::string memory_debug_string() const;
 
     void check_and_update(const WorkloadGroupInfo& tg_info);
 
diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp 
b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
index ab15fce05a7..bab94ace470 100644
--- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
+++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
@@ -264,7 +264,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
     thread_context->consume_memory(size2);
     EXPECT_EQ(t->consumption(), size1 + size2);
 
-    thread_context->try_reserve_memory(size3);
+    auto st = thread_context->try_reserve_memory(size3);
+    EXPECT_TRUE(st.ok());
     EXPECT_EQ(t->consumption(), size1 + size2 + size3);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
 
@@ -284,14 +285,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
     EXPECT_EQ(t->consumption(), size1 + size2 + size3);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1 
+ size1);
 
-    std::cout << "11111 " << 
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
-              << thread_context->thread_mem_tracker_mgr->reserved_mem() << 
std::endl;
     thread_context->consume_memory(size1);
     thread_context->consume_memory(size1);
-    std::cout << "2222 " << 
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
-              << thread_context->thread_mem_tracker_mgr->reserved_mem() << 
std::endl;
-    std::cout << "3333 " << 
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
-              << thread_context->thread_mem_tracker_mgr->reserved_mem() << 
std::endl;
     // reserved memory used done
     EXPECT_EQ(t->consumption(), size1 + size2 + size3);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
@@ -308,7 +303,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
     EXPECT_EQ(t->consumption(), size1 + size2);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
 
-    thread_context->try_reserve_memory(size3);
+    st = thread_context->try_reserve_memory(size3);
+    EXPECT_TRUE(st.ok());
     EXPECT_EQ(t->consumption(), size1 + size2 + size3);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
 
@@ -358,7 +354,8 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) {
     int64_t size3 = size2 * 1024;
 
     thread_context->attach_task(TUniqueId(), t, workload_group);
-    thread_context->try_reserve_memory(size3);
+    auto st = thread_context->try_reserve_memory(size3);
+    EXPECT_TRUE(st.ok());
     EXPECT_EQ(t->consumption(), size3);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
 
@@ -369,15 +366,18 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) {
     EXPECT_EQ(t->consumption(), size3);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
 
-    thread_context->try_reserve_memory(size2);
+    st = thread_context->try_reserve_memory(size2);
+    EXPECT_TRUE(st.ok());
     // ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2
     // ThreadMemTrackerMgr _untracked_mem = 0
     EXPECT_EQ(t->consumption(), size3 + size2);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
               size3); // size3 - size2 + size2
 
-    thread_context->try_reserve_memory(size3);
-    thread_context->try_reserve_memory(size3);
+    st = thread_context->try_reserve_memory(size3);
+    EXPECT_TRUE(st.ok());
+    st = thread_context->try_reserve_memory(size3);
+    EXPECT_TRUE(st.ok());
     thread_context->consume_memory(size3);
     thread_context->consume_memory(size2);
     thread_context->consume_memory(size3);
@@ -411,13 +411,15 @@ TEST_F(ThreadMemTrackerMgrTest, 
NestedSwitchMemTrackerReserveMemory) {
     int64_t size3 = size2 * 1024;
 
     thread_context->attach_task(TUniqueId(), t1, workload_group);
-    thread_context->try_reserve_memory(size3);
+    auto st = thread_context->try_reserve_memory(size3);
+    EXPECT_TRUE(st.ok());
     thread_context->consume_memory(size2);
     EXPECT_EQ(t1->consumption(), size3);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
 
     thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
-    thread_context->try_reserve_memory(size3);
+    st = thread_context->try_reserve_memory(size3);
+    EXPECT_TRUE(st.ok());
     EXPECT_EQ(t1->consumption(), size3);
     EXPECT_EQ(t2->consumption(), size3);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2 + size3);
@@ -428,7 +430,8 @@ TEST_F(ThreadMemTrackerMgrTest, 
NestedSwitchMemTrackerReserveMemory) {
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
 
     thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3);
-    thread_context->try_reserve_memory(size3);
+    st = thread_context->try_reserve_memory(size3);
+    EXPECT_TRUE(st.ok());
     EXPECT_EQ(t1->consumption(), size3);
     EXPECT_EQ(t2->consumption(), size3 + size2);
     EXPECT_EQ(t3->consumption(), size3);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to