This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ecbf87d77b [bugfix](memtracker)fix exceed memory limit log (#11485) ecbf87d77b is described below commit ecbf87d77b36e345d9cf6deb6a776f2344de3bdc Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Aug 4 10:22:20 2022 +0800 [bugfix](memtracker)fix exceed memory limit log (#11485) --- be/src/exec/cross_join_node.cpp | 1 - be/src/exec/es/es_scroll_parser.cpp | 11 ++--- be/src/exec/except_node.cpp | 1 - be/src/exec/hash_join_node.cpp | 2 - be/src/exec/intersect_node.cpp | 1 - be/src/exec/olap_scanner.cpp | 5 +- be/src/exec/partitioned_aggregation_node.cc | 10 ++-- be/src/exec/partitioned_hash_table.cc | 7 ++- be/src/exec/set_operation_node.cpp | 1 - be/src/exprs/anyval_util.cpp | 6 +-- be/src/exprs/expr_context.cpp | 6 +-- be/src/runtime/mem_pool.h | 43 +++++++---------- be/src/runtime/memory/mem_tracker_limiter.cpp | 59 +++++++++++------------- be/src/runtime/memory/mem_tracker_limiter.h | 16 +++---- be/src/runtime/memory/mem_tracker_task_pool.cpp | 8 ++-- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 31 ++++++------- be/src/runtime/memory/thread_mem_tracker_mgr.h | 41 +++------------- be/src/runtime/plan_fragment_executor.cpp | 1 + be/src/runtime/runtime_state.cpp | 4 +- be/src/runtime/thread_context.cpp | 18 -------- be/src/runtime/thread_context.h | 40 +++++----------- be/src/vec/exec/join/vhash_join_node.cpp | 1 - be/src/vec/exec/vaggregation_node.cpp | 2 - be/src/vec/exec/vcross_join_node.cpp | 1 - be/src/vec/exec/vset_operation_node.cpp | 1 - 25 files changed, 106 insertions(+), 211 deletions(-) diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp index c80560bf82..fe748904f0 100644 --- a/be/src/exec/cross_join_node.cpp +++ b/be/src/exec/cross_join_node.cpp @@ -52,7 +52,6 @@ Status CrossJoinNode::close(RuntimeState* state) { Status CrossJoinNode::construct_build_side(RuntimeState* state) { // Do a full scan of child(1) and store all build row batches. RETURN_IF_ERROR(child(1)->open(state)); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Cross join, while getting next from child 1"); while (true) { RowBatch* batch = diff --git a/be/src/exec/es/es_scroll_parser.cpp b/be/src/exec/es/es_scroll_parser.cpp index a95af7bb51..7170057ac2 100644 --- a/be/src/exec/es/es_scroll_parser.cpp +++ b/be/src/exec/es/es_scroll_parser.cpp @@ -348,12 +348,11 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, Tuple* tuple, // obj[FIELD_ID] must not be nullptr std::string _id = obj[FIELD_ID].GetString(); size_t len = _id.length(); - Status rst; - char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(len, &rst)); + char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(len)); if (UNLIKELY(buffer == nullptr)) { std::string details = strings::Substitute(ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", len, "string slot"); - RETURN_LIMIT_EXCEEDED(nullptr, details, len, rst); + RETURN_LIMIT_EXCEEDED(nullptr, details, len); } memcpy(buffer, _id.data(), len); reinterpret_cast<StringValue*>(slot)->ptr = buffer; @@ -407,13 +406,11 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, Tuple* tuple, } } size_t val_size = val.length(); - Status rst; - char* buffer = - reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size, &rst)); + char* buffer = reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size)); if (UNLIKELY(buffer == nullptr)) { std::string details = strings::Substitute( ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", val_size, "string slot"); - RETURN_LIMIT_EXCEEDED(nullptr, details, val_size, rst); + RETURN_LIMIT_EXCEEDED(nullptr, details, val_size); } memcpy(buffer, val.data(), val_size); reinterpret_cast<StringValue*>(slot)->ptr = buffer; diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp index 8084fb47f4..8ad7ce9044 100644 --- a/be/src/exec/except_node.cpp +++ b/be/src/exec/except_node.cpp @@ -40,7 +40,6 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { Status ExceptNode::open(RuntimeState* state) { SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Except Node, while probing the hash table."); RETURN_IF_ERROR(SetOperationNode::open(state)); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index c642335924..b328c4b157 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -184,7 +184,6 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) { // The hash join node needs to keep in memory all build tuples, including the tuple // row ptrs. The row ptrs are copied into the hash table's internal structure so they // don't need to be stored in the _build_pool. - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while constructing the hash table."); RowBatch build_batch(child(1)->row_desc(), state->batch_size()); RETURN_IF_ERROR(child(1)->open(state)); @@ -301,7 +300,6 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo // In most cases, no additional memory overhead will be applied for at this stage, // but if the expression calculation in this node needs to apply for additional memory, // it may cause the memory to exceed the limit. - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while execute get_next."); SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp index c897945fed..9bf02cedd4 100644 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -44,7 +44,6 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { // repeat [2] this for all the rest child Status IntersectNode::open(RuntimeState* state) { SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Intersect Node, while probing the hash table."); RETURN_IF_ERROR(SetOperationNode::open(state)); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index a2dac28d30..4fe1bff489 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -310,10 +310,7 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) { Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); // 2. Allocate Row's Tuple buf - Status st = Status::OK(); - uint8_t* tuple_buf = - batch->tuple_data_pool()->allocate(_batch_size * _tuple_desc->byte_size(), &st); - RETURN_NOT_OK_STATUS_WITH_WARN(st, "Allocate mem for row batch failed"); + uint8_t* tuple_buf = batch->tuple_data_pool()->allocate(_batch_size * _tuple_desc->byte_size()); if (tuple_buf == nullptr) { LOG(WARNING) << "Allocate mem for row batch failed."; return Status::RuntimeError("Allocate mem for row batch failed."); diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index ad5f6788d9..b508bee322 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -404,14 +404,13 @@ Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_des Tuple* tuple = batch_iter.get()->get_tuple(0); StringValue* sv = reinterpret_cast<StringValue*>(tuple->get_slot(slot_desc.tuple_offset())); if (sv == nullptr || sv->len == 0) continue; - Status rst; - char* new_ptr = reinterpret_cast<char*>(pool->try_allocate(sv->len, &rst)); + char* new_ptr = reinterpret_cast<char*>(pool->try_allocate(sv->len)); if (UNLIKELY(new_ptr == nullptr)) { string details = Substitute( "Cannot perform aggregation at node with id $0." " Failed to allocate $1 output bytes.", _id, sv->len); - RETURN_LIMIT_EXCEEDED(state_, details, sv->len, rst); + RETURN_LIMIT_EXCEEDED(state_, details, sv->len); } memcpy(new_ptr, sv->ptr, sv->len); sv->ptr = new_ptr; @@ -921,8 +920,7 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( const int fixed_size = intermediate_tuple_desc_->byte_size(); const int varlen_size = GroupingExprsVarlenSize(); const int tuple_data_size = fixed_size + varlen_size; - Status rst; - uint8_t* tuple_data = pool->try_allocate(tuple_data_size, &rst); + uint8_t* tuple_data = pool->try_allocate(tuple_data_size); if (UNLIKELY(tuple_data == nullptr)) { stringstream str; str << "Memory exceed limit. Cannot perform aggregation at node with id $0. Failed " @@ -937,7 +935,7 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( string details = Substitute(str.str(), _id, tuple_data_size); *status = thread_context() ->_thread_mem_tracker_mgr->limiter_mem_tracker() - ->mem_limit_exceeded(state_, details, tuple_data_size, rst); + ->mem_limit_exceeded(state_, details, tuple_data_size); return nullptr; } memset(tuple_data, 0, fixed_size); diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc index cbcc85070c..b78622e137 100644 --- a/be/src/exec/partitioned_hash_table.cc +++ b/be/src/exec/partitioned_hash_table.cc @@ -307,13 +307,12 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state, MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_)); int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_); - Status st = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit( - mem_usage); - if (UNLIKELY(!st)) { + if (UNLIKELY(!thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit( + mem_usage))) { capacity_ = 0; string details = Substitute( "PartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes", mem_usage); - RETURN_LIMIT_EXCEEDED(state, details, mem_usage, st); + RETURN_LIMIT_EXCEEDED(state, details, mem_usage); } int expr_values_size = expr_values_bytes_per_row_ * capacity_; diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index 32794051ef..12ba3d373f 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -136,7 +136,6 @@ Status SetOperationNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("SetOperation, while constructing the hash table."); RETURN_IF_CANCELLED(state); // open result expr lists. for (const std::vector<ExprContext*>& exprs : _child_expr_lists) { diff --git a/be/src/exprs/anyval_util.cpp b/be/src/exprs/anyval_util.cpp index 9d923bb4a7..1c3765578a 100644 --- a/be/src/exprs/anyval_util.cpp +++ b/be/src/exprs/anyval_util.cpp @@ -44,11 +44,9 @@ Status allocate_any_val(RuntimeState* state, MemPool* pool, const TypeDescriptor const std::string& mem_limit_exceeded_msg, AnyVal** result) { const int anyval_size = AnyValUtil::any_val_size(type); const int anyval_alignment = AnyValUtil::any_val_alignment(type); - Status rst; - *result = reinterpret_cast<AnyVal*>( - pool->try_allocate_aligned(anyval_size, anyval_alignment, &rst)); + *result = reinterpret_cast<AnyVal*>(pool->try_allocate_aligned(anyval_size, anyval_alignment)); if (*result == nullptr) { - RETURN_LIMIT_EXCEEDED(state, mem_limit_exceeded_msg, anyval_size, rst); + RETURN_LIMIT_EXCEEDED(state, mem_limit_exceeded_msg, anyval_size); } memset(static_cast<void*>(*result), 0, anyval_size); return Status::OK(); diff --git a/be/src/exprs/expr_context.cpp b/be/src/exprs/expr_context.cpp index 3f63c93d2c..c37195a6bb 100644 --- a/be/src/exprs/expr_context.cpp +++ b/be/src/exprs/expr_context.cpp @@ -413,11 +413,9 @@ Status ExprContext::get_const_value(RuntimeState* state, Expr& expr, AnyVal** co StringVal* sv = reinterpret_cast<StringVal*>(*const_val); if (!sv->is_null && sv->len > 0) { // Make sure the memory is owned by this evaluator. - Status rst; - char* ptr_copy = reinterpret_cast<char*>(_pool->try_allocate(sv->len, &rst)); + char* ptr_copy = reinterpret_cast<char*>(_pool->try_allocate(sv->len)); if (ptr_copy == nullptr) { - RETURN_LIMIT_EXCEEDED(state, "Could not allocate constant string value", sv->len, - rst); + RETURN_LIMIT_EXCEEDED(state, "Could not allocate constant string value", sv->len); } memcpy(ptr_copy, sv->ptr, sv->len); sv->ptr = reinterpret_cast<uint8_t*>(ptr_copy); diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 8fccacc987..84a1b79a30 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -102,44 +102,40 @@ public: /// Allocates a section of memory of 'size' bytes with DEFAULT_ALIGNMENT at the end /// of the current chunk. Creates a new chunk if there aren't any chunks /// with enough capacity. - uint8_t* allocate(int64_t size, Status* rst = nullptr) { - return allocate<false>(size, DEFAULT_ALIGNMENT, rst); - } + uint8_t* allocate(int64_t size) { return allocate<false>(size, DEFAULT_ALIGNMENT); } - uint8_t* allocate_aligned(int64_t size, int alignment, Status* rst = nullptr) { + uint8_t* allocate_aligned(int64_t size, int alignment) { DCHECK_GE(alignment, 1); DCHECK_LE(alignment, config::memory_max_alignment); DCHECK_EQ(BitUtil::RoundUpToPowerOfTwo(alignment), alignment); - return allocate<false>(size, alignment, rst); + return allocate<false>(size, alignment); } /// Same as Allocate() expect add a check when return a nullptr - Status allocate_safely(int64_t size, uint8_t*& ret, Status* rst = nullptr) { - return allocate_safely<false>(size, DEFAULT_ALIGNMENT, ret, rst); + Status allocate_safely(int64_t size, uint8_t*& ret) { + return allocate_safely<false>(size, DEFAULT_ALIGNMENT, ret); } /// Same as Allocate() except the mem limit is checked before the allocation and /// this call will fail (returns nullptr) if it does. /// The caller must handle the nullptr case. This should be used for allocations /// where the size can be very big to bound the amount by which we exceed mem limits. - uint8_t* try_allocate(int64_t size, Status* rst = nullptr) { - return allocate<true>(size, DEFAULT_ALIGNMENT, rst); - } + uint8_t* try_allocate(int64_t size) { return allocate<true>(size, DEFAULT_ALIGNMENT); } /// Same as TryAllocate() except a non-default alignment can be specified. It /// should be a power-of-two in [1, alignof(std::max_align_t)]. - uint8_t* try_allocate_aligned(int64_t size, int alignment, Status* rst = nullptr) { + uint8_t* try_allocate_aligned(int64_t size, int alignment) { DCHECK_GE(alignment, 1); DCHECK_LE(alignment, config::memory_max_alignment); DCHECK_EQ(BitUtil::RoundUpToPowerOfTwo(alignment), alignment); - return allocate<true>(size, alignment, rst); + return allocate<true>(size, alignment); } /// Same as TryAllocate() except returned memory is not aligned at all. - uint8_t* try_allocate_unaligned(int64_t size, Status* rst = nullptr) { + uint8_t* try_allocate_unaligned(int64_t size) { // Call templated implementation directly so that it is inlined here and the // alignment logic can be optimised out. - return allocate<true>(size, 1, rst); + return allocate<true>(size, 1); } /// Makes all allocated chunks available for re-use, but doesn't delete any chunks. @@ -252,7 +248,7 @@ private: } template <bool CHECK_LIMIT_FIRST> - uint8_t* ALWAYS_INLINE allocate(int64_t size, int alignment, Status* rst) { + uint8_t* ALWAYS_INLINE allocate(int64_t size, int alignment) { DCHECK_GE(size, 0); if (UNLIKELY(size == 0)) return reinterpret_cast<uint8_t*>(&k_zero_length_region_); @@ -268,22 +264,15 @@ private: // guarantee alignment. //static_assert( //INITIAL_CHUNK_SIZE >= config::FLAGS_MEMORY_MAX_ALIGNMENT, "Min chunk size too low"); - if (rst == nullptr) { - if (UNLIKELY(!find_chunk(size + DEFAULT_PADDING_SIZE, CHECK_LIMIT_FIRST))) - return nullptr; - } else { - *rst = find_chunk(size + DEFAULT_PADDING_SIZE, CHECK_LIMIT_FIRST); - if (UNLIKELY(!*rst)) return nullptr; - } + if (UNLIKELY(!find_chunk(size + DEFAULT_PADDING_SIZE, CHECK_LIMIT_FIRST))) return nullptr; uint8_t* result = allocate_from_current_chunk(size, alignment); return result; } template <bool CHECK_LIMIT_FIRST> - Status ALWAYS_INLINE allocate_safely(int64_t size, int alignment, uint8_t*& ret, - Status* rst = nullptr) { - uint8_t* result = allocate<CHECK_LIMIT_FIRST>(size, alignment, rst); + Status ALWAYS_INLINE allocate_safely(int64_t size, int alignment, uint8_t*& ret) { + uint8_t* result = allocate<CHECK_LIMIT_FIRST>(size, alignment); if (result == nullptr) { return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR); } @@ -320,6 +309,6 @@ private: }; // Stamp out templated implementations here so they're included in IR module -template uint8_t* MemPool::allocate<false>(int64_t size, int alignment, Status* rst); -template uint8_t* MemPool::allocate<true>(int64_t size, int alignment, Status* rst); +template uint8_t* MemPool::allocate<false>(int64_t size, int alignment); +template uint8_t* MemPool::allocate<true>(int64_t size, int alignment); } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 6a37b1df86..4976c6ae05 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -143,8 +143,8 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) { Status MemTrackerLimiter::try_gc_memory(int64_t bytes) { if (UNLIKELY(gc_memory(_limit - bytes))) { return Status::MemoryLimitExceeded( - fmt::format("label={} TryConsume failed size={}, used={}, limit={}", label(), bytes, - _consumption->current_value(), _limit)); + fmt::format("label={}, limit={}, used={}, failed consume size={}", label(), _limit, + _consumption->current_value(), bytes)); } VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes << " consumption=" << _consumption->current_value() << " limit=" << _limit; @@ -197,9 +197,9 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logge std::vector<MemTracker::Snapshot> snapshots; MemTracker::make_group_snapshot(&snapshots, 0, _group_num, _label); for (const auto& snapshot : snapshots) { - child_trackers_usage += MemTracker::log_usage(snapshot); + child_trackers_usage += "\n " + MemTracker::log_usage(snapshot); } - if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage; + if (!child_trackers_usage.empty()) detail += child_trackers_usage; return detail; } @@ -217,41 +217,38 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, return join(usage_strings, "\n"); } -Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& details, - int64_t failed_allocation_size, Status failed_alloc) { +Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size) { STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); - std::string detail = - "Memory exceed limit. fragment={}, details={}, on backend={}. Memory left in process " - "limit={}."; - detail = fmt::format( - detail, state != nullptr ? print_id(state->fragment_instance_id()) : "", details, - BackendOptions::get_localhost(), + std::string detail = fmt::format( + "{}, failed mem consume:<consume_size={}, mem_limit={}, mem_used={}, tracker_label={}, " + "in backend={} free memory left={}. details mem usage see be.INFO.", + msg, PrettyPrinter::print(failed_consume_size, TUnit::BYTES), _limit, + _consumption->current_value(), _label, BackendOptions::get_localhost(), PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity(), TUnit::BYTES)); - if (!failed_alloc) { - detail += " failed alloc=<{}>. current tracker={}."; - detail = fmt::format(detail, failed_alloc.to_string(), _label); - } else { - detail += " current tracker <label={}, used={}, limit={}, failed alloc size={}>."; - detail = fmt::format(detail, _label, _consumption->current_value(), _limit, - PrettyPrinter::print(failed_allocation_size, TUnit::BYTES)); - } - detail += " If this is a query, can change the limit by session variable exec_mem_limit."; Status status = Status::MemoryLimitExceeded(detail); - if (state != nullptr) state->log_error(detail); - detail += "\n" + boost::stacktrace::to_string(boost::stacktrace::stacktrace()); // only print the tracker log_usage in be log. - if (ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity() < failed_allocation_size) { - // Dumping the process MemTracker is expensive. Limiting the recursive depth to two - // levels limits the level of detail to a one-line summary for each query MemTracker. - detail += "\n" + ExecEnv::GetInstance()->process_mem_tracker()->log_usage(2); - } else { - detail += "\n" + log_usage(); + if (_print_log_usage) { + if (ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity() < failed_consume_size) { + // Dumping the process MemTracker is expensive. Limiting the recursive depth to two + // levels limits the level of detail to a one-line summary for each query MemTracker. + detail += "\n" + ExecEnv::GetInstance()->process_mem_tracker()->log_usage(2); + } else { + detail += "\n" + log_usage(); + } + detail += "\n" + boost::stacktrace::to_string(boost::stacktrace::stacktrace()); + LOG(WARNING) << detail; + _print_log_usage = false; } - - LOG(WARNING) << detail; return status; } +Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& msg, + int64_t failed_alloc_size) { + Status rt = mem_limit_exceeded(msg, failed_alloc_size); + state->log_error(rt.to_string()); + return rt; +} + } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index c85205b2c0..1a8ba5a149 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -139,12 +139,13 @@ public: std::string log_usage(int max_recursive_depth = INT_MAX, int64_t* logged_consumption = nullptr); // Log the memory usage when memory limit is exceeded and return a status object with - // details of the allocation which caused the limit to be exceeded. + // msg of the allocation which caused the limit to be exceeded. // If 'failed_allocation_size' is greater than zero, logs the allocation size. If // 'failed_allocation_size' is zero, nothing about the allocation size is logged. // If 'state' is non-nullptr, logs the error to 'state'. - Status mem_limit_exceeded(RuntimeState* state, const std::string& details = std::string(), - int64_t failed_allocation = -1, Status failed_alloc = Status::OK()); + Status mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size); + Status mem_limit_exceeded(RuntimeState* state, const std::string& msg = std::string(), + int64_t failed_consume_size = -1); std::string debug_string() { std::stringstream msg; @@ -204,6 +205,8 @@ private: // The number of child trackers that have been added. std::atomic_size_t _had_child_count = 0; + bool _print_log_usage = true; + // Lock to protect gc_memory(). This prevents many GCs from occurring at once. std::mutex _gc_lock; // Functions to call after the limit is reached to free memory. @@ -280,11 +283,4 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { return Status::OK(); } -#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ - return thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->mem_limit_exceeded( \ - state, msg, ##__VA_ARGS__); -#define RETURN_IF_LIMIT_EXCEEDED(state, msg) \ - if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->any_limit_exceeded()) \ - RETURN_LIMIT_EXCEEDED(state, msg); - } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index 3c775db5ec..b63b25df17 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -31,15 +31,13 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_task_mem_tracker // Combine new tracker and emplace into one operation to avoid the use of locks // Name for task MemTrackers. '$0' is replaced with the task id. std::shared_ptr<MemTrackerLimiter> tracker; - bool new_emplace = _task_mem_trackers.lazy_emplace_l( + bool new_emplace = _task_mem_trackers.try_emplace_l( task_id, [&](const std::shared_ptr<MemTrackerLimiter>& v) { tracker = v; }, - [&](const auto& ctor) { - tracker = std::make_shared<MemTrackerLimiter>(mem_limit, label, parent); - ctor(task_id, tracker); - }); + std::make_shared<MemTrackerLimiter>(mem_limit, label, parent)); if (new_emplace) { LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); + return get_task_mem_tracker(task_id); } return tracker; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index b6f1ebde33..a3a8dbbc8b 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -25,14 +25,12 @@ namespace doris { void ThreadMemTrackerMgr::attach_limiter_tracker( - const std::string& cancel_msg, const std::string& task_id, - const TUniqueId& fragment_instance_id, + const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { DCHECK(mem_tracker); flush_untracked_mem<false>(); _task_id = task_id; _fragment_instance_id = fragment_instance_id; - _exceed_cb.cancel_msg = cancel_msg; _limiter_tracker = mem_tracker; } @@ -40,7 +38,6 @@ void ThreadMemTrackerMgr::detach_limiter_tracker() { flush_untracked_mem<false>(); _task_id = ""; _fragment_instance_id = TUniqueId(); - _exceed_cb.cancel_msg = ""; _limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker(); } @@ -52,20 +49,22 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details } } -void ThreadMemTrackerMgr::exceeded(int64_t mem_usage, Status try_consume_st) { - if (_exceed_cb.cb_func != nullptr) { - _exceed_cb.cb_func(); +void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size) { + if (_cb_func != nullptr) { + _cb_func(); } - if (is_attach_task()) { - if (_exceed_cb.cancel_task) { - auto st = _limiter_tracker->mem_limit_exceeded( - nullptr, - fmt::format("Task mem limit exceeded and cancel it, msg:{}", - _exceed_cb.cancel_msg), - mem_usage, try_consume_st); - exceeded_cancel_task(st.to_string()); - _exceed_cb.cancel_task = false; // Make sure it will only be canceled once + if (is_attach_query()) { + std::string cancel_msg; + if (!_consumer_tracker_stack.empty()) { + cancel_msg = fmt::format( + "exec node:<name={}>, can change the limit by `set exec_mem_limit=xxx`", + _consumer_tracker_stack[-1]->label()); + } else { + cancel_msg = "exec node:unknown, can change the limit by `set exec_mem_limit=xxx`"; } + auto st = _limiter_tracker->mem_limit_exceeded(cancel_msg, failed_consume_size); + exceeded_cancel_task(st.to_string()); + _check_limit = false; // Make sure it will only be canceled once } } } // namespace doris diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index d3940d1e50..1c9cdfc953 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -32,22 +32,6 @@ extern bthread_key_t btls_key; static const bthread_key_t EMPTY_BTLS_KEY = {0, 0}; using ExceedCallBack = void (*)(); -struct MemExceedCallBackInfo { - std::string cancel_msg; - bool cancel_task; // Whether to cancel the task when the current tracker exceeds the limit. - ExceedCallBack cb_func; - - MemExceedCallBackInfo() { init(); } - - MemExceedCallBackInfo(const std::string& cancel_msg, bool cancel_task, ExceedCallBack cb_func) - : cancel_msg(cancel_msg), cancel_task(cancel_task), cb_func(cb_func) {} - - void init() { - cancel_msg = ""; - cancel_task = true; - cb_func = nullptr; - } -}; // TCMalloc new/delete Hook is counted in the memory_tracker of the current thread. // @@ -61,7 +45,6 @@ public: ~ThreadMemTrackerMgr() { flush_untracked_mem<false>(); - _exceed_cb.init(); DCHECK(_consumer_tracker_stack.empty()); } @@ -75,8 +58,7 @@ public: void init(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker - void attach_limiter_tracker(const std::string& cancel_msg, const std::string& task_id, - const TUniqueId& fragment_instance_id, + void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr<MemTrackerLimiter>& mem_tracker); void detach_limiter_tracker(); @@ -86,16 +68,7 @@ public: void push_consumer_tracker(MemTracker* mem_tracker); void pop_consumer_tracker(); - MemExceedCallBackInfo update_exceed_call_back(const std::string& cancel_msg, bool cancel_task, - ExceedCallBack cb_func) { - _temp_exceed_cb = _exceed_cb; - _exceed_cb.cancel_msg = cancel_msg; - _exceed_cb.cancel_task = cancel_task; - _exceed_cb.cb_func = cb_func; - return _temp_exceed_cb; - } - - void update_exceed_call_back(const MemExceedCallBackInfo& exceed_cb) { _exceed_cb = exceed_cb; } + void set_exceed_call_back(ExceedCallBack cb_func) { _cb_func = cb_func; } // Note that, If call the memory allocation operation in TCMalloc new/delete Hook, // such as calling LOG/iostream/sstream/stringstream/etc. related methods, @@ -114,7 +87,7 @@ public: template <bool CheckLimit> void flush_untracked_mem(); - bool is_attach_task() { return _task_id != ""; } + bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return _limiter_tracker; } @@ -138,7 +111,7 @@ private: // If tryConsume fails due to task mem tracker exceeding the limit, the task must be canceled void exceeded_cancel_task(const std::string& cancel_details); - void exceeded(int64_t mem_usage, Status try_consume_st); + void exceeded(int64_t failed_consume_size); private: // Cache untracked mem, only update to _untracked_mems when switching mem tracker. @@ -155,14 +128,12 @@ private: bool _check_attach = true; std::string _task_id; TUniqueId _fragment_instance_id; - MemExceedCallBackInfo _exceed_cb; - MemExceedCallBackInfo _temp_exceed_cb; + ExceedCallBack _cb_func = nullptr; }; inline void ThreadMemTrackerMgr::init() { DCHECK(_consumer_tracker_stack.empty()); _task_id = ""; - _exceed_cb.init(); _limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker(); _check_limit = true; } @@ -219,7 +190,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. _limiter_tracker->consume(_untracked_mem); - exceeded(_untracked_mem, st); + exceeded(_untracked_mem); } } else { _limiter_tracker->consume(_untracked_mem); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 5b5aaae861..381649c730 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -251,6 +251,7 @@ Status PlanFragmentExecutor::open() { if (_cancel_reason == PPlanFragmentCancelReason::CALL_RPC_ERROR) { status = Status::RuntimeError(_cancel_msg); } else if (_cancel_reason == PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED) { + // status = Status::MemoryAllocFailed(_cancel_msg); status = Status::MemoryLimitExceeded(_cancel_msg); } } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 2d128c13e4..ea7b415622 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -351,7 +351,9 @@ Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) { Status RuntimeState::check_query_state(const std::string& msg) { // TODO: it would be nice if this also checked for cancellation, but doing so breaks // cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached. - RETURN_IF_LIMIT_EXCEEDED(this, msg); + if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->any_limit_exceeded()) { + RETURN_LIMIT_EXCEEDED(this, msg); + } return query_status(); } diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 7a6968b30f..cc11c1b29b 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -79,24 +79,6 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { #endif // USE_MEM_TRACKER } -UpdateMemExceedCallBack::UpdateMemExceedCallBack(const std::string& cancel_msg, bool cancel_task, - ExceedCallBack cb_func) { -#ifdef USE_MEM_TRACKER - DCHECK(cancel_msg != std::string()); - _old_cb = thread_context()->_thread_mem_tracker_mgr->update_exceed_call_back( - cancel_msg, cancel_task, cb_func); -#endif -} - -UpdateMemExceedCallBack::~UpdateMemExceedCallBack() { -#ifdef USE_MEM_TRACKER - thread_context()->_thread_mem_tracker_mgr->update_exceed_call_back(_old_cb); -#ifndef NDEBUG - DorisMetrics::instance()->thread_mem_tracker_exceed_call_back_count->increment(1); -#endif -#endif // USE_MEM_TRACKER -} - SwitchBthread::SwitchBthread() { #ifdef USE_MEM_TRACKER _bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 0acc7c6556..9379f12a9d 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -33,9 +33,11 @@ #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) -#define SCOPED_UPDATE_MEM_EXCEED_CALL_BACK(cancel_msg, ...) \ - auto VARNAME_LINENUM(update_exceed_cb) = \ - doris::UpdateMemExceedCallBack(cancel_msg, ##__VA_ARGS__) +// Attach to task when thread starts +#define SCOPED_ATTACH_TASK(arg1, ...) \ + auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__) + +#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread() namespace doris { @@ -104,8 +106,8 @@ public: BRPC = 5 // to be added ... }; - inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", - "STORAGE"}; + inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", + "COMPACTION", "STORAGE", "BRPC"}; public: ThreadContext() { @@ -139,8 +141,7 @@ public: _type = type; _task_id = task_id; _fragment_instance_id = fragment_instance_id; - _thread_mem_tracker_mgr->attach_limiter_tracker(TaskTypeStr[_type], task_id, - fragment_instance_id, mem_tracker); + _thread_mem_tracker_mgr->attach_limiter_tracker(task_id, fragment_instance_id, mem_tracker); } void detach_task() { @@ -225,19 +226,6 @@ public: ~AddThreadMemTrackerConsumer(); }; -class UpdateMemExceedCallBack { -public: - explicit UpdateMemExceedCallBack(const std::string& cancel_msg, bool cancel_task = true, - ExceedCallBack cb_func = nullptr); - - ~UpdateMemExceedCallBack(); - -private: -#ifdef USE_MEM_TRACKER - MemExceedCallBackInfo _old_cb; -#endif -}; - class SwitchBthread { public: explicit SwitchBthread(); @@ -261,15 +249,8 @@ public: } }; -#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread() - -// Attach to task when thread starts -#define SCOPED_ATTACH_TASK(arg1, ...) \ - auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__) - #define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \ auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit() - #define CONSUME_THREAD_MEM_TRACKER(size) \ doris::thread_context()->_thread_mem_tracker_mgr->consume(size) #define RELEASE_THREAD_MEM_TRACKER(size) \ @@ -278,5 +259,8 @@ public: doris::thread_context()->_thread_mem_tracker_mgr->transfer_to(size, tracker) #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ doris::thread_context()->_thread_mem_tracker_mgr->transfer_from(size, tracker) - +#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ + return doris::thread_context() \ + ->_thread_mem_tracker_mgr->limiter_mem_tracker() \ + ->mem_limit_exceeded(state, msg, ##__VA_ARGS__); } // namespace doris diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 6cbdcfa53f..e7ae1c3a91 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -1115,7 +1115,6 @@ void HashJoinNode::_hash_table_build_thread(RuntimeState* state, std::promise<St Status HashJoinNode::_hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(1)->open(state)); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while constructing the hash table."); SCOPED_TIMER(_build_timer); MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors()); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 1209750203..80817a0095 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -401,7 +401,6 @@ Status AggregationNode::prepare(RuntimeState* state) { Status AggregationNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "AggregationNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("aggregator, while execute open."); RETURN_IF_ERROR(ExecNode::open(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); @@ -446,7 +445,6 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "AggregationNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("aggregator, while execute get_next."); if (_is_streaming_preagg) { bool child_eos = false; diff --git a/be/src/vec/exec/vcross_join_node.cpp b/be/src/vec/exec/vcross_join_node.cpp index d3c9de5843..4a8c2f3b17 100644 --- a/be/src/vec/exec/vcross_join_node.cpp +++ b/be/src/vec/exec/vcross_join_node.cpp @@ -53,7 +53,6 @@ Status VCrossJoinNode::close(RuntimeState* state) { Status VCrossJoinNode::construct_build_side(RuntimeState* state) { // Do a full scan of child(1) and store all build row batches. RETURN_IF_ERROR(child(1)->open(state)); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Vec Cross join, while getting next from the child 1"); bool eos = false; while (true) { diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 11a85ebd47..88747512e8 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -233,7 +233,6 @@ void VSetOperationNode::hash_table_init() { //build a hash table from child(0) Status VSetOperationNode::hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(0)->open(state)); - SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Vec Set Operation Node, while constructing the hash table"); Block block; MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org