morningman commented on a change in pull request #8322: URL: https://github.com/apache/incubator-doris/pull/8322#discussion_r823292255
########## File path: be/src/exec/partitioned_hash_table.cc ########## @@ -310,13 +310,13 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state, MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_)); int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_); - Status st = tracker->TryConsume(mem_usage); + Status st = tracker->check_limit(mem_usage); WARN_IF_ERROR(st, "PartitionedHashTableCtx::ExprValuesCache failed"); Review comment: I think this `WARN_IF_ERROR` can be removed? duplicate with following `RETURN_LIMIT_EXCEEDED` ########## File path: be/src/exec/exec_node.cpp ########## @@ -268,10 +267,6 @@ Status ExecNode::close(RuntimeState* state) { if (_vconjunct_ctx_ptr) (*_vconjunct_ctx_ptr)->close(state); Expr::close(_conjunct_ctxs, state); - if (expr_mem_pool() != nullptr) { Review comment: Why this can be removed? ########## File path: be/src/olap/storage_engine.h ########## @@ -326,6 +330,9 @@ class StorageEngine { std::shared_ptr<MemTracker> _compaction_mem_tracker; std::shared_ptr<MemTracker> _tablet_mem_tracker; std::shared_ptr<MemTracker> _schema_change_mem_tracker; + std::shared_ptr<MemTracker> _clone_mem_tracker; Review comment: Add comments for all these trackers ########## File path: be/src/runtime/disk_io_mgr.cc ########## @@ -359,14 +363,14 @@ DiskIoMgr::~DiskIoMgr() { */ } -Status DiskIoMgr::init(const std::shared_ptr<MemTracker>& process_mem_tracker) { - DCHECK(process_mem_tracker != nullptr); - _process_mem_tracker = process_mem_tracker; +Status DiskIoMgr::init(const int64_t mem_limit) { + _mem_tracker->set_limit(mem_limit); + _cached_buffers_mem_tracker = MemTracker::create_tracker( + mem_limit, "DiskIO:CachedBuffers", _mem_tracker, MemTrackerLevel::OVERVIEW); // If we hit the process limit, see if we can reclaim some memory by removing // previously allocated (but unused) io buffers. - /* - * process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::gc_io_buffers, this)); - */ + MemTracker::get_process_tracker()->add_gc_function( + std::bind<void>(&DiskIoMgr::gc_io_buffers, this, std::placeholders::_1)); Review comment: Why we don't need to call `AddGcFunction` before? ########## File path: be/src/service/doris_main.cpp ########## @@ -484,6 +484,7 @@ int main(int argc, char** argv) { #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) doris::MemInfo::refresh_current_mem(); #endif + doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); Review comment: Not a good implementation. add a TODO here. ########## File path: be/src/runtime/memory/chunk_allocator.cpp ########## @@ -138,7 +149,8 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { DCHECK_GE(_reserved_bytes, 0); _reserved_bytes.fetch_sub(size); chunk_pool_local_core_alloc_count->increment(1); - return true; + if (tracker) _mem_tracker->release_cache(size); Review comment: Add comment for this line ########## File path: be/src/exec/olap_scan_node.h ########## @@ -244,6 +244,7 @@ class OlapScanNode : public ScanNode { TResourceInfo* _resource_info; int64_t _buffered_bytes; + std::shared_ptr<MemTracker> _scanner_mem_tracker; Review comment: Add comment for this mem tracker ########## File path: be/src/olap/rowset/segment_reader.cpp ########## @@ -86,10 +86,6 @@ SegmentReader::~SegmentReader() { _lru_cache = nullptr; _file_handler.close(); - if (_is_data_loaded && _runtime_state != nullptr) { - MemTracker::update_limits(_buffer_size * -1, _runtime_state->mem_trackers()); Review comment: What was this for? ########## File path: be/src/exprs/expr_context.h ########## @@ -170,6 +170,8 @@ class ExprContext { /// TODO: revisit this FunctionContext** _fn_contexts_ptr; + std::shared_ptr<MemTracker> _mem_tracker; Review comment: Add comment for this tracker ########## File path: be/src/olap/delta_writer.cpp ########## @@ -297,7 +297,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf // return error if previous flush failed RETURN_NOT_OK(_flush_token->wait()); - DCHECK_EQ(_mem_tracker->consumption(), 0); + MemTracker::memory_leak_check(_mem_tracker.get()); Review comment: explain more about this `MemTracker::memory_leak_check` ########## File path: be/src/exprs/new_agg_fn_evaluator.cc ########## @@ -88,19 +88,13 @@ typedef StringVal (*SerializeFn)(FunctionContext*, const StringVal&); typedef AnyVal (*GetValueFn)(FunctionContext*, const AnyVal&); typedef AnyVal (*FinalizeFn)(FunctionContext*, const AnyVal&); -NewAggFnEvaluator::NewAggFnEvaluator(const AggFn& agg_fn, MemPool* mem_pool, - const std::shared_ptr<MemTracker>& tracker, bool is_clone) - : _total_mem_consumption(0), Review comment: Why `_total_mem_consumption` can be removed? ########## File path: be/src/exprs/expr_context.cpp ########## @@ -49,15 +49,18 @@ ExprContext::~ExprContext() { } } -// TODO(zc): memory tracker Status ExprContext::prepare(RuntimeState* state, const RowDescriptor& row_desc, const std::shared_ptr<MemTracker>& tracker) { DCHECK(tracker != nullptr) << std::endl << get_stack_trace(); + if (_prepared) { Review comment: Is this a bug before? ########## File path: be/src/runtime/disk_io_mgr.h ########## @@ -691,8 +695,9 @@ class DiskIoMgr { // Pool to allocate BufferDescriptors. ObjectPool _pool; - // Process memory tracker; needed to account for io buffers. - std::shared_ptr<MemTracker> _process_mem_tracker; + std::shared_ptr<MemTracker> _mem_tracker; + // account for io buffers. + std::shared_ptr<MemTracker> _cached_buffers_mem_tracker; Review comment: Why need 2 memtrackers for disk io mgr? ########## File path: be/src/runtime/mem_pool.cpp ########## @@ -37,6 +37,24 @@ const int MemPool::MAX_CHUNK_SIZE; const int MemPool::DEFAULT_ALIGNMENT; uint32_t MemPool::k_zero_length_region_ alignas(std::max_align_t) = MEM_POOL_POISON; +MemPool::MemPool(MemTracker* mem_tracker) + : current_chunk_idx_(-1), + next_chunk_size_(INITIAL_CHUNK_SIZE), + total_allocated_bytes_(0), + total_reserved_bytes_(0), + peak_allocated_bytes_(0), + _mem_tracker(mem_tracker) {} + +MemPool::MemPool(std::string label) Review comment: ```suggestion MemPool::MemPool(const std::string& label) ``` ########## File path: be/src/runtime/mem_pool.cpp ########## @@ -45,9 +63,8 @@ MemPool::~MemPool() { int64_t total_bytes_released = 0; for (auto& chunk : chunks_) { total_bytes_released += chunk.chunk.size; - ChunkAllocator::instance()->free(chunk.chunk); + ChunkAllocator::instance()->free(chunk.chunk, _mem_tracker); } - mem_tracker_->Release(total_bytes_released); Review comment: Why this can be removed? ########## File path: be/src/runtime/mem_pool.cpp ########## @@ -115,20 +131,10 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) { } chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); - if (check_limits) { Review comment: What is `check_limit` for? ########## File path: be/src/runtime/data_stream_recvr.cc ########## @@ -184,6 +184,8 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** next_batch) { if (!_pending_closures.empty()) { auto closure_pair = _pending_closures.front(); + // When the batch queue reaches the upper limit of memory, calling run to let + // brpc send data packets may cause additional memory to be released Review comment: I don't understand this comment. This is a TODO or ? ########## File path: be/src/runtime/memory/chunk_allocator.cpp ########## @@ -164,15 +177,18 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { chunk_pool_system_alloc_count->increment(1); chunk_pool_system_alloc_cost_ns->increment(cost_ns); if (chunk->data == nullptr) { - return false; + if (tracker) tracker->release_cache(size); + return Status::MemoryAllocFailed( + fmt::format("ChunkAllocator failed to allocate chunk {} bytes", size)); } - return true; + return Status::OK(); } -void ChunkAllocator::free(const Chunk& chunk) { +void ChunkAllocator::free(Chunk& chunk, MemTracker* tracker) { Review comment: ```suggestion void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { ``` ########## File path: be/src/runtime/mem_tracker_task_pool.cpp ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/mem_tracker_task_pool.h" + +#include "common/config.h" +#include "runtime/exec_env.h" +#include "util/pretty_printer.h" + +namespace doris { + +std::shared_ptr<MemTracker> MemTrackerTaskPool::register_task_mem_tracker_impl( + const std::string& task_id, int64_t mem_limit, const std::string& label, + std::shared_ptr<MemTracker> parent) { + DCHECK(!task_id.empty()); + // First time this task_id registered, make a new object, otherwise do nothing. + // Combine create_tracker and emplace into one operation to avoid the use of locks + // Name for task MemTrackers. '$0' is replaced with the task id. + _task_mem_trackers.try_emplace_l( + task_id, [](std::shared_ptr<MemTracker>) {}, + MemTracker::create_tracker(mem_limit, label, parent, MemTrackerLevel::TASK)); + std::shared_ptr<MemTracker> tracker = get_task_mem_tracker(task_id); + return tracker; +} + +std::shared_ptr<MemTracker> MemTrackerTaskPool::register_query_mem_tracker( + const std::string& query_id, int64_t mem_limit) { + VLOG_FILE << "Register Query memory tracker, query id: " << query_id + << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); + return register_task_mem_tracker_impl(query_id, mem_limit, fmt::format("queryId={}", query_id), + ExecEnv::GetInstance()->query_pool_mem_tracker()); +} + +std::shared_ptr<MemTracker> MemTrackerTaskPool::register_load_mem_tracker( + const std::string& load_id, int64_t mem_limit) { + VLOG_FILE << "Register Load memory tracker, load id: " << load_id + << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); + return register_task_mem_tracker_impl(load_id, mem_limit, fmt::format("loadId={}", load_id), + ExecEnv::GetInstance()->load_pool_mem_tracker()); +} + +std::shared_ptr<MemTracker> MemTrackerTaskPool::get_task_mem_tracker(const std::string& task_id) { + DCHECK(!task_id.empty()); + std::shared_ptr<MemTracker> tracker = nullptr; + // Avoid using locks to resolve erase conflicts + _task_mem_trackers.if_contains(task_id, + [&tracker](std::shared_ptr<MemTracker> v) { tracker = v; }); + return tracker; +} + +void MemTrackerTaskPool::logout_task_mem_tracker() { + std::vector<std::string> expired_tasks; + for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) { + // No RuntimeState uses this task MemTracker, it is only referenced by this map, delete it + if (it->second.use_count() == 1) { + if (!config::memory_leak_detection || it->second->consumption() == 0) { + // If consumption is not equal to 0 before query mem tracker is destructed, + // there are two possibilities in theory. + // 1. A memory leak occurs. + // 2. Some of the memory consumed/released on the query mem tracker is actually released/consume on + // other trackers such as the process mem tracker, and there is no manual transfer between the two trackers. + // + // The second case should be eliminated in theory, but it has not been done so far, so the query memory leak + // cannot be located, and the value of the query pool mem tracker statistics will be inaccurate. + // + // In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers, + // the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is, + // the negative number of the current value of consume. + it->second->parent()->consume(-it->second->consumption(), + MemTracker::get_process_tracker().get()); + expired_tasks.emplace_back(it->first); + } else { + LOG(WARNING) << "Memory tracker " << it->second->debug_string() << " Memory leak " Review comment: If `memory_leak_detection` is true, and `it->second->consumption() != 0`, there may be too many logs? ########## File path: be/src/runtime/memory/chunk_allocator.cpp ########## @@ -150,7 +162,8 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { chunk_pool_other_core_alloc_count->increment(1); // reset chunk's core_id to other chunk->core_id = core_id % _arenas.size(); - return true; + if (tracker) _mem_tracker->release_cache(size); Review comment: Add comment for this line ########## File path: be/src/runtime/plan_fragment_executor.h ########## @@ -147,7 +147,6 @@ class PlanFragmentExecutor { ExecEnv* _exec_env; // not owned ExecNode* _plan; // lives in _runtime_state->obj_pool() TUniqueId _query_id; - std::shared_ptr<MemTracker> _mem_tracker; Review comment: Where is this mem tracker go? ########## File path: be/src/util/mem_info.h ########## @@ -34,6 +34,8 @@ class MemInfo { // Initialize MemInfo. static void init(); + static inline bool initialized() { return _s_initialized; } Review comment: What is this for? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org