This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a663465534 [improve] Make the schema change memory space adaptive 
(#34350)
7a663465534 is described below

commit 7a6634655341066ca4c166ac16e1740a69fc5351
Author: Lightman <31928846+lchangli...@users.noreply.github.com>
AuthorDate: Wed May 1 09:11:58 2024 +0800

    [improve] Make the schema change memory space adaptive (#34350)
---
 be/src/agent/task_worker_pool.cpp             | 16 ++++++++++++++--
 be/src/cloud/cloud_schema_change_job.cpp      | 11 ++++++-----
 be/src/common/config.cpp                      |  3 +++
 be/src/common/config.h                        |  3 +++
 be/src/olap/schema_change.cpp                 |  4 +++-
 be/src/olap/schema_change.h                   |  7 +++----
 be/src/olap/storage_engine.cpp                | 11 ++++++++++-
 be/src/olap/storage_engine.h                  |  5 +++++
 be/src/olap/task/engine_index_change_task.cpp |  2 +-
 9 files changed, 48 insertions(+), 14 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 83e97c5d7d0..3fb5f3dfcf2 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -185,11 +185,17 @@ void alter_tablet(StorageEngine& engine, const 
TAgentTaskRequest& agent_task_req
                 
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
                             
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
                             
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
-                            
config::memory_limitation_per_thread_for_schema_change_bytes));
+                            
engine.memory_limitation_bytes_per_thread_for_schema_change()));
         SCOPED_ATTACH_TASK(mem_tracker);
         DorisMetrics::instance()->create_rollup_requests_total->increment(1);
         Status res = Status::OK();
         try {
+            LOG_INFO("start {}", process_name)
+                    .tag("signature", agent_task_req.signature)
+                    .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+                    .tag("new_tablet_id", new_tablet_id)
+                    .tag("mem_limit",
+                         
engine.memory_limitation_bytes_per_thread_for_schema_change());
             DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
             SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2,
                                 
std::to_string(agent_task_req.alter_tablet_req_v2.job_id));
@@ -254,11 +260,17 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const 
TAgentTaskRequest& age
                 
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
                             
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
                             
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
-                            
config::memory_limitation_per_thread_for_schema_change_bytes));
+                            
engine.memory_limitation_bytes_per_thread_for_schema_change()));
         SCOPED_ATTACH_TASK(mem_tracker);
         DorisMetrics::instance()->create_rollup_requests_total->increment(1);
         Status res = Status::OK();
         try {
+            LOG_INFO("start {}", process_name)
+                    .tag("signature", agent_task_req.signature)
+                    .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+                    .tag("new_tablet_id", new_tablet_id)
+                    .tag("mem_limit",
+                         
engine.memory_limitation_bytes_per_thread_for_schema_change());
             DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
             CloudSchemaChangeJob job(engine,
                                      
std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 791c67256cf..77034845028 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -41,11 +41,10 @@ using namespace ErrorCode;
 static constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
 static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2;
 
-static std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& 
changer,
-                                                      bool sc_sorting) {
+std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer, 
bool sc_sorting,
+                                               int64_t mem_limit) {
     if (sc_sorting) {
-        return std::make_unique<VBaseSchemaChangeWithSorting>(
-                changer, 
config::memory_limitation_per_thread_for_schema_change_bytes);
+        return std::make_unique<VBaseSchemaChangeWithSorting>(changer, 
mem_limit);
     }
     // else sc_directly
     return std::make_unique<VSchemaChangeDirectly>(changer);
@@ -207,7 +206,9 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
               << ", new_tablet=" << _new_tablet->tablet_id();
 
     // 2. Generate historical data converter
-    auto sc_procedure = get_sc_procedure(changer, sc_sorting);
+    auto sc_procedure = get_sc_procedure(
+            changer, sc_sorting,
+            
_cloud_storage_engine.memory_limitation_bytes_per_thread_for_schema_change());
 
     cloud::TabletJobInfoPB job;
     auto* idx = job.mutable_idx();
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b0612958d35..5f2d12d740f 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -89,6 +89,9 @@ DEFINE_String(mem_limit, "90%");
 // Soft memory limit as a fraction of hard memory limit.
 DEFINE_Double(soft_mem_limit_frac, "0.9");
 
+// Schema change memory limit as a fraction of soft memory limit.
+DEFINE_Double(schema_change_mem_limit_frac, "0.6");
+
 // Many modern allocators (for example, tcmalloc) do not do a mremap for
 // realloc, even in case of large enough chunks of memory. Although this allows
 // you to increase performance and reduce memory consumption during realloc.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f2cdc35ed07..73037ac10e9 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -129,6 +129,9 @@ DECLARE_String(mem_limit);
 // Soft memory limit as a fraction of hard memory limit.
 DECLARE_Double(soft_mem_limit_frac);
 
+// Schema change memory limit as a fraction of soft memory limit.
+DECLARE_Double(schema_change_mem_limit_frac);
+
 // Many modern allocators (for example) do not do a mremap for
 // realloc, even in case of large enough chunks of memory. Although this allows
 // you to increase performance and reduce memory consumption during realloc.
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 0f80bfb3085..3fac0ce831f 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1094,7 +1094,9 @@ Status SchemaChangeJob::_convert_historical_rowsets(const 
SchemaChangeParams& sc
     }
 
     // b. Generate historical data converter
-    auto sc_procedure = _get_sc_procedure(changer, sc_sorting, sc_directly);
+    auto sc_procedure = _get_sc_procedure(
+            changer, sc_sorting, sc_directly,
+            
_local_storage_engine.memory_limitation_bytes_per_thread_for_schema_change());
 
     // c.Convert historical data
     bool have_failure_rowset = false;
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 9d67bdbf5ee..ae4093063fd 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -293,11 +293,10 @@ public:
 
 private:
     std::unique_ptr<SchemaChange> _get_sc_procedure(const BlockChanger& 
changer, bool sc_sorting,
-                                                    bool sc_directly) {
+                                                    bool sc_directly, int64_t 
mem_limit) {
         if (sc_sorting) {
-            return std::make_unique<VLocalSchemaChangeWithSorting>(
-                    changer, 
config::memory_limitation_per_thread_for_schema_change_bytes,
-                    _local_storage_engine);
+            return std::make_unique<VLocalSchemaChangeWithSorting>(changer, 
mem_limit,
+                                                                   
_local_storage_engine);
         }
 
         if (sc_directly) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index d549e17b1bf..7ab14181f18 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -77,6 +77,7 @@
 #include "olap/txn_manager.h"
 #include "runtime/stream_load/stream_load_recorder.h"
 #include "util/doris_metrics.h"
+#include "util/mem_info.h"
 #include "util/metrics.h"
 #include "util/spinlock.h"
 #include "util/stopwatch.hpp"
@@ -107,7 +108,10 @@ bvar::Adder<uint64_t> 
unused_rowsets_counter("ununsed_rowsets_counter");
 BaseStorageEngine::BaseStorageEngine(Type type, const UniqueId& backend_uid)
         : _type(type),
           
_rowset_id_generator(std::make_unique<UniqueRowsetIdGenerator>(backend_uid)),
-          _stop_background_threads_latch(1) {}
+          _stop_background_threads_latch(1) {
+    _memory_limitation_bytes_for_schema_change =
+            static_cast<int64_t>(MemInfo::soft_mem_limit() * 
config::schema_change_mem_limit_frac);
+}
 
 BaseStorageEngine::~BaseStorageEngine() = default;
 
@@ -125,6 +129,11 @@ CloudStorageEngine& BaseStorageEngine::to_cloud() {
     return *static_cast<CloudStorageEngine*>(this);
 }
 
+int64_t 
BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change() const 
{
+    return std::max(_memory_limitation_bytes_for_schema_change / 
config::alter_tablet_worker_count,
+                    
config::memory_limitation_per_thread_for_schema_change_bytes);
+}
+
 static Status _validate_options(const EngineOptions& options) {
     if (options.store_paths.empty()) {
         return Status::InternalError("store paths is empty");
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 63234047305..60ef681c3c5 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -36,6 +36,7 @@
 #include <unordered_set>
 #include <vector>
 
+#include "common/config.h"
 #include "common/status.h"
 #include "gutil/ref_counted.h"
 #include "olap/calc_delete_bitmap_executor.h"
@@ -128,6 +129,8 @@ public:
 
     RowsetSharedPtr get_quering_rowset(RowsetId rs_id);
 
+    int64_t memory_limitation_bytes_per_thread_for_schema_change() const;
+
 protected:
     void _evict_querying_rowset();
     void _evict_quring_rowset_thread_callback();
@@ -148,6 +151,8 @@ protected:
     std::mutex _quering_rowsets_mutex;
     std::unordered_map<RowsetId, RowsetSharedPtr> _querying_rowsets;
     scoped_refptr<Thread> _evict_quering_rowset_thread;
+
+    int64_t _memory_limitation_bytes_for_schema_change;
 };
 
 class StorageEngine final : public BaseStorageEngine {
diff --git a/be/src/olap/task/engine_index_change_task.cpp 
b/be/src/olap/task/engine_index_change_task.cpp
index 91c91fdf3f3..6fa0a63865a 100644
--- a/be/src/olap/task/engine_index_change_task.cpp
+++ b/be/src/olap/task/engine_index_change_task.cpp
@@ -31,7 +31,7 @@ EngineIndexChangeTask::EngineIndexChangeTask(
             MemTrackerLimiter::Type::SCHEMA_CHANGE,
             fmt::format("EngineIndexChangeTask#tabletId={}",
                         std::to_string(_alter_inverted_index_req.tablet_id)),
-            config::memory_limitation_per_thread_for_schema_change_bytes);
+            engine.memory_limitation_bytes_per_thread_for_schema_change());
 }
 
 EngineIndexChangeTask::~EngineIndexChangeTask() = default;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to