This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new aaaaae5 [feature] (memory) Switch TLS mem tracker to separate more detailed memory usage (#8605) aaaaae5 is described below commit aaaaae53b53ce6060188dfd2c19bc1784031b90c Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Mar 24 14:29:34 2022 +0800 [feature] (memory) Switch TLS mem tracker to separate more detailed memory usage (#8605) In pr #8476, all memory usage of a process is recorded in the process mem tracker, and all memory usage of a query is recorded in the query mem tracker, and it is still necessary to manually call `transfer to` to track the cached memory size. We hope to separate out more detailed memory usage based on Hook TCMalloc new/delete + TLS mem tracker. In this pr, the more detailed mem tracker is switched to TLS, which automatically and accurately counts more detailed memory usage than before. --- be/src/exec/cross_join_node.cpp | 5 +- be/src/exec/except_node.cpp | 3 +- be/src/exec/exec_node.cpp | 3 + be/src/exec/hash_join_node.cpp | 5 +- be/src/exec/intersect_node.cpp | 3 +- be/src/exec/set_operation_node.cpp | 3 +- be/src/olap/lru_cache.cpp | 16 ++++-- be/src/olap/tablet_manager.cpp | 15 ++--- be/src/runtime/bufferpool/buffer_allocator.cc | 17 ++++-- be/src/runtime/bufferpool/buffer_allocator.h | 3 + be/src/runtime/memory/chunk_allocator.cpp | 5 ++ be/src/runtime/tcmalloc_hook.h | 2 + be/src/runtime/thread_context.h | 81 +++++++++++++++++++++++---- be/src/runtime/thread_mem_tracker_mgr.cpp | 20 ++++--- be/src/runtime/thread_mem_tracker_mgr.h | 56 +++++++++++------- be/src/service/doris_main.cpp | 16 ++++-- be/src/util/doris_metrics.cpp | 8 +++ be/src/util/doris_metrics.h | 4 ++ be/src/vec/exec/join/vhash_join_node.cpp | 5 +- be/src/vec/exec/vaggregation_node.cpp | 8 ++- be/src/vec/exec/vcross_join_node.cpp | 4 +- be/src/vec/exec/vset_operation_node.cpp | 6 +- 22 files changed, 202 insertions(+), 86 deletions(-) diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp index 5dcdc10..5def58a 100644 --- a/be/src/exec/cross_join_node.cpp +++ b/be/src/exec/cross_join_node.cpp @@ -23,6 +23,7 @@ #include "gen_cpp/PlanNodes_types.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "util/debug_util.h" #include "util/runtime_profile.h" @@ -52,6 +53,7 @@ Status CrossJoinNode::close(RuntimeState* state) { Status CrossJoinNode::construct_build_side(RuntimeState* state) { // Do a full scan of child(1) and store all build row batches. RETURN_IF_ERROR(child(1)->open(state)); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Cross join, while getting next from child 1"); while (true) { RowBatch* batch = @@ -63,9 +65,6 @@ Status CrossJoinNode::construct_build_side(RuntimeState* state) { bool eos = false; RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos)); - // to prevent use too many memory - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Cross join, while getting next from the child 1."); - SCOPED_TIMER(_build_timer); _build_batches.add_row_batch(batch); VLOG_ROW << build_list_debug_string(); diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp index 8ae1701..ec3e451 100644 --- a/be/src/exec/except_node.cpp +++ b/be/src/exec/except_node.cpp @@ -21,6 +21,7 @@ #include "exprs/expr.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" namespace doris { ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) @@ -40,6 +41,7 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { Status ExceptNode::open(RuntimeState* state) { RETURN_IF_ERROR(SetOperationNode::open(state)); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Except Node, while probing the hash table."); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { _hash_tbl_iterator = _hash_tbl->begin(); @@ -62,7 +64,6 @@ Status ExceptNode::open(RuntimeState* state) { while (!eos) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos)); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " Except , while probing the hash table."); for (int j = 0; j < _probe_batch->num_rows(); ++j) { _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); if (_hash_tbl_iterator != _hash_tbl->end()) { diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4d8dd2a..e3891b6 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -57,6 +57,7 @@ #include "runtime/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "util/debug_util.h" #include "util/runtime_profile.h" #include "vec/core/block.h" @@ -208,6 +209,7 @@ Status ExecNode::prepare(RuntimeState* state) { _mem_tracker = MemTracker::create_tracker(-1, "ExecNode:" + _runtime_profile->name(), state->instance_mem_tracker(), MemTrackerLevel::VERBOSE, _runtime_profile.get()); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); _expr_mem_tracker = MemTracker::create_tracker(-1, "ExecNode:Exprs:" + _runtime_profile->name(), _mem_tracker); @@ -226,6 +228,7 @@ Status ExecNode::prepare(RuntimeState* state) { } Status ExecNode::open(RuntimeState* state) { + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); if (_vconjunct_ctx_ptr) { RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state)); diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index c92bb5a..30cd844 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -186,6 +186,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) { // The hash join node needs to keep in memory all build tuples, including the tuple // row ptrs. The row ptrs are copied into the hash table's internal structure so they // don't need to be stored in the _build_pool. + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while constructing the hash table."); RowBatch build_batch(child(1)->row_desc(), state->batch_size()); RETURN_IF_ERROR(child(1)->open(state)); @@ -303,7 +304,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo // In most cases, no additional memory overhead will be applied for at this stage, // but if the expression calculation in this node needs to apply for additional memory, // it may cause the memory to exceed the limit. - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while execute get_next."); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while execute get_next."); SCOPED_TIMER(_runtime_profile->total_time_counter()); if (reached_limit()) { @@ -771,11 +772,9 @@ Status HashJoinNode::process_build_batch(RuntimeState* state, RowBatch* build_ba _build_pool.get(), false); } } - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); } else { // take ownership of tuple data of build_batch _build_pool->acquire_data(build_batch->tuple_data_pool(), false); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch->num_rows())); for (int i = 0; i < build_batch->num_rows(); ++i) { _hash_tbl->insert_without_check(build_batch->get_row(i)); diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp index 2d8d2ee..df0e30d 100644 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -21,6 +21,7 @@ #include "exprs/expr.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" namespace doris { IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) @@ -44,6 +45,7 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { // repeat [2] this for all the rest child Status IntersectNode::open(RuntimeState* state) { RETURN_IF_ERROR(SetOperationNode::open(state)); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Intersect Node, while probing the hash table."); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { _hash_tbl_iterator = _hash_tbl->begin(); @@ -66,7 +68,6 @@ Status IntersectNode::open(RuntimeState* state) { while (!eos) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos)); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table."); for (int j = 0; j < _probe_batch->num_rows(); ++j) { _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); if (_hash_tbl_iterator != _hash_tbl->end()) { diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index 7faa561..5574d3b 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -23,6 +23,7 @@ #include "runtime/raw_value.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" namespace doris { SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode, @@ -137,6 +138,7 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow* other) { Status SetOperationNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("SetOperation, while constructing the hash table."); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); // open result expr lists. @@ -156,7 +158,6 @@ Status SetOperationNode::open(RuntimeState* state) { RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos)); // take ownership of tuple data of build_batch _build_pool->acquire_data(build_batch.tuple_data_pool(), false); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, " SetOperation, while constructing the hash table."); // build hash table and remove duplicate items RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch.num_rows())); for (int i = 0; i < build_batch.num_rows(); ++i) { diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index 825f029..e494c54 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -364,13 +364,7 @@ void LRUCache::erase(const CacheKey& key, uint32_t hash, MemTracker* tracker) { } // free handle out of mutex, when last_ref is true, e must not be nullptr if (last_ref) { - size_t charge = e->charge; e->free(); - // The parameter tracker is ShardedLRUCache::_mem_tracker, - // because the memory released by LRUHandle is recorded in the tls mem tracker, - // so this part of the memory is subsidized from ShardedLRUCache::_mem_tracker to the tls mem tracker - tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), - charge); } } @@ -449,11 +443,15 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity, : _name(name), _last_id(1), _mem_tracker(MemTracker::create_tracker(-1, name, nullptr, MemTrackerLevel::OVERVIEW)) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards; for (int s = 0; s < kNumShards; s++) { _shards[s] = new LRUCache(type); _shards[s]->set_capacity(per_shard); } + // After the lru cache is created in the main thread, the main thread will not switch to the + // lru cache mem tracker again, so manually clear the untracked mem in tls. + thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems(); _entity = DorisMetrics::instance()->metric_registry()->register_entity( std::string("lru_cache:") + name, {{"name", name}}); @@ -467,6 +465,7 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity, } ShardedLRUCache::~ShardedLRUCache() { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); for (int s = 0; s < kNumShards; s++) { delete _shards[s]; } @@ -481,6 +480,7 @@ Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker. thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), charge); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); const uint32_t hash = _hash_slice(key); return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority); } @@ -491,11 +491,13 @@ Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) { } void ShardedLRUCache::release(Handle* handle) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); LRUHandle* h = reinterpret_cast<LRUHandle*>(handle); _shards[_shard(h->hash)]->release(handle); } void ShardedLRUCache::erase(const CacheKey& key) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); const uint32_t hash = _hash_slice(key); _shards[_shard(hash)]->erase(key, hash, _mem_tracker.get()); } @@ -514,6 +516,7 @@ uint64_t ShardedLRUCache::new_id() { } int64_t ShardedLRUCache::prune() { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t num_prune = 0; for (int s = 0; s < kNumShards; s++) { num_prune += _shards[s]->prune(); @@ -522,6 +525,7 @@ int64_t ShardedLRUCache::prune() { } int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t num_prune = 0; for (int s = 0; s < kNumShards; s++) { num_prune += _shards[s]->prune_if(pred); diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index dd51b87..ed1a711 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -192,11 +192,6 @@ OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, tablet_map_t& tablet_map = _get_tablet_map(tablet_id); tablet_map[tablet_id] = tablet; _add_tablet_to_partition(tablet); - // TODO: remove multiply 2 of tablet meta mem size - // Because table schema will copy in tablet, there will be double mem cost - // so here multiply 2 - thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to( - _mem_tracker.get(), tablet->tablet_meta()->mem_size() * 2); VLOG_NOTICE << "add tablet to map successfully." << " tablet_id=" << tablet_id ; @@ -215,6 +210,7 @@ bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) { OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, std::vector<DataDir*> stores) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); DorisMetrics::instance()->create_tablet_requests_total->increment(1); int64_t tablet_id = request.tablet_id; @@ -432,6 +428,7 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( OLAPStatus TabletManager::drop_tablet(TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) { WriteLock wrlock(_get_tablets_shard_lock(tablet_id)); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); return _drop_tablet_unlocked(tablet_id, keep_files); } @@ -460,6 +457,7 @@ OLAPStatus TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, bool keep_f OLAPStatus TabletManager::drop_tablets_on_error_root_path( const std::vector<TabletInfo>& tablet_info_vec) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); OLAPStatus res = OLAP_SUCCESS; if (tablet_info_vec.empty()) { // This is a high probability event return res; @@ -670,6 +668,7 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tab TSchemaHash schema_hash, const string& meta_binary, bool update_meta, bool force, bool restore, bool check_path) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); TabletMetaSharedPtr tablet_meta(new TabletMeta()); OLAPStatus status = tablet_meta->deserialize(meta_binary); if (status != OLAP_SUCCESS) { @@ -752,6 +751,7 @@ OLAPStatus TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_ SchemaHash schema_hash, const string& schema_hash_path, bool force, bool restore) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); LOG(INFO) << "begin to load tablet from dir. " << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash << " path = " << schema_hash_path << " force = " << force << " restore = " << restore; @@ -1219,11 +1219,6 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id, bo } dropped_tablet->deregister_tablet_from_dir(); - // The dropped tablet meta is expected to be released in the TabletManager mem tracker, - // but is actually released in the tls mem tracker. - // So from TabletManager mem tracker compensate memory to tls tracker. - _mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), - dropped_tablet->tablet_meta()->mem_size() * 2); return OLAP_SUCCESS; } diff --git a/be/src/runtime/bufferpool/buffer_allocator.cc b/be/src/runtime/bufferpool/buffer_allocator.cc index a3bbe4c..82742a6 100644 --- a/be/src/runtime/bufferpool/buffer_allocator.cc +++ b/be/src/runtime/bufferpool/buffer_allocator.cc @@ -26,6 +26,7 @@ #include "util/cpu_info.h" #include "util/pretty_printer.h" #include "util/runtime_profile.h" +#include "runtime/thread_context.h" //DECLARE_bool(disable_mem_pools); @@ -48,7 +49,7 @@ public: /// Add a free buffer to the free lists. May free buffers to the system allocator /// if the list becomes full. Caller should not hold 'lock_' - void AddFreeBuffer(BufferHandle&& buffer); + bool AddFreeBuffer(BufferHandle&& buffer); /// Try to get a free buffer of 'buffer_len' bytes from this arena. Returns true and /// sets 'buffer' if found or false if not found. Caller should not hold 'lock_'. @@ -193,7 +194,8 @@ BufferPool::BufferAllocator::BufferAllocator(BufferPool* pool, int64_t min_buffe clean_page_bytes_limit_(clean_page_bytes_limit), clean_page_bytes_remaining_(clean_page_bytes_limit), per_core_arenas_(CpuInfo::get_max_num_cores()), - max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS) { + max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS), + _mem_tracker(MemTracker::create_virtual_tracker(-1, "BufferAllocator", nullptr, MemTrackerLevel::OVERVIEW)) { DCHECK(BitUtil::IsPowerOf2(min_buffer_len_)) << min_buffer_len_; DCHECK(BitUtil::IsPowerOf2(max_buffer_len_)) << max_buffer_len_; DCHECK_LE(0, min_buffer_len_); @@ -303,6 +305,7 @@ Status BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle* system_bytes_remaining_.add(len); return status; } + _mem_tracker->consume_cache(len); return Status::OK(); } @@ -375,7 +378,9 @@ void BufferPool::BufferAllocator::Free(BufferHandle&& handle) { handle.client_ = nullptr; // Buffer is no longer associated with a client. FreeBufferArena* arena = per_core_arenas_[handle.home_core_].get(); handle.Poison(); - arena->AddFreeBuffer(std::move(handle)); + if (!arena->AddFreeBuffer(std::move(handle))) { + _mem_tracker->release_cache(handle.len()); + } } void BufferPool::BufferAllocator::AddCleanPage(const std::unique_lock<std::mutex>& client_lock, @@ -426,6 +431,7 @@ int64_t BufferPool::BufferAllocator::FreeToSystem(std::vector<BufferHandle>&& bu buffer.Unpoison(); system_allocator_->Free(std::move(buffer)); } + _mem_tracker->release_cache(bytes_freed); return bytes_freed; } @@ -485,16 +491,17 @@ BufferPool::FreeBufferArena::~FreeBufferArena() { } } -void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) { +bool BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) { std::lock_guard<SpinLock> al(lock_); if (config::disable_mem_pools) { int64_t len = buffer.len(); parent_->system_allocator_->Free(std::move(buffer)); parent_->system_bytes_remaining_.add(len); - return; + return false; } PerSizeLists* lists = GetListsForSize(buffer.len()); lists->AddFreeBuffer(std::move(buffer)); + return true; } bool BufferPool::FreeBufferArena::RemoveCleanPage(bool claim_buffer, Page* page) { diff --git a/be/src/runtime/bufferpool/buffer_allocator.h b/be/src/runtime/bufferpool/buffer_allocator.h index f2ab5e6..19b1c6c 100644 --- a/be/src/runtime/bufferpool/buffer_allocator.h +++ b/be/src/runtime/bufferpool/buffer_allocator.h @@ -21,6 +21,7 @@ #include "runtime/bufferpool/buffer_pool_internal.h" #include "runtime/bufferpool/free_list.h" #include "util/aligned_new.h" +#include "runtime/mem_tracker.h" namespace doris { @@ -235,6 +236,8 @@ private: /// all arenas so may fail. The final attempt locks all arenas, which is expensive /// but is guaranteed to succeed. int max_scavenge_attempts_; + + std::shared_ptr<MemTracker> _mem_tracker; }; } // namespace doris diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index 6f1306c..9aec083 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -99,6 +99,7 @@ public: // Poison this chunk to make asan can detect invalid access ASAN_POISON_MEMORY_REGION(ptr, size); std::lock_guard<SpinLock> l(_lock); + // TODO(zxy) The memory of vector resize is not recorded in chunk allocator mem tracker _chunk_lists[idx].push_back(ptr); } @@ -118,9 +119,13 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit) _arenas(CpuInfo::get_max_num_cores()) { _mem_tracker = MemTracker::create_tracker(-1, "ChunkAllocator", nullptr, MemTrackerLevel::OVERVIEW); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); for (int i = 0; i < _arenas.size(); ++i) { _arenas[i].reset(new ChunkArena()); } + // After the ChunkAllocator is created in the main thread, the main thread will not switch to the + // chunk allocator mem tracker again, so manually clear the untracked mem in tls. + thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems(); _chunk_allocator_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity("chunk_allocator"); diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h index 4e3fbb8..9ba55fd 100644 --- a/be/src/runtime/tcmalloc_hook.h +++ b/be/src/runtime/tcmalloc_hook.h @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#pragma once + #include <gperftools/malloc_hook.h> #include <gperftools/nallocx.h> #include <gperftools/tcmalloc.h> diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 1718e8d..7c3beaf 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -25,14 +25,29 @@ #include "runtime/runtime_state.h" #include "runtime/thread_mem_tracker_mgr.h" #include "runtime/threadlocal.h" +#include "util/doris_metrics.h" // Attach to task when thread starts #define SCOPED_ATTACH_TASK_THREAD(type, ...) \ auto VARNAME_LINENUM(attach_task_thread) = AttachTaskThread(type, ##__VA_ARGS__) +// Be careful to stop the thread mem tracker, because the actual order of malloc and free memory +// may be different from the order of execution of instructions, which will cause the position of +// the memory track to be unexpected. #define SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER() \ auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(true) #define GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER() \ auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(false) +// Switch thread mem tracker during task execution. +// After the non-query thread switches the mem tracker, if the thread will not switch the mem +// tracker again in the short term, can consider manually clear_untracked_mems. +// The query thread will automatically clear_untracked_mems when detach_task. +#define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ + auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, false) +#define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ + auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, true); +#define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(action_type, ...) \ + auto VARNAME_LINENUM(witch_tracker_cb) = \ + SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__) namespace doris { @@ -72,7 +87,7 @@ public: _type = type; _task_id = task_id; _fragment_instance_id = fragment_instance_id; - _thread_mem_tracker_mgr->attach_task(task_type_string(_type), task_id, fragment_instance_id, + _thread_mem_tracker_mgr->attach_task(TaskTypeStr[_type], task_id, fragment_instance_id, mem_tracker); } @@ -88,10 +103,6 @@ public: const std::string& thread_id_str() const { return _thread_id_str; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } - inline static const std::string task_type_string(ThreadContext::TaskType type) { - return TaskTypeStr[type]; - } - void consume_mem(int64_t size) { if (start_thread_mem_tracker) { _thread_mem_tracker_mgr->cache_consume(size); @@ -166,13 +177,13 @@ public: explicit AttachTaskThread(const ThreadContext::TaskType& type, const std::shared_ptr<MemTracker>& mem_tracker) { - DCHECK(mem_tracker != nullptr); + DCHECK(mem_tracker); thread_local_ctx.get()->attach(type, "", TUniqueId(), mem_tracker); } explicit AttachTaskThread(const TQueryType::type& query_type, const std::shared_ptr<MemTracker>& mem_tracker) { - DCHECK(mem_tracker != nullptr); + DCHECK(mem_tracker); thread_local_ctx.get()->attach(query_to_task_type(query_type), "", TUniqueId(), mem_tracker); } @@ -182,7 +193,7 @@ public: const std::shared_ptr<MemTracker>& mem_tracker) { DCHECK(task_id != ""); DCHECK(fragment_instance_id != TUniqueId()); - DCHECK(mem_tracker != nullptr); + DCHECK(mem_tracker); thread_local_ctx.get()->attach(query_to_task_type(query_type), task_id, fragment_instance_id, mem_tracker); } @@ -192,7 +203,7 @@ public: #ifndef BE_TEST DCHECK(print_id(runtime_state->query_id()) != ""); DCHECK(runtime_state->fragment_instance_id() != TUniqueId()); - DCHECK(mem_tracker != nullptr); + DCHECK(mem_tracker); thread_local_ctx.get()->attach(query_to_task_type(runtime_state->query_type()), print_id(runtime_state->query_id()), runtime_state->fragment_instance_id(), mem_tracker); @@ -211,7 +222,12 @@ public: } } - ~AttachTaskThread() { thread_local_ctx.get()->detach(); } + ~AttachTaskThread() { +#ifndef BE_TEST + thread_local_ctx.get()->detach(); + DorisMetrics::instance()->attach_task_thread_count->increment(1); +#endif + } }; class StopThreadMemTracker { @@ -228,4 +244,49 @@ private: bool _scope; }; +class SwitchThreadMemTracker { +public: + explicit SwitchThreadMemTracker(const std::shared_ptr<MemTracker>& mem_tracker, + bool in_task = true) { +#ifndef BE_TEST + DCHECK(mem_tracker); + // The thread tracker must be switched after the attach task, otherwise switching + // in the main thread will cause the cached tracker not be cleaned up in time. + DCHECK(in_task == false || + thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task()); + _old_tracker_id = + thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker(mem_tracker); +#endif + } + + ~SwitchThreadMemTracker() { +#ifndef BE_TEST + thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id); + DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1); +#endif + } + +private: + std::string _old_tracker_id; +}; + +class SwitchThreadMemTrackerErrCallBack { +public: + explicit SwitchThreadMemTrackerErrCallBack(const std::string& action_type, + bool cancel_work = true, + ERRCALLBACK err_call_back_func = nullptr) { + DCHECK(action_type != std::string()); + _old_tracker_cb = thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb( + action_type, cancel_work, err_call_back_func); + } + + ~SwitchThreadMemTrackerErrCallBack() { + thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb); + DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1); + } + +private: + ConsumeErrCallBackInfo _old_tracker_cb; +}; + } // namespace doris diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp b/be/src/runtime/thread_mem_tracker_mgr.cpp index 12b64d4..fd06c17 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/thread_mem_tracker_mgr.cpp @@ -22,19 +22,21 @@ namespace doris { -void ThreadMemTrackerMgr::attach_task(const std::string& action_type, const std::string& task_id, +void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr<MemTracker>& mem_tracker) { _task_id = task_id; _fragment_instance_id = fragment_instance_id; - _consume_err_call_back.update(action_type, true, nullptr); + _consume_err_cb.cancel_msg = cancel_msg; if (mem_tracker == nullptr) { #ifdef BE_TEST if (ExecEnv::GetInstance()->task_pool_mem_tracker_registry() == nullptr) { return; } #endif - _temp_task_mem_tracker = ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(task_id); + _temp_task_mem_tracker = + ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker( + task_id); update_tracker(_temp_task_mem_tracker); } else { update_tracker(mem_tracker); @@ -44,7 +46,7 @@ void ThreadMemTrackerMgr::attach_task(const std::string& action_type, const std: void ThreadMemTrackerMgr::detach_task() { _task_id = ""; _fragment_instance_id = TUniqueId(); - _consume_err_call_back.init(); + _consume_err_cb.init(); clear_untracked_mems(); _tracker_id = "process"; // The following memory changes for the two map operations of _untracked_mems and _mem_trackers @@ -70,12 +72,12 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details void ThreadMemTrackerMgr::exceeded(int64_t mem_usage, Status st) { auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded( - nullptr, "In TCMalloc Hook, " + _consume_err_call_back.action_type, mem_usage, st); - if (_consume_err_call_back.call_back_func != nullptr) { - _consume_err_call_back.call_back_func(); + nullptr, "In TCMalloc Hook, " + _consume_err_cb.cancel_msg, mem_usage, st); + if (_consume_err_cb.cb_func != nullptr) { + _consume_err_cb.cb_func(); } - if (_task_id != "") { - if (_consume_err_call_back.cancel_task == true) { + if (is_attach_task()) { + if (_consume_err_cb.cancel_task == true) { exceeded_cancel_task(rst.to_string()); } else { // TODO(zxy) Need other processing, or log (not too often). diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index 7401910..2b581de 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -28,27 +28,19 @@ namespace doris { typedef void (*ERRCALLBACK)(); struct ConsumeErrCallBackInfo { - std::string action_type; + std::string cancel_msg; bool cancel_task; // Whether to cancel the task when the current tracker exceeds the limit - ERRCALLBACK call_back_func; + ERRCALLBACK cb_func; - ConsumeErrCallBackInfo() { - init(); - } - - ConsumeErrCallBackInfo(std::string action_type, bool cancel_task, ERRCALLBACK call_back_func) - : action_type(action_type), cancel_task(cancel_task), call_back_func(call_back_func) {} + ConsumeErrCallBackInfo() { init(); } - void update(std::string new_action_type, bool new_cancel_task, ERRCALLBACK new_call_back_func) { - action_type = new_action_type; - cancel_task = new_cancel_task; - call_back_func = new_call_back_func; - } + ConsumeErrCallBackInfo(const std::string& cancel_msg, bool cancel_task, ERRCALLBACK cb_func) + : cancel_msg(cancel_msg), cancel_task(cancel_task), cb_func(cb_func) {} void init() { - action_type = ""; + cancel_msg = ""; cancel_task = false; - call_back_func = nullptr; + cb_func = nullptr; } }; @@ -80,7 +72,7 @@ public: } void clear_untracked_mems() { - for(auto untracked_mem : _untracked_mems) { + for (const auto& untracked_mem : _untracked_mems) { if (untracked_mem.second != 0) { DCHECK(_mem_trackers[untracked_mem.first]); _mem_trackers[untracked_mem.first]->consume(untracked_mem.second); @@ -91,7 +83,7 @@ public: } // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker - void attach_task(const std::string& action_type, const std::string& task_id, + void attach_task(const std::string& cancel_msg, const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr<MemTracker>& mem_tracker); @@ -101,6 +93,27 @@ public: // Thread update_tracker may be called very frequently, adding a memory copy will be slow. std::string update_tracker(const std::shared_ptr<MemTracker>& mem_tracker); + void update_tracker_id(const std::string& tracker_id) { + if (tracker_id != _tracker_id) { + _untracked_mems[_tracker_id] += _untracked_mem; + _untracked_mem = 0; + _tracker_id = tracker_id; + } + } + + inline ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg, + bool cancel_task, ERRCALLBACK cb_func) { + _temp_consume_err_cb = _consume_err_cb; + _consume_err_cb.cancel_msg = cancel_msg; + _consume_err_cb.cancel_task = cancel_task; + _consume_err_cb.cb_func = cb_func; + return _temp_consume_err_cb; + } + + inline void update_consume_err_cb(const ConsumeErrCallBackInfo& consume_err_cb) { + _consume_err_cb = consume_err_cb; + } + // Note that, If call the memory allocation operation in TCMalloc new/delete Hook, // such as calling LOG/iostream/sstream/stringstream/etc. related methods, // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, @@ -108,6 +121,8 @@ public: void noncache_consume(); + bool is_attach_task() { return _task_id != ""; } + std::shared_ptr<MemTracker> mem_tracker() { DCHECK(_mem_trackers[_tracker_id]); return _mem_trackers[_tracker_id]; @@ -137,15 +152,16 @@ private: // Avoid memory allocation in functions and fall into an infinite loop std::string _temp_tracker_id; - ConsumeErrCallBackInfo _temp_consume_err_call_back; + ConsumeErrCallBackInfo _temp_consume_err_cb; std::shared_ptr<MemTracker> _temp_task_mem_tracker; std::string _task_id; TUniqueId _fragment_instance_id; - ConsumeErrCallBackInfo _consume_err_call_back; + ConsumeErrCallBackInfo _consume_err_cb; }; -inline std::string ThreadMemTrackerMgr::update_tracker(const std::shared_ptr<MemTracker>& mem_tracker) { +inline std::string ThreadMemTrackerMgr::update_tracker( + const std::shared_ptr<MemTracker>& mem_tracker) { DCHECK(mem_tracker); _temp_tracker_id = mem_tracker->id(); if (_temp_tracker_id == _tracker_id) { diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 2b06779..df15f9d 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -53,7 +53,6 @@ #include "runtime/exec_env.h" #include "runtime/heartbeat_flags.h" #include "runtime/minidump.h" -#include "runtime/tcmalloc_hook.h" #include "service/backend_options.h" #include "service/backend_service.h" #include "service/brpc_service.h" @@ -65,6 +64,11 @@ #include "util/thrift_server.h" #include "util/uid_util.h" +#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ + !defined(THREAD_SANITIZER) +#include "runtime/tcmalloc_hook.h" +#endif + static void help(const char*); #include <dlfcn.h> @@ -336,11 +340,8 @@ int main(int argc, char** argv) { return -1; } - if (doris::config::track_new_delete) { - init_hook(); - } - -#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) +#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ + !defined(THREAD_SANITIZER) // Aggressive decommit is required so that unused pages in the TCMalloc page heap are // not backed by physical pages and do not contribute towards memory consumption. MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit", 1); @@ -351,6 +352,9 @@ int main(int argc, char** argv) { fprintf(stderr, "Failed to change TCMalloc total thread cache size.\n"); return -1; } + if (doris::config::track_new_delete) { + init_hook(); + } #endif if (!doris::Env::init()) { diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 0c93390..a1d781a 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -132,6 +132,10 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, MetricUnit::BYTES); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_total, MetricUnit::OPERATIONS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, MetricUnit::MICROSECONDS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(attach_task_thread_count, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_count, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_err_cb_count, MetricUnit::NOUNIT); + DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(memory_pool_bytes_total, MetricUnit::BYTES); DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_thread_num, MetricUnit::NOUNIT); DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_fd_num_used, MetricUnit::NOUNIT); @@ -275,6 +279,10 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_rows); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, attach_task_thread_count); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_thread_mem_tracker_count); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_thread_mem_tracker_err_cb_count); + _server_metric_entity->register_hook(_s_hook_name, std::bind(&DorisMetrics::_update, this)); INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, query_cache_memory_total_byte); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index e99f598..ca5d05c 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -125,6 +125,10 @@ public: IntCounter* memtable_flush_total; IntCounter* memtable_flush_duration_us; + IntCounter* attach_task_thread_count; + IntCounter* switch_thread_mem_tracker_count; + IntCounter* switch_thread_mem_tracker_err_cb_count; + IntGauge* memory_pool_bytes_total; IntGauge* process_thread_num; IntGauge* process_fd_num_used; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index aea3ff7..9f2e9fb 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -20,6 +20,7 @@ #include "gen_cpp/PlanNodes_types.h" #include "runtime/mem_tracker.h" #include "runtime/runtime_filter_mgr.h" +#include "runtime/thread_context.h" #include "util/defer_op.h" #include "vec/core/materialize_block.h" #include "vec/exprs/vexpr.h" @@ -921,6 +922,7 @@ Status HashJoinNode::open(RuntimeState* state) { Status HashJoinNode::_hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(1)->open(state)); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while constructing the hash table."); SCOPED_TIMER(_build_timer); MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors()); @@ -936,7 +938,6 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(1)->get_next(state, &block, &eos)); _hash_table_mem_tracker->consume(block.allocated_bytes()); _mem_used += block.allocated_bytes(); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while getting next from the child 1."); if (block.rows() != 0) { mutable_block.merge(block); } @@ -947,7 +948,6 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) { // TODO:: Rethink may we should do the proess after we recevie all build blocks ? // which is better. RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index)); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); mutable_block = MutableBlock(); ++index; @@ -957,7 +957,6 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) { _build_blocks.emplace_back(mutable_block.to_block()); RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index)); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); return std::visit( [&](auto&& arg) -> Status { diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 2f8cf06..0788303 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -22,6 +22,7 @@ #include "exec/exec_node.h" #include "runtime/mem_pool.h" #include "runtime/row_batch.h" +#include "runtime/thread_context.h" #include "util/defer_op.h" #include "vec/core/block.h" #include "vec/data_types/data_type_nullable.h" @@ -332,6 +333,7 @@ Status AggregationNode::prepare(RuntimeState* state) { Status AggregationNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute open."); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state)); @@ -356,7 +358,6 @@ Status AggregationNode::open(RuntimeState* state) { } RETURN_IF_ERROR(_executor.execute(&block)); _executor.update_memusage(); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "aggregator, while execute open."); } return Status::OK(); @@ -366,7 +367,9 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* return Status::NotSupported("Not Implemented Aggregation Node::get_next scalar"); } -Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); +Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute get_next."); + SCOPED_TIMER(_runtime_profile->total_time_counter()); if (_is_streaming_preagg) { bool child_eos = false; @@ -395,7 +398,6 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { } _executor.update_memusage(); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "aggregator, while execute get_next."); return Status::OK(); } diff --git a/be/src/vec/exec/vcross_join_node.cpp b/be/src/vec/exec/vcross_join_node.cpp index 69b45dc..d2fc21d 100644 --- a/be/src/vec/exec/vcross_join_node.cpp +++ b/be/src/vec/exec/vcross_join_node.cpp @@ -23,6 +23,7 @@ #include "gen_cpp/PlanNodes_types.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "util/runtime_profile.h" namespace doris::vectorized { @@ -53,6 +54,7 @@ Status VCrossJoinNode::close(RuntimeState* state) { Status VCrossJoinNode::construct_build_side(RuntimeState* state) { // Do a full scan of child(1) and store all build row batches. RETURN_IF_ERROR(child(1)->open(state)); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Vec Cross join, while getting next from the child 1"); bool eos = false; while (true) { @@ -70,8 +72,6 @@ Status VCrossJoinNode::construct_build_side(RuntimeState* state) { _build_blocks.emplace_back(std::move(block)); _block_mem_tracker->consume(mem_usage); } - // to prevent use too many memory - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Cross join, while getting next from the child 1."); if (eos) { break; diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 0985599..bd7856f 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -17,6 +17,7 @@ #include "vec/exec/vset_operation_node.h" +#include "runtime/thread_context.h" #include "util/defer_op.h" #include "vec/exprs/vexpr.h" namespace doris { @@ -228,6 +229,8 @@ void VSetOperationNode::hash_table_init() { //build a hash table from child(0) Status VSetOperationNode::hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(0)->open(state)); + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB( + "Vec Set Operation Node, while constructing the hash table"); Block block; MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors()); @@ -244,7 +247,6 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { _hash_table_mem_tracker->consume(allocated_bytes); _mem_used += allocated_bytes; - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Set Operation Node, while getting next from the child 0."); if (block.rows() != 0) { mutable_block.merge(block); } // make one block for each 4 gigabytes @@ -254,7 +256,6 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { // TODO:: Rethink may we should do the proess after we recevie all build blocks ? // which is better. RETURN_IF_ERROR(process_build_block(_build_blocks[index], index)); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Set Operation Node, while constructing the hash table."); mutable_block = MutableBlock(); ++index; last_mem_used = _mem_used; @@ -263,7 +264,6 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { _build_blocks.emplace_back(mutable_block.to_block()); RETURN_IF_ERROR(process_build_block(_build_blocks[index], index)); - RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, "Set Operation Node, while constructing the hash table."); return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org