This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b6bdb3bdbc [fix] (mem tracker) Fix MemTracker accuracy (#11190) b6bdb3bdbc is described below commit b6bdb3bdbcc81d6751b21044a09485a3be7bf07e Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Jul 27 18:59:24 2022 +0800 [fix] (mem tracker) Fix MemTracker accuracy (#11190) --- be/src/common/config.h | 2 +- be/src/exec/broker_scan_node.cpp | 2 + be/src/exec/es_http_scan_node.cpp | 1 + be/src/exec/tablet_sink.cpp | 1 - be/src/http/ev_http_server.cpp | 2 + be/src/olap/compaction.cpp | 8 +++- be/src/olap/delta_writer.cpp | 39 ++++++++++++----- be/src/olap/delta_writer.h | 11 +++-- be/src/olap/memtable.cpp | 5 +-- be/src/olap/memtable.h | 4 +- be/src/olap/memtable_flush_executor.cpp | 52 ++++++++++++---------- be/src/olap/memtable_flush_executor.h | 7 ++- be/src/olap/olap_server.cpp | 10 ++++- be/src/olap/storage_engine.cpp | 7 ++- be/src/olap/storage_engine.h | 2 + be/src/runtime/disk_io_mgr.cc | 9 ++-- be/src/runtime/disk_io_mgr.h | 2 +- be/src/runtime/exec_env_init.cpp | 1 + be/src/runtime/fragment_mgr.cpp | 5 +-- be/src/runtime/load_channel.cpp | 4 +- be/src/runtime/load_channel.h | 1 - be/src/runtime/memory/mem_tracker.cpp | 8 ++-- be/src/runtime/memory/mem_tracker.h | 6 +-- be/src/runtime/memory/mem_tracker_limiter.h | 8 +++- be/src/runtime/memory/mem_tracker_task_pool.cpp | 56 +++++++++--------------- be/src/runtime/memory/mem_tracker_task_pool.h | 7 +-- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 8 ++++ be/src/runtime/memory/thread_mem_tracker_mgr.h | 18 ++++++++ be/src/runtime/tablets_channel.cpp | 15 +++---- be/src/runtime/tablets_channel.h | 7 ++- be/src/runtime/thread_context.cpp | 19 -------- be/src/runtime/thread_context.h | 1 - be/src/service/doris_main.cpp | 4 +- be/src/vec/exec/vbroker_scan_node.cpp | 3 ++ be/src/vec/exec/ves_http_scan_node.cpp | 1 + be/src/vec/sink/vtablet_sink.cpp | 1 + be/test/olap/delta_writer_test.cpp | 6 +-- 37 files changed, 192 insertions(+), 151 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 9891df2f18..d1f43eaeba 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -653,7 +653,7 @@ CONF_Bool(memory_verbose_track, "false"); // smaller than this value will continue to accumulate. specified as number of bytes. // Decreasing this value will increase the frequency of consume/release. // Increasing this value will cause MemTracker statistics to be inaccurate. -CONF_mInt32(mem_tracker_consume_min_size_bytes, "4194304"); +CONF_mInt32(mem_tracker_consume_min_size_bytes, "1048576"); // The version information of the tablet will be stored in the memory // in an adjacency graph data structure. diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index 9bc920a9b6..0b5fe1d940 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -373,6 +373,8 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, } void BrokerScanNode::scanner_worker(int start_idx, int length) { + SCOPED_ATTACH_TASK(_runtime_state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // Clone expr context std::vector<ExprContext*> scanner_expr_ctxs; auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, &scanner_expr_ctxs); diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index acfc14bb7d..ec23253a5b 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -419,6 +419,7 @@ static std::string get_host_port(const std::vector<TNetworkAddress>& es_hosts) { void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) { SCOPED_ATTACH_TASK(_runtime_state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // Clone expr context std::vector<ExprContext*> scanner_expr_ctxs; DCHECK(start_idx < length); diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 6254d906bd..ef3d97c2bf 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -600,7 +600,6 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, int64_t tablet_id) { - SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get()); const auto& it = _tablets_by_channel.find(node_id); if (it == _tablets_by_channel.end()) { return; diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index 2d9031383a..bf612ca830 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -33,6 +33,7 @@ #include "http/http_handler.h" #include "http/http_headers.h" #include "http/http_request.h" +#include "runtime/thread_context.h" #include "service/brpc.h" #include "util/debug_util.h" #include "util/threadpool.h" @@ -98,6 +99,7 @@ void EvHttpServer::start() { _event_bases.resize(_num_workers); for (int i = 0; i < _num_workers; ++i) { CHECK(_workers->submit_func([this, i]() { + thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); std::shared_ptr<event_base> base(event_base_new(), [](event_base* base) { event_base_free(base); }); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index aa6ff0f6d3..a456370f85 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -41,7 +41,13 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label) #endif } -Compaction::~Compaction() {} +Compaction::~Compaction() { +#ifndef BE_TEST + // Compaction tracker cannot be completely accurate, offset the global impact. + StorageEngine::instance()->compaction_mem_tracker()->consumption_revise( + -_mem_tracker->consumption()); +#endif +} Status Compaction::compact() { RETURN_NOT_OK(prepare_compact()); diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 73e9d7b992..04b7327f25 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -30,12 +30,14 @@ namespace doris { -Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, bool is_vec) { - *writer = new DeltaWriter(req, StorageEngine::instance(), is_vec); +Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, MemTrackerLimiter* parent_tracker, + bool is_vec) { + *writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker, is_vec); return Status::OK(); } -DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool is_vec) +DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, + MemTrackerLimiter* parent_tracker, bool is_vec) : _req(*req), _tablet(nullptr), _cur_rowset(nullptr), @@ -43,6 +45,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool _tablet_schema(new TabletSchema), _delta_written_success(false), _storage_engine(storage_engine), + _parent_tracker(parent_tracker), _is_vec(is_vec) {} DeltaWriter::~DeltaWriter() { @@ -95,9 +98,9 @@ Status DeltaWriter::init() { << ", schema_hash=" << _req.schema_hash; return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND); } - - _flushed_mem_tracker = std::make_unique<MemTracker>( - fmt::format("DeltaWriter:tabletId={}", std::to_string(_tablet->tablet_id()))); + _mem_tracker = std::make_unique<MemTrackerLimiter>( + -1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()), _parent_tracker); + SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); // check tablet version number if (_tablet->version_count() > config::max_tablet_version_num) { //trigger quick compaction @@ -147,7 +150,10 @@ Status DeltaWriter::write(Tuple* tuple) { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } + int64_t prev_memtable_usage = _mem_table->memory_usage(); _mem_table->insert(tuple); + THREAD_MEM_TRACKER_TRANSFER_TO(_mem_table->memory_usage() - prev_memtable_usage, + _mem_tracker.get()); // if memtable is full, push it to the flush executor, // and create a new memtable for incoming data @@ -171,9 +177,15 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row if (_is_cancelled) { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } + + // Hook automatically records that the memory is lower than the real value, so manual tracking is used. + // Because multiple places freed memory that doesn't belong to DeltaWriter + int64_t prev_memtable_usage = _mem_table->memory_usage(); for (const auto& row_idx : row_idxs) { _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0)); } + THREAD_MEM_TRACKER_TRANSFER_TO(_mem_table->memory_usage() - prev_memtable_usage, + _mem_tracker.get()); if (_mem_table->memory_usage() >= config::write_buffer_size) { RETURN_NOT_OK(_flush_memtable_async()); @@ -196,6 +208,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } + SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); _mem_table->insert(block, row_idxs); if (_mem_table->need_to_agg()) { @@ -213,8 +226,7 @@ Status DeltaWriter::_flush_memtable_async() { if (++_segment_counter > config::max_segment_num_per_rowset) { return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS); } - _flushed_mem_tracker->consume(_mem_table->memory_usage()); - return _flush_token->submit(_mem_table); + return _flush_token->submit(std::move(_mem_table), _mem_tracker.get()); } Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { @@ -231,6 +243,7 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } + SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); if (mem_consumption() == _mem_table->memory_usage()) { // equal means there is no memtable in flush queue, just flush this memtable VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " @@ -267,7 +280,7 @@ Status DeltaWriter::wait_flush() { void DeltaWriter::_reset_mem_table() { _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema.get(), _req.slots, _req.tuple_desc, _tablet->keys_type(), - _rowset_writer.get(), _flushed_mem_tracker.get(), _is_vec)); + _rowset_writer.get(), _is_vec)); } Status DeltaWriter::close() { @@ -285,6 +298,7 @@ Status DeltaWriter::close() { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } + SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); RETURN_NOT_OK(_flush_memtable_async()); _mem_table.reset(); return Status::OK(); @@ -299,6 +313,7 @@ Status DeltaWriter::close_wait() { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } + SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); // return error if previous flush failed RETURN_NOT_OK(_flush_token->wait()); @@ -350,12 +365,12 @@ int64_t DeltaWriter::get_mem_consumption_snapshot() const { } int64_t DeltaWriter::mem_consumption() const { - if (_flushed_mem_tracker == nullptr) { + if (_mem_tracker == nullptr) { // This method may be called before this writer is initialized. - // So _flushed_mem_tracker may be null. + // So _mem_tracker may be null. return 0; } - return _flushed_mem_tracker->consumption() + _mem_table->memory_usage(); + return _mem_tracker->consumption(); } int64_t DeltaWriter::partition_id() const { diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 0c11b8fcbe..1ce62de338 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -55,7 +55,8 @@ struct WriteRequest { // This class is NOT thread-safe, external synchronization is required. class DeltaWriter { public: - static Status open(WriteRequest* req, DeltaWriter** writer, bool is_vec = false); + static Status open(WriteRequest* req, DeltaWriter** writer, + MemTrackerLimiter* parent_tracker = nullptr, bool is_vec = false); ~DeltaWriter(); @@ -100,7 +101,8 @@ public: int64_t get_mem_consumption_snapshot() const; private: - DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool is_vec); + DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, MemTrackerLimiter* parent_tracker, + bool is_vec); // push a full memtable to flush executor Status _flush_memtable_async(); @@ -120,7 +122,7 @@ private: RowsetSharedPtr _cur_rowset; std::unique_ptr<RowsetWriter> _rowset_writer; // TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr - std::shared_ptr<MemTable> _mem_table; + std::unique_ptr<MemTable> _mem_table; std::unique_ptr<Schema> _schema; //const TabletSchema* _tablet_schema; // tablet schema owned by delta writer, all write will use this tablet schema @@ -131,7 +133,8 @@ private: StorageEngine* _storage_engine; std::unique_ptr<FlushToken> _flush_token; - std::unique_ptr<MemTracker> _flushed_mem_tracker; + std::unique_ptr<MemTrackerLimiter> _mem_tracker; + MemTrackerLimiter* _parent_tracker; // The counter of number of segment flushed already. int64_t _segment_counter = 0; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index a44c54c552..46607f6a6c 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -31,8 +31,7 @@ namespace doris { MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, - KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* writer_mem_tracker, - bool support_vec) + KeysType keys_type, RowsetWriter* rowset_writer, bool support_vec) : _tablet_id(tablet_id), _schema(schema), _tablet_schema(tablet_schema), @@ -40,7 +39,6 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet _keys_type(keys_type), _mem_tracker(std::make_unique<MemTracker>( fmt::format("MemTable:tabletId={}", std::to_string(tablet_id)))), - _writer_mem_tracker(writer_mem_tracker), _buffer_mem_pool(new MemPool(_mem_tracker.get())), _table_mem_pool(new MemPool(_mem_tracker.get())), _schema_size(_schema->schema_size()), @@ -134,7 +132,6 @@ MemTable::~MemTable() { } } std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>()); - _writer_mem_tracker->release(_mem_tracker->consumption()); _mem_tracker->release(_mem_usage); _buffer_mem_pool->free_all(); _table_mem_pool->free_all(); diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index d92e7773a9..9be1f7d33d 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -42,8 +42,7 @@ class MemTable { public: MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, - KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* writer_mem_tracker, - bool support_vec = false); + KeysType keys_type, RowsetWriter* rowset_writer, bool support_vec = false); ~MemTable(); int64_t tablet_id() const { return _tablet_id; } @@ -153,7 +152,6 @@ private: std::shared_ptr<RowInBlockComparator> _vec_row_comparator; std::unique_ptr<MemTracker> _mem_tracker; - MemTracker* _writer_mem_tracker; // This is a buffer, to hold the memory referenced by the rows that have not // been inserted into the SkipList std::unique_ptr<MemPool> _buffer_mem_pool; diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 3e722019e4..b9e622a9a7 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -26,6 +26,30 @@ namespace doris { +class MemtableFlushTask final : public Runnable { +public: + MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable, + int64_t submit_task_time, MemTrackerLimiter* tracker) + : _flush_token(flush_token), + _memtable(std::move(memtable)), + _submit_task_time(submit_task_time), + _tracker(tracker) {} + + ~MemtableFlushTask() override = default; + + void run() override { + SCOPED_ATTACH_TASK(_tracker, ThreadContext::TaskType::LOAD); + _flush_token->_flush_memtable(_memtable.get(), _submit_task_time); + _memtable.reset(); + } + +private: + FlushToken* _flush_token; + std::unique_ptr<MemTable> _memtable; + int64_t _submit_task_time; + MemTrackerLimiter* _tracker; +}; + std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS @@ -34,20 +58,15 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { return os; } -// The type of parameter is safe to be a reference. Because the function object -// returned by std::bind() will increase the reference count of Memtable. i.e., -// after the submit() method returns, even if the caller immediately releases the -// passed shared_ptr object, the Memtable object will not be destructed because -// its reference count is not 0. -Status FlushToken::submit(const std::shared_ptr<MemTable>& memtable) { +Status FlushToken::submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* tracker) { ErrorCode s = _flush_status.load(); if (s != OLAP_SUCCESS) { return Status::OLAPInternalError(s); } int64_t submit_task_time = MonotonicNanos(); - _flush_token->submit_func( - std::bind(&FlushToken::_flush_memtable, this, memtable, submit_task_time)); - return Status::OK(); + auto task = std::make_shared<MemtableFlushTask>(this, std::move(mem_table), submit_task_time, + tracker); + return _flush_token->submit(std::move(task)); } void FlushToken::cancel() { @@ -60,21 +79,8 @@ Status FlushToken::wait() { return s == OLAP_SUCCESS ? Status::OK() : Status::OLAPInternalError(s); } -void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable, int64_t submit_task_time) { -#ifndef BE_TEST - // The memtable mem tracker needs to be completely accurate, - // because DeltaWriter judges whether to flush memtable according to the memtable memory usage. - // The macro of attach thread mem tracker is affected by the destructuring order of local variables, - // so it cannot completely correspond to the number of new/delete bytes recorded in scoped, - // and there is a small range of errors. So direct track load mem tracker. - // TODO(zxy) After rethinking the use of switch thread mem tracker, choose the appropriate way to get - // load mem tracke here. - // DCHECK(memtable->mem_tracker()->parent_task_mem_tracker_no_own()); - // SCOPED_ATTACH_TASK(ThreadContext::TaskType::LOAD, - // memtable->mem_tracker()->parent_task_mem_tracker_no_own()); -#endif +void FlushToken::_flush_memtable(MemTable* memtable, int64_t submit_task_time) { _stats.flush_wait_time_ns += (MonotonicNanos() - submit_task_time); - SCOPED_CLEANUP({ memtable.reset(); }); // If previous flush has failed, return directly if (_flush_status.load() != OLAP_SUCCESS) { return; diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 36d5c21be8..6f986af3f5 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -31,6 +31,7 @@ class DataDir; class DeltaWriter; class ExecEnv; class MemTable; +class MemTrackerLimiter; // the statistic of a certain flush handler. // use atomic because it may be updated by multi threads @@ -56,7 +57,7 @@ public: explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token) : _flush_token(std::move(flush_pool_token)), _flush_status(OLAP_SUCCESS) {} - Status submit(const std::shared_ptr<MemTable>& mem_table); + Status submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* tracker); // error has happpens, so we cancel this token // And remove all tasks in the queue. @@ -69,7 +70,9 @@ public: const FlushStatistic& get_stats() const { return _stats; } private: - void _flush_memtable(std::shared_ptr<MemTable> mem_table, int64_t submit_task_time); + friend class MemtableFlushTask; + + void _flush_memtable(MemTable* mem_table, int64_t submit_task_time); std::unique_ptr<ThreadPoolToken> _flush_token; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 28e9241985..ef53eeaee4 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -111,14 +111,20 @@ Status StorageEngine::start_bg_threads() { scoped_refptr<Thread> path_scan_thread; RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_scan_thread", - [this, data_dir]() { this->_path_scan_thread_callback(data_dir); }, + [this, data_dir]() { + SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); + this->_path_scan_thread_callback(data_dir); + }, &path_scan_thread)); _path_scan_threads.emplace_back(path_scan_thread); scoped_refptr<Thread> path_gc_thread; RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_gc_thread", - [this, data_dir]() { this->_path_gc_thread_callback(data_dir); }, + [this, data_dir]() { + SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); + this->_path_gc_thread_callback(data_dir); + }, &path_gc_thread)); _path_gc_threads.emplace_back(path_gc_thread); } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 0179f3f064..9a7c414971 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -122,6 +122,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::BatchLoad")), _consistency_mem_tracker( std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Consistency")), + _mem_tracker(std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Self")), _stop_background_threads_latch(1), _tablet_manager(new TabletManager(config::tablet_map_shard_size)), _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)), @@ -166,7 +167,8 @@ StorageEngine::~StorageEngine() { void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) { std::vector<std::thread> threads; for (auto data_dir : data_dirs) { - threads.emplace_back([data_dir] { + threads.emplace_back([this, data_dir] { + SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); auto res = data_dir->load(); if (!res.ok()) { LOG(WARNING) << "io error when init load tables. res=" << res @@ -217,7 +219,8 @@ Status StorageEngine::_init_store_map() { DataDir* store = new DataDir(path.path, path.capacity_bytes, path.storage_medium, _tablet_manager.get(), _txn_manager.get()); tmp_stores.emplace_back(store); - threads.emplace_back([store, &error_msg_lock, &error_msg]() { + threads.emplace_back([this, store, &error_msg_lock, &error_msg]() { + SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE); auto st = store->init(); if (!st.ok()) { { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index df018e27d3..d508a617c5 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -346,6 +346,8 @@ private: std::unique_ptr<MemTrackerLimiter> _batch_load_mem_tracker; // Count the memory consumption of all EngineChecksumTask. std::unique_ptr<MemTrackerLimiter> _consistency_mem_tracker; + // StorageEngine oneself + std::unique_ptr<MemTrackerLimiter> _mem_tracker; CountDownLatch _stop_background_threads_latch; scoped_refptr<Thread> _unused_rowset_monitor_thread; diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc index f2c68b1e99..e56c4bff69 100644 --- a/be/src/runtime/disk_io_mgr.cc +++ b/be/src/runtime/disk_io_mgr.cc @@ -373,8 +373,8 @@ Status DiskIoMgr::init(const int64_t mem_limit) { ss << "work-loop(Disk: " << i << ", Thread: " << j << ")"; // _disk_thread_group.AddThread(new Thread("disk-io-mgr", ss.str(), // &DiskIoMgr::work_loop, this, _disk_queues[i])); - _disk_thread_group.add_thread(new std::thread( - std::bind(&DiskIoMgr::work_loop, this, _disk_queues[i], _mem_tracker.get()))); + _disk_thread_group.add_thread( + new std::thread(std::bind(&DiskIoMgr::work_loop, this, _disk_queues[i]))); } } _request_context_cache.reset(new RequestContextCache(this)); @@ -947,7 +947,7 @@ void DiskIoMgr::handle_read_finished(DiskQueue* disk_queue, RequestContext* read state.decrement_request_thread(); } -void DiskIoMgr::work_loop(DiskQueue* disk_queue, MemTrackerLimiter* mem_tracker) { +void DiskIoMgr::work_loop(DiskQueue* disk_queue) { // The thread waits until there is work or the entire system is being shut down. // If there is work, performs the read or write requested and re-enqueues the // requesting context. @@ -960,8 +960,7 @@ void DiskIoMgr::work_loop(DiskQueue* disk_queue, MemTrackerLimiter* mem_tracker) // 3. Perform the read or write as specified. // Cancellation checking needs to happen in both steps 1 and 3. - // tracked in the process tracker - // SCOPED_ATTACH_TASK(ThreadContext::TaskType::STORAGE, mem_tracker); + thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); while (!_shut_down) { RequestContext* worker_context = nullptr; ; diff --git a/be/src/runtime/disk_io_mgr.h b/be/src/runtime/disk_io_mgr.h index 3279ac2559..9d0aa2f5ae 100644 --- a/be/src/runtime/disk_io_mgr.h +++ b/be/src/runtime/disk_io_mgr.h @@ -795,7 +795,7 @@ private: // Disk worker thread loop. This function retrieves the next range to process on // the disk queue and invokes read_range() or Write() depending on the type of Range(). // There can be multiple threads per disk running this loop. - void work_loop(DiskQueue* queue, MemTrackerLimiter* mem_tracker); + void work_loop(DiskQueue* queue); // This is called from the disk thread to get the next range to process. It will // wait until a scan range and buffer are available, or a write range is available. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a35ae80ee3..8281375331 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -191,6 +191,7 @@ Status ExecEnv::_init_mem_tracker() { } _process_mem_tracker = new MemTrackerLimiter(global_memory_limit_bytes, "Process"); thread_context()->_thread_mem_tracker_mgr->init(); + thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) if (doris::config::enable_tcmalloc_hook) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c5fcfa97c4..7b16f853ce 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -477,6 +477,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi std::string func_name {"PlanFragmentExecutor::_exec_actual"}; #ifndef BE_TEST auto span = exec_state->executor()->runtime_state()->get_tracer()->StartSpan(func_name); + SCOPED_ATTACH_TASK(exec_state->executor()->runtime_state()); #else auto span = telemetry::get_noop_tracer()->StartSpan(func_name); #endif @@ -493,9 +494,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi .query_id(exec_state->query_id()) .instance_id(exec_state->fragment_instance_id()) .tag("pthread_id", std::to_string((uintptr_t)pthread_self())); -#ifndef BE_TEST - SCOPED_ATTACH_TASK(exec_state->executor()->runtime_state()); -#endif + exec_state->execute(); std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx(); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 9a9d1c808f..e1fc0a3463 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -50,7 +50,6 @@ LoadChannel::~LoadChannel() { } Status LoadChannel::open(const PTabletWriterOpenRequest& params) { - // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); int64_t index_id = params.index_id(); std::shared_ptr<TabletsChannel> channel; { @@ -61,7 +60,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { } else { // create a new tablets channel TabletsChannelKey key(params.id(), index_id); - channel.reset(new TabletsChannel(key, _is_high_priority, _is_vec)); + channel.reset(new TabletsChannel(key, _mem_tracker.get(), _is_high_priority, _is_vec)); _tablets_channels.insert({index_id, channel}); } } @@ -137,7 +136,6 @@ bool LoadChannel::is_finished() { Status LoadChannel::cancel() { std::lock_guard<std::mutex> l(_lock); - // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); for (auto& it : _tablets_channels) { it.second->cancel(); } diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 4137c7fafc..ad8a476fcf 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -129,7 +129,6 @@ private: template <typename TabletWriterAddRequest, typename TabletWriterAddResult> Status LoadChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD); int64_t index_id = request.index_id(); // 1. get tablets channel std::shared_ptr<TabletsChannel> channel; diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index dec04a20f5..0f4e185e46 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -80,12 +80,14 @@ MemTracker::~MemTracker() { MemTracker* MemTracker::get_static_mem_tracker(const std::string& label) { // First time this label registered, make a new object, otherwise do nothing. // Avoid using locks to resolve erase conflicts. + MemTracker* tracker; _static_mem_trackers.lazy_emplace_l( - label, [&](MemTracker*) {}, + label, [&](MemTracker* v) { tracker = v; }, [&](const auto& ctor) { - ctor(label, new MemTracker(fmt::format("[Static]-{}", label))); + tracker = new MemTracker(fmt::format("[Static]-{}", label)); + ctor(label, tracker); }); - return _static_mem_trackers[label]; + return tracker; } MemTracker::Snapshot MemTracker::make_snapshot(size_t level) const { diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 4e4af1723d..9f6e021a3c 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -30,9 +30,9 @@ class MemTrackerLimiter; // MemTracker can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER, // which will automatically track all memory usage of the code segment where it is located. // -// MemTracker's father can only be MemTrackerLimiter, which is only used to print tree-like statistics. -// Consuming MemTracker will not consume its father synchronously. -// Usually, it is not necessary to specify the father. by default, the MemTrackerLimiter in the thread context +// MemTracker's parent can only be MemTrackerLimiter, which is only used to print tree-like statistics. +// Consuming MemTracker will not consume its parent synchronously. +// Usually, it is not necessary to specify the parent. by default, the MemTrackerLimiter in the thread context // is used as the parent, which is specified when the thread starts. // // This class is thread-safe. diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 1f3fb89800..786ad945bf 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -119,7 +119,13 @@ public: Status try_gc_memory(int64_t bytes); public: - void consumption_revise(int64_t bytes) { _consumption->add(bytes); } + // It is used for revise mem tracker consumption. + // If the location of memory alloc and free is different, the consumption value of mem tracker will be inaccurate. + // But the consumption value of the process mem tracker is not affecte + void consumption_revise(int64_t bytes) { + DCHECK(_label != "Process"); + _consumption->add(bytes); + } // Logs the usage of this tracker limiter and optionally its children (recursively). // If 'logged_consumption' is non-nullptr, sets the consumption value logged. diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index 86f2976f18..a4831114cf 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -23,11 +23,6 @@ namespace doris { -// When MemTracker is a negative value, it is considered that a memory leak has occurred, -// but the actual MemTracker records inaccurately will also cause a negative value, -// so this feature is in the experimental stage. -const bool QUERY_MEMORY_LEAK_DETECTION = false; - MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std::string& task_id, int64_t mem_limit, const std::string& label, @@ -37,15 +32,15 @@ MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std: // Combine new tracker and emplace into one operation to avoid the use of locks // Name for task MemTrackers. '$0' is replaced with the task id. bool new_emplace = _task_mem_trackers.lazy_emplace_l( - task_id, [&](MemTrackerLimiter*) {}, + task_id, [&](std::shared_ptr<MemTrackerLimiter>) {}, [&](const auto& ctor) { - ctor(task_id, new MemTrackerLimiter(mem_limit, label, parent)); + ctor(task_id, std::make_unique<MemTrackerLimiter>(mem_limit, label, parent)); }); if (new_emplace) { LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); } - return _task_mem_trackers[task_id]; + return _task_mem_trackers[task_id].get(); } MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const std::string& query_id, @@ -67,35 +62,35 @@ MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& t DCHECK(!task_id.empty()); MemTrackerLimiter* tracker = nullptr; // Avoid using locks to resolve erase conflicts - _task_mem_trackers.if_contains(task_id, [&tracker](MemTrackerLimiter* v) { tracker = v; }); + _task_mem_trackers.if_contains( + task_id, [&tracker](std::shared_ptr<MemTrackerLimiter> v) { tracker = v.get(); }); return tracker; } void MemTrackerTaskPool::logout_task_mem_tracker() { - std::vector<std::string> expired_tasks; - for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) { + for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end();) { if (!it->second) { - // https://github.com/apache/incubator-doris/issues/10006 - expired_tasks.emplace_back(it->first); + // Unknown exception case with high concurrency, after _task_mem_trackers.erase, + // the key still exists in _task_mem_trackers. https://github.com/apache/incubator-doris/issues/10006 + _task_mem_trackers._erase(it++); } else if (it->second->remain_child_count() == 0 && it->second->had_child_count() != 0) { // No RuntimeState uses this task MemTracker, it is only referenced by this map, // and tracker was not created soon, delete it. - if (QUERY_MEMORY_LEAK_DETECTION && it->second->consumption() != 0) { - // If consumption is not equal to 0 before query mem tracker is destructed, - // there are two possibilities in theory. - // 1. A memory leak occurs. - // 2. Some of the memory consumed/released on the query mem tracker is actually released/consume on - // other trackers such as the process mem tracker, and there is no manual transfer between the two trackers. - // - // The second case should be eliminated in theory, but it has not been done so far, so the query memory leak - // cannot be located, and the value of the query pool mem tracker statistics will be inaccurate. - LOG(WARNING) << "Task memory tracker memory leak:" << it->second->debug_string(); - } + // + // If consumption is not equal to 0 before query mem tracker is destructed, + // there are two possibilities in theory. + // 1. A memory leak occurs. + // 2. memory consumed on query mem tracker, released on other trackers, and no manual transfer + // between the two trackers. + // At present, it is impossible to effectively locate which memory consume and release on different trackers, + // so query memory leaks cannot be found. + // // In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers, // the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is, // the negative number of the current value of consume. it->second->parent()->consumption_revise(-it->second->consumption()); - expired_tasks.emplace_back(it->first); + LOG(INFO) << "Deregister query/load memory tracker, queryId/loadId: " << it->first; + _task_mem_trackers._erase(it++); } else { // Log limit exceeded query tracker. if (it->second->limit_exceeded()) { @@ -104,16 +99,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first), 0, Status::OK()); } - } - } - for (auto tid : expired_tasks) { - if (!_task_mem_trackers[tid]) { - _task_mem_trackers.erase(tid); - LOG(INFO) << "Deregister null query/load memory tracker, query/load id: " << tid; - } else { - delete _task_mem_trackers[tid]; - _task_mem_trackers.erase(tid); - LOG(INFO) << "Deregister not used query/load memory tracker, query/load id: " << tid; + ++it; } } } diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h index ae7d82caf4..4890d72713 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.h +++ b/be/src/runtime/memory/mem_tracker_task_pool.h @@ -24,9 +24,10 @@ namespace doris { using TaskTrackersMap = phmap::parallel_flat_hash_map< - std::string, MemTrackerLimiter*, phmap::priv::hash_default_hash<std::string>, - phmap::priv::hash_default_eq<std::string>, - std::allocator<std::pair<const std::string, MemTrackerLimiter*>>, 12, std::mutex>; + std::string, std::shared_ptr<MemTrackerLimiter>, + phmap::priv::hash_default_hash<std::string>, phmap::priv::hash_default_eq<std::string>, + std::allocator<std::pair<const std::string, std::shared_ptr<MemTrackerLimiter>>>, 12, + std::mutex>; // Global task pool for query MemTrackers. Owned by ExecEnv. class MemTrackerTaskPool { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index c129deb8ee..53a47202b1 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -29,6 +29,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg, const TUniqueId& fragment_instance_id, MemTrackerLimiter* mem_tracker) { DCHECK(mem_tracker); + flush_untracked_mem<false>(); _task_id = task_id; _fragment_instance_id = fragment_instance_id; _exceed_cb.cancel_msg = cancel_msg; @@ -36,6 +37,13 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg, } void ThreadMemTrackerMgr::detach_limiter_tracker() { +#ifndef BE_TEST + // Unexpectedly, the runtime state is destructed before the end of the query sub-thread, + // (_hash_table_build_thread has appeared) which is not a graceful exit. + // consider replacing CHECK with a conditional statement and checking for runtime state survival. + CHECK(_task_id == "" || + ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(_task_id)); +#endif flush_untracked_mem<false>(); _task_id = ""; _fragment_instance_id = TUniqueId(); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index d32eaaf082..8ccb6f70b1 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -19,12 +19,18 @@ #include <fmt/format.h> #include <parallel_hashmap/phmap.h> +#include <service/brpc_conflict.h> #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" +// After brpc_conflict.h +#include <bthread/bthread.h> namespace doris { +extern bthread_key_t btls_key; +static const bthread_key_t EMPTY_BTLS_KEY = {0, 0}; + using ExceedCallBack = void (*)(); struct MemExceedCallBackInfo { std::string cancel_msg; @@ -113,6 +119,7 @@ public: MemTrackerLimiter* limiter_mem_tracker() { return _limiter_tracker; } void set_check_limit(bool check_limit) { _check_limit = check_limit; } + void set_check_attach(bool check_attach) { _check_attach = check_attach; } std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; @@ -144,6 +151,7 @@ private: bool _check_limit = false; // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; + bool _check_attach = true; std::string _task_id; TUniqueId _fragment_instance_id; MemExceedCallBackInfo _exceed_cb; @@ -195,6 +203,16 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { _stop_consume = true; DCHECK(_limiter_tracker); if (CheckLimit) { +#ifndef BE_TEST + // When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker. + // If _check_attach is true and it is not in the brpc server (the protobuf will be operated when bthread is started), + // it will check whether the tracker label is equal to the default "Process" when flushing. + // If you do not want this check, set_check_attach=true + // TODO(zxy) The current p0 test cannot guarantee that all threads are checked, + // so disable it and try to open it when memory tracking is not on time. + DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY || + _limiter_tracker->label() != "Process"); +#endif Status st = _limiter_tracker->try_consume(_untracked_mem); if (!st) { // The memory has been allocated, so when TryConsume fails, need to continue to complete diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index f35ffec2b7..043f4c7eab 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -30,12 +30,15 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT); std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count; -TabletsChannel::TabletsChannel(const TabletsChannelKey& key, bool is_high_priority, bool is_vec) +TabletsChannel::TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* parent_tracker, + bool is_high_priority, bool is_vec) : _key(key), _state(kInitialized), _closed_senders(64), _is_high_priority(is_high_priority), _is_vec(is_vec) { + _mem_tracker = std::make_unique<MemTrackerLimiter>( + -1, fmt::format("TabletsChannel#indexID={}", key.index_id), parent_tracker); static std::once_flag once_flag; std::call_once(once_flag, [] { REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); }); @@ -207,14 +210,6 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { return Status::OK(); } -int64_t TabletsChannel::mem_consumption() { - int64_t mem_usage = 0; - for (auto& it : _tablet_writers) { - mem_usage += it.second->mem_consumption(); - } - return mem_usage; -} - Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) { std::vector<SlotDescriptor*>* index_slots = nullptr; int32_t schema_hash = 0; @@ -245,7 +240,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request wrequest.ptable_schema_param = request.schema(); DeltaWriter* writer = nullptr; - auto st = DeltaWriter::open(&wrequest, &writer, _is_vec); + auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker.get(), _is_vec); if (!st.ok()) { std::stringstream ss; ss << "open delta writer failed, tablet_id=" << tablet.tablet_id() diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index bd7504cd2f..318dd879a4 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -60,7 +60,8 @@ class OlapTableSchemaParam; // Write channel for a particular (load, index). class TabletsChannel { public: - TabletsChannel(const TabletsChannelKey& key, bool is_high_priority, bool is_vec); + TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* parent_tracker, + bool is_high_priority, bool is_vec); ~TabletsChannel(); @@ -88,7 +89,7 @@ public: // no-op when this channel has been closed or cancelled Status reduce_mem_usage(int64_t mem_limit); - int64_t mem_consumption(); + int64_t mem_consumption() const { return _mem_tracker->consumption(); } private: template <typename Request> @@ -143,6 +144,8 @@ private: static std::atomic<uint64_t> _s_tablet_writer_count; + std::unique_ptr<MemTrackerLimiter> _mem_tracker; + bool _is_high_priority = false; bool _is_vec = false; diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 5bfa58fa39..6071defe64 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -37,25 +37,6 @@ AttachTask::AttachTask(MemTrackerLimiter* mem_tracker, const ThreadContext::Task #endif } -// AttachTask::AttachTask(const TQueryType::type& query_type, -// MemTrackerLimiter* mem_tracker) { -// DCHECK(mem_tracker); -// #ifdef USE_MEM_TRACKER -// thread_context()->attach_task(query_to_task_type(query_type), "", TUniqueId(), mem_tracker); -// #endif -// } - -// AttachTask::AttachTask(const TQueryType::type& query_type, -// MemTrackerLimiter* mem_tracker, const std::string& task_id, -// const TUniqueId& fragment_instance_id) { -// DCHECK(task_id != ""); -// DCHECK(fragment_instance_id != TUniqueId()); -// DCHECK(mem_tracker); -// #ifdef USE_MEM_TRACKER -// thread_context()->attach_task(query_to_task_type(query_type), task_id, fragment_instance_id, mem_tracker); -// #endif -// } - AttachTask::AttachTask(RuntimeState* runtime_state) { #ifndef BE_TEST DCHECK(print_id(runtime_state->query_id()) != ""); diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index f1439d1eea..2c14929432 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -43,7 +43,6 @@ class TUniqueId; class ThreadContext; extern bthread_key_t btls_key; -static const bthread_key_t EMPTY_BTLS_KEY = {0, 0}; // Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error, // see https://github.com/apache/doris/pull/7911 diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index c68add8a76..a78bb801b1 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -473,9 +473,7 @@ int main(int argc, char** argv) { #endif doris::PerfCounters::refresh_proc_status(); - // TODO(zxy) 10s is too long to clear the expired task mem tracker. - // A query mem tracker is about 57 bytes, assuming 10000 qps, which wastes about 55M of memory. - // It should be actively triggered at the end of query/load. + // 1s clear the expired task mem tracker, a query mem tracker is about 57 bytes. doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); sleep(1); } diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index 0a85a60aa7..34cc7ddf26 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -109,6 +109,7 @@ Status VBrokerScanNode::start_scanners() { Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VBrokerScanNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // check if CANCELLED. if (state->is_cancelled()) { std::unique_lock<std::mutex> l(_batch_queue_lock); @@ -275,6 +276,8 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner void VBrokerScanNode::scanner_worker(int start_idx, int length) { START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VBrokerScanNode::scanner_worker"); + SCOPED_ATTACH_TASK(_runtime_state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); Thread::set_self_name("vbroker_scanner"); Status status = Status::OK(); ScannerCounter counter; diff --git a/be/src/vec/exec/ves_http_scan_node.cpp b/be/src/vec/exec/ves_http_scan_node.cpp index 3029da4f8c..3a9a1ef673 100644 --- a/be/src/vec/exec/ves_http_scan_node.cpp +++ b/be/src/vec/exec/ves_http_scan_node.cpp @@ -385,6 +385,7 @@ void VEsHttpScanNode::debug_string(int ident_level, std::stringstream* out) cons void VEsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) { START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VEsHttpScanNode::scanner_worker"); SCOPED_ATTACH_TASK(_runtime_state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // Clone expr context std::vector<ExprContext*> scanner_expr_ctxs; DCHECK(start_idx < length); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 048395eec5..1dbcc05daa 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -141,6 +141,7 @@ Status VNodeChannel::open_wait() { } Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) { + SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 33c4a48737..3126b30f94 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -398,7 +398,7 @@ TEST_F(TestDeltaWriter, open) { SAFE_DELETE(delta_writer); // test vec delta writer - DeltaWriter::open(&write_req, &delta_writer, true); + DeltaWriter::open(&write_req, &delta_writer, nullptr, true); EXPECT_NE(delta_writer, nullptr); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); @@ -551,7 +551,7 @@ TEST_F(TestDeltaWriter, vec_write) { WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002, 30002, load_id, tuple_desc, &(tuple_desc->slots())}; DeltaWriter* delta_writer = nullptr; - DeltaWriter::open(&write_req, &delta_writer, true); + DeltaWriter::open(&write_req, &delta_writer, nullptr, true); ASSERT_NE(delta_writer, nullptr); MemPool pool; @@ -764,7 +764,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, 30003, load_id, tuple_desc, &(tuple_desc->slots())}; DeltaWriter* delta_writer = nullptr; - DeltaWriter::open(&write_req, &delta_writer, true); + DeltaWriter::open(&write_req, &delta_writer, nullptr, true); ASSERT_NE(delta_writer, nullptr); MemPool pool; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org