This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aaaaae5  [feature] (memory) Switch TLS mem tracker to separate more 
detailed memory usage (#8605)
aaaaae5 is described below

commit aaaaae53b53ce6060188dfd2c19bc1784031b90c
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Thu Mar 24 14:29:34 2022 +0800

    [feature] (memory) Switch TLS mem tracker to separate more detailed memory 
usage (#8605)
    
    In pr #8476, all memory usage of a process is recorded in the process mem 
tracker,
    and all memory usage of a query is recorded in the query mem tracker,
    and it is still necessary to manually call `transfer to` to track the 
cached memory size.
    
    We hope to separate out more detailed memory usage based on Hook TCMalloc 
new/delete + TLS mem tracker.
    
    In this pr, the more detailed mem tracker is switched to TLS, which 
automatically and accurately
    counts more detailed memory usage than before.
---
 be/src/exec/cross_join_node.cpp               |  5 +-
 be/src/exec/except_node.cpp                   |  3 +-
 be/src/exec/exec_node.cpp                     |  3 +
 be/src/exec/hash_join_node.cpp                |  5 +-
 be/src/exec/intersect_node.cpp                |  3 +-
 be/src/exec/set_operation_node.cpp            |  3 +-
 be/src/olap/lru_cache.cpp                     | 16 ++++--
 be/src/olap/tablet_manager.cpp                | 15 ++---
 be/src/runtime/bufferpool/buffer_allocator.cc | 17 ++++--
 be/src/runtime/bufferpool/buffer_allocator.h  |  3 +
 be/src/runtime/memory/chunk_allocator.cpp     |  5 ++
 be/src/runtime/tcmalloc_hook.h                |  2 +
 be/src/runtime/thread_context.h               | 81 +++++++++++++++++++++++----
 be/src/runtime/thread_mem_tracker_mgr.cpp     | 20 ++++---
 be/src/runtime/thread_mem_tracker_mgr.h       | 56 +++++++++++-------
 be/src/service/doris_main.cpp                 | 16 ++++--
 be/src/util/doris_metrics.cpp                 |  8 +++
 be/src/util/doris_metrics.h                   |  4 ++
 be/src/vec/exec/join/vhash_join_node.cpp      |  5 +-
 be/src/vec/exec/vaggregation_node.cpp         |  8 ++-
 be/src/vec/exec/vcross_join_node.cpp          |  4 +-
 be/src/vec/exec/vset_operation_node.cpp       |  6 +-
 22 files changed, 202 insertions(+), 86 deletions(-)

diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp
index 5dcdc10..5def58a 100644
--- a/be/src/exec/cross_join_node.cpp
+++ b/be/src/exec/cross_join_node.cpp
@@ -23,6 +23,7 @@
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
 
@@ -52,6 +53,7 @@ Status CrossJoinNode::close(RuntimeState* state) {
 Status CrossJoinNode::construct_build_side(RuntimeState* state) {
     // Do a full scan of child(1) and store all build row batches.
     RETURN_IF_ERROR(child(1)->open(state));
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Cross join, while getting 
next from child 1");
 
     while (true) {
         RowBatch* batch =
@@ -63,9 +65,6 @@ Status CrossJoinNode::construct_build_side(RuntimeState* 
state) {
         bool eos = false;
         RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos));
 
-        // to prevent use too many memory
-        RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Cross join, while getting 
next from the child 1.");
-
         SCOPED_TIMER(_build_timer);
         _build_batches.add_row_batch(batch);
         VLOG_ROW << build_list_debug_string();
diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index 8ae1701..ec3e451 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -21,6 +21,7 @@
 #include "exprs/expr.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
@@ -40,6 +41,7 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
 
 Status ExceptNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(SetOperationNode::open(state));
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Except Node, while probing 
the hash table.");
     // if a table is empty, the result must be empty
     if (_hash_tbl->size() == 0) {
         _hash_tbl_iterator = _hash_tbl->begin();
@@ -62,7 +64,6 @@ Status ExceptNode::open(RuntimeState* state) {
         while (!eos) {
             RETURN_IF_CANCELLED(state);
             RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), 
&eos));
-            RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " Except , while probing 
the hash table.");
             for (int j = 0; j < _probe_batch->num_rows(); ++j) {
                 _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
                 if (_hash_tbl_iterator != _hash_tbl->end()) {
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 4d8dd2a..e3891b6 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -57,6 +57,7 @@
 #include "runtime/mem_tracker.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
@@ -208,6 +209,7 @@ Status ExecNode::prepare(RuntimeState* state) {
     _mem_tracker = MemTracker::create_tracker(-1, "ExecNode:" + 
_runtime_profile->name(),
                                               state->instance_mem_tracker(),
                                               MemTrackerLevel::VERBOSE, 
_runtime_profile.get());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     _expr_mem_tracker = MemTracker::create_tracker(-1, "ExecNode:Exprs:" + 
_runtime_profile->name(),
                                                    _mem_tracker);
 
@@ -226,6 +228,7 @@ Status ExecNode::prepare(RuntimeState* state) {
 }
 
 Status ExecNode::open(RuntimeState* state) {
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     if (_vconjunct_ctx_ptr) {
         RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state));
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index c92bb5a..30cd844 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -186,6 +186,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState* 
state) {
     // The hash join node needs to keep in memory all build tuples, including 
the tuple
     // row ptrs.  The row ptrs are copied into the hash table's internal 
structure so they
     // don't need to be stored in the _build_pool.
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while 
constructing the hash table.");
     RowBatch build_batch(child(1)->row_desc(), state->batch_size());
     RETURN_IF_ERROR(child(1)->open(state));
 
@@ -303,7 +304,7 @@ Status HashJoinNode::get_next(RuntimeState* state, 
RowBatch* out_batch, bool* eo
     // In most cases, no additional memory overhead will be applied for at 
this stage,
     // but if the expression calculation in this node needs to apply for 
additional memory,
     // it may cause the memory to exceed the limit.
-    RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while execute 
get_next.");
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while execute 
get_next.");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     if (reached_limit()) {
@@ -771,11 +772,9 @@ Status HashJoinNode::process_build_batch(RuntimeState* 
state, RowBatch* build_ba
                                                    _build_pool.get(), false);
             }
         }
-        RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while 
constructing the hash table.");
     } else {
         // take ownership of tuple data of build_batch
         _build_pool->acquire_data(build_batch->tuple_data_pool(), false);
-        RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while 
constructing the hash table.");
         
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch->num_rows()));
         for (int i = 0; i < build_batch->num_rows(); ++i) {
             _hash_tbl->insert_without_check(build_batch->get_row(i));
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index 2d8d2ee..df0e30d 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -21,6 +21,7 @@
 #include "exprs/expr.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
@@ -44,6 +45,7 @@ Status IntersectNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 // repeat [2] this for all the rest child
 Status IntersectNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(SetOperationNode::open(state));
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Intersect Node, while 
probing the hash table.");
     // if a table is empty, the result must be empty
     if (_hash_tbl->size() == 0) {
         _hash_tbl_iterator = _hash_tbl->begin();
@@ -66,7 +68,6 @@ Status IntersectNode::open(RuntimeState* state) {
         while (!eos) {
             RETURN_IF_CANCELLED(state);
             RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), 
&eos));
-            RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " Intersect , while 
probing the hash table.");
             for (int j = 0; j < _probe_batch->num_rows(); ++j) {
                 _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
                 if (_hash_tbl_iterator != _hash_tbl->end()) {
diff --git a/be/src/exec/set_operation_node.cpp 
b/be/src/exec/set_operation_node.cpp
index 7faa561..5574d3b 100644
--- a/be/src/exec/set_operation_node.cpp
+++ b/be/src/exec/set_operation_node.cpp
@@ -23,6 +23,7 @@
 #include "runtime/raw_value.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
@@ -137,6 +138,7 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow* 
other) {
 Status SetOperationNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("SetOperation, while 
constructing the hash table.");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
     // open result expr lists.
@@ -156,7 +158,6 @@ Status SetOperationNode::open(RuntimeState* state) {
         RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
         // take ownership of tuple data of build_batch
         _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
-        RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " SetOperation, while 
constructing the hash table.");
         // build hash table and remove duplicate items
         
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch.num_rows()));
         for (int i = 0; i < build_batch.num_rows(); ++i) {
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 825f029..e494c54 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -364,13 +364,7 @@ void LRUCache::erase(const CacheKey& key, uint32_t hash, 
MemTracker* tracker) {
     }
     // free handle out of mutex, when last_ref is true, e must not be nullptr
     if (last_ref) {
-        size_t charge = e->charge;
         e->free();
-        // The parameter tracker is ShardedLRUCache::_mem_tracker,
-        // because the memory released by LRUHandle is recorded in the tls mem 
tracker,
-        // so this part of the memory is subsidized from 
ShardedLRUCache::_mem_tracker to the tls mem tracker
-        
tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
-                            charge);
     }
 }
 
@@ -449,11 +443,15 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, 
size_t total_capacity,
         : _name(name),
           _last_id(1),
           _mem_tracker(MemTracker::create_tracker(-1, name, nullptr, 
MemTrackerLevel::OVERVIEW)) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
     for (int s = 0; s < kNumShards; s++) {
         _shards[s] = new LRUCache(type);
         _shards[s]->set_capacity(per_shard);
     }
+    // After the lru cache is created in the main thread, the main thread will 
not switch to the
+    // lru cache mem tracker again, so manually clear the untracked mem in tls.
+    thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
 
     _entity = DorisMetrics::instance()->metric_registry()->register_entity(
             std::string("lru_cache:") + name, {{"name", name}});
@@ -467,6 +465,7 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, 
size_t total_capacity,
 }
 
 ShardedLRUCache::~ShardedLRUCache() {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     for (int s = 0; s < kNumShards; s++) {
         delete _shards[s];
     }
@@ -481,6 +480,7 @@ Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, 
void* value, size_t
     // transfer the memory ownership of the value to 
ShardedLRUCache::_mem_tracker.
     
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
                                                                                
 charge);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     const uint32_t hash = _hash_slice(key);
     return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, 
priority);
 }
@@ -491,11 +491,13 @@ Cache::Handle* ShardedLRUCache::lookup(const CacheKey& 
key) {
 }
 
 void ShardedLRUCache::release(Handle* handle) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
     _shards[_shard(h->hash)]->release(handle);
 }
 
 void ShardedLRUCache::erase(const CacheKey& key) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     const uint32_t hash = _hash_slice(key);
     _shards[_shard(hash)]->erase(key, hash, _mem_tracker.get());
 }
@@ -514,6 +516,7 @@ uint64_t ShardedLRUCache::new_id() {
 }
 
 int64_t ShardedLRUCache::prune() {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int64_t num_prune = 0;
     for (int s = 0; s < kNumShards; s++) {
         num_prune += _shards[s]->prune();
@@ -522,6 +525,7 @@ int64_t ShardedLRUCache::prune() {
 }
 
 int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int64_t num_prune = 0;
     for (int s = 0; s < kNumShards; s++) {
         num_prune += _shards[s]->prune_if(pred);
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index dd51b87..ed1a711 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -192,11 +192,6 @@ OLAPStatus 
TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id,
     tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
     tablet_map[tablet_id] = tablet;
     _add_tablet_to_partition(tablet);
-    // TODO: remove multiply 2 of tablet meta mem size
-    // Because table schema will copy in tablet, there will be double mem cost
-    // so here multiply 2
-    
thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(
-            _mem_tracker.get(), tablet->tablet_meta()->mem_size() * 2);
 
     VLOG_NOTICE << "add tablet to map successfully." << " tablet_id=" << 
tablet_id ;
 
@@ -215,6 +210,7 @@ bool 
TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) {
 
 OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request,
                                         std::vector<DataDir*> stores) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     DorisMetrics::instance()->create_tablet_requests_total->increment(1);
 
     int64_t tablet_id = request.tablet_id;
@@ -432,6 +428,7 @@ TabletSharedPtr 
TabletManager::_create_tablet_meta_and_dir_unlocked(
 OLAPStatus TabletManager::drop_tablet(TTabletId tablet_id, SchemaHash 
schema_hash,
                                       bool keep_files) {
     WriteLock wrlock(_get_tablets_shard_lock(tablet_id));
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     return _drop_tablet_unlocked(tablet_id, keep_files);
 }
 
@@ -460,6 +457,7 @@ OLAPStatus TabletManager::_drop_tablet_unlocked(TTabletId 
tablet_id, bool keep_f
 
 OLAPStatus TabletManager::drop_tablets_on_error_root_path(
         const std::vector<TabletInfo>& tablet_info_vec) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     OLAPStatus res = OLAP_SUCCESS;
     if (tablet_info_vec.empty()) { // This is a high probability event
         return res;
@@ -670,6 +668,7 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir* 
data_dir, TTabletId tab
                                                 TSchemaHash schema_hash, const 
string& meta_binary,
                                                 bool update_meta, bool force, 
bool restore,
                                                 bool check_path) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     TabletMetaSharedPtr tablet_meta(new TabletMeta());
     OLAPStatus status = tablet_meta->deserialize(meta_binary);
     if (status != OLAP_SUCCESS) {
@@ -752,6 +751,7 @@ OLAPStatus TabletManager::load_tablet_from_dir(DataDir* 
store, TTabletId tablet_
                                                SchemaHash schema_hash,
                                                const string& schema_hash_path, 
bool force,
                                                bool restore) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     LOG(INFO) << "begin to load tablet from dir. "
               << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash
               << " path = " << schema_hash_path << " force = " << force << " 
restore = " << restore;
@@ -1219,11 +1219,6 @@ OLAPStatus 
TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id, bo
     }
 
     dropped_tablet->deregister_tablet_from_dir();
-    // The dropped tablet meta is expected to be released in the TabletManager 
mem tracker,
-    // but is actually released in the tls mem tracker.
-    // So from TabletManager mem tracker compensate memory to tls tracker.
-    
_mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
-                              dropped_tablet->tablet_meta()->mem_size() * 2);
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/runtime/bufferpool/buffer_allocator.cc 
b/be/src/runtime/bufferpool/buffer_allocator.cc
index a3bbe4c..82742a6 100644
--- a/be/src/runtime/bufferpool/buffer_allocator.cc
+++ b/be/src/runtime/bufferpool/buffer_allocator.cc
@@ -26,6 +26,7 @@
 #include "util/cpu_info.h"
 #include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
+#include "runtime/thread_context.h"
 
 //DECLARE_bool(disable_mem_pools);
 
@@ -48,7 +49,7 @@ public:
 
     /// Add a free buffer to the free lists. May free buffers to the system 
allocator
     /// if the list becomes full. Caller should not hold 'lock_'
-    void AddFreeBuffer(BufferHandle&& buffer);
+    bool AddFreeBuffer(BufferHandle&& buffer);
 
     /// Try to get a free buffer of 'buffer_len' bytes from this arena. 
Returns true and
     /// sets 'buffer' if found or false if not found. Caller should not hold 
'lock_'.
@@ -193,7 +194,8 @@ BufferPool::BufferAllocator::BufferAllocator(BufferPool* 
pool, int64_t min_buffe
           clean_page_bytes_limit_(clean_page_bytes_limit),
           clean_page_bytes_remaining_(clean_page_bytes_limit),
           per_core_arenas_(CpuInfo::get_max_num_cores()),
-          max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS) {
+          max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS),
+          _mem_tracker(MemTracker::create_virtual_tracker(-1, 
"BufferAllocator", nullptr, MemTrackerLevel::OVERVIEW)) {
     DCHECK(BitUtil::IsPowerOf2(min_buffer_len_)) << min_buffer_len_;
     DCHECK(BitUtil::IsPowerOf2(max_buffer_len_)) << max_buffer_len_;
     DCHECK_LE(0, min_buffer_len_);
@@ -303,6 +305,7 @@ Status 
BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle*
         system_bytes_remaining_.add(len);
         return status;
     }
+    _mem_tracker->consume_cache(len);
     return Status::OK();
 }
 
@@ -375,7 +378,9 @@ void BufferPool::BufferAllocator::Free(BufferHandle&& 
handle) {
     handle.client_ = nullptr; // Buffer is no longer associated with a client.
     FreeBufferArena* arena = per_core_arenas_[handle.home_core_].get();
     handle.Poison();
-    arena->AddFreeBuffer(std::move(handle));
+    if (!arena->AddFreeBuffer(std::move(handle))) {
+        _mem_tracker->release_cache(handle.len());
+    }
 }
 
 void BufferPool::BufferAllocator::AddCleanPage(const 
std::unique_lock<std::mutex>& client_lock,
@@ -426,6 +431,7 @@ int64_t 
BufferPool::BufferAllocator::FreeToSystem(std::vector<BufferHandle>&& bu
         buffer.Unpoison();
         system_allocator_->Free(std::move(buffer));
     }
+    _mem_tracker->release_cache(bytes_freed);
     return bytes_freed;
 }
 
@@ -485,16 +491,17 @@ BufferPool::FreeBufferArena::~FreeBufferArena() {
     }
 }
 
-void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) {
+bool BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) {
     std::lock_guard<SpinLock> al(lock_);
     if (config::disable_mem_pools) {
         int64_t len = buffer.len();
         parent_->system_allocator_->Free(std::move(buffer));
         parent_->system_bytes_remaining_.add(len);
-        return;
+        return false;
     }
     PerSizeLists* lists = GetListsForSize(buffer.len());
     lists->AddFreeBuffer(std::move(buffer));
+    return true;
 }
 
 bool BufferPool::FreeBufferArena::RemoveCleanPage(bool claim_buffer, Page* 
page) {
diff --git a/be/src/runtime/bufferpool/buffer_allocator.h 
b/be/src/runtime/bufferpool/buffer_allocator.h
index f2ab5e6..19b1c6c 100644
--- a/be/src/runtime/bufferpool/buffer_allocator.h
+++ b/be/src/runtime/bufferpool/buffer_allocator.h
@@ -21,6 +21,7 @@
 #include "runtime/bufferpool/buffer_pool_internal.h"
 #include "runtime/bufferpool/free_list.h"
 #include "util/aligned_new.h"
+#include "runtime/mem_tracker.h"
 
 namespace doris {
 
@@ -235,6 +236,8 @@ private:
     /// all arenas so may fail. The final attempt locks all arenas, which is 
expensive
     /// but is guaranteed to succeed.
     int max_scavenge_attempts_;
+
+    std::shared_ptr<MemTracker> _mem_tracker;
 };
 } // namespace doris
 
diff --git a/be/src/runtime/memory/chunk_allocator.cpp 
b/be/src/runtime/memory/chunk_allocator.cpp
index 6f1306c..9aec083 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -99,6 +99,7 @@ public:
         // Poison this chunk to make asan can detect invalid access
         ASAN_POISON_MEMORY_REGION(ptr, size);
         std::lock_guard<SpinLock> l(_lock);
+        // TODO(zxy) The memory of vector resize is not recorded in chunk 
allocator mem tracker
         _chunk_lists[idx].push_back(ptr);
     }
 
@@ -118,9 +119,13 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit)
           _arenas(CpuInfo::get_max_num_cores()) {
     _mem_tracker =
             MemTracker::create_tracker(-1, "ChunkAllocator", nullptr, 
MemTrackerLevel::OVERVIEW);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     for (int i = 0; i < _arenas.size(); ++i) {
         _arenas[i].reset(new ChunkArena());
     }
+    // After the ChunkAllocator is created in the main thread, the main thread 
will not switch to the
+    // chunk allocator mem tracker again, so manually clear the untracked mem 
in tls.
+    thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
 
     _chunk_allocator_metric_entity =
             
DorisMetrics::instance()->metric_registry()->register_entity("chunk_allocator");
diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h
index 4e3fbb8..9ba55fd 100644
--- a/be/src/runtime/tcmalloc_hook.h
+++ b/be/src/runtime/tcmalloc_hook.h
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#pragma once
+
 #include <gperftools/malloc_hook.h>
 #include <gperftools/nallocx.h>
 #include <gperftools/tcmalloc.h>
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 1718e8d..7c3beaf 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -25,14 +25,29 @@
 #include "runtime/runtime_state.h"
 #include "runtime/thread_mem_tracker_mgr.h"
 #include "runtime/threadlocal.h"
+#include "util/doris_metrics.h"
 
 // Attach to task when thread starts
 #define SCOPED_ATTACH_TASK_THREAD(type, ...) \
     auto VARNAME_LINENUM(attach_task_thread) = AttachTaskThread(type, 
##__VA_ARGS__)
+// Be careful to stop the thread mem tracker, because the actual order of 
malloc and free memory
+// may be different from the order of execution of instructions, which will 
cause the position of
+// the memory track to be unexpected.
 #define SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER() \
     auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(true)
 #define GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER() \
     auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(false)
+// Switch thread mem tracker during task execution.
+// After the non-query thread switches the mem tracker, if the thread will not 
switch the mem
+// tracker again in the short term, can consider manually clear_untracked_mems.
+// The query thread will automatically clear_untracked_mems when detach_task.
+#define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
+    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, 
false)
+#define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
+    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, 
true);
+#define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(action_type, ...) \
+    auto VARNAME_LINENUM(witch_tracker_cb) =                            \
+            SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__)
 
 namespace doris {
 
@@ -72,7 +87,7 @@ public:
         _type = type;
         _task_id = task_id;
         _fragment_instance_id = fragment_instance_id;
-        _thread_mem_tracker_mgr->attach_task(task_type_string(_type), task_id, 
fragment_instance_id,
+        _thread_mem_tracker_mgr->attach_task(TaskTypeStr[_type], task_id, 
fragment_instance_id,
                                              mem_tracker);
     }
 
@@ -88,10 +103,6 @@ public:
     const std::string& thread_id_str() const { return _thread_id_str; }
     const TUniqueId& fragment_instance_id() const { return 
_fragment_instance_id; }
 
-    inline static const std::string task_type_string(ThreadContext::TaskType 
type) {
-        return TaskTypeStr[type];
-    }
-
     void consume_mem(int64_t size) {
         if (start_thread_mem_tracker) {
             _thread_mem_tracker_mgr->cache_consume(size);
@@ -166,13 +177,13 @@ public:
 
     explicit AttachTaskThread(const ThreadContext::TaskType& type,
                               const std::shared_ptr<MemTracker>& mem_tracker) {
-        DCHECK(mem_tracker != nullptr);
+        DCHECK(mem_tracker);
         thread_local_ctx.get()->attach(type, "", TUniqueId(), mem_tracker);
     }
 
     explicit AttachTaskThread(const TQueryType::type& query_type,
                               const std::shared_ptr<MemTracker>& mem_tracker) {
-        DCHECK(mem_tracker != nullptr);
+        DCHECK(mem_tracker);
         thread_local_ctx.get()->attach(query_to_task_type(query_type), "", 
TUniqueId(),
                                        mem_tracker);
     }
@@ -182,7 +193,7 @@ public:
                               const std::shared_ptr<MemTracker>& mem_tracker) {
         DCHECK(task_id != "");
         DCHECK(fragment_instance_id != TUniqueId());
-        DCHECK(mem_tracker != nullptr);
+        DCHECK(mem_tracker);
         thread_local_ctx.get()->attach(query_to_task_type(query_type), task_id,
                                        fragment_instance_id, mem_tracker);
     }
@@ -192,7 +203,7 @@ public:
 #ifndef BE_TEST
         DCHECK(print_id(runtime_state->query_id()) != "");
         DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
-        DCHECK(mem_tracker != nullptr);
+        DCHECK(mem_tracker);
         
thread_local_ctx.get()->attach(query_to_task_type(runtime_state->query_type()),
                                        print_id(runtime_state->query_id()),
                                        runtime_state->fragment_instance_id(), 
mem_tracker);
@@ -211,7 +222,12 @@ public:
         }
     }
 
-    ~AttachTaskThread() { thread_local_ctx.get()->detach(); }
+    ~AttachTaskThread() {
+#ifndef BE_TEST
+        thread_local_ctx.get()->detach();
+        DorisMetrics::instance()->attach_task_thread_count->increment(1);
+#endif
+    }
 };
 
 class StopThreadMemTracker {
@@ -228,4 +244,49 @@ private:
     bool _scope;
 };
 
+class SwitchThreadMemTracker {
+public:
+    explicit SwitchThreadMemTracker(const std::shared_ptr<MemTracker>& 
mem_tracker,
+                                    bool in_task = true) {
+#ifndef BE_TEST
+        DCHECK(mem_tracker);
+        // The thread tracker must be switched after the attach task, 
otherwise switching
+        // in the main thread will cause the cached tracker not be cleaned up 
in time.
+        DCHECK(in_task == false ||
+               
thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task());
+        _old_tracker_id =
+                
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker(mem_tracker);
+#endif
+    }
+
+    ~SwitchThreadMemTracker() {
+#ifndef BE_TEST
+        
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
+        
DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
+#endif
+    }
+
+private:
+    std::string _old_tracker_id;
+};
+
+class SwitchThreadMemTrackerErrCallBack {
+public:
+    explicit SwitchThreadMemTrackerErrCallBack(const std::string& action_type,
+                                               bool cancel_work = true,
+                                               ERRCALLBACK err_call_back_func 
= nullptr) {
+        DCHECK(action_type != std::string());
+        _old_tracker_cb = 
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(
+                action_type, cancel_work, err_call_back_func);
+    }
+
+    ~SwitchThreadMemTrackerErrCallBack() {
+        
thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb);
+        
DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1);
+    }
+
+private:
+    ConsumeErrCallBackInfo _old_tracker_cb;
+};
+
 } // namespace doris
diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/thread_mem_tracker_mgr.cpp
index 12b64d4..fd06c17 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/thread_mem_tracker_mgr.cpp
@@ -22,19 +22,21 @@
 
 namespace doris {
 
-void ThreadMemTrackerMgr::attach_task(const std::string& action_type, const 
std::string& task_id,
+void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const 
std::string& task_id,
                                       const TUniqueId& fragment_instance_id,
                                       const std::shared_ptr<MemTracker>& 
mem_tracker) {
     _task_id = task_id;
     _fragment_instance_id = fragment_instance_id;
-    _consume_err_call_back.update(action_type, true, nullptr);
+    _consume_err_cb.cancel_msg = cancel_msg;
     if (mem_tracker == nullptr) {
 #ifdef BE_TEST
         if (ExecEnv::GetInstance()->task_pool_mem_tracker_registry() == 
nullptr) {
             return;
         }
 #endif
-        _temp_task_mem_tracker = 
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(task_id);
+        _temp_task_mem_tracker =
+                
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(
+                        task_id);
         update_tracker(_temp_task_mem_tracker);
     } else {
         update_tracker(mem_tracker);
@@ -44,7 +46,7 @@ void ThreadMemTrackerMgr::attach_task(const std::string& 
action_type, const std:
 void ThreadMemTrackerMgr::detach_task() {
     _task_id = "";
     _fragment_instance_id = TUniqueId();
-    _consume_err_call_back.init();
+    _consume_err_cb.init();
     clear_untracked_mems();
     _tracker_id = "process";
     // The following memory changes for the two map operations of 
_untracked_mems and _mem_trackers
@@ -70,12 +72,12 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const 
std::string& cancel_details
 
 void ThreadMemTrackerMgr::exceeded(int64_t mem_usage, Status st) {
     auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded(
-            nullptr, "In TCMalloc Hook, " + 
_consume_err_call_back.action_type, mem_usage, st);
-    if (_consume_err_call_back.call_back_func != nullptr) {
-        _consume_err_call_back.call_back_func();
+            nullptr, "In TCMalloc Hook, " + _consume_err_cb.cancel_msg, 
mem_usage, st);
+    if (_consume_err_cb.cb_func != nullptr) {
+        _consume_err_cb.cb_func();
     }
-    if (_task_id != "") {
-        if (_consume_err_call_back.cancel_task == true) {
+    if (is_attach_task()) {
+        if (_consume_err_cb.cancel_task == true) {
             exceeded_cancel_task(rst.to_string());
         } else {
             // TODO(zxy) Need other processing, or log (not too often).
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h 
b/be/src/runtime/thread_mem_tracker_mgr.h
index 7401910..2b581de 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -28,27 +28,19 @@ namespace doris {
 typedef void (*ERRCALLBACK)();
 
 struct ConsumeErrCallBackInfo {
-    std::string action_type;
+    std::string cancel_msg;
     bool cancel_task; // Whether to cancel the task when the current tracker 
exceeds the limit
-    ERRCALLBACK call_back_func;
+    ERRCALLBACK cb_func;
 
-    ConsumeErrCallBackInfo() {
-        init();
-    }
-
-    ConsumeErrCallBackInfo(std::string action_type, bool cancel_task, 
ERRCALLBACK call_back_func)
-            : action_type(action_type), cancel_task(cancel_task), 
call_back_func(call_back_func) {}
+    ConsumeErrCallBackInfo() { init(); }
 
-    void update(std::string new_action_type, bool new_cancel_task, ERRCALLBACK 
new_call_back_func) {
-        action_type = new_action_type;
-        cancel_task = new_cancel_task;
-        call_back_func = new_call_back_func;
-    }
+    ConsumeErrCallBackInfo(const std::string& cancel_msg, bool cancel_task, 
ERRCALLBACK cb_func)
+            : cancel_msg(cancel_msg), cancel_task(cancel_task), 
cb_func(cb_func) {}
 
     void init() {
-        action_type = "";
+        cancel_msg = "";
         cancel_task = false;
-        call_back_func = nullptr;
+        cb_func = nullptr;
     }
 };
 
@@ -80,7 +72,7 @@ public:
     }
 
     void clear_untracked_mems() {
-        for(auto untracked_mem : _untracked_mems) {
+        for (const auto& untracked_mem : _untracked_mems) {
             if (untracked_mem.second != 0) {
                 DCHECK(_mem_trackers[untracked_mem.first]);
                 
_mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
@@ -91,7 +83,7 @@ public:
     }
 
     // After attach, the current thread TCMalloc Hook starts to 
consume/release task mem_tracker
-    void attach_task(const std::string& action_type, const std::string& 
task_id,
+    void attach_task(const std::string& cancel_msg, const std::string& task_id,
                      const TUniqueId& fragment_instance_id,
                      const std::shared_ptr<MemTracker>& mem_tracker);
 
@@ -101,6 +93,27 @@ public:
     // Thread update_tracker may be called very frequently, adding a memory 
copy will be slow.
     std::string update_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
 
+    void update_tracker_id(const std::string& tracker_id) {
+        if (tracker_id != _tracker_id) {
+            _untracked_mems[_tracker_id] += _untracked_mem;
+            _untracked_mem = 0;
+            _tracker_id = tracker_id;
+        }
+    }
+
+    inline ConsumeErrCallBackInfo update_consume_err_cb(const std::string& 
cancel_msg,
+                                                        bool cancel_task, 
ERRCALLBACK cb_func) {
+        _temp_consume_err_cb = _consume_err_cb;
+        _consume_err_cb.cancel_msg = cancel_msg;
+        _consume_err_cb.cancel_task = cancel_task;
+        _consume_err_cb.cb_func = cb_func;
+        return _temp_consume_err_cb;
+    }
+
+    inline void update_consume_err_cb(const ConsumeErrCallBackInfo& 
consume_err_cb) {
+        _consume_err_cb = consume_err_cb;
+    }
+
     // Note that, If call the memory allocation operation in TCMalloc 
new/delete Hook,
     // such as calling LOG/iostream/sstream/stringstream/etc. related methods,
     // must increase the control to avoid entering infinite recursion, 
otherwise it may cause crash or stuck,
@@ -108,6 +121,8 @@ public:
 
     void noncache_consume();
 
+    bool is_attach_task() { return _task_id != ""; }
+
     std::shared_ptr<MemTracker> mem_tracker() {
         DCHECK(_mem_trackers[_tracker_id]);
         return _mem_trackers[_tracker_id];
@@ -137,15 +152,16 @@ private:
 
     // Avoid memory allocation in functions and fall into an infinite loop
     std::string _temp_tracker_id;
-    ConsumeErrCallBackInfo _temp_consume_err_call_back;
+    ConsumeErrCallBackInfo _temp_consume_err_cb;
     std::shared_ptr<MemTracker> _temp_task_mem_tracker;
 
     std::string _task_id;
     TUniqueId _fragment_instance_id;
-    ConsumeErrCallBackInfo _consume_err_call_back;
+    ConsumeErrCallBackInfo _consume_err_cb;
 };
 
-inline std::string ThreadMemTrackerMgr::update_tracker(const 
std::shared_ptr<MemTracker>& mem_tracker) {
+inline std::string ThreadMemTrackerMgr::update_tracker(
+        const std::shared_ptr<MemTracker>& mem_tracker) {
     DCHECK(mem_tracker);
     _temp_tracker_id = mem_tracker->id();
     if (_temp_tracker_id == _tracker_id) {
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 2b06779..df15f9d 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -53,7 +53,6 @@
 #include "runtime/exec_env.h"
 #include "runtime/heartbeat_flags.h"
 #include "runtime/minidump.h"
-#include "runtime/tcmalloc_hook.h"
 #include "service/backend_options.h"
 #include "service/backend_service.h"
 #include "service/brpc_service.h"
@@ -65,6 +64,11 @@
 #include "util/thrift_server.h"
 #include "util/uid_util.h"
 
+#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && 
!defined(LEAK_SANITIZER) && \
+        !defined(THREAD_SANITIZER)
+#include "runtime/tcmalloc_hook.h"
+#endif
+
 static void help(const char*);
 
 #include <dlfcn.h>
@@ -336,11 +340,8 @@ int main(int argc, char** argv) {
         return -1;
     }
 
-    if (doris::config::track_new_delete) {
-        init_hook();
-    }
-
-#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && 
!defined(THREAD_SANITIZER)
+#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && 
!defined(LEAK_SANITIZER) && \
+        !defined(THREAD_SANITIZER)
     // Aggressive decommit is required so that unused pages in the TCMalloc 
page heap are
     // not backed by physical pages and do not contribute towards memory 
consumption.
     
MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit",
 1);
@@ -351,6 +352,9 @@ int main(int argc, char** argv) {
         fprintf(stderr, "Failed to change TCMalloc total thread cache 
size.\n");
         return -1;
     }
+    if (doris::config::track_new_delete) {
+        init_hook();
+    }
 #endif
 
     if (!doris::Env::init()) {
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 0c93390..a1d781a 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -132,6 +132,10 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, 
MetricUnit::BYTES);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_total, 
MetricUnit::OPERATIONS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, 
MetricUnit::MICROSECONDS);
 
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(attach_task_thread_count, 
MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_count, 
MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_err_cb_count, 
MetricUnit::NOUNIT);
+
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(memory_pool_bytes_total, MetricUnit::BYTES);
 DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_thread_num, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_fd_num_used, 
MetricUnit::NOUNIT);
@@ -275,6 +279,10 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_rows);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes);
 
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
attach_task_thread_count);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
switch_thread_mem_tracker_count);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
switch_thread_mem_tracker_err_cb_count);
+
     _server_metric_entity->register_hook(_s_hook_name, 
std::bind(&DorisMetrics::_update, this));
 
     INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, 
query_cache_memory_total_byte);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index e99f598..ca5d05c 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -125,6 +125,10 @@ public:
     IntCounter* memtable_flush_total;
     IntCounter* memtable_flush_duration_us;
 
+    IntCounter* attach_task_thread_count;
+    IntCounter* switch_thread_mem_tracker_count;
+    IntCounter* switch_thread_mem_tracker_err_cb_count;
+
     IntGauge* memory_pool_bytes_total;
     IntGauge* process_thread_num;
     IntGauge* process_fd_num_used;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index aea3ff7..9f2e9fb 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -20,6 +20,7 @@
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/runtime_filter_mgr.h"
+#include "runtime/thread_context.h"
 #include "util/defer_op.h"
 #include "vec/core/materialize_block.h"
 #include "vec/exprs/vexpr.h"
@@ -921,6 +922,7 @@ Status HashJoinNode::open(RuntimeState* state) {
 
 Status HashJoinNode::_hash_table_build(RuntimeState* state) {
     RETURN_IF_ERROR(child(1)->open(state));
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while 
constructing the hash table.");
     SCOPED_TIMER(_build_timer);
     MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors());
 
@@ -936,7 +938,6 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) 
{
         RETURN_IF_ERROR(child(1)->get_next(state, &block, &eos));
         _hash_table_mem_tracker->consume(block.allocated_bytes());
         _mem_used += block.allocated_bytes();
-        RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while getting 
next from the child 1.");
 
         if (block.rows() != 0) { mutable_block.merge(block); }
 
@@ -947,7 +948,6 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) 
{
             // TODO:: Rethink may we should do the proess after we recevie all 
build blocks ?
             // which is better.
             RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], 
index));
-            RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while 
constructing the hash table.");
 
             mutable_block = MutableBlock();
             ++index;
@@ -957,7 +957,6 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) 
{
 
     _build_blocks.emplace_back(mutable_block.to_block());
     RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index));
-    RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while constructing 
the hash table.");
 
     return std::visit(
             [&](auto&& arg) -> Status {
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 2f8cf06..0788303 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -22,6 +22,7 @@
 #include "exec/exec_node.h"
 #include "runtime/mem_pool.h"
 #include "runtime/row_batch.h"
+#include "runtime/thread_context.h"
 #include "util/defer_op.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type_nullable.h"
@@ -332,6 +333,7 @@ Status AggregationNode::prepare(RuntimeState* state) {
 
 Status AggregationNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::open(state));
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute 
open.");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
@@ -356,7 +358,6 @@ Status AggregationNode::open(RuntimeState* state) {
         }
         RETURN_IF_ERROR(_executor.execute(&block));
         _executor.update_memusage();
-        RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "aggregator, while execute 
open.");
     }
 
     return Status::OK();
@@ -366,7 +367,9 @@ Status AggregationNode::get_next(RuntimeState* state, 
RowBatch* row_batch, bool*
     return Status::NotSupported("Not Implemented Aggregation Node::get_next 
scalar");
 }
 
-Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) 
{    SCOPED_TIMER(_runtime_profile->total_time_counter());
+Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) 
{
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute 
get_next.");
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     if (_is_streaming_preagg) {
         bool child_eos = false;
@@ -395,7 +398,6 @@ Status AggregationNode::get_next(RuntimeState* state, 
Block* block, bool* eos) {
     }
 
     _executor.update_memusage();
-    RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "aggregator, while execute 
get_next.");
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/vcross_join_node.cpp 
b/be/src/vec/exec/vcross_join_node.cpp
index 69b45dc..d2fc21d 100644
--- a/be/src/vec/exec/vcross_join_node.cpp
+++ b/be/src/vec/exec/vcross_join_node.cpp
@@ -23,6 +23,7 @@
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
 
 namespace doris::vectorized {
@@ -53,6 +54,7 @@ Status VCrossJoinNode::close(RuntimeState* state) {
 Status VCrossJoinNode::construct_build_side(RuntimeState* state) {
     // Do a full scan of child(1) and store all build row batches.
     RETURN_IF_ERROR(child(1)->open(state));
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Vec Cross join, while 
getting next from the child 1");
 
     bool eos = false;
     while (true) {
@@ -70,8 +72,6 @@ Status VCrossJoinNode::construct_build_side(RuntimeState* 
state) {
             _build_blocks.emplace_back(std::move(block));
             _block_mem_tracker->consume(mem_usage);
         }
-        // to prevent use too many memory
-        RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Cross join, while getting 
next from the child 1.");
 
         if (eos) {
             break;
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 0985599..bd7856f 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/exec/vset_operation_node.h"
 
+#include "runtime/thread_context.h"
 #include "util/defer_op.h"
 #include "vec/exprs/vexpr.h"
 namespace doris {
@@ -228,6 +229,8 @@ void VSetOperationNode::hash_table_init() {
 //build a hash table from child(0)
 Status VSetOperationNode::hash_table_build(RuntimeState* state) {
     RETURN_IF_ERROR(child(0)->open(state));
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(
+                "Vec Set Operation Node, while constructing the hash table");
     Block block;
     MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors());
 
@@ -244,7 +247,6 @@ Status VSetOperationNode::hash_table_build(RuntimeState* 
state) {
         _hash_table_mem_tracker->consume(allocated_bytes);
         _mem_used += allocated_bytes;
 
-        RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Set Operation Node, while 
getting next from the child 0.");
         if (block.rows() != 0) { mutable_block.merge(block); }
 
         // make one block for each 4 gigabytes
@@ -254,7 +256,6 @@ Status VSetOperationNode::hash_table_build(RuntimeState* 
state) {
             // TODO:: Rethink may we should do the proess after we recevie all 
build blocks ?
             // which is better.
             RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
-            RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Set Operation Node, 
while constructing the hash table.");
             mutable_block = MutableBlock();
             ++index;
             last_mem_used = _mem_used;
@@ -263,7 +264,6 @@ Status VSetOperationNode::hash_table_build(RuntimeState* 
state) {
 
     _build_blocks.emplace_back(mutable_block.to_block());
     RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
-    RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Set Operation Node, while 
constructing the hash table.");
     return Status::OK();
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to