This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 62a64d10ae8 [enhancement](mem-tracker) Use thread local mem tracker to 
track s3 file buffer memory usage (#40597)
62a64d10ae8 is described below

commit 62a64d10ae8d1d6431834f09ad296836b8cdedc3
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Thu Sep 26 12:32:12 2024 +0800

    [enhancement](mem-tracker) Use thread local mem tracker to track s3 file 
buffer memory usage (#40597)
    
    Track s3 file buffer memory usage with thread local tracker, so that
    memory usage will be specified to detail.
---
 be/src/io/fs/s3_file_bufferpool.cpp | 25 +++++++++++++++++++------
 be/src/io/fs/s3_file_bufferpool.h   | 14 +++++++++-----
 be/src/olap/tablet.cpp              |  6 ++++++
 be/src/olap/tablet.h                |  3 +++
 be/src/runtime/snapshot_loader.cpp  |  5 +++++
 be/src/runtime/snapshot_loader.h    |  2 ++
 6 files changed, 44 insertions(+), 11 deletions(-)

diff --git a/be/src/io/fs/s3_file_bufferpool.cpp 
b/be/src/io/fs/s3_file_bufferpool.cpp
index f1f90ea7f2e..0d59ea0ed88 100644
--- a/be/src/io/fs/s3_file_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_bufferpool.cpp
@@ -31,6 +31,7 @@
 #include "io/cache/file_cache_common.h"
 #include "io/fs/s3_common.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/thread_context.h"
 #include "util/defer_op.h"
 #include "util/slice.h"
@@ -77,17 +78,19 @@ Slice FileBuffer::get_slice() const {
 }
 
 FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> 
alloc_holder,
-                       size_t offset, OperationState state)
+                       size_t offset, OperationState state,
+                       std::shared_ptr<MemTrackerLimiter> mem_tracker)
         : _type(type),
           _alloc_holder(std::move(alloc_holder)),
           _offset(offset),
           _size(0),
           _state(std::move(state)),
           _inner_data(std::make_unique<FileBuffer::PartData>()),
-          _capacity(_inner_data->size()) {}
+          _capacity(_inner_data->size()),
+          _mem_tracker(std::move(mem_tracker)) {}
 
 FileBuffer::~FileBuffer() {
-    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
     _inner_data.reset();
 }
 
@@ -240,13 +243,22 @@ FileBufferBuilder& 
FileBufferBuilder::set_allocate_file_blocks_holder(
 }
 
 Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
-    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+    auto mem_tracker = ExecEnv::GetInstance()->s3_file_buffer_tracker();
+    auto* thread_ctx = doris::thread_context(true);
+    if (thread_ctx != nullptr) {
+        // if thread local mem tracker is set, use it instead.
+        auto curr_tracker = 
thread_ctx->thread_mem_tracker_mgr->limiter_mem_tracker();
+        if (curr_tracker != ExecEnv::GetInstance()->orphan_mem_tracker()) {
+            mem_tracker = std::move(curr_tracker);
+        }
+    }
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
     OperationState state(_sync_after_complete_task, _is_cancelled);
 
     if (_type == BufferType::UPLOAD) {
         RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
                                           std::move(_upload_cb), 
std::move(state), _offset,
-                                          std::move(_alloc_holder_cb)));
+                                          std::move(_alloc_holder_cb), 
std::move(mem_tracker)));
         return Status::OK();
     }
     if (_type == BufferType::DOWNLOAD) {
@@ -254,7 +266,8 @@ Status 
FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
                                           std::move(_download),
                                           
std::move(_write_to_local_file_cache),
                                           std::move(_write_to_use_buffer), 
std::move(state),
-                                          _offset, 
std::move(_alloc_holder_cb)));
+                                          _offset, std::move(_alloc_holder_cb),
+                                          std::move(mem_tracker)));
         return Status::OK();
     }
     // should never come here
diff --git a/be/src/io/fs/s3_file_bufferpool.h 
b/be/src/io/fs/s3_file_bufferpool.h
index 1b552850ae3..a603c3cb29a 100644
--- a/be/src/io/fs/s3_file_bufferpool.h
+++ b/be/src/io/fs/s3_file_bufferpool.h
@@ -27,6 +27,7 @@
 
 #include "common/status.h"
 #include "io/cache/file_block.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "util/crc32c.h"
 #include "util/slice.h"
 #include "util/threadpool.h"
@@ -77,7 +78,7 @@ struct OperationState {
 
 struct FileBuffer {
     FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> 
alloc_holder, size_t offset,
-               OperationState state);
+               OperationState state, std::shared_ptr<MemTrackerLimiter> 
mem_tracker);
     virtual ~FileBuffer();
     /**
     * submit the correspoding task to async executor
@@ -127,14 +128,16 @@ struct FileBuffer {
     struct PartData;
     std::unique_ptr<PartData> _inner_data;
     size_t _capacity;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 };
 
 struct DownloadFileBuffer final : public FileBuffer {
     DownloadFileBuffer(std::function<Status(Slice&)> download,
                        std::function<void(FileBlocksHolderPtr, Slice)> 
write_to_cache,
                        std::function<void(Slice, size_t)> write_to_use_buffer, 
OperationState state,
-                       size_t offset, std::function<FileBlocksHolderPtr()> 
alloc_holder)
-            : FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state),
+                       size_t offset, std::function<FileBlocksHolderPtr()> 
alloc_holder,
+                       std::shared_ptr<MemTrackerLimiter> mem_tracker)
+            : FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state, 
std::move(mem_tracker)),
               _download(std::move(download)),
               _write_to_local_file_cache(std::move(write_to_cache)),
               _write_to_use_buffer(std::move(write_to_use_buffer)) {}
@@ -153,8 +156,9 @@ struct DownloadFileBuffer final : public FileBuffer {
 
 struct UploadFileBuffer final : public FileBuffer {
     UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, 
OperationState state,
-                     size_t offset, std::function<FileBlocksHolderPtr()> 
alloc_holder)
-            : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
+                     size_t offset, std::function<FileBlocksHolderPtr()> 
alloc_holder,
+                     std::shared_ptr<MemTrackerLimiter> mem_tracker)
+            : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state, 
std::move(mem_tracker)),
               _upload_to_remote(std::move(upload_cb)) {}
     ~UploadFileBuffer() override = default;
     Status append_data(const Slice& s) override;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 51eabe5495e..ad2671962d8 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -106,6 +106,7 @@
 #include "olap/txn_manager.h"
 #include "olap/types.h"
 #include "olap/utils.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "segment_loader.h"
 #include "service/point_query_executor.h"
 #include "tablet.h"
@@ -268,6 +269,9 @@ Tablet::Tablet(StorageEngine& engine, TabletMetaSharedPtr 
tablet_meta, DataDir*
         _tablet_path = fmt::format("{}/{}/{}/{}/{}", _data_dir->path(), 
DATA_PREFIX,
                                    _tablet_meta->shard_id(), tablet_id(), 
schema_hash());
     }
+    _upload_cooldown_meta_tracker = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::OTHER,
+            fmt::format("UploadCoolDownMeta#tablet_id={}", tablet_id()));
 }
 
 bool Tablet::set_tablet_schema_into_rowset_meta() {
@@ -2100,6 +2104,8 @@ Status Tablet::write_cooldown_meta() {
                                       _cooldown_conf.cooldown_replica_id, 
tablet_id());
     }
 
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_upload_cooldown_meta_tracker);
+
     auto storage_resource = 
DORIS_TRY(get_resource_by_storage_policy_id(storage_policy_id()));
 
     std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 33253e82ced..71af08e61cd 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -46,6 +46,7 @@
 #include "olap/rowset/rowset_reader.h"
 #include "olap/rowset/segment_v2/segment.h"
 #include "olap/version_graph.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "segment_loader.h"
 #include "util/metrics.h"
 #include "util/once.h"
@@ -608,6 +609,8 @@ private:
     std::shared_ptr<const VersionWithTime> _visible_version;
 
     std::atomic_bool _is_full_compaction_running = false;
+
+    std::shared_ptr<MemTrackerLimiter> _upload_cooldown_meta_tracker;
 };
 
 inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index d04a5463879..0a13ac6085c 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -52,6 +52,7 @@
 #include "olap/tablet_manager.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "util/s3_uri.h"
 #include "util/s3_util.h"
 #include "util/thrift_rpc_helper.h"
@@ -115,6 +116,9 @@ Status SnapshotLoader::init(TStorageBackendType::type type, 
const std::string& l
     } else {
         return Status::InternalError("Unknown storage type: {}", type);
     }
+    _mem_tracker = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::OTHER,
+            fmt::format("SnapShotLoader#job_id={}#task_id={}", _job_id, 
_task_id));
     return Status::OK();
 }
 
@@ -125,6 +129,7 @@ Status SnapshotLoader::upload(const std::map<std::string, 
std::string>& src_to_d
     if (!_remote_fs) {
         return Status::InternalError("Storage backend not initialized.");
     }
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
     LOG(INFO) << "begin to upload snapshot files. num: " << 
src_to_dest_path.size()
               << ", broker addr: " << _broker_addr << ", job: " << _job_id << 
", task" << _task_id;
 
diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h
index 7b1d5a0d942..b6da1a2adae 100644
--- a/be/src/runtime/snapshot_loader.h
+++ b/be/src/runtime/snapshot_loader.h
@@ -26,6 +26,7 @@
 
 #include "common/status.h"
 #include "olap/tablet_fwd.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 
 namespace doris {
 namespace io {
@@ -111,6 +112,7 @@ private:
     const TNetworkAddress _broker_addr;
     const std::map<std::string, std::string> _prop;
     std::shared_ptr<io::RemoteFileSystem> _remote_fs;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 };
 
 } // end namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to