This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 02e3de35f0d [Improment]add internal workload group (#42006) 02e3de35f0d is described below commit 02e3de35f0d1a2ae6014c3056548639eb951920f Author: wangbo <wan...@selectdb.com> AuthorDate: Thu Nov 14 13:12:02 2024 +0800 [Improment]add internal workload group (#42006) ## Proposed changes Add an internal workload group when Doris started, currently it mainly used to manage compaction workload cpu usage. --- be/src/agent/cgroup_cpu_ctl.cpp | 6 +- be/src/agent/cgroup_cpu_ctl.h | 2 +- be/src/agent/topic_subscriber.cpp | 6 +- be/src/agent/workload_group_listener.cpp | 2 +- be/src/cloud/cloud_storage_engine.cpp | 31 ++- be/src/cloud/cloud_storage_engine.h | 2 +- be/src/olap/olap_server.cpp | 77 ++++--- be/src/olap/storage_engine.h | 5 +- be/src/pipeline/task_scheduler.h | 4 +- be/src/runtime/exec_env_init.cpp | 4 +- be/src/runtime/workload_group/workload_group.cpp | 85 +++++--- be/src/runtime/workload_group/workload_group.h | 25 ++- .../workload_group/workload_group_manager.cpp | 23 +++ .../workload_group/workload_group_manager.h | 12 ++ be/src/util/threadpool.cpp | 7 +- be/src/util/threadpool.h | 6 +- be/src/vec/exec/scan/scanner_scheduler.h | 9 +- be/src/vec/sink/writer/async_result_writer.cpp | 11 +- .../doris/analysis/AlterWorkloadGroupStmt.java | 20 +- .../doris/analysis/CreateWorkloadGroupStmt.java | 18 +- .../doris/analysis/DropWorkloadGroupStmt.java | 3 - .../main/java/org/apache/doris/catalog/Env.java | 2 + .../doris/catalog/InternalSchemaInitializer.java | 5 +- .../java/org/apache/doris/common/FeConstants.java | 3 + .../CreateInternalWorkloadGroupThread.java | 55 +++++ .../resource/workloadgroup/WorkloadGroup.java | 59 +++++- .../resource/workloadgroup/WorkloadGroupMgr.java | 109 +++++++--- .../workloadgroup/WorkloadGroupMgrTest.java | 222 +++++++++++++++++++++ .../apache/doris/utframe/TestWithFeService.java | 1 + gensrc/thrift/BackendService.thrift | 4 + .../workload_manager_p0/test_curd_wlg.groovy | 29 +++ 31 files changed, 698 insertions(+), 149 deletions(-) diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index e68535a708c..76b72f2c9d0 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -158,11 +158,11 @@ uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() { return _is_enable_cgroup_v2_in_env ? 100 : 1024; } -std::unique_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) { +std::shared_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) { if (_is_enable_cgroup_v2_in_env) { - return std::make_unique<CgroupV2CpuCtl>(wg_id); + return std::make_shared<CgroupV2CpuCtl>(wg_id); } else if (_is_enable_cgroup_v1_in_env) { - return std::make_unique<CgroupV1CpuCtl>(wg_id); + return std::make_shared<CgroupV1CpuCtl>(wg_id); } return nullptr; } diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index 84e191159f1..b23f1f4dd9c 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -52,7 +52,7 @@ public: static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids); - static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id); + static std::shared_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id); static bool is_a_valid_cgroup_path(std::string cg_path); diff --git a/be/src/agent/topic_subscriber.cpp b/be/src/agent/topic_subscriber.cpp index f62bdaef099..b470e1534e1 100644 --- a/be/src/agent/topic_subscriber.cpp +++ b/be/src/agent/topic_subscriber.cpp @@ -40,14 +40,12 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques // eg, update workload info may delay other listener, then we need add a thread here // to handle_topic_info asynchronous std::shared_lock lock(_listener_mtx); - LOG(INFO) << "[topic_publish]begin handle topic info"; for (auto& listener_pair : _registered_listeners) { if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) { - LOG(INFO) << "[topic_publish]begin handle topic " << listener_pair.first - << ", size=" << topic_request.topic_map.at(listener_pair.first).size(); listener_pair.second->handle_topic_info( topic_request.topic_map.at(listener_pair.first)); - LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first; + LOG(INFO) << "[topic_publish]finish handle topic " << listener_pair.first + << ", size=" << topic_request.topic_map.at(listener_pair.first).size(); } } } diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index f0f57869f25..7b688b7dcdf 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -59,7 +59,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi workload_group_info.enable_cpu_hard_limit); // 4 create and update task scheduler - wg->upsert_task_scheduler(&workload_group_info, _exec_env); + wg->upsert_task_scheduler(&workload_group_info); // 5 upsert io throttle wg->upsert_scan_io_throttle(&workload_group_info); diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 5d7b445917a..dc6abbac31b 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -231,7 +231,7 @@ Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id) { }); } -Status CloudStorageEngine::start_bg_threads() { +Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) { RETURN_IF_ERROR(Thread::create( "CloudStorageEngine", "refresh_s3_info_thread", [this]() { this->_refresh_storage_vault_info_thread_callback(); }, @@ -266,14 +266,27 @@ Status CloudStorageEngine::start_bg_threads() { // compaction tasks producer thread int base_thread_num = get_base_thread_num(); int cumu_thread_num = get_cumu_thread_num(); - RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") - .set_min_threads(base_thread_num) - .set_max_threads(base_thread_num) - .build(&_base_compaction_thread_pool)); - RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") - .set_min_threads(cumu_thread_num) - .set_max_threads(cumu_thread_num) - .build(&_cumu_compaction_thread_pool)); + if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) { + RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(base_thread_num) + .set_max_threads(base_thread_num) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_base_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(cumu_thread_num) + .set_max_threads(cumu_thread_num) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_cumu_compaction_thread_pool)); + } else { + RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(base_thread_num) + .set_max_threads(base_thread_num) + .build(&_base_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(cumu_thread_num) + .set_max_threads(cumu_thread_num) + .build(&_cumu_compaction_thread_pool)); + } RETURN_IF_ERROR(Thread::create( "StorageEngine", "compaction_tasks_producer_thread", [this]() { this->_compaction_tasks_producer_callback(); }, diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 92d2917a916..072b8366542 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -57,7 +57,7 @@ public: Result<BaseTabletSPtr> get_tablet(int64_t tablet_id) override; - Status start_bg_threads() override; + Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override; Status set_cluster_id(int32_t cluster_id) override { _effective_cluster_id = cluster_id; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index a0c5a05636b..736bdaa9930 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -210,7 +210,7 @@ static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) { return threads_num; } -Status StorageEngine::start_bg_threads() { +Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) { RETURN_IF_ERROR(Thread::create( "StorageEngine", "unused_rowset_monitor_thread", [this]() { this->_unused_rowset_monitor_thread_callback(); }, @@ -243,29 +243,60 @@ Status StorageEngine::start_bg_threads() { auto single_replica_compaction_threads = get_single_replica_compaction_threads_num(data_dirs.size()); - RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") - .set_min_threads(base_compaction_threads) - .set_max_threads(base_compaction_threads) - .build(&_base_compaction_thread_pool)); - RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") - .set_min_threads(cumu_compaction_threads) - .set_max_threads(cumu_compaction_threads) - .build(&_cumu_compaction_thread_pool)); - RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool") - .set_min_threads(single_replica_compaction_threads) - .set_max_threads(single_replica_compaction_threads) - .build(&_single_replica_compaction_thread_pool)); - - if (config::enable_segcompaction) { - RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool") - .set_min_threads(config::segcompaction_num_threads) - .set_max_threads(config::segcompaction_num_threads) - .build(&_seg_compaction_thread_pool)); + if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) { + RETURN_IF_ERROR(ThreadPoolBuilder("gBaseCompactionTaskThreadPool") + .set_min_threads(base_compaction_threads) + .set_max_threads(base_compaction_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_base_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("gCumuCompactionTaskThreadPool") + .set_min_threads(cumu_compaction_threads) + .set_max_threads(cumu_compaction_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_cumu_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("gSingleReplicaCompactionTaskThreadPool") + .set_min_threads(single_replica_compaction_threads) + .set_max_threads(single_replica_compaction_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_single_replica_compaction_thread_pool)); + + if (config::enable_segcompaction) { + RETURN_IF_ERROR(ThreadPoolBuilder("gSegCompactionTaskThreadPool") + .set_min_threads(config::segcompaction_num_threads) + .set_max_threads(config::segcompaction_num_threads) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_seg_compaction_thread_pool)); + } + RETURN_IF_ERROR(ThreadPoolBuilder("gColdDataCompactionTaskThreadPool") + .set_min_threads(config::cold_data_compaction_thread_num) + .set_max_threads(config::cold_data_compaction_thread_num) + .set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr()) + .build(&_cold_data_compaction_thread_pool)); + } else { + RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(base_compaction_threads) + .set_max_threads(base_compaction_threads) + .build(&_base_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(cumu_compaction_threads) + .set_max_threads(cumu_compaction_threads) + .build(&_cumu_compaction_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool") + .set_min_threads(single_replica_compaction_threads) + .set_max_threads(single_replica_compaction_threads) + .build(&_single_replica_compaction_thread_pool)); + + if (config::enable_segcompaction) { + RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool") + .set_min_threads(config::segcompaction_num_threads) + .set_max_threads(config::segcompaction_num_threads) + .build(&_seg_compaction_thread_pool)); + } + RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool") + .set_min_threads(config::cold_data_compaction_thread_num) + .set_max_threads(config::cold_data_compaction_thread_num) + .build(&_cold_data_compaction_thread_pool)); } - RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool") - .set_min_threads(config::cold_data_compaction_thread_num) - .set_max_threads(config::cold_data_compaction_thread_num) - .build(&_cold_data_compaction_thread_pool)); // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 421c0eb352d..a2201589898 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -72,6 +72,7 @@ class ReportWorker; class CreateTabletRRIdxCache; struct DirInfo; class SnapshotManager; +class WorkloadGroup; using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>; using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>; @@ -105,7 +106,7 @@ public: virtual bool stopped() = 0; // start all background threads. This should be call after env is ready. - virtual Status start_bg_threads() = 0; + virtual Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) = 0; virtual Result<BaseTabletSPtr> get_tablet(int64_t tablet_id) = 0; @@ -278,7 +279,7 @@ public: return _default_rowset_type; } - Status start_bg_threads() override; + Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override; // clear trash and snapshot file // option: update disk usage after sweep diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index bdb5bec1776..3c1b08063df 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -43,7 +43,7 @@ namespace doris::pipeline { class TaskScheduler { public: - TaskScheduler(int core_num, std::string name, CgroupCpuCtl* cgroup_cpu_ctl) + TaskScheduler(int core_num, std::string name, std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl) : _task_queue(core_num), _shutdown(false), _name(std::move(name)), @@ -65,7 +65,7 @@ private: std::vector<bool> _markers; bool _shutdown; std::string _name; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl; void _do_work(int index); }; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 75ec588aa50..0f0b677bb1c 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -276,6 +276,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, _pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query RETURN_IF_ERROR(init_pipeline_task_scheduler()); _workload_group_manager = new WorkloadGroupMgr(); + _workload_group_manager->init_internal_workload_group(); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); _fragment_mgr = new FragmentMgr(this); _result_cache = new ResultCache(config::query_cache_max_size_mb, @@ -364,7 +365,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, return st; } _storage_engine->set_heartbeat_flags(this->heartbeat_flags()); - if (st = _storage_engine->start_bg_threads(); !st.ok()) { + WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg(); + if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) { LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st; return st; } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index c6a3c07adda..f62179273cf 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -50,7 +50,9 @@ const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; -WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) +WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info) : WorkloadGroup(wg_info, true) {} + +WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_query_thread_pool) : _id(tg_info.id), _name(tg_info.name), _version(tg_info.version), @@ -65,7 +67,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) _spill_low_watermark(tg_info.spill_low_watermark), _spill_high_watermark(tg_info.spill_high_watermark), _scan_bytes_per_second(tg_info.read_bytes_per_second), - _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) { + _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second), + _need_create_query_thread_pool(need_create_query_thread_pool) { std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list; for (const auto& data_dir : data_dir_list) { _scan_io_throttle_map[data_dir.path] = @@ -434,54 +437,60 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( .remote_read_bytes_per_second = remote_read_bytes_per_second}; } -void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) { - uint64_t tg_id = tg_info->id; - std::string tg_name = tg_info->name; - int cpu_hard_limit = tg_info->cpu_hard_limit; - uint64_t cpu_shares = tg_info->cpu_share; - bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit; - int scan_thread_num = tg_info->scan_thread_num; - int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num; - int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num; +std::weak_ptr<CgroupCpuCtl> WorkloadGroup::get_cgroup_cpu_ctl_wptr() { + std::shared_lock<std::shared_mutex> rlock(_task_sched_lock); + return _cgroup_cpu_ctl; +} +void WorkloadGroup::create_cgroup_cpu_ctl() { std::lock_guard<std::shared_mutex> wlock(_task_sched_lock); + create_cgroup_cpu_ctl_no_lock(); +} + +void WorkloadGroup::create_cgroup_cpu_ctl_no_lock() { if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) { - std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id); + std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(_id); if (cgroup_cpu_ctl) { Status ret = cgroup_cpu_ctl->init(); if (ret.ok()) { _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl); - LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id; + LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << _id; } else { - LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << tg_id + LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << _id << ", reason=" << ret.to_string(); } } else { - LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " << tg_id << " failed"; + LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl wg_id=" << _id << " failed"; } } +} - CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get(); - +void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, + std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_ptr) { + uint64_t wg_id = wg_info->id; + std::string wg_name = wg_info->name; + int scan_thread_num = wg_info->scan_thread_num; + int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num; + int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num; if (_task_sched == nullptr) { int32_t executors_size = config::pipeline_executor_size; if (executors_size <= 0) { executors_size = CpuInfo::num_cores(); } std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler = - std::make_unique<pipeline::TaskScheduler>(executors_size, "Pipe_" + tg_name, + std::make_unique<pipeline::TaskScheduler>(executors_size, "Pipe_" + wg_name, cg_cpu_ctl_ptr); Status ret = pipeline_task_scheduler->start(); if (ret.ok()) { _task_sched = std::move(pipeline_task_scheduler); } else { - LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << tg_id; + LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << wg_id; } } if (_scan_task_sched == nullptr) { std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler = - std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" + tg_name, + std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" + wg_name, cg_cpu_ctl_ptr); Status ret = scan_scheduler->start(config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_thread_num, @@ -489,7 +498,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e if (ret.ok()) { _scan_task_sched = std::move(scan_scheduler); } else { - LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << tg_id; + LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << wg_id; } } if (scan_thread_num > 0 && _scan_task_sched) { @@ -501,7 +510,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e int remote_scan_thread_queue_size = vectorized::ScannerScheduler::get_remote_scan_thread_queue_size(); std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler = - std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + tg_name, + std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + wg_name, cg_cpu_ctl_ptr); Status ret = remote_scan_scheduler->start(remote_max_thread_num, config::doris_scanner_min_thread_pool_thread_num, @@ -510,7 +519,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e _remote_scan_task_sched = std::move(remote_scan_scheduler); } else { LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start failed, gid=" - << tg_id; + << wg_id; } } if (max_remote_scan_thread_num >= min_remote_scan_thread_num && _remote_scan_task_sched) { @@ -532,7 +541,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e : std::min(num_disk * min_threads, num_cpus * config::wg_flush_thread_num_per_cpu); - std::string pool_name = "wg_flush_" + tg_name; + std::string pool_name = "wg_flush_" + wg_name; auto ret = ThreadPoolBuilder(pool_name) .set_min_threads(min_threads) .set_max_threads(max_threads) @@ -540,17 +549,24 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e .build(&thread_pool); if (!ret.ok()) { LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " failed, gid=" - << tg_id; + << wg_id; } else { _memtable_flush_pool = std::move(thread_pool); - LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << tg_id + LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << wg_id << ", max thread num=" << max_threads << ", min thread num=" << min_threads; } } } +} + +void WorkloadGroup::upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info) { + uint64_t wg_id = wg_info->id; + int cpu_hard_limit = wg_info->cpu_hard_limit; + uint64_t cpu_shares = wg_info->cpu_share; + bool enable_cpu_hard_limit = wg_info->enable_cpu_hard_limit; + create_cgroup_cpu_ctl_no_lock(); - // step 6: update cgroup cpu if needed if (_cgroup_cpu_ctl) { if (enable_cpu_hard_limit) { if (cpu_hard_limit > 0) { @@ -560,15 +576,24 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e } else { LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is " "illegal: " - << cpu_hard_limit << ", gid=" << tg_id; + << cpu_hard_limit << ", gid=" << wg_id; } } else { _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares); _cgroup_cpu_ctl->update_cpu_hard_limit( CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit } - _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares), - &(tg_info->cgroup_cpu_hard_limit)); + _cgroup_cpu_ctl->get_cgroup_cpu_info(&(wg_info->cgroup_cpu_shares), + &(wg_info->cgroup_cpu_hard_limit)); + } +} + +void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* wg_info) { + std::lock_guard<std::shared_mutex> wlock(_task_sched_lock); + upsert_cgroup_cpu_ctl_no_lock(wg_info); + + if (_need_create_query_thread_pool) { + upsert_thread_pool_no_lock(wg_info, _cgroup_cpu_ctl); } } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 2ba84ce982b..96b8a36df1c 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -58,6 +58,8 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> { public: explicit WorkloadGroup(const WorkloadGroupInfo& tg_info); + explicit WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_query_thread_pool); + int64_t version() const { return _version; } uint64_t cpu_share() const { return _cpu_share.load(); } @@ -165,7 +167,7 @@ public: int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc); - void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env); + void upsert_task_scheduler(WorkloadGroupInfo* tg_info); void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, vectorized::SimplifiedScanScheduler** scan_sched, @@ -198,18 +200,21 @@ public: } int64_t get_remote_scan_bytes_per_second(); - CgroupCpuCtl* get_cgroup_cpu_ctl_ptr() { - std::shared_lock<std::shared_mutex> rlock(_task_sched_lock); - return _cgroup_cpu_ctl.get(); - } - ThreadPool* get_memtable_flush_pool_ptr() { // no lock here because this is called by memtable flush, // to avoid lock competition with the workload thread pool's update return _memtable_flush_pool.get(); } + void create_cgroup_cpu_ctl(); + + std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr(); private: + void create_cgroup_cpu_ctl_no_lock(); + void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info); + void upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, + std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_ptr); + mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; std::string _name; @@ -240,7 +245,10 @@ private: std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs; std::shared_mutex _task_sched_lock; - std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr}; + // _cgroup_cpu_ctl not only used by threadpool which managed by WorkloadGroup, + // but also some global background threadpool which not owned by WorkloadGroup, + // so it should be shared ptr; + std::shared_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr}; std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr}; std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched {nullptr}; std::unique_ptr<vectorized::SimplifiedScanScheduler> _remote_scan_task_sched {nullptr}; @@ -249,6 +257,9 @@ private: std::map<std::string, std::shared_ptr<IOThrottle>> _scan_io_throttle_map; std::shared_ptr<IOThrottle> _remote_scan_io_throttle {nullptr}; + // for some background workload, it doesn't need to create query thread pool + const bool _need_create_query_thread_pool; + // bvar metric std::unique_ptr<bvar::Status<int64_t>> _mem_used_status; std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 927d4d13814..4d32fc8700e 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -34,6 +34,25 @@ namespace doris { +void WorkloadGroupMgr::init_internal_workload_group() { + WorkloadGroupPtr internal_wg = nullptr; + { + std::lock_guard<std::shared_mutex> w_lock(_group_mutex); + if (_workload_groups.find(INTERNAL_WORKLOAD_GROUP_ID) == _workload_groups.end()) { + WorkloadGroupInfo internal_wg_info { + .id = INTERNAL_WORKLOAD_GROUP_ID, + .name = INTERNAL_WORKLOAD_GROUP_NAME, + .cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value()}; + internal_wg = std::make_shared<WorkloadGroup>(internal_wg_info, false); + _workload_groups[internal_wg_info.id] = internal_wg; + } + } + DCHECK(internal_wg != nullptr); + if (internal_wg) { + internal_wg->create_cgroup_cpu_ctl(); + } +} + WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group( const WorkloadGroupInfo& workload_group_info) { { @@ -86,6 +105,10 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i old_wg_size = _workload_groups.size(); for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { uint64_t wg_id = iter->first; + // internal workload group created by BE can not be dropped + if (wg_id == INTERNAL_WORKLOAD_GROUP_ID) { + continue; + } auto workload_group_ptr = iter->second; if (used_wg_id.find(wg_id) == used_wg_id.end()) { workload_group_ptr->shutdown(); diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index f76e98d2606..18a0687b373 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -36,11 +36,18 @@ class TaskScheduler; class MultiCoreTaskQueue; } // namespace pipeline +// internal_group is used for doris internal workload, currently is mainly compaction +const static uint64_t INTERNAL_WORKLOAD_GROUP_ID = + static_cast<uint64_t>(TWorkloadType::type::INTERNAL); +const static std::string INTERNAL_WORKLOAD_GROUP_NAME = "_internal"; + class WorkloadGroupMgr { public: WorkloadGroupMgr() = default; ~WorkloadGroupMgr() = default; + void init_internal_workload_group(); + WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& workload_group_info); void get_related_workload_groups(const std::function<bool(const WorkloadGroupPtr& ptr)>& pred, @@ -64,6 +71,11 @@ public: void get_wg_resource_usage(vectorized::Block* block); + WorkloadGroupPtr get_internal_wg() { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID]; + } + private: std::shared_mutex _group_mutex; std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups; diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index 15fb36181d4..f5ea38515de 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -75,7 +75,8 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { return *this; } -ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl) { +ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl( + std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) { _cgroup_cpu_ctl = cgroup_cpu_ctl; return *this; } @@ -476,8 +477,8 @@ void ThreadPool::dispatch_thread() { _num_threads++; _num_threads_pending_start--; - if (_cgroup_cpu_ctl != nullptr) { - static_cast<void>(_cgroup_cpu_ctl->add_thread_to_cgroup()); + if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) { + static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup()); } // Owned by this worker thread and added/removed from _idle_threads as needed. diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index 5ce27e2f27b..9bd4a7246fb 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -107,7 +107,7 @@ public: ThreadPoolBuilder& set_min_threads(int min_threads); ThreadPoolBuilder& set_max_threads(int max_threads); ThreadPoolBuilder& set_max_queue_size(int max_queue_size); - ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl); + ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl); template <class Rep, class Period> ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) { _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout); @@ -133,7 +133,7 @@ private: int _min_threads; int _max_threads; int _max_queue_size; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl; std::chrono::milliseconds _idle_timeout; ThreadPoolBuilder(const ThreadPoolBuilder&) = delete; @@ -345,7 +345,7 @@ private: // Protected by _lock. int _total_queued_tasks; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl; // All allocated tokens. // diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 56c49368598..7731b3ba8f9 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -114,11 +114,8 @@ struct SimplifiedScanTask { class SimplifiedScanScheduler { public: - SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cgroup_cpu_ctl) { - _is_stop.store(false); - _cgroup_cpu_ctl = cgroup_cpu_ctl; - _sched_name = sched_name; - } + SimplifiedScanScheduler(std::string sched_name, std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl) + : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl), _sched_name(sched_name) {} ~SimplifiedScanScheduler() { stop(); @@ -217,7 +214,7 @@ public: private: std::unique_ptr<ThreadPool> _scan_thread_pool; std::atomic<bool> _is_stop; - CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl; std::string _sched_name; }; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 432ec1c54b5..c17b84b2dbe 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -107,12 +107,13 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi force_close(status); } - if (state && state->get_query_ctx()) { - WorkloadGroupPtr wg_ptr = state->get_query_ctx()->workload_group(); - if (wg_ptr && wg_ptr->get_cgroup_cpu_ctl_ptr()) { - Status ret = wg_ptr->get_cgroup_cpu_ctl_ptr()->add_thread_to_cgroup(); + if (state && state->get_query_ctx() && state->get_query_ctx()->workload_group()) { + if (auto cg_ctl_sptr = + state->get_query_ctx()->workload_group()->get_cgroup_cpu_ctl_wptr().lock()) { + Status ret = cg_ctl_sptr->add_thread_to_cgroup(); if (ret.ok()) { - std::string wg_tname = "asyc_wr_" + wg_ptr->name(); + std::string wg_tname = + "asyc_wr_" + state->get_query_ctx()->workload_group()->name(); Thread::set_self_name(wg_tname); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java index 8cba792dd39..4405da6ce13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java @@ -25,6 +25,10 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; + +import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -55,14 +59,26 @@ public class AlterWorkloadGroupStmt extends DdlStmt implements NotFallbackInPars } if (properties == null || properties.isEmpty()) { - throw new AnalysisException("Resource group properties can't be null"); + throw new AnalysisException("Workload Group properties can't be empty"); + } + + if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) { + throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can not be create or modified "); + } + + String tagStr = properties.get(WorkloadGroup.TAG); + if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName) + || WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) { + throw new AnalysisException( + WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + WorkloadGroupMgr.DEFAULT_GROUP_NAME + + " group can not set tag"); } } @Override public String toSql() { StringBuilder sb = new StringBuilder(); - sb.append("ALTER RESOURCE GROUP '").append(workloadGroupName).append("' "); + sb.append("ALTER WORKLOAD GROUP '").append(workloadGroupName).append("' "); sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")"); return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java index fc4f99046d5..dd13542a836 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java @@ -27,6 +27,9 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; + +import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -68,12 +71,19 @@ public class CreateWorkloadGroupStmt extends DdlStmt implements NotFallbackInPar FeNameFormat.checkWorkloadGroupName(workloadGroupName); if (properties == null || properties.isEmpty()) { - throw new AnalysisException("Resource group properties can't be null"); + throw new AnalysisException("Workload Group properties can't be empty"); + } + + if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) { + throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can not be create or modified "); } - String wgTag = properties.get(WorkloadGroup.TAG); - if (wgTag != null) { - FeNameFormat.checkCommonName("workload group tag", wgTag); + String tagStr = properties.get(WorkloadGroup.TAG); + if (!StringUtils.isEmpty(tagStr) && (WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName) + || WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) { + throw new AnalysisException( + WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + WorkloadGroupMgr.DEFAULT_GROUP_NAME + + " group can not set tag"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java index e4e3055f128..9356c6b5c4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java @@ -20,7 +20,6 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Env; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -50,8 +49,6 @@ public class DropWorkloadGroupStmt extends DdlStmt implements NotFallbackInParse if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); } - - FeNameFormat.checkWorkloadGroupName(workloadGroupName); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 12ac201eb65..7fb12b9621a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -250,6 +250,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.AdmissionControl; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.workloadgroup.CreateInternalWorkloadGroupThread; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; @@ -1874,6 +1875,7 @@ public class Env { WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this); topicPublisherThread.addToTopicPublisherList(wpPublisher); topicPublisherThread.start(); + new CreateInternalWorkloadGroupThread().start(); // auto analyze related threads. statisticsCleaner.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 2dc6a90d593..cfc9d85c6a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -244,10 +244,11 @@ public class InternalSchemaInitializer extends Thread { // statistics Env.getCurrentEnv().getInternalCatalog().createTable( buildStatisticsTblStmt(StatisticConstants.TABLE_STATISTIC_TBL_NAME, - Lists.newArrayList("id", "catalog_id", "db_id", "tbl_id", "idx_id", "col_id", "part_id"))); + Lists.newArrayList("id", "catalog_id", "db_id", "tbl_id", "idx_id", "col_id", "part_id"))); Env.getCurrentEnv().getInternalCatalog().createTable( buildStatisticsTblStmt(StatisticConstants.PARTITION_STATISTIC_TBL_NAME, - Lists.newArrayList("catalog_id", "db_id", "tbl_id", "idx_id", "part_name", "part_id", "col_id"))); + Lists.newArrayList("catalog_id", "db_id", "tbl_id", "idx_id", "part_name", "part_id", + "col_id"))); // audit table Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 1c24ca69d4f..214fbe0e410 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -50,6 +50,9 @@ public class FeConstants { // set to false to disable internal schema db public static boolean enableInternalSchemaDb = true; + // for UT, create internal workload group thread can not start + public static boolean shouldCreateInternalWorkloadGroup = true; + // default scheduler interval is 10 seconds public static int default_scheduler_interval_millisecond = 10000; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java new file mode 100644 index 00000000000..7c6d0e3a080 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadgroup; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeConstants; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CreateInternalWorkloadGroupThread extends Thread { + + private static final Logger LOG = LogManager.getLogger(CreateInternalWorkloadGroupThread.class); + + public CreateInternalWorkloadGroupThread() { + super("CreateInternalWorkloadGroupThread"); + } + + public void run() { + if (!FeConstants.shouldCreateInternalWorkloadGroup) { + return; + } + try { + Env env = Env.getCurrentEnv(); + while (!env.isReady()) { + Thread.sleep(5000); + } + if (!env.getWorkloadGroupMgr() + .isWorkloadGroupExists(WorkloadGroupMgr.INTERNAL_GROUP_NAME)) { + env.getWorkloadGroupMgr().createInternalWorkloadGroup(); + LOG.info("create internal workload group succ"); + } else { + LOG.info("internal workload group already exists."); + } + } catch (Throwable t) { + LOG.warn("create internal workload group failed. ", t); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 44fb98e10ef..7d5e792ef71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -30,8 +30,10 @@ import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TWorkloadGroupInfo; +import org.apache.doris.thrift.TWorkloadType; import org.apache.doris.thrift.TopicInfo; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; @@ -43,8 +45,11 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; public class WorkloadGroup implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(WorkloadGroup.class); @@ -79,6 +84,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String REMOTE_READ_BYTES_PER_SECOND = "remote_read_bytes_per_second"; + // it's used to define Doris's internal workload group, + // currently it is internal, only contains compaction + // later more type and workload may be included in the future. + public static final String INTERNAL_TYPE = "internal_type"; + // NOTE(wb): all property is not required, some properties default value is set in be // default value is as followed // cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true @@ -87,7 +97,10 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM) .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM) .add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK) - .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build(); + .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).add(INTERNAL_TYPE).build(); + + public static final ImmutableMap<String, Integer> WORKLOAD_TYPE_MAP = new ImmutableMap.Builder<String, Integer>() + .put(TWorkloadType.INTERNAL.toString().toLowerCase(), TWorkloadType.INTERNAL.getValue()).build(); public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; @@ -420,13 +433,31 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { String[] tagArr = tagStr.split(","); for (String tag : tagArr) { try { - FeNameFormat.checkCommonName("workload group tag name", tag); + FeNameFormat.checkCommonName("workload group tag", tag); } catch (AnalysisException e) { - throw new DdlException("workload group tag name format is illegal, " + tagStr); + throw new DdlException("tag format is illegal, " + tagStr); } } } + // internal workload group is usually created by Doris. + // If exception happens here, it means thrift not match WORKLOAD_TYPE_MAP. + String interTypeId = properties.get(WorkloadGroup.INTERNAL_TYPE); + if (!StringUtils.isEmpty(interTypeId)) { + int wid = Integer.valueOf(interTypeId); + if (TWorkloadType.findByValue(wid) == null) { + throw new DdlException("error internal type id: " + wid + ", current id map:" + WORKLOAD_TYPE_MAP); + } + } + + } + + Optional<Integer> getInternalTypeId() { + String typeIdStr = this.properties.get(INTERNAL_TYPE); + if (StringUtils.isEmpty(typeIdStr)) { + return Optional.empty(); + } + return Optional.of(Integer.valueOf(typeIdStr)); } public long getId() { @@ -535,8 +566,18 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { return cpuHardLimit; } - public String getTag() { - return properties.get(TAG); + public Optional<Set<String>> getTag() { + String tagStr = properties.get(TAG); + if (StringUtils.isEmpty(tagStr)) { + return Optional.empty(); + } + + Set<String> tagSet = new HashSet<>(); + String[] ss = tagStr.split(","); + for (String str : ss) { + tagSet.add(str); + } + return Optional.of(tagSet); } @Override @@ -550,7 +591,13 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public TopicInfo toTopicInfo() { TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo(); - tWorkloadGroupInfo.setId(id); + long wgId = this.id; + Optional<Integer> internalTypeId = getInternalTypeId(); + if (internalTypeId.isPresent()) { + wgId = internalTypeId.get(); + } + tWorkloadGroupInfo.setId(wgId); + tWorkloadGroupInfo.setName(name); tWorkloadGroupInfo.setVersion(version); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 8464d83bdbc..26798bb1ec3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -42,6 +42,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TUserIdentity; +import org.apache.doris.thrift.TWorkloadType; import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Strings; @@ -49,7 +50,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; -import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -62,6 +62,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -71,6 +72,12 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost public static final Long DEFAULT_GROUP_ID = 1L; + public static final String INTERNAL_GROUP_NAME = "_internal"; + + // internal_type_id could be converted to workload group id when Workload published to BE + // refer WorkloadGroup.toTopicInfo + public static final Long INTERNAL_TYPE_ID = Long.valueOf(TWorkloadType.INTERNAL.getValue()); + public static final ImmutableList<String> WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>() .add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT) .add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT) @@ -375,44 +382,84 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost LOG.info("Create workload group success: {}", workloadGroup); } + public void createInternalWorkloadGroup() { + Map<String, String> properties = Maps.newHashMap(); + // 100 is cgroup v2 default cpu_share value + properties.put(WorkloadGroup.CPU_SHARE, "100"); + properties.put(WorkloadGroup.INTERNAL_TYPE, String.valueOf(INTERNAL_TYPE_ID)); + WorkloadGroup wg = new WorkloadGroup(Env.getCurrentEnv().getNextId(), INTERNAL_GROUP_NAME, properties); + writeLock(); + try { + if (!nameToWorkloadGroup.containsKey(wg.getName())) { + nameToWorkloadGroup.put(wg.getName(), wg); + idToWorkloadGroup.put(wg.getId(), wg); + Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(wg); + } + } finally { + writeUnlock(); + } + } + // NOTE: used for checking sum value of 100% for cpu_hard_limit and memory_limit // when create/alter workload group with same tag. // when oldWg is null it means caller is an alter stmt. private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) throws DdlException { - String wgTag = newWg.getTag(); - double sumOfAllMemLimit = 0; - int sumOfAllCpuHardLimit = 0; - for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) { - WorkloadGroup wg = entry.getValue(); - if (!StringUtils.equals(wgTag, wg.getTag())) { - continue; - } + Optional<Set<String>> newWgTag = newWg.getTag(); + Set<String> newWgTagSet = null; + if (newWgTag.isPresent()) { + newWgTagSet = newWgTag.get(); + } else { + newWgTagSet = new HashSet<>(); + newWgTagSet.add(null); + } - if (oldWg != null && entry.getKey() == oldWg.getId()) { - continue; - } + for (String newWgOneTag : newWgTagSet) { + double sumOfAllMemLimit = 0; + int sumOfAllCpuHardLimit = 0; - if (wg.getCpuHardLimit() > 0) { - sumOfAllCpuHardLimit += wg.getCpuHardLimit(); - } - if (wg.getMemoryLimitPercent() > 0) { - sumOfAllMemLimit += wg.getMemoryLimitPercent(); + // 1 get sum value of all wg which has same tag without current wg + for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) { + WorkloadGroup wg = entry.getValue(); + Optional<Set<String>> wgTag = wg.getTag(); + + if (oldWg != null && entry.getKey() == oldWg.getId()) { + continue; + } + + if (newWgOneTag == null) { + if (wgTag.isPresent()) { + continue; + } + } else if (!wgTag.isPresent() || (!wgTag.get().contains(newWgOneTag))) { + continue; + } + + if (wg.getCpuHardLimit() > 0) { + sumOfAllCpuHardLimit += wg.getCpuHardLimit(); + } + if (wg.getMemoryLimitPercent() > 0) { + sumOfAllMemLimit += wg.getMemoryLimitPercent(); + } } - } - sumOfAllMemLimit += newWg.getMemoryLimitPercent(); - sumOfAllCpuHardLimit += newWg.getCpuHardLimit(); + // 2 sum current wg value + sumOfAllMemLimit += newWg.getMemoryLimitPercent(); + sumOfAllCpuHardLimit += newWg.getCpuHardLimit(); - if (sumOfAllMemLimit > 100.0 + 1e-6) { - throw new DdlException( - "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + wgTag - + " cannot be greater than 100.0%."); - } + // 3 check total sum + if (sumOfAllMemLimit > 100.0 + 1e-6) { + throw new DdlException( + "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + ( + newWgTag.isPresent() ? newWgTag.get() : "") + + " cannot be greater than 100.0%. current sum val:" + sumOfAllMemLimit); + } - if (sumOfAllCpuHardLimit > 100) { - throw new DdlException( - "sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag " - + wgTag + " can not be greater than 100% "); + if (sumOfAllCpuHardLimit > 100) { + throw new DdlException( + "sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag " + ( + newWgTag.isPresent() + ? newWgTag.get() : "") + " can not be greater than 100% "); + } } } @@ -446,8 +493,8 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws DdlException { String workloadGroupName = stmt.getWorkloadGroupName(); - if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) { - throw new DdlException("Dropping default workload group " + workloadGroupName + " is not allowed"); + if (DEFAULT_GROUP_NAME.equals(workloadGroupName) || INTERNAL_GROUP_NAME.equals(workloadGroupName)) { + throw new DdlException("Dropping workload group " + workloadGroupName + " is not allowed"); } // if a workload group exists in user property, it should not be dropped diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java index 5f1e3565966..d729881358e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java @@ -235,4 +235,226 @@ public class WorkloadGroupMgrTest { } Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() == 5); } + + @Test + public void testMultiTagCreateWorkloadGroup() throws UserException { + Config.enable_workload_group = true; + WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); + + { + String name = "empty_g1"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "50%"); + properties.put(WorkloadGroup.TAG, ""); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "empty_g2"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "10%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g1"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn1,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g2"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn3,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + + { + String name = "not_empty_g3"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn2,cn100"); + try { + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + + { + String name = "not_empty_g3"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn3,cn100"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g5"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn5"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g6"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn5"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g7"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn5"); + try { + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + + } + + + @Test + public void testMultiTagAlterWorkloadGroup() throws UserException { + Config.enable_workload_group = true; + WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); + { + String name = "empty_g1"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "50%"); + properties.put(WorkloadGroup.TAG, ""); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "empty_g2"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "10%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g1"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn1,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g2"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn3,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g3"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn2,cn100"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g3"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + properties.put(WorkloadGroup.TAG, "cn2,cn100"); + AlterWorkloadGroupStmt alterStmt = new AlterWorkloadGroupStmt(name, properties); + try { + workloadGroupMgr.alterWorkloadGroup(alterStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + } + + + @Test + public void testMultiTagCreateWorkloadGroupWithNoTag() throws UserException { + Config.enable_workload_group = true; + WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); + + { + String name = "not_empty_g1"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn1,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "not_empty_g2"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + properties.put(WorkloadGroup.TAG, "cn3,cn2"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + // create not tag workload group + { + String name = "no_tag_g1"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "10%"); + properties.put(WorkloadGroup.TAG, ""); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "no_tag_g2"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + + { + String name = "no_tag_g3"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "70%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + try { + workloadGroupMgr.createWorkloadGroup(createStmt); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT)); + } + } + + { + String name = "no_tag_g3"; + Map<String, String> properties = Maps.newHashMap(); + properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); + CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); + workloadGroupMgr.createWorkloadGroup(createStmt); + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 8e25efdfada..70adbbd7f99 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -153,6 +153,7 @@ public abstract class TestWithFeService { @BeforeAll public final void beforeAll() throws Exception { FeConstants.enableInternalSchemaDb = false; + FeConstants.shouldCreateInternalWorkloadGroup = false; beforeCreatingConnectContext(); connectContext = createDefaultCtx(); beforeCluster(); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index ed0ae243a1d..533999a853f 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -327,6 +327,10 @@ struct TPublishTopicResult { 1: required Status.TStatus status } +enum TWorkloadType { + INTERNAL = 2 +} + struct TGetRealtimeExecStatusRequest { // maybe query id or other unique id 1: optional Types.TUniqueId id diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 41cc190a017..7807578ea81 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -176,6 +176,30 @@ suite("test_crud_wlg") { exception "can not be greater than 100%" } + // test alter tag and type + test { + sql "alter workload group test_group properties ( 'internal_type'='13' );" + + exception "internal_type can not be create or modified" + } + + test { + sql "create workload group inter_wg properties('internal_type'='123');" + exception "internal_type can not be create or modified" + } + + test { + sql "alter workload group normal properties ('tag'='123')" + + exception "_internal and normal group can not set tag" + } + + test { + sql "alter workload group _internal properties ('tag'='123')" + + exception "_internal and normal group can not set tag" + } + sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' );" qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """ qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from information_schema.workload_groups where name in ('normal','test_group') order by name;" @@ -492,6 +516,11 @@ suite("test_crud_wlg") { // test workload group's tag property, cpu_hard_limit + test { + sql "create workload group tag_test properties('tag'=' a, b , c ');" + exception "tag format is illegal" + } + test { sql "create workload group if not exists tag1_wg1 properties ( 'cpu_hard_limit'='101%', 'tag'='tag1')" exception "must be a positive integer" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org