This is an automated email from the ASF dual-hosted git repository. lingmiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 66d2f4e1fd [fix][mem tracker] Fix MemTracker null pointer in vectorized (#8925) 66d2f4e1fd is described below commit 66d2f4e1fdc490536d383e0878ed2e3ad011bd6c Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Tue Apr 12 10:17:10 2022 +0800 [fix][mem tracker] Fix MemTracker null pointer in vectorized (#8925) Fix ThreadMemTrackerMgr::update_tracker null pointer and some details. Issue Number: close #8920 --- be/src/exec/exchange_node.cpp | 1 + be/src/runtime/data_stream_recvr.h | 1 + be/src/runtime/fold_constant_executor.cpp | 4 ++-- be/src/runtime/thread_context.h | 2 +- be/src/runtime/thread_mem_tracker_mgr.h | 13 ++++++++----- be/src/vec/exec/vexchange_node.cpp | 5 +++++ be/src/vec/runtime/vdata_stream_recvr.h | 1 + be/src/vec/sink/vtablet_sink.cpp | 3 ++- 8 files changed, 21 insertions(+), 9 deletions(-) diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index 083e40518e..d79170f912 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -212,6 +212,7 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next.")); + ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos)); while ((_num_rows_skipped < _offset)) { _num_rows_skipped += output_batch->num_rows(); diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h index 3b1edac081..28a4a9dadf 100644 --- a/be/src/runtime/data_stream_recvr.h +++ b/be/src/runtime/data_stream_recvr.h @@ -100,6 +100,7 @@ public: const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } PlanNodeId dest_node_id() const { return _dest_node_id; } const RowDescriptor& row_desc() const { return _row_desc; } + const std::shared_ptr<MemTracker>& mem_tracker() const { return _mem_tracker; } void add_sub_plan_statistics(const PQueryStatistics& statistics, int sender_id) { _sub_plan_query_statistics_recvr->insert(statistics, sender_id); diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp index 9b5300f16a..274750b8b3 100644 --- a/be/src/runtime/fold_constant_executor.cpp +++ b/be/src/runtime/fold_constant_executor.cpp @@ -44,7 +44,6 @@ TUniqueId FoldConstantExecutor::_dummy_id; Status FoldConstantExecutor::fold_constant_expr( const TFoldConstantParams& params, PConstantExprResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); const auto& expr_map = params.expr_map; auto expr_result_map = response->mutable_expr_result_map(); @@ -54,6 +53,7 @@ Status FoldConstantExecutor::fold_constant_expr( if (UNLIKELY(!status.ok())) { return status; } + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); for (const auto& m : expr_map) { PExprResultMap pexpr_result_map; @@ -108,9 +108,9 @@ Status FoldConstantExecutor::fold_constant_vexpr( // init Status status = _init(query_globals); if (UNLIKELY(!status.ok())) { - LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg(); return status; } + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); for (const auto& m : expr_map) { PExprResultMap pexpr_result_map; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index a7c8261930..4d9d60078c 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -156,7 +156,7 @@ private: // The func provided by pthread and std::thread doesn't help either. // // So, kudu Class-scoped static thread local implementation was introduced. Solve the above problem by -// Thread-scopedthread local + Class-scoped thread local. +// Thread-scoped thread local + Class-scoped thread local. // // This may look very trick, but it's the best way I can find. // diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index a709a536bf..79c152c243 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -65,6 +65,7 @@ public: _mem_trackers[0] = MemTracker::get_process_tracker(); _untracked_mems[0] = 0; _tracker_id = 0; + _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); start_thread_mem_tracker = true; } ~ThreadMemTrackerMgr() { @@ -75,7 +76,8 @@ public: void clear_untracked_mems() { for (const auto& untracked_mem : _untracked_mems) { if (untracked_mem.second != 0) { - DCHECK(_mem_trackers[untracked_mem.first]) << ", label: " << _mem_tracker_labels[untracked_mem.first]; + DCHECK(_mem_trackers[untracked_mem.first]) + << ", label: " << _mem_tracker_labels[untracked_mem.first]; if (_mem_trackers[untracked_mem.first]) { _mem_trackers[untracked_mem.first]->consume(untracked_mem.second); } else { @@ -195,7 +197,7 @@ inline int64_t ThreadMemTrackerMgr::update_tracker(const std::shared_ptr<MemTrac _untracked_mems[_tracker_id] += _untracked_mem; _untracked_mem = 0; std::swap(_tracker_id, _temp_tracker_id); - DCHECK(_mem_trackers[_tracker_id]); + DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id]; return _temp_tracker_id; // old tracker_id } @@ -204,7 +206,8 @@ inline void ThreadMemTrackerMgr::update_tracker_id(int64_t tracker_id) { _untracked_mems[_tracker_id] += _untracked_mem; _untracked_mem = 0; _tracker_id = tracker_id; - DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()); + DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) + << ", label: " << _mem_tracker_labels[_tracker_id]; DCHECK(_mem_trackers[_tracker_id]); } } @@ -217,14 +220,14 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) { if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes || _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) { DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()); + // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, infinite recursion. + // Needs to ensure that all memory allocated in mem_tracker.consume/try_consume is freed in time to avoid tracking misses. start_thread_mem_tracker = false; // When switching to the current tracker last time, the remaining untracked memory. if (_untracked_mems[_tracker_id] != 0) { _untracked_mem += _untracked_mems[_tracker_id]; _untracked_mems[_tracker_id] = 0; } - // Avoid getting stuck in infinite loop if there is memory allocation in noncache_consume. - // For example: GC function when try_consume; mem_limit_exceeded. noncache_consume(); start_thread_mem_tracker = true; } diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 91b107904d..ea4e61e7fe 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -19,6 +19,7 @@ #include "runtime/exec_env.h" #include "runtime/runtime_state.h" +#include "runtime/thread_context.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/runtime/vdata_stream_recvr.h" @@ -48,6 +49,7 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) { Status VExchangeNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); DCHECK_GT(_num_senders, 0); _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr()); _stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( @@ -63,6 +65,7 @@ Status VExchangeNode::prepare(RuntimeState* state) { } Status VExchangeNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); RETURN_IF_ERROR(ExecNode::open(state)); if (_is_merging) { @@ -80,6 +83,8 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { SCOPED_TIMER(runtime_profile()->total_time_counter()); + SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker()); + ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker()); auto status = _stream_recvr->get_next(block, eos); if (block != nullptr) { if (_num_rows_returned + block->rows() < _limit) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 7e74944712..9c18cb6baa 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -73,6 +73,7 @@ public: const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } PlanNodeId dest_node_id() const { return _dest_node_id; } const RowDescriptor& row_desc() const { return _row_desc; } + const std::shared_ptr<MemTracker>& mem_tracker() const { return _mem_tracker; } void add_sub_plan_statistics(const PQueryStatistics& statistics, int sender_id) { _sub_plan_query_statistics_recvr->insert(statistics, sender_id); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 58bb03c69c..0804cde38f 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -43,10 +43,11 @@ Status VOlapTableSink::init(const TDataSink& sink) { } Status VOlapTableSink::prepare(RuntimeState* state) { + RETURN_IF_ERROR(OlapTableSink::prepare(state)); // Prepare the exprs to run. RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _input_row_desc, _expr_mem_tracker)); - return OlapTableSink::prepare(state); + return Status::OK(); } Status VOlapTableSink::open(RuntimeState* state) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org