This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7a663465534 [improve] Make the schema change memory space adaptive (#34350) 7a663465534 is described below commit 7a6634655341066ca4c166ac16e1740a69fc5351 Author: Lightman <31928846+lchangli...@users.noreply.github.com> AuthorDate: Wed May 1 09:11:58 2024 +0800 [improve] Make the schema change memory space adaptive (#34350) --- be/src/agent/task_worker_pool.cpp | 16 ++++++++++++++-- be/src/cloud/cloud_schema_change_job.cpp | 11 ++++++----- be/src/common/config.cpp | 3 +++ be/src/common/config.h | 3 +++ be/src/olap/schema_change.cpp | 4 +++- be/src/olap/schema_change.h | 7 +++---- be/src/olap/storage_engine.cpp | 11 ++++++++++- be/src/olap/storage_engine.h | 5 +++++ be/src/olap/task/engine_index_change_task.cpp | 2 +- 9 files changed, 48 insertions(+), 14 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 83e97c5d7d0..3fb5f3dfcf2 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -185,11 +185,17 @@ void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id), std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id), - config::memory_limitation_per_thread_for_schema_change_bytes)); + engine.memory_limitation_bytes_per_thread_for_schema_change())); SCOPED_ATTACH_TASK(mem_tracker); DorisMetrics::instance()->create_rollup_requests_total->increment(1); Status res = Status::OK(); try { + LOG_INFO("start {}", process_name) + .tag("signature", agent_task_req.signature) + .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) + .tag("new_tablet_id", new_tablet_id) + .tag("mem_limit", + engine.memory_limitation_bytes_per_thread_for_schema_change()); DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id); SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2, std::to_string(agent_task_req.alter_tablet_req_v2.job_id)); @@ -254,11 +260,17 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& age fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id), std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id), - config::memory_limitation_per_thread_for_schema_change_bytes)); + engine.memory_limitation_bytes_per_thread_for_schema_change())); SCOPED_ATTACH_TASK(mem_tracker); DorisMetrics::instance()->create_rollup_requests_total->increment(1); Status res = Status::OK(); try { + LOG_INFO("start {}", process_name) + .tag("signature", agent_task_req.signature) + .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) + .tag("new_tablet_id", new_tablet_id) + .tag("mem_limit", + engine.memory_limitation_bytes_per_thread_for_schema_change()); DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id); CloudSchemaChangeJob job(engine, std::to_string(agent_task_req.alter_tablet_req_v2.job_id), diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 791c67256cf..77034845028 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -41,11 +41,10 @@ using namespace ErrorCode; static constexpr int ALTER_TABLE_BATCH_SIZE = 4096; static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2; -static std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer, - bool sc_sorting) { +std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer, bool sc_sorting, + int64_t mem_limit) { if (sc_sorting) { - return std::make_unique<VBaseSchemaChangeWithSorting>( - changer, config::memory_limitation_per_thread_for_schema_change_bytes); + return std::make_unique<VBaseSchemaChangeWithSorting>(changer, mem_limit); } // else sc_directly return std::make_unique<VSchemaChangeDirectly>(changer); @@ -207,7 +206,9 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam << ", new_tablet=" << _new_tablet->tablet_id(); // 2. Generate historical data converter - auto sc_procedure = get_sc_procedure(changer, sc_sorting); + auto sc_procedure = get_sc_procedure( + changer, sc_sorting, + _cloud_storage_engine.memory_limitation_bytes_per_thread_for_schema_change()); cloud::TabletJobInfoPB job; auto* idx = job.mutable_idx(); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b0612958d35..5f2d12d740f 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -89,6 +89,9 @@ DEFINE_String(mem_limit, "90%"); // Soft memory limit as a fraction of hard memory limit. DEFINE_Double(soft_mem_limit_frac, "0.9"); +// Schema change memory limit as a fraction of soft memory limit. +DEFINE_Double(schema_change_mem_limit_frac, "0.6"); + // Many modern allocators (for example, tcmalloc) do not do a mremap for // realloc, even in case of large enough chunks of memory. Although this allows // you to increase performance and reduce memory consumption during realloc. diff --git a/be/src/common/config.h b/be/src/common/config.h index f2cdc35ed07..73037ac10e9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -129,6 +129,9 @@ DECLARE_String(mem_limit); // Soft memory limit as a fraction of hard memory limit. DECLARE_Double(soft_mem_limit_frac); +// Schema change memory limit as a fraction of soft memory limit. +DECLARE_Double(schema_change_mem_limit_frac); + // Many modern allocators (for example) do not do a mremap for // realloc, even in case of large enough chunks of memory. Although this allows // you to increase performance and reduce memory consumption during realloc. diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 0f80bfb3085..3fac0ce831f 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1094,7 +1094,9 @@ Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc } // b. Generate historical data converter - auto sc_procedure = _get_sc_procedure(changer, sc_sorting, sc_directly); + auto sc_procedure = _get_sc_procedure( + changer, sc_sorting, sc_directly, + _local_storage_engine.memory_limitation_bytes_per_thread_for_schema_change()); // c.Convert historical data bool have_failure_rowset = false; diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 9d67bdbf5ee..ae4093063fd 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -293,11 +293,10 @@ public: private: std::unique_ptr<SchemaChange> _get_sc_procedure(const BlockChanger& changer, bool sc_sorting, - bool sc_directly) { + bool sc_directly, int64_t mem_limit) { if (sc_sorting) { - return std::make_unique<VLocalSchemaChangeWithSorting>( - changer, config::memory_limitation_per_thread_for_schema_change_bytes, - _local_storage_engine); + return std::make_unique<VLocalSchemaChangeWithSorting>(changer, mem_limit, + _local_storage_engine); } if (sc_directly) { diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index d549e17b1bf..7ab14181f18 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -77,6 +77,7 @@ #include "olap/txn_manager.h" #include "runtime/stream_load/stream_load_recorder.h" #include "util/doris_metrics.h" +#include "util/mem_info.h" #include "util/metrics.h" #include "util/spinlock.h" #include "util/stopwatch.hpp" @@ -107,7 +108,10 @@ bvar::Adder<uint64_t> unused_rowsets_counter("ununsed_rowsets_counter"); BaseStorageEngine::BaseStorageEngine(Type type, const UniqueId& backend_uid) : _type(type), _rowset_id_generator(std::make_unique<UniqueRowsetIdGenerator>(backend_uid)), - _stop_background_threads_latch(1) {} + _stop_background_threads_latch(1) { + _memory_limitation_bytes_for_schema_change = + static_cast<int64_t>(MemInfo::soft_mem_limit() * config::schema_change_mem_limit_frac); +} BaseStorageEngine::~BaseStorageEngine() = default; @@ -125,6 +129,11 @@ CloudStorageEngine& BaseStorageEngine::to_cloud() { return *static_cast<CloudStorageEngine*>(this); } +int64_t BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change() const { + return std::max(_memory_limitation_bytes_for_schema_change / config::alter_tablet_worker_count, + config::memory_limitation_per_thread_for_schema_change_bytes); +} + static Status _validate_options(const EngineOptions& options) { if (options.store_paths.empty()) { return Status::InternalError("store paths is empty"); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 63234047305..60ef681c3c5 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -36,6 +36,7 @@ #include <unordered_set> #include <vector> +#include "common/config.h" #include "common/status.h" #include "gutil/ref_counted.h" #include "olap/calc_delete_bitmap_executor.h" @@ -128,6 +129,8 @@ public: RowsetSharedPtr get_quering_rowset(RowsetId rs_id); + int64_t memory_limitation_bytes_per_thread_for_schema_change() const; + protected: void _evict_querying_rowset(); void _evict_quring_rowset_thread_callback(); @@ -148,6 +151,8 @@ protected: std::mutex _quering_rowsets_mutex; std::unordered_map<RowsetId, RowsetSharedPtr> _querying_rowsets; scoped_refptr<Thread> _evict_quering_rowset_thread; + + int64_t _memory_limitation_bytes_for_schema_change; }; class StorageEngine final : public BaseStorageEngine { diff --git a/be/src/olap/task/engine_index_change_task.cpp b/be/src/olap/task/engine_index_change_task.cpp index 91c91fdf3f3..6fa0a63865a 100644 --- a/be/src/olap/task/engine_index_change_task.cpp +++ b/be/src/olap/task/engine_index_change_task.cpp @@ -31,7 +31,7 @@ EngineIndexChangeTask::EngineIndexChangeTask( MemTrackerLimiter::Type::SCHEMA_CHANGE, fmt::format("EngineIndexChangeTask#tabletId={}", std::to_string(_alter_inverted_index_req.tablet_id)), - config::memory_limitation_per_thread_for_schema_change_bytes); + engine.memory_limitation_bytes_per_thread_for_schema_change()); } EngineIndexChangeTask::~EngineIndexChangeTask() = default; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org