This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c6b2cc7067b [fix](memory) Fix Allocator release memory to correct 
MemTracker after TLS attach task ends (#39908)
c6b2cc7067b is described below

commit c6b2cc7067bb7868f5d1d766714f8ecbde1ad42f
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed Aug 28 01:12:55 2024 +0800

    [fix](memory) Fix Allocator release memory to correct MemTracker after TLS 
attach task ends (#39908)
    
    Allocator save TLS MemTracker during first alloc, which is used to
    release memory after TLS attach task ends.
    ```
    23:00:15   F20240824 22:56:49.773799 66432 thread_context.h:238] Check 
failed: doris::k_doris_exit || !doris::config::enable_memory_orphan_check || 
thread_mem_tracker()->label() != "Orphan" If you crash here, it means that 
SCOPED_ATTACH_TASK and SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used 
correctly. starting position of each thread is expected to use 
SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging to 
Query/Load/Compaction/Other Tasks, otherwise memory alloc using Do [...]
    23:00:15   *** Check failure stack trace: ***
    23:00:15       @     0x55645d3388e6  
google::LogMessageFatal::~LogMessageFatal()
    23:00:15       @     0x5564439637c2  doris::ThreadContext::consume_memory()
    23:00:15       @     0x5564439914fe  Allocator<>::release_memory()
    23:00:15       @     0x5564354be11e  std::_Sp_counted_ptr<>::_M_dispose()
    23:00:15       @     0x55643557bc3b  std::deque<>::pop_front()
    23:00:15       @     0x5564355756b1  
doris::io::StreamLoadPipe::~StreamLoadPipe()
    23:00:15       @     0x5564354bfa77  
doris::StreamLoadContext::~StreamLoadContext()
    23:00:15       @     0x556436ee5114  doris::HttpRequest::~HttpRequest()
    ```
---
 be/src/common/config.cpp                         |  6 ++++-
 be/src/common/config.h                           |  3 +++
 be/src/http/action/http_stream.cpp               |  4 +--
 be/src/io/file_factory.cpp                       |  6 ++---
 be/src/olap/page_cache.cpp                       | 10 +++----
 be/src/olap/page_cache.h                         |  1 -
 be/src/runtime/stream_load/stream_load_context.h | 17 +++++++-----
 be/src/runtime/thread_context.h                  | 28 ++++++++++----------
 be/src/service/internal_service.cpp              |  3 ++-
 be/src/util/byte_buffer.h                        |  6 ++++-
 be/src/vec/common/allocator.cpp                  | 33 ++++++++++++++++++++++--
 be/src/vec/common/allocator.h                    |  8 +++++-
 12 files changed, 86 insertions(+), 39 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 216c2e133c8..b477247c669 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -142,7 +142,11 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, 
"2147483648");
 
 DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1");
 
-DEFINE_mBool(enable_memory_orphan_check, "false");
+// default is true. if any memory tracking in Orphan mem tracker will report 
error.
+// !! not modify the default value of this conf!! otherwise memory errors 
cannot be detected in time.
+// allocator free memory not need to check, because when the thread memory 
tracker label is Orphan,
+// use the tracker saved in Allocator.
+DEFINE_mBool(enable_memory_orphan_check, "true");
 
 // The maximum time a thread waits for full GC. Currently only query will wait 
for full gc.
 DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 78f94577d81..67048bc615e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -197,6 +197,9 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);
 DECLARE_mInt64(crash_in_alloc_large_memory_bytes);
 
 // default is true. if any memory tracking in Orphan mem tracker will report 
error.
+// !! not modify the default value of this conf!! otherwise memory errors 
cannot be detected in time.
+// allocator free memory not need to check, because when the thread memory 
tracker label is Orphan,
+// use the tracker saved in Allocator.
 DECLARE_mBool(enable_memory_orphan_check);
 
 // The maximum time a thread waits for a full GC. Currently only query will 
wait for full gc.
diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index afeb251ca41..7dbae6df731 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -247,8 +247,8 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
             // schema_buffer stores 1M of data for parsing column information
             // need to determine whether to cache for the first time
             if (ctx->is_read_schema) {
-                if (ctx->schema_buffer->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
-                    ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
+                if (ctx->schema_buffer()->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
+                    ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
                 } else {
                     LOG(INFO) << "use a portion of data to request fe to 
obtain column information";
                     ctx->is_read_schema = false;
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 7f64ea50710..f4ce573c535 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -209,9 +209,9 @@ Status FileFactory::create_pipe_reader(const TUniqueId& 
load_id, io::FileReaderS
         // Here, a portion of the data is processed to parse column information
         auto pipe = std::make_shared<io::StreamLoadPipe>(
                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
-                stream_load_ctx->schema_buffer->pos /* total_length */);
-        stream_load_ctx->schema_buffer->flip();
-        RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer));
+                stream_load_ctx->schema_buffer()->pos /* total_length */);
+        stream_load_ctx->schema_buffer()->flip();
+        RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer()));
         RETURN_IF_ERROR(pipe->finish());
         *file_reader = std::move(pipe);
     } else {
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index 1f0556f4642..b70dadc5b43 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -28,12 +28,10 @@ template <typename TAllocator>
 PageBase<TAllocator>::PageBase(size_t b, bool use_cache, 
segment_v2::PageTypePB page_type)
         : LRUCacheValueBase(), _size(b), _capacity(b) {
     if (use_cache) {
-        _mem_tracker_by_allocator = 
StoragePageCache::instance()->mem_tracker(page_type);
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                StoragePageCache::instance()->mem_tracker(page_type));
+        _data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, 
ALLOCATOR_ALIGNMENT_16));
     } else {
-        _mem_tracker_by_allocator = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
-    }
-    {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
         _data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, 
ALLOCATOR_ALIGNMENT_16));
     }
 }
@@ -42,7 +40,7 @@ template <typename TAllocator>
 PageBase<TAllocator>::~PageBase() {
     if (_data != nullptr) {
         DCHECK(_capacity != 0 && _size != 0);
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(TAllocator::mem_tracker_);
         TAllocator::free(_data, _capacity);
     }
 }
diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h
index 09fc689959c..ef25de7bc30 100644
--- a/be/src/olap/page_cache.h
+++ b/be/src/olap/page_cache.h
@@ -60,7 +60,6 @@ private:
     // Effective size, smaller than capacity, such as data page remove 
checksum suffix.
     size_t _size = 0;
     size_t _capacity = 0;
-    std::shared_ptr<MemTrackerLimiter> _mem_tracker_by_allocator;
 };
 
 using DataPage = PageBase<Allocator<false>>;
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index f7c4a0d474f..95e56e0b3fa 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -96,14 +96,9 @@ class StreamLoadContext {
 public:
     StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), 
_exec_env(exec_env) {
         start_millis = UnixMillis();
-        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
-        schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size);
     }
 
     ~StreamLoadContext() {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
-                ExecEnv::GetInstance()->stream_load_pipe_tracker());
-        schema_buffer.reset();
         if (need_rollback) {
             _exec_env->stream_load_executor()->rollback_txn(this);
             need_rollback = false;
@@ -126,6 +121,15 @@ public:
 
     bool is_mow_table() const;
 
+    ByteBufferPtr schema_buffer() {
+        if (_schema_buffer == nullptr) {
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                    ExecEnv::GetInstance()->stream_load_pipe_tracker());
+            _schema_buffer = 
ByteBuffer::allocate(config::stream_tvf_buffer_size);
+        }
+        return _schema_buffer;
+    }
+
 public:
     static const int default_txn_id = -1;
     // load type, eg: ROUTINE LOAD/MANUAL LOAD
@@ -190,8 +194,6 @@ public:
     std::shared_ptr<MessageBodySink> body_sink;
     std::shared_ptr<io::StreamLoadPipe> pipe;
 
-    ByteBufferPtr schema_buffer;
-
     TStreamLoadPutResult put_result;
     TStreamLoadMultiTablePutResult multi_table_put_result;
 
@@ -253,6 +255,7 @@ public:
 
 private:
     ExecEnv* _exec_env = nullptr;
+    ByteBufferPtr _schema_buffer;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index ff8f2c6b0b5..6158f0535be 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -50,7 +50,7 @@
 // Used after SCOPED_ATTACH_TASK, in order to count the memory into another
 // MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task.
 #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \
-    auto VARNAME_LINENUM(switch_mem_tracker) = 
SwitchThreadMemTrackerLimiter(arg1)
+    auto VARNAME_LINENUM(switch_mem_tracker) = 
doris::SwitchThreadMemTrackerLimiter(arg1)
 
 // Looking forward to tracking memory during thread execution into MemTracker.
 // Usually used to record query more detailed memory, including ExecNode 
operators.
@@ -170,8 +170,7 @@ static std::string memory_orphan_check_msg =
         "each thread is expected to use SCOPED_ATTACH_TASK to bind a 
MemTrackerLimiter belonging "
         "to Query/Load/Compaction/Other Tasks, otherwise memory alloc using 
Doris Allocator in the "
         "thread will crash. If you want to switch MemTrackerLimiter during 
thread execution, "
-        "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat 
Attach. Of course, you "
-        "can modify enable_memory_orphan_check=false in be.conf to avoid this 
crash.";
+        "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat 
Attach.";
 
 // The thread context saves some info about a working thread.
 // 2 required info:
@@ -222,9 +221,9 @@ public:
         ss << std::this_thread::get_id();
         return ss.str();
     }
-    // After thread_mem_tracker_mgr is initialized, the current thread Hook 
starts to
-    // consume/release mem_tracker.
-    // Note that the use of shared_ptr will cause a crash. The guess is that 
there is an
+    // Note that if set global Memory Hook, After thread_mem_tracker_mgr is 
initialized,
+    // the current thread Hook starts to consume/release mem_tracker.
+    // the use of shared_ptr will cause a crash. The guess is that there is an
     // intermediate state during the copy construction of shared_ptr. 
Shared_ptr is not equal
     // to nullptr, but the object it points to is not initialized. At this 
time, when the memory
     // is released somewhere, the hook is triggered to cause the crash.
@@ -318,7 +317,7 @@ public:
                 // The brpc server should respond as quickly as possible.
                 bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
                 // set the data so that next time bthread_getspecific in the 
thread returns the data.
-                CHECK(0 == bthread_setspecific(btls_key, bthread_context) || 
k_doris_exit);
+                CHECK(0 == bthread_setspecific(btls_key, bthread_context) || 
doris::k_doris_exit);
             }
             DCHECK(bthread_context != nullptr);
             bthread_context->thread_local_handle_count++;
@@ -360,7 +359,7 @@ static ThreadContext* thread_context(bool allow_return_null 
= false) {
         // in bthread
         // bthread switching pthread may be very frequent, remember not to use 
lock or other time-consuming operations.
         auto* bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
-        DCHECK(bthread_context != nullptr);
+        DCHECK(bthread_context != nullptr && 
bthread_context->thread_local_handle_count > 0);
         return bthread_context;
     }
     if (allow_return_null) {
@@ -449,15 +448,16 @@ public:
 
 class SwitchThreadMemTrackerLimiter {
 public:
-    explicit SwitchThreadMemTrackerLimiter(const 
std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
+    explicit SwitchThreadMemTrackerLimiter(
+            const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
         DCHECK(mem_tracker);
-        ThreadLocalHandle::create_thread_local_if_not_exits();
+        doris::ThreadLocalHandle::create_thread_local_if_not_exits();
         _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
         
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
     }
 
-    explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& 
query_thread_context) {
-        ThreadLocalHandle::create_thread_local_if_not_exits();
+    explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& 
query_thread_context) {
+        doris::ThreadLocalHandle::create_thread_local_if_not_exits();
         DCHECK(thread_context()->task_id() ==
                query_thread_context.query_id); // workload group alse not 
change
         DCHECK(query_thread_context.query_mem_tracker);
@@ -468,11 +468,11 @@ public:
 
     ~SwitchThreadMemTrackerLimiter() {
         
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
-        ThreadLocalHandle::del_thread_local_if_count_is_zero();
+        doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
     }
 
 private:
-    std::shared_ptr<MemTrackerLimiter> _old_mem_tracker;
+    std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker;
 };
 
 class AddThreadMemTrackerConsumer {
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index c2251c240ae..744024c940c 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -767,7 +767,8 @@ void 
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
 
         std::shared_ptr<MemTrackerLimiter> mem_tracker = 
MemTrackerLimiter::create_shared(
                 MemTrackerLimiter::Type::OTHER,
-                fmt::format("{}#{}", params.format_type, params.file_type));
+                fmt::format("InternalService::fetch_table_schema:{}#{}", 
params.format_type,
+                            params.file_type));
         SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
 
         // make sure profile is desctructed after reader cause 
PrefetchBufferedReader
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index e8eadf69e02..1499f51c053 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -24,6 +24,7 @@
 
 #include "common/logging.h"
 #include "common/status.h"
+#include "runtime/thread_context.h"
 #include "vec/common/allocator.h"
 #include "vec/common/allocator_fwd.h"
 
@@ -43,7 +44,10 @@ struct ByteBuffer : private Allocator<false> {
         return Status::OK();
     }
 
-    ~ByteBuffer() { Allocator<false>::free(ptr, capacity); }
+    ~ByteBuffer() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
+        Allocator<false>::free(ptr, capacity);
+    }
 
     void put_bytes(const char* data, size_t size) {
         memcpy(ptr + pos, data, size);
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 82cd78a7fc1..ae5f27989b2 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -211,14 +211,43 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::memory_
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::consume_memory(
-        size_t size) const {
+        size_t size) {
+    // Usually, an object that inherits Allocator has the same TLS tracker for 
each alloc.
+    // If an object that inherits Allocator needs to be reused by multiple 
queries,
+    // it is necessary to switch the same tracker to TLS when calling alloc.
+    // However, in ORC Reader, ORC DataBuffer will be reused, but we cannot 
switch TLS tracker,
+    // so we update the Allocator tracker when the TLS tracker changes.
+    // note that the tracker in thread context when object that inherit 
Allocator is constructed may be
+    // no attach memory tracker in tls. usually the memory tracker is attached 
in tls only during the first alloc.
+    if (mem_tracker_ == nullptr ||
+        mem_tracker_->label() != 
doris::thread_context()->thread_mem_tracker()->label()) {
+        mem_tracker_ = 
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+    }
     CONSUME_THREAD_MEM_TRACKER(size);
 }
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::release_memory(
         size_t size) const {
-    RELEASE_THREAD_MEM_TRACKER(size);
+    doris::ThreadContext* thread_context = doris::thread_context(true);
+    if ((thread_context && thread_context->thread_mem_tracker()->label() != 
"Orphan") ||
+        mem_tracker_ == nullptr) {
+        // If thread_context exist and the label of thread_mem_tracker not 
equal to `Orphan`,
+        // this means that in the scope of SCOPED_ATTACH_TASK,
+        // so thread_mem_tracker should be used to release memory.
+        // If mem_tracker_ is nullptr there is a scenario where an object that 
inherits Allocator
+        // has never called alloc, but free memory.
+        // in phmap, the memory alloced by an object may be transferred to 
another object and then free.
+        // in this case, thread context must attach a memory tracker other 
than Orphan,
+        // otherwise memory tracking will be wrong.
+        RELEASE_THREAD_MEM_TRACKER(size);
+    } else {
+        // if thread_context does not exist or the label of thread_mem_tracker 
is equal to
+        // `Orphan`, it usually happens during object destruction. This means 
that
+        // the scope of SCOPED_ATTACH_TASK has been left,  so release memory 
using Allocator tracker.
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
+        RELEASE_THREAD_MEM_TRACKER(size);
+    }
 }
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 2bcce9a9c68..f393886cf0b 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -87,6 +87,10 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
 // is always a multiple of sixteen. 
(https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html)
 static constexpr int ALLOCATOR_ALIGNMENT_16 = 16;
 
+namespace doris {
+class MemTrackerLimiter;
+}
+
 class DefaultMemoryAllocator {
 public:
     static void* malloc(size_t size) __THROW { return std::malloc(size); }
@@ -228,7 +232,7 @@ public:
     // alloc will continue to execute, so the consume memtracker is forced.
     void memory_check(size_t size) const;
     // Increases consumption of this tracker by 'bytes'.
-    void consume_memory(size_t size) const;
+    void consume_memory(size_t size);
     void release_memory(size_t size) const;
     void throw_bad_alloc(const std::string& err) const;
 #ifndef NDEBUG
@@ -400,6 +404,8 @@ protected:
 
     static constexpr bool clear_memory = clear_memory_;
 
+    std::shared_ptr<doris::MemTrackerLimiter> mem_tracker_ {nullptr};
+
     // Freshly mmapped pages are copy-on-write references to a global zero 
page.
     // On the first write, a page fault occurs, and an actual writable page is
     // allocated. If we are going to use this memory soon, such as when 
resizing


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to