This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 62a64d10ae8 [enhancement](mem-tracker) Use thread local mem tracker to track s3 file buffer memory usage (#40597) 62a64d10ae8 is described below commit 62a64d10ae8d1d6431834f09ad296836b8cdedc3 Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Thu Sep 26 12:32:12 2024 +0800 [enhancement](mem-tracker) Use thread local mem tracker to track s3 file buffer memory usage (#40597) Track s3 file buffer memory usage with thread local tracker, so that memory usage will be specified to detail. --- be/src/io/fs/s3_file_bufferpool.cpp | 25 +++++++++++++++++++------ be/src/io/fs/s3_file_bufferpool.h | 14 +++++++++----- be/src/olap/tablet.cpp | 6 ++++++ be/src/olap/tablet.h | 3 +++ be/src/runtime/snapshot_loader.cpp | 5 +++++ be/src/runtime/snapshot_loader.h | 2 ++ 6 files changed, 44 insertions(+), 11 deletions(-) diff --git a/be/src/io/fs/s3_file_bufferpool.cpp b/be/src/io/fs/s3_file_bufferpool.cpp index f1f90ea7f2e..0d59ea0ed88 100644 --- a/be/src/io/fs/s3_file_bufferpool.cpp +++ b/be/src/io/fs/s3_file_bufferpool.cpp @@ -31,6 +31,7 @@ #include "io/cache/file_cache_common.h" #include "io/fs/s3_common.h" #include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" #include "util/defer_op.h" #include "util/slice.h" @@ -77,17 +78,19 @@ Slice FileBuffer::get_slice() const { } FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder, - size_t offset, OperationState state) + size_t offset, OperationState state, + std::shared_ptr<MemTrackerLimiter> mem_tracker) : _type(type), _alloc_holder(std::move(alloc_holder)), _offset(offset), _size(0), _state(std::move(state)), _inner_data(std::make_unique<FileBuffer::PartData>()), - _capacity(_inner_data->size()) {} + _capacity(_inner_data->size()), + _mem_tracker(std::move(mem_tracker)) {} FileBuffer::~FileBuffer() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); _inner_data.reset(); } @@ -240,13 +243,22 @@ FileBufferBuilder& FileBufferBuilder::set_allocate_file_blocks_holder( } Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker()); + auto mem_tracker = ExecEnv::GetInstance()->s3_file_buffer_tracker(); + auto* thread_ctx = doris::thread_context(true); + if (thread_ctx != nullptr) { + // if thread local mem tracker is set, use it instead. + auto curr_tracker = thread_ctx->thread_mem_tracker_mgr->limiter_mem_tracker(); + if (curr_tracker != ExecEnv::GetInstance()->orphan_mem_tracker()) { + mem_tracker = std::move(curr_tracker); + } + } + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); OperationState state(_sync_after_complete_task, _is_cancelled); if (_type == BufferType::UPLOAD) { RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>( std::move(_upload_cb), std::move(state), _offset, - std::move(_alloc_holder_cb))); + std::move(_alloc_holder_cb), std::move(mem_tracker))); return Status::OK(); } if (_type == BufferType::DOWNLOAD) { @@ -254,7 +266,8 @@ Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) { std::move(_download), std::move(_write_to_local_file_cache), std::move(_write_to_use_buffer), std::move(state), - _offset, std::move(_alloc_holder_cb))); + _offset, std::move(_alloc_holder_cb), + std::move(mem_tracker))); return Status::OK(); } // should never come here diff --git a/be/src/io/fs/s3_file_bufferpool.h b/be/src/io/fs/s3_file_bufferpool.h index 1b552850ae3..a603c3cb29a 100644 --- a/be/src/io/fs/s3_file_bufferpool.h +++ b/be/src/io/fs/s3_file_bufferpool.h @@ -27,6 +27,7 @@ #include "common/status.h" #include "io/cache/file_block.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "util/crc32c.h" #include "util/slice.h" #include "util/threadpool.h" @@ -77,7 +78,7 @@ struct OperationState { struct FileBuffer { FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset, - OperationState state); + OperationState state, std::shared_ptr<MemTrackerLimiter> mem_tracker); virtual ~FileBuffer(); /** * submit the correspoding task to async executor @@ -127,14 +128,16 @@ struct FileBuffer { struct PartData; std::unique_ptr<PartData> _inner_data; size_t _capacity; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; }; struct DownloadFileBuffer final : public FileBuffer { DownloadFileBuffer(std::function<Status(Slice&)> download, std::function<void(FileBlocksHolderPtr, Slice)> write_to_cache, std::function<void(Slice, size_t)> write_to_use_buffer, OperationState state, - size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder) - : FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state), + size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder, + std::shared_ptr<MemTrackerLimiter> mem_tracker) + : FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state, std::move(mem_tracker)), _download(std::move(download)), _write_to_local_file_cache(std::move(write_to_cache)), _write_to_use_buffer(std::move(write_to_use_buffer)) {} @@ -153,8 +156,9 @@ struct DownloadFileBuffer final : public FileBuffer { struct UploadFileBuffer final : public FileBuffer { UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, OperationState state, - size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder) - : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state), + size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder, + std::shared_ptr<MemTrackerLimiter> mem_tracker) + : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state, std::move(mem_tracker)), _upload_to_remote(std::move(upload_cb)) {} ~UploadFileBuffer() override = default; Status append_data(const Slice& s) override; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 51eabe5495e..ad2671962d8 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -106,6 +106,7 @@ #include "olap/txn_manager.h" #include "olap/types.h" #include "olap/utils.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "segment_loader.h" #include "service/point_query_executor.h" #include "tablet.h" @@ -268,6 +269,9 @@ Tablet::Tablet(StorageEngine& engine, TabletMetaSharedPtr tablet_meta, DataDir* _tablet_path = fmt::format("{}/{}/{}/{}/{}", _data_dir->path(), DATA_PREFIX, _tablet_meta->shard_id(), tablet_id(), schema_hash()); } + _upload_cooldown_meta_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::OTHER, + fmt::format("UploadCoolDownMeta#tablet_id={}", tablet_id())); } bool Tablet::set_tablet_schema_into_rowset_meta() { @@ -2100,6 +2104,8 @@ Status Tablet::write_cooldown_meta() { _cooldown_conf.cooldown_replica_id, tablet_id()); } + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_upload_cooldown_meta_tracker); + auto storage_resource = DORIS_TRY(get_resource_by_storage_policy_id(storage_policy_id())); std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 33253e82ced..71af08e61cd 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -46,6 +46,7 @@ #include "olap/rowset/rowset_reader.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/version_graph.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "segment_loader.h" #include "util/metrics.h" #include "util/once.h" @@ -608,6 +609,8 @@ private: std::shared_ptr<const VersionWithTime> _visible_version; std::atomic_bool _is_full_compaction_running = false; + + std::shared_ptr<MemTrackerLimiter> _upload_cooldown_meta_tracker; }; inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() { diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index d04a5463879..0a13ac6085c 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -52,6 +52,7 @@ #include "olap/tablet_manager.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "util/s3_uri.h" #include "util/s3_util.h" #include "util/thrift_rpc_helper.h" @@ -115,6 +116,9 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l } else { return Status::InternalError("Unknown storage type: {}", type); } + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::OTHER, + fmt::format("SnapShotLoader#job_id={}#task_id={}", _job_id, _task_id)); return Status::OK(); } @@ -125,6 +129,7 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d if (!_remote_fs) { return Status::InternalError("Storage backend not initialized."); } + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size() << ", broker addr: " << _broker_addr << ", job: " << _job_id << ", task" << _task_id; diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h index 7b1d5a0d942..b6da1a2adae 100644 --- a/be/src/runtime/snapshot_loader.h +++ b/be/src/runtime/snapshot_loader.h @@ -26,6 +26,7 @@ #include "common/status.h" #include "olap/tablet_fwd.h" +#include "runtime/memory/mem_tracker_limiter.h" namespace doris { namespace io { @@ -111,6 +112,7 @@ private: const TNetworkAddress _broker_addr; const std::map<std::string, std::string> _prop; std::shared_ptr<io::RemoteFileSystem> _remote_fs; + std::shared_ptr<MemTrackerLimiter> _mem_tracker; }; } // end namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org