This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c6b2cc7067b [fix](memory) Fix Allocator release memory to correct MemTracker after TLS attach task ends (#39908) c6b2cc7067b is described below commit c6b2cc7067bb7868f5d1d766714f8ecbde1ad42f Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Aug 28 01:12:55 2024 +0800 [fix](memory) Fix Allocator release memory to correct MemTracker after TLS attach task ends (#39908) Allocator save TLS MemTracker during first alloc, which is used to release memory after TLS attach task ends. ``` 23:00:15 F20240824 22:56:49.773799 66432 thread_context.h:238] Check failed: doris::k_doris_exit || !doris::config::enable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan" If you crash here, it means that SCOPED_ATTACH_TASK and SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. starting position of each thread is expected to use SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging to Query/Load/Compaction/Other Tasks, otherwise memory alloc using Do [...] 23:00:15 *** Check failure stack trace: *** 23:00:15 @ 0x55645d3388e6 google::LogMessageFatal::~LogMessageFatal() 23:00:15 @ 0x5564439637c2 doris::ThreadContext::consume_memory() 23:00:15 @ 0x5564439914fe Allocator<>::release_memory() 23:00:15 @ 0x5564354be11e std::_Sp_counted_ptr<>::_M_dispose() 23:00:15 @ 0x55643557bc3b std::deque<>::pop_front() 23:00:15 @ 0x5564355756b1 doris::io::StreamLoadPipe::~StreamLoadPipe() 23:00:15 @ 0x5564354bfa77 doris::StreamLoadContext::~StreamLoadContext() 23:00:15 @ 0x556436ee5114 doris::HttpRequest::~HttpRequest() ``` --- be/src/common/config.cpp | 6 ++++- be/src/common/config.h | 3 +++ be/src/http/action/http_stream.cpp | 4 +-- be/src/io/file_factory.cpp | 6 ++--- be/src/olap/page_cache.cpp | 10 +++---- be/src/olap/page_cache.h | 1 - be/src/runtime/stream_load/stream_load_context.h | 17 +++++++----- be/src/runtime/thread_context.h | 28 ++++++++++---------- be/src/service/internal_service.cpp | 3 ++- be/src/util/byte_buffer.h | 6 ++++- be/src/vec/common/allocator.cpp | 33 ++++++++++++++++++++++-- be/src/vec/common/allocator.h | 8 +++++- 12 files changed, 86 insertions(+), 39 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 216c2e133c8..b477247c669 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -142,7 +142,11 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648"); DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1"); -DEFINE_mBool(enable_memory_orphan_check, "false"); +// default is true. if any memory tracking in Orphan mem tracker will report error. +// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time. +// allocator free memory not need to check, because when the thread memory tracker label is Orphan, +// use the tracker saved in Allocator. +DEFINE_mBool(enable_memory_orphan_check, "true"); // The maximum time a thread waits for full GC. Currently only query will wait for full gc. DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 78f94577d81..67048bc615e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -197,6 +197,9 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes); DECLARE_mInt64(crash_in_alloc_large_memory_bytes); // default is true. if any memory tracking in Orphan mem tracker will report error. +// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time. +// allocator free memory not need to check, because when the thread memory tracker label is Orphan, +// use the tracker saved in Allocator. DECLARE_mBool(enable_memory_orphan_check); // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index afeb251ca41..7dbae6df731 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -247,8 +247,8 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { // schema_buffer stores 1M of data for parsing column information // need to determine whether to cache for the first time if (ctx->is_read_schema) { - if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) { - ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes); + if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { + ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); } else { LOG(INFO) << "use a portion of data to request fe to obtain column information"; ctx->is_read_schema = false; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 7f64ea50710..f4ce573c535 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -209,9 +209,9 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS // Here, a portion of the data is processed to parse column information auto pipe = std::make_shared<io::StreamLoadPipe>( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, - stream_load_ctx->schema_buffer->pos /* total_length */); - stream_load_ctx->schema_buffer->flip(); - RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer)); + stream_load_ctx->schema_buffer()->pos /* total_length */); + stream_load_ctx->schema_buffer()->flip(); + RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer())); RETURN_IF_ERROR(pipe->finish()); *file_reader = std::move(pipe); } else { diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index 1f0556f4642..b70dadc5b43 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -28,12 +28,10 @@ template <typename TAllocator> PageBase<TAllocator>::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type) : LRUCacheValueBase(), _size(b), _capacity(b) { if (use_cache) { - _mem_tracker_by_allocator = StoragePageCache::instance()->mem_tracker(page_type); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + StoragePageCache::instance()->mem_tracker(page_type)); + _data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16)); } else { - _mem_tracker_by_allocator = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - } - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator); _data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16)); } } @@ -42,7 +40,7 @@ template <typename TAllocator> PageBase<TAllocator>::~PageBase() { if (_data != nullptr) { DCHECK(_capacity != 0 && _size != 0); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(TAllocator::mem_tracker_); TAllocator::free(_data, _capacity); } } diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index 09fc689959c..ef25de7bc30 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -60,7 +60,6 @@ private: // Effective size, smaller than capacity, such as data page remove checksum suffix. size_t _size = 0; size_t _capacity = 0; - std::shared_ptr<MemTrackerLimiter> _mem_tracker_by_allocator; }; using DataPage = PageBase<Allocator<false>>; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index f7c4a0d474f..95e56e0b3fa 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -96,14 +96,9 @@ class StreamLoadContext { public: StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) { start_millis = UnixMillis(); - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); - schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); } ~StreamLoadContext() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->stream_load_pipe_tracker()); - schema_buffer.reset(); if (need_rollback) { _exec_env->stream_load_executor()->rollback_txn(this); need_rollback = false; @@ -126,6 +121,15 @@ public: bool is_mow_table() const; + ByteBufferPtr schema_buffer() { + if (_schema_buffer == nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->stream_load_pipe_tracker()); + _schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); + } + return _schema_buffer; + } + public: static const int default_txn_id = -1; // load type, eg: ROUTINE LOAD/MANUAL LOAD @@ -190,8 +194,6 @@ public: std::shared_ptr<MessageBodySink> body_sink; std::shared_ptr<io::StreamLoadPipe> pipe; - ByteBufferPtr schema_buffer; - TStreamLoadPutResult put_result; TStreamLoadMultiTablePutResult multi_table_put_result; @@ -253,6 +255,7 @@ public: private: ExecEnv* _exec_env = nullptr; + ByteBufferPtr _schema_buffer; }; } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index ff8f2c6b0b5..6158f0535be 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -50,7 +50,7 @@ // Used after SCOPED_ATTACH_TASK, in order to count the memory into another // MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task. #define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \ - auto VARNAME_LINENUM(switch_mem_tracker) = SwitchThreadMemTrackerLimiter(arg1) + auto VARNAME_LINENUM(switch_mem_tracker) = doris::SwitchThreadMemTrackerLimiter(arg1) // Looking forward to tracking memory during thread execution into MemTracker. // Usually used to record query more detailed memory, including ExecNode operators. @@ -170,8 +170,7 @@ static std::string memory_orphan_check_msg = "each thread is expected to use SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging " "to Query/Load/Compaction/Other Tasks, otherwise memory alloc using Doris Allocator in the " "thread will crash. If you want to switch MemTrackerLimiter during thread execution, " - "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach. Of course, you " - "can modify enable_memory_orphan_check=false in be.conf to avoid this crash."; + "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach."; // The thread context saves some info about a working thread. // 2 required info: @@ -222,9 +221,9 @@ public: ss << std::this_thread::get_id(); return ss.str(); } - // After thread_mem_tracker_mgr is initialized, the current thread Hook starts to - // consume/release mem_tracker. - // Note that the use of shared_ptr will cause a crash. The guess is that there is an + // Note that if set global Memory Hook, After thread_mem_tracker_mgr is initialized, + // the current thread Hook starts to consume/release mem_tracker. + // the use of shared_ptr will cause a crash. The guess is that there is an // intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal // to nullptr, but the object it points to is not initialized. At this time, when the memory // is released somewhere, the hook is triggered to cause the crash. @@ -318,7 +317,7 @@ public: // The brpc server should respond as quickly as possible. bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); // set the data so that next time bthread_getspecific in the thread returns the data. - CHECK(0 == bthread_setspecific(btls_key, bthread_context) || k_doris_exit); + CHECK(0 == bthread_setspecific(btls_key, bthread_context) || doris::k_doris_exit); } DCHECK(bthread_context != nullptr); bthread_context->thread_local_handle_count++; @@ -360,7 +359,7 @@ static ThreadContext* thread_context(bool allow_return_null = false) { // in bthread // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); - DCHECK(bthread_context != nullptr); + DCHECK(bthread_context != nullptr && bthread_context->thread_local_handle_count > 0); return bthread_context; } if (allow_return_null) { @@ -449,15 +448,16 @@ public: class SwitchThreadMemTrackerLimiter { public: - explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { + explicit SwitchThreadMemTrackerLimiter( + const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) { DCHECK(mem_tracker); - ThreadLocalHandle::create_thread_local_if_not_exits(); + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); } - explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) { - ThreadLocalHandle::create_thread_local_if_not_exits(); + explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& query_thread_context) { + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); DCHECK(thread_context()->task_id() == query_thread_context.query_id); // workload group alse not change DCHECK(query_thread_context.query_mem_tracker); @@ -468,11 +468,11 @@ public: ~SwitchThreadMemTrackerLimiter() { thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); - ThreadLocalHandle::del_thread_local_if_count_is_zero(); + doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); } private: - std::shared_ptr<MemTrackerLimiter> _old_mem_tracker; + std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker; }; class AddThreadMemTrackerConsumer { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index c2251c240ae..744024c940c 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -767,7 +767,8 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( MemTrackerLimiter::Type::OTHER, - fmt::format("{}#{}", params.format_type, params.file_type)); + fmt::format("InternalService::fetch_table_schema:{}#{}", params.format_type, + params.file_type)); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); // make sure profile is desctructed after reader cause PrefetchBufferedReader diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index e8eadf69e02..1499f51c053 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -24,6 +24,7 @@ #include "common/logging.h" #include "common/status.h" +#include "runtime/thread_context.h" #include "vec/common/allocator.h" #include "vec/common/allocator_fwd.h" @@ -43,7 +44,10 @@ struct ByteBuffer : private Allocator<false> { return Status::OK(); } - ~ByteBuffer() { Allocator<false>::free(ptr, capacity); } + ~ByteBuffer() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_); + Allocator<false>::free(ptr, capacity); + } void put_bytes(const char* data, size_t size) { memcpy(ptr + pos, data, size); diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 82cd78a7fc1..ae5f27989b2 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -211,14 +211,43 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_ template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::consume_memory( - size_t size) const { + size_t size) { + // Usually, an object that inherits Allocator has the same TLS tracker for each alloc. + // If an object that inherits Allocator needs to be reused by multiple queries, + // it is necessary to switch the same tracker to TLS when calling alloc. + // However, in ORC Reader, ORC DataBuffer will be reused, but we cannot switch TLS tracker, + // so we update the Allocator tracker when the TLS tracker changes. + // note that the tracker in thread context when object that inherit Allocator is constructed may be + // no attach memory tracker in tls. usually the memory tracker is attached in tls only during the first alloc. + if (mem_tracker_ == nullptr || + mem_tracker_->label() != doris::thread_context()->thread_mem_tracker()->label()) { + mem_tracker_ = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + } CONSUME_THREAD_MEM_TRACKER(size); } template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::release_memory( size_t size) const { - RELEASE_THREAD_MEM_TRACKER(size); + doris::ThreadContext* thread_context = doris::thread_context(true); + if ((thread_context && thread_context->thread_mem_tracker()->label() != "Orphan") || + mem_tracker_ == nullptr) { + // If thread_context exist and the label of thread_mem_tracker not equal to `Orphan`, + // this means that in the scope of SCOPED_ATTACH_TASK, + // so thread_mem_tracker should be used to release memory. + // If mem_tracker_ is nullptr there is a scenario where an object that inherits Allocator + // has never called alloc, but free memory. + // in phmap, the memory alloced by an object may be transferred to another object and then free. + // in this case, thread context must attach a memory tracker other than Orphan, + // otherwise memory tracking will be wrong. + RELEASE_THREAD_MEM_TRACKER(size); + } else { + // if thread_context does not exist or the label of thread_mem_tracker is equal to + // `Orphan`, it usually happens during object destruction. This means that + // the scope of SCOPED_ATTACH_TASK has been left, so release memory using Allocator tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_); + RELEASE_THREAD_MEM_TRACKER(size); + } } template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 2bcce9a9c68..f393886cf0b 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -87,6 +87,10 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; // is always a multiple of sixteen. (https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html) static constexpr int ALLOCATOR_ALIGNMENT_16 = 16; +namespace doris { +class MemTrackerLimiter; +} + class DefaultMemoryAllocator { public: static void* malloc(size_t size) __THROW { return std::malloc(size); } @@ -228,7 +232,7 @@ public: // alloc will continue to execute, so the consume memtracker is forced. void memory_check(size_t size) const; // Increases consumption of this tracker by 'bytes'. - void consume_memory(size_t size) const; + void consume_memory(size_t size); void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; #ifndef NDEBUG @@ -400,6 +404,8 @@ protected: static constexpr bool clear_memory = clear_memory_; + std::shared_ptr<doris::MemTrackerLimiter> mem_tracker_ {nullptr}; + // Freshly mmapped pages are copy-on-write references to a global zero page. // On the first write, a page fault occurs, and an actual writable page is // allocated. If we are going to use this memory soon, such as when resizing --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org