This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new df54c6b63a [enhancement](memtracker) Add independent and unique scanner mem tracker for each query (#13262) df54c6b63a is described below commit df54c6b63af4570f9f276fd885d94420159e8128 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Tue Oct 11 19:47:12 2022 +0800 [enhancement](memtracker) Add independent and unique scanner mem tracker for each query (#13262) --- be/src/runtime/memory/mem_tracker_task_pool.cpp | 20 ++++++++++++++++---- be/src/runtime/memory/mem_tracker_task_pool.h | 4 ++++ be/src/runtime/runtime_state.cpp | 7 +++++++ be/src/runtime/runtime_state.h | 6 +++++- be/src/runtime/thread_context.cpp | 7 ++++--- be/src/runtime/thread_context.h | 24 ++++++++++++------------ be/src/vec/exec/scan/new_es_scan_node.cpp | 7 ++----- be/src/vec/exec/scan/new_es_scan_node.h | 2 -- be/src/vec/exec/scan/new_es_scanner.cpp | 10 +++------- be/src/vec/exec/scan/new_es_scanner.h | 4 ++-- be/src/vec/exec/scan/new_file_arrow_scanner.cpp | 16 ++++++---------- be/src/vec/exec/scan/new_file_arrow_scanner.h | 12 ++++++------ be/src/vec/exec/scan/new_file_scan_node.cpp | 13 ++++--------- be/src/vec/exec/scan/new_file_scan_node.h | 1 - be/src/vec/exec/scan/new_file_scanner.cpp | 8 +++----- be/src/vec/exec/scan/new_file_scanner.h | 2 +- be/src/vec/exec/scan/new_file_text_scanner.cpp | 5 ++--- be/src/vec/exec/scan/new_file_text_scanner.h | 4 ++-- be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 5 ++--- be/src/vec/exec/scan/new_jdbc_scan_node.h | 2 -- be/src/vec/exec/scan/new_jdbc_scanner.cpp | 7 ++----- be/src/vec/exec/scan/new_jdbc_scanner.h | 4 ++-- be/src/vec/exec/scan/new_odbc_scan_node.cpp | 4 +--- be/src/vec/exec/scan/new_odbc_scan_node.h | 1 - be/src/vec/exec/scan/new_odbc_scanner.cpp | 6 ++---- be/src/vec/exec/scan/new_odbc_scanner.h | 2 +- be/src/vec/exec/scan/new_olap_scan_node.cpp | 7 +++---- be/src/vec/exec/scan/new_olap_scan_node.h | 2 -- be/src/vec/exec/scan/new_olap_scanner.cpp | 7 ++----- be/src/vec/exec/scan/new_olap_scanner.h | 2 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 6 ++++-- be/src/vec/exec/scan/vfile_scanner.cpp | 6 ++---- be/src/vec/exec/scan/vfile_scanner.h | 2 +- be/src/vec/exec/scan/vscanner.cpp | 4 +--- be/src/vec/exec/scan/vscanner.h | 3 +-- be/src/vec/exec/volap_scan_node.cpp | 13 ++++++------- be/src/vec/exec/volap_scan_node.h | 2 -- be/src/vec/exec/volap_scanner.cpp | 9 ++------- be/src/vec/exec/volap_scanner.h | 4 +--- 39 files changed, 113 insertions(+), 137 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index 143e7486fa..0eff6c7460 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -47,16 +47,28 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_task_mem_tracker std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_query_mem_tracker( const std::string& query_id, int64_t mem_limit) { - return register_task_mem_tracker_impl(query_id, mem_limit, - fmt::format("Query#queryId={}", query_id), + return register_task_mem_tracker_impl(query_id, mem_limit, fmt::format("Query#Id={}", query_id), + ExecEnv::GetInstance()->query_pool_mem_tracker()); +} + +std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_query_scanner_mem_tracker( + const std::string& query_id) { + return register_task_mem_tracker_impl("Scanner#" + query_id, -1, + fmt::format("Scanner#Query#Id={}", query_id), ExecEnv::GetInstance()->query_pool_mem_tracker()); } std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_mem_tracker( const std::string& load_id, int64_t mem_limit) { // In load, the query id of the fragment is executed, which is the same as the load id of the load channel. - return register_task_mem_tracker_impl(load_id, mem_limit, - fmt::format("Load#queryId={}", load_id), + return register_task_mem_tracker_impl(load_id, mem_limit, fmt::format("Load#Id={}", load_id), + ExecEnv::GetInstance()->load_pool_mem_tracker()); +} + +std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_scanner_mem_tracker( + const std::string& load_id) { + return register_task_mem_tracker_impl("Scanner#" + load_id, -1, + fmt::format("Scanner#Load#Id={}", load_id), ExecEnv::GetInstance()->load_pool_mem_tracker()); } diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h index f8c5039eab..4d3a514f6d 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.h +++ b/be/src/runtime/memory/mem_tracker_task_pool.h @@ -43,8 +43,12 @@ public: const std::shared_ptr<MemTrackerLimiter>& parent); std::shared_ptr<MemTrackerLimiter> register_query_mem_tracker(const std::string& query_id, int64_t mem_limit); + std::shared_ptr<MemTrackerLimiter> register_query_scanner_mem_tracker( + const std::string& query_id); std::shared_ptr<MemTrackerLimiter> register_load_mem_tracker(const std::string& load_id, int64_t mem_limit); + std::shared_ptr<MemTrackerLimiter> register_load_scanner_mem_tracker( + const std::string& load_id); std::shared_ptr<MemTrackerLimiter> get_task_mem_tracker(const std::string& task_id); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 646ae960ca..764bf1471f 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -233,14 +233,21 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { _query_mem_tracker = _exec_env->task_pool_mem_tracker_registry()->register_query_mem_tracker( print_id(query_id), bytes_limit); + _scanner_mem_tracker = + _exec_env->task_pool_mem_tracker_registry()->register_query_scanner_mem_tracker( + print_id(query_id)); } else if (query_type() == TQueryType::LOAD) { _query_mem_tracker = _exec_env->task_pool_mem_tracker_registry()->register_load_mem_tracker( print_id(query_id), bytes_limit); + _scanner_mem_tracker = + _exec_env->task_pool_mem_tracker_registry()->register_load_scanner_mem_tracker( + print_id(query_id)); } else { DCHECK(false); _query_mem_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker(); } _query_mem_tracker->enable_reset_zero(); + _scanner_mem_tracker->enable_reset_zero(); _instance_mem_tracker = std::make_shared<MemTrackerLimiter>( -1, "RuntimeState:instance:" + print_id(_fragment_instance_id), _query_mem_tracker, diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 1b7042815a..71831667d0 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -124,6 +124,7 @@ public: ExecEnv* exec_env() { return _exec_env; } std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return _query_mem_tracker; } std::shared_ptr<MemTrackerLimiter> instance_mem_tracker() { return _instance_mem_tracker; } + std::shared_ptr<MemTrackerLimiter> scanner_mem_tracker() { return _scanner_mem_tracker; } ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; } void set_fragment_root_id(PlanNodeId id) { @@ -400,9 +401,12 @@ private: // MemTracker that is shared by all fragment instances running on this host. // The query mem tracker must be released after the _instance_mem_tracker. std::shared_ptr<MemTrackerLimiter> _query_mem_tracker; - // Memory usage of this fragment instance std::shared_ptr<MemTrackerLimiter> _instance_mem_tracker; + // Count the memory consumption of Scanner, independent and unique for each query, + // this means that scnner memory does not count into query mem tracker, + // label is `Scanner#{queryId}`. + std::shared_ptr<MemTrackerLimiter> _scanner_mem_tracker; // put runtime state before _obj_pool, so that it will be deconstructed after // _obj_pool. Because some of object in _obj_pool will use profile when deconstructing. diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 4a02c45560..432c5be879 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -42,9 +42,10 @@ AttachTask::AttachTask(RuntimeState* runtime_state) { DCHECK(runtime_state->fragment_instance_id() != TUniqueId()); #endif // BE_TEST DCHECK(runtime_state->instance_mem_tracker()); - thread_context()->attach_task( - query_to_task_type(runtime_state->query_type()), print_id(runtime_state->query_id()), - runtime_state->fragment_instance_id(), runtime_state->instance_mem_tracker()); + thread_context()->attach_task(ThreadContext::query_to_task_type(runtime_state->query_type()), + print_id(runtime_state->query_id()), + runtime_state->fragment_instance_id(), + runtime_state->instance_mem_tracker()); } AttachTask::~AttachTask() { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 26030ee2e6..46b2e86d44 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -159,6 +159,18 @@ public: const std::string& thread_id_str() const { return _thread_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } + static TaskType query_to_task_type(const TQueryType::type& query_type) { + switch (query_type) { + case TQueryType::SELECT: + return TaskType::QUERY; + case TQueryType::LOAD: + return TaskType::LOAD; + default: + DCHECK(false); + return TaskType::UNKNOWN; + } + } + std::string get_thread_id() { std::stringstream ss; ss << std::this_thread::get_id(); @@ -206,18 +218,6 @@ public: explicit AttachTask(RuntimeState* runtime_state); - const ThreadContext::TaskType query_to_task_type(const TQueryType::type& query_type) { - switch (query_type) { - case TQueryType::SELECT: - return ThreadContext::TaskType::QUERY; - case TQueryType::LOAD: - return ThreadContext::TaskType::LOAD; - default: - DCHECK(false); - return ThreadContext::TaskType::UNKNOWN; - } - } - ~AttachTask(); }; diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index 31e439281d..dcad57a418 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -49,7 +49,6 @@ NewEsScanNode::NewEsScanNode(ObjectPool* pool, const TPlanNode& tnode, const Des : VScanNode(pool, tnode, descs), _tuple_id(tnode.es_scan_node.tuple_id), _tuple_desc(nullptr), - _scanner_mem_tracker(nullptr), _es_profile(nullptr) { _output_tuple_id = tnode.es_scan_node.tuple_id; } @@ -78,7 +77,6 @@ Status NewEsScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - _scanner_mem_tracker = std::make_unique<MemTracker>("NewEsScanner"); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); if (_tuple_desc == nullptr) { @@ -203,9 +201,8 @@ Status NewEsScanNode::_init_scanners(std::list<VScanner*>* scanners) { properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build( properties, _column_names, _predicates, _docvalue_context, &doc_value_mode); - NewEsScanner* scanner = - new NewEsScanner(_state, this, _limit_per_scanner, _mem_tracker.get(), _tuple_id, - properties, _docvalue_context, doc_value_mode); + NewEsScanner* scanner = new NewEsScanner(_state, this, _limit_per_scanner, _tuple_id, + properties, _docvalue_context, doc_value_mode); _scanner_pool.add(scanner); RETURN_IF_ERROR(scanner->prepare(_state)); diff --git a/be/src/vec/exec/scan/new_es_scan_node.h b/be/src/vec/exec/scan/new_es_scan_node.h index 88586f7383..55aab31dcc 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.h +++ b/be/src/vec/exec/scan/new_es_scan_node.h @@ -59,8 +59,6 @@ private: std::vector<int> _predicate_to_conjunct; std::vector<int> _conjunct_to_predicate; - std::unique_ptr<MemTracker> _scanner_mem_tracker; - // Profile std::unique_ptr<RuntimeProfile> _es_profile; RuntimeProfile::Counter* _rows_read_counter; diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp index 03f4526a23..cfde06ab3c 100644 --- a/be/src/vec/exec/scan/new_es_scanner.cpp +++ b/be/src/vec/exec/scan/new_es_scanner.cpp @@ -24,11 +24,10 @@ static const std::string NEW_SCANNER_TYPE = "NewEsScanner"; namespace doris::vectorized { NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, - MemTracker* mem_tracker, TupleId tuple_id, - const std::map<std::string, std::string>& properties, + TupleId tuple_id, const std::map<std::string, std::string>& properties, const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode) - : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker), + : VScanner(state, static_cast<VScanNode*>(parent), limit), _is_init(false), _es_eof(false), _properties(properties), @@ -53,8 +52,6 @@ Status NewEsScanner::prepare(RuntimeState* state) { return Status::InternalError("input pointer is null."); } - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); if (nullptr == _tuple_desc) { return Status::InternalError("Failed to get tuple descriptor, tuple_id={}", _tuple_id); @@ -83,10 +80,9 @@ Status NewEsScanner::open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(VScanner::open(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); RETURN_IF_ERROR(_es_reader->open()); - _mem_pool.reset(new MemPool(_mem_tracker)); + _mem_pool.reset(new MemPool()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h index 2e776f08c9..be4d50448b 100644 --- a/be/src/vec/exec/scan/new_es_scanner.h +++ b/be/src/vec/exec/scan/new_es_scanner.h @@ -28,8 +28,8 @@ class NewEsScanNode; class NewEsScanner : public VScanner { public: - NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, MemTracker* mem_tracker, - TupleId tuple_id, const std::map<std::string, std::string>& properties, + NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, TupleId tuple_id, + const std::map<std::string, std::string>& properties, const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode); Status open(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp index 9355e36a4c..77506dafb9 100644 --- a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp +++ b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp @@ -28,9 +28,9 @@ namespace doris::vectorized { NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, - MemTracker* tracker, RuntimeProfile* profile, + RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs) - : NewFileScanner(state, parent, limit, scan_range, tracker, profile, pre_filter_texprs), + : NewFileScanner(state, parent, limit, scan_range, profile, pre_filter_texprs), _cur_file_reader(nullptr), _cur_file_eof(false), _batch(nullptr), @@ -39,7 +39,6 @@ NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* p Status NewFileArrowScanner::open(RuntimeState* state) { RETURN_IF_ERROR(NewFileScanner::open(state)); // SCOPED_TIMER(_parent->_reader_init_timer); - // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); // _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false); return Status::OK(); @@ -239,10 +238,9 @@ Status NewFileArrowScanner::_open_next_reader() { NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, - MemTracker* tracker, RuntimeProfile* profile, + RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs) - : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile, - pre_filter_texprs) { + : NewFileArrowScanner(state, parent, limit, scan_range, profile, pre_filter_texprs) { // _init_profiles(profile); } @@ -254,11 +252,9 @@ ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader( } NewFileORCScanner::NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile, + const TFileScanRange& scan_range, RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs) - : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile, - pre_filter_texprs) {} + : NewFileArrowScanner(state, parent, limit, scan_range, profile, pre_filter_texprs) {} ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader( const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader, diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.h b/be/src/vec/exec/scan/new_file_arrow_scanner.h index 89e76f6623..281373a70d 100644 --- a/be/src/vec/exec/scan/new_file_arrow_scanner.h +++ b/be/src/vec/exec/scan/new_file_arrow_scanner.h @@ -28,8 +28,8 @@ namespace doris::vectorized { class NewFileArrowScanner : public NewFileScanner { public: NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs); + const TFileScanRange& scan_range, RuntimeProfile* profile, + const std::vector<TExpr>& pre_filter_texprs); Status open(RuntimeState* state) override; protected: @@ -59,8 +59,8 @@ private: class NewFileParquetScanner final : public NewFileArrowScanner { public: NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs); + const TFileScanRange& scan_range, RuntimeProfile* profile, + const std::vector<TExpr>& pre_filter_texprs); ~NewFileParquetScanner() override = default; @@ -75,8 +75,8 @@ protected: class NewFileORCScanner final : public NewFileArrowScanner { public: NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs); + const TFileScanRange& scan_range, RuntimeProfile* profile, + const std::vector<TExpr>& pre_filter_texprs); ~NewFileORCScanner() override = default; diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 19efa0a555..039082ab1c 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -39,7 +39,6 @@ Status NewFileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status NewFileScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); - _scanner_mem_tracker = std::make_unique<MemTracker>("NewFileScanners"); return Status::OK(); } @@ -105,26 +104,22 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) { VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) { VScanner* scanner = nullptr; if (config::enable_new_file_scanner) { - scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile()); + scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range, runtime_profile()); ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range); } else { switch (scan_range.params.format_type) { case TFileFormatType::FORMAT_PARQUET: scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile(), - std::vector<TExpr>()); + runtime_profile(), std::vector<TExpr>()); break; case TFileFormatType::FORMAT_ORC: scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile(), - std::vector<TExpr>()); + runtime_profile(), std::vector<TExpr>()); break; default: scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range, - _scanner_mem_tracker.get(), runtime_profile(), - std::vector<TExpr>()); + runtime_profile(), std::vector<TExpr>()); break; } ((NewFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get()); diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h index 5e08d05ae1..53b11e408d 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.h +++ b/be/src/vec/exec/scan/new_file_scan_node.h @@ -41,6 +41,5 @@ private: private: std::vector<TScanRangeParams> _scan_ranges; - std::unique_ptr<MemTracker> _scanner_mem_tracker; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scanner.cpp b/be/src/vec/exec/scan/new_file_scanner.cpp index 6acfe2810f..6e511fe10f 100644 --- a/be/src/vec/exec/scan/new_file_scanner.cpp +++ b/be/src/vec/exec/scan/new_file_scanner.cpp @@ -35,9 +35,9 @@ namespace doris::vectorized { NewFileScanner::NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs) - : VScanner(state, static_cast<VScanNode*>(parent), limit, tracker), + const TFileScanRange& scan_range, RuntimeProfile* profile, + const std::vector<TExpr>& pre_filter_texprs) + : VScanner(state, static_cast<VScanNode*>(parent), limit), _params(scan_range.params), _ranges(scan_range.ranges), _next_range(0), @@ -53,8 +53,6 @@ Status NewFileScanner::open(RuntimeState* state) { } Status NewFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - if (vconjunct_ctx_ptr != nullptr) { // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); diff --git a/be/src/vec/exec/scan/new_file_scanner.h b/be/src/vec/exec/scan/new_file_scanner.h index e8534416d9..50423bd3e6 100644 --- a/be/src/vec/exec/scan/new_file_scanner.h +++ b/be/src/vec/exec/scan/new_file_scanner.h @@ -30,7 +30,7 @@ class NewFileScanNode; class NewFileScanner : public VScanner { public: NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile, + const TFileScanRange& scan_range, RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs); Status open(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/new_file_text_scanner.cpp b/be/src/vec/exec/scan/new_file_text_scanner.cpp index a1e2cc9b56..1222d1d6d9 100644 --- a/be/src/vec/exec/scan/new_file_text_scanner.cpp +++ b/be/src/vec/exec/scan/new_file_text_scanner.cpp @@ -25,10 +25,9 @@ namespace doris::vectorized { NewFileTextScanner::NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile, + const TFileScanRange& scan_range, RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs) - : NewFileScanner(state, parent, limit, scan_range, tracker, profile, pre_filter_texprs), + : NewFileScanner(state, parent, limit, scan_range, profile, pre_filter_texprs), _cur_file_reader(nullptr), _cur_line_reader(nullptr), _cur_line_reader_eof(false), diff --git a/be/src/vec/exec/scan/new_file_text_scanner.h b/be/src/vec/exec/scan/new_file_text_scanner.h index ab12389450..ccb92a44c3 100644 --- a/be/src/vec/exec/scan/new_file_text_scanner.h +++ b/be/src/vec/exec/scan/new_file_text_scanner.h @@ -29,8 +29,8 @@ namespace doris::vectorized { class NewFileTextScanner : public NewFileScanner { public: NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs); + const TFileScanRange& scan_range, RuntimeProfile* profile, + const std::vector<TExpr>& pre_filter_texprs); Status open(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp index da76859915..5f209a09c2 100644 --- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp @@ -38,7 +38,6 @@ Status NewJdbcScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << "VNewJdbcScanNode::Prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - _scanner_mem_tracker = std::make_unique<MemTracker>("NewJdbcScanners"); return Status::OK(); } @@ -51,8 +50,8 @@ Status NewJdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) { if (_eos == true) { return Status::OK(); } - NewJdbcScanner* scanner = new NewJdbcScanner( - _state, this, _limit_per_scanner, _scanner_mem_tracker.get(), _tuple_id, _query_string); + NewJdbcScanner* scanner = + new NewJdbcScanner(_state, this, _limit_per_scanner, _tuple_id, _query_string); _scanner_pool.add(scanner); RETURN_IF_ERROR(scanner->prepare(_state)); scanners->push_back(static_cast<VScanner*>(scanner)); diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.h b/be/src/vec/exec/scan/new_jdbc_scan_node.h index 287522fc0d..7463e55daf 100644 --- a/be/src/vec/exec/scan/new_jdbc_scan_node.h +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.h @@ -38,8 +38,6 @@ private: std::string _table_name; TupleId _tuple_id; std::string _query_string; - - std::unique_ptr<MemTracker> _scanner_mem_tracker; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index c571714e94..61c9ff53c5 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -21,8 +21,8 @@ namespace doris::vectorized { NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, - MemTracker* mem_tracker, TupleId tuple_id, std::string query_string) - : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker), + TupleId tuple_id, std::string query_string) + : VScanner(state, static_cast<VScanNode*>(parent), limit), _is_init(false), _jdbc_eos(false), _tuple_id(tuple_id), @@ -39,8 +39,6 @@ Status NewJdbcScanner::prepare(RuntimeState* state) { return Status::InternalError("input pointer is NULL of VJdbcScanNode::prepare."); } - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - // get tuple desc _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); if (_tuple_desc == nullptr) { @@ -83,7 +81,6 @@ Status NewJdbcScanner::open(RuntimeState* state) { } RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(VScanner::open(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); RETURN_IF_ERROR(_jdbc_connector->open()); RETURN_IF_ERROR(_jdbc_connector->query()); return Status::OK(); diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h index 75dabcacfa..984e239c2d 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.h +++ b/be/src/vec/exec/scan/new_jdbc_scanner.h @@ -26,8 +26,8 @@ namespace doris { namespace vectorized { class NewJdbcScanner : public VScanner { public: - NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, - MemTracker* mem_tracker, TupleId tuple_id, std::string query_string); + NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, TupleId tuple_id, + std::string query_string); Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp index 48043dd22f..571566b88a 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp @@ -39,7 +39,6 @@ Status NewOdbcScanNode::prepare(RuntimeState* state) { VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare"; RETURN_IF_ERROR(VScanNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - _scanner_mem_tracker = std::make_unique<MemTracker>("NewOdbcScanner"); return Status::OK(); } @@ -52,8 +51,7 @@ Status NewOdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) { if (_eos == true) { return Status::OK(); } - NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner, - _scanner_mem_tracker.get(), _odbc_scan_node); + NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner, _odbc_scan_node); _scanner_pool.add(scanner); RETURN_IF_ERROR(scanner->prepare(_state)); scanners->push_back(static_cast<VScanner*>(scanner)); diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.h b/be/src/vec/exec/scan/new_odbc_scan_node.h index 40d2bdd4bd..03d18ce58f 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.h +++ b/be/src/vec/exec/scan/new_odbc_scan_node.h @@ -37,6 +37,5 @@ protected: private: std::string _table_name; TOdbcScanNode _odbc_scan_node; - std::unique_ptr<MemTracker> _scanner_mem_tracker; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp index ee383ac636..a69ecb1f32 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp @@ -26,8 +26,8 @@ static const std::string NEW_SCANNER_TYPE = "NewOdbcScanner"; namespace doris::vectorized { NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit, - MemTracker* mem_tracker, const TOdbcScanNode& odbc_scan_node) - : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker), + const TOdbcScanNode& odbc_scan_node) + : VScanner(state, static_cast<VScanNode*>(parent), limit), _is_init(false), _odbc_eof(false), _table_name(odbc_scan_node.table_name), @@ -47,7 +47,6 @@ Status NewOdbcScanner::prepare(RuntimeState* state) { return Status::InternalError("input pointer is null."); } - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); // get tuple desc _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -89,7 +88,6 @@ Status NewOdbcScanner::open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(VScanner::open(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); RETURN_IF_ERROR(_odbc_connector->open()); RETURN_IF_ERROR(_odbc_connector->query()); // check materialize slot num diff --git a/be/src/vec/exec/scan/new_odbc_scanner.h b/be/src/vec/exec/scan/new_odbc_scanner.h index 34cedb8095..0b8d28ed99 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.h +++ b/be/src/vec/exec/scan/new_odbc_scanner.h @@ -26,7 +26,7 @@ namespace doris::vectorized { class NewOdbcScanner : public VScanner { public: NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit, - MemTracker* mem_tracker, const TOdbcScanNode& odbc_scan_node); + const TOdbcScanNode& odbc_scan_node); Status open(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 8242abef77..3fdf0770af 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -38,7 +38,6 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, Status NewOlapScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - _scanner_mem_tracker = std::make_unique<MemTracker>("OlapScanners"); return Status::OK(); } @@ -307,9 +306,9 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) { scanner_ranges.push_back((*ranges)[i].get()); } - NewOlapScanner* scanner = new NewOlapScanner( - _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, - _need_agg_finalize, *scan_range, _scanner_mem_tracker.get()); + NewOlapScanner* scanner = new NewOlapScanner(_state, this, _limit_per_scanner, + _olap_scan_node.is_preaggregation, + _need_agg_finalize, *scan_range); // add scanner to pool before doing prepare. // so that scanner can be automatically deconstructed if prepare failed. _scanner_pool.add(scanner); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index faea367089..2315d67047 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -57,8 +57,6 @@ private: OlapScanKeys _scan_keys; std::vector<TCondition> _olap_filters; - std::unique_ptr<MemTracker> _scanner_mem_tracker; - private: std::unique_ptr<RuntimeProfile> _segment_profile; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index ea07536a76..ec33346b04 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -25,8 +25,8 @@ namespace doris::vectorized { NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, bool need_agg_finalize, - const TPaloScanRange& scan_range, MemTracker* tracker) - : VScanner(state, static_cast<VScanNode*>(parent), limit, tracker), + const TPaloScanRange& scan_range) + : VScanner(state, static_cast<VScanNode*>(parent), limit), _aggregation(aggregation), _need_agg_finalize(need_agg_finalize), _version(-1) { @@ -38,8 +38,6 @@ Status NewOlapScanner::prepare( VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters, const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - if (vconjunct_ctx_ptr != nullptr) { // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); @@ -114,7 +112,6 @@ Status NewOlapScanner::prepare( Status NewOlapScanner::open(RuntimeState* state) { RETURN_IF_ERROR(VScanner::open(state)); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); auto res = _tablet_reader->init(_tablet_reader_params); if (!res.ok()) { diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 41b2888879..6b07438bf1 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -34,7 +34,7 @@ class NewOlapScanNode; class NewOlapScanner : public VScanner { public: NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, - bool need_agg_finalize, const TPaloScanRange& scan_range, MemTracker* tracker); + bool need_agg_finalize, const TPaloScanRange& scan_range); Status open(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 9787ecae5d..b561fca483 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -185,8 +185,10 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext VScanner* scanner) { INIT_AND_SCOPE_REENTRANT_SPAN_IF(ctx->state()->enable_profile(), ctx->state()->get_tracer(), ctx->scan_span(), "VScanner::scan"); - SCOPED_ATTACH_TASK(scanner->runtime_state()); - + SCOPED_ATTACH_TASK(scanner->runtime_state()->scanner_mem_tracker(), + ThreadContext::query_to_task_type(scanner->runtime_state()->query_type()), + print_id(scanner->runtime_state()->query_id()), + scanner->runtime_state()->fragment_instance_id()); Thread::set_self_name("_scanner_scan"); scanner->update_wait_worker_timer(); // Do not use ScopedTimer. There is no guarantee that, the counter diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 6c798c77c4..ef10257735 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -37,9 +37,8 @@ namespace doris::vectorized { VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, - RuntimeProfile* profile) - : VScanner(state, static_cast<VScanNode*>(parent), limit, tracker), + const TFileScanRange& scan_range, RuntimeProfile* profile) + : VScanner(state, static_cast<VScanNode*>(parent), limit), _params(scan_range.params), _ranges(scan_range.ranges), _next_range(0), @@ -52,7 +51,6 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t Status VFileScanner::prepare( VExprContext** vconjunct_ctx_ptr, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); _colname_to_value_range = colname_to_value_range; _get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime"); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index cbb4a95707..2bd019b955 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -43,7 +43,7 @@ struct ScannerCounter { class VFileScanner : public VScanner { public: VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile); + const TFileScanRange& scan_range, RuntimeProfile* profile); Status open(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index f92e291e33..fdda015c20 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -21,11 +21,10 @@ namespace doris::vectorized { -VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, MemTracker* mem_tracker) +VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit) : _state(state), _parent(parent), _limit(limit), - _mem_tracker(mem_tracker), _input_tuple_desc(parent->input_tuple_desc()), _output_tuple_desc(parent->output_tuple_desc()) { _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc; @@ -36,7 +35,6 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, MemTra Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { // only empty block should be here DCHECK(block->rows() == 0); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; if (!block->mem_reuse()) { diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index d5d8025256..49b9c780bd 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -30,7 +30,7 @@ class VScanNode; class VScanner { public: - VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, MemTracker* mem_tracker); + VScanner(RuntimeState* state, VScanNode* parent, int64_t limit); virtual ~VScanner() {} @@ -117,7 +117,6 @@ protected: VScanNode* _parent; // Set if scan node has sort limit info int64_t _limit = -1; - MemTracker* _mem_tracker; const TupleDescriptor* _input_tuple_desc = nullptr; const TupleDescriptor* _output_tuple_desc = nullptr; diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 26925a78fe..0b7ba331af 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -209,8 +209,6 @@ Status VOlapScanNode::prepare(RuntimeState* state) { _init_counter(state); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); - _scanner_mem_tracker = std::make_unique<MemTracker>("OlapScanners"); - if (_tuple_desc == nullptr) { // TODO: make sure we print all available diagnostic output to our error log return Status::InternalError("Failed to get tuple descriptor."); @@ -389,8 +387,10 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { } void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { - // SCOPED_ATTACH_TASK(_runtime_state); // TODO Recorded on an independent tracker - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + SCOPED_ATTACH_TASK(_runtime_state->scanner_mem_tracker(), + ThreadContext::query_to_task_type(_runtime_state->query_type()), + print_id(_runtime_state->query_id()), + _runtime_state->fragment_instance_id()); Thread::set_self_name("volap_scanner"); int64_t wait_time = scanner->update_wait_worker_timer(); // Do not use ScopedTimer. There is no guarantee that, the counter @@ -892,9 +892,8 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) { ++j, ++i) { scanner_ranges.push_back((*ranges)[i].get()); } - VOlapScanner* scanner = - new VOlapScanner(state, this, _olap_scan_node.is_preaggregation, - _need_agg_finalize, *scan_range, _scanner_mem_tracker.get()); + VOlapScanner* scanner = new VOlapScanner(state, this, _olap_scan_node.is_preaggregation, + _need_agg_finalize, *scan_range); // add scanner to pool before doing prepare. // so that scanner can be automatically deconstructed if prepare failed. _scanner_pool.add(scanner); diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h index 44b3829402..51f4ed97c0 100644 --- a/be/src/vec/exec/volap_scan_node.h +++ b/be/src/vec/exec/volap_scan_node.h @@ -221,8 +221,6 @@ private: TResourceInfo* _resource_info; int64_t _buffered_bytes; - // Count the memory consumption of Rowset Reader and Tablet Reader in OlapScanner. - std::unique_ptr<MemTracker> _scanner_mem_tracker; EvalConjunctsFn _eval_conjuncts_fn; // the max num of scan keys of this scan request. diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index 5761705a84..202f54bcb2 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -28,8 +28,7 @@ namespace doris::vectorized { VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, bool aggregation, - bool need_agg_finalize, const TPaloScanRange& scan_range, - MemTracker* tracker) + bool need_agg_finalize, const TPaloScanRange& scan_range) : _runtime_state(runtime_state), _parent(parent), _tuple_desc(parent->_tuple_desc), @@ -37,8 +36,7 @@ VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, b _is_open(false), _aggregation(aggregation), _need_agg_finalize(need_agg_finalize), - _version(-1), - _mem_tracker(tracker) { + _version(-1) { _tablet_schema = std::make_shared<TabletSchema>(); } @@ -47,7 +45,6 @@ Status VOlapScanner::prepare( const std::vector<TCondition>& filters, const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters, const std::vector<FunctionFilter>& function_filters) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); set_tablet_reader(); // set limit to reduce end of rowset and segment mem use _tablet_reader->set_batch_size( @@ -119,7 +116,6 @@ Status VOlapScanner::prepare( Status VOlapScanner::open() { SCOPED_TIMER(_parent->_reader_init_timer); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false); @@ -325,7 +321,6 @@ Status VOlapScanner::_init_return_columns(bool need_seq_col) { Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bool* eof) { // only empty block should be here DCHECK(block->rows() == 0); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; if (!block->mem_reuse()) { diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h index d3cd791b63..3bc286eb40 100644 --- a/be/src/vec/exec/volap_scanner.h +++ b/be/src/vec/exec/volap_scanner.h @@ -36,7 +36,7 @@ class VOlapScanNode; class VOlapScanner { public: VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, bool aggregation, - bool need_agg_finalize, const TPaloScanRange& scan_range, MemTracker* tracker); + bool need_agg_finalize, const TPaloScanRange& scan_range); virtual ~VOlapScanner() = default; Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, @@ -141,8 +141,6 @@ private: MonotonicStopWatch _watcher; - MemTracker* _mem_tracker; - VExprContext* _vconjunct_ctx = nullptr; bool _need_to_close = false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org