This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2b1ec70afd0 [Feature](executor)Workload Group support Non-Pipeline Execution (#30164) 2b1ec70afd0 is described below commit 2b1ec70afd045ed421fcb346c1597d3e4931840b Author: wangbo <wan...@apache.org> AuthorDate: Mon Jan 22 20:25:28 2024 +0800 [Feature](executor)Workload Group support Non-Pipeline Execution (#30164) --- be/src/agent/workload_group_listener.cpp | 6 ++- be/src/common/config.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 7 +-- .../pipeline_x/pipeline_x_fragment_context.cpp | 5 +- be/src/runtime/fragment_mgr.cpp | 61 +++++++++------------- be/src/runtime/query_context.cpp | 25 +++++++++ be/src/runtime/query_context.h | 13 +++-- be/src/runtime/task_group/task_group_manager.cpp | 57 ++++++++++++++++---- be/src/runtime/task_group/task_group_manager.h | 9 ++-- .../workload_sched_policy_mgr.cpp | 1 - be/src/vec/exec/scan/scanner_context.h | 2 +- .../main/java/org/apache/doris/qe/Coordinator.java | 3 ++ .../java/org/apache/doris/qe/StmtExecutor.java | 2 +- gensrc/thrift/PaloInternalService.thrift | 16 +++--- 14 files changed, 132 insertions(+), 77 deletions(-) diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index 6ea7c28669c..237d6c77274 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -26,6 +26,7 @@ namespace doris { void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) { std::set<uint64_t> current_wg_ids; + bool is_set_cgroup_path = config::doris_cgroup_cpu_path != ""; for (const TopicInfo& topic_info : topic_info_list) { if (!topic_info.__isset.workload_group_info) { continue; @@ -52,7 +53,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi // 4 create and update task scheduler Status ret2 = _exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info, _exec_env); - if (!ret2.ok()) { + if (is_set_cgroup_path && !ret2.ok()) { LOG(INFO) << "upsert task sche failed, tg_id=" << task_group_info.id << ", reason=" << ret2.to_string(); } @@ -63,7 +64,8 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi << ", cgroup cpu_shares=" << task_group_info.cgroup_cpu_shares << ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit << ", enable_cgroup_cpu_soft_limit=" - << (config::enable_cgroup_cpu_soft_limit ? "true" : "false"); + << (config::enable_cgroup_cpu_soft_limit ? "true" : "false") + << ", is set cgroup path=" << (is_set_cgroup_path ? "true" : "flase"); } _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5468ac29a02..271ad72410d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1133,7 +1133,7 @@ DEFINE_Bool(enable_flush_file_cache_async, "true"); // cgroup DEFINE_mString(doris_cgroup_cpu_path, ""); -DEFINE_mBool(enable_cgroup_cpu_soft_limit, "false"); +DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true"); DEFINE_Bool(ignore_always_true_predicate_for_segment, "true"); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 538a2ce1bdb..909039b23fb 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -718,12 +718,7 @@ Status PipelineFragmentContext::submit() { int submit_tasks = 0; Status st; - auto* scheduler = _exec_env->pipeline_task_scheduler(); - if (_query_ctx->get_task_scheduler()) { - scheduler = _query_ctx->get_task_scheduler(); - } else if (_task_group_entity && _query_ctx->use_task_group_for_cpu_limit.load()) { - scheduler = _exec_env->pipeline_task_group_scheduler(); - } + auto* scheduler = _query_ctx->get_pipe_exec_scheduler(); for (auto& task : _tasks) { st = scheduler->schedule_task(task.get()); if (!st) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index a44db667450..4a16b97b2f3 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1219,10 +1219,7 @@ Status PipelineXFragmentContext::submit() { int submit_tasks = 0; Status st; - auto* scheduler = _exec_env->pipeline_task_scheduler(); - if (_task_group_entity) { - scheduler = _exec_env->pipeline_task_group_scheduler(); - } + auto* scheduler = _query_ctx->get_pipe_exec_scheduler(); for (auto& task : _tasks) { for (auto& t : task) { st = scheduler->schedule_task(t.get()); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 0fbaacb4fa2..a76c7687f02 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -677,41 +677,27 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo query_ctx->register_memory_statistics(); query_ctx->register_cpu_statistics(); + bool is_pipeline = false; if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) { - if (params.__isset.workload_groups && !params.workload_groups.empty()) { - uint64_t tg_id = params.workload_groups[0].id; - auto* tg_mgr = _exec_env->task_group_manager(); - if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) { - task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker); - // set task group to queryctx for memory tracker can be removed, see QueryContext's destructor - query_ctx->set_task_group(task_group_ptr); - stringstream ss; - ss << "Query/load id: " << print_id(query_ctx->query_id()) - << ", use task group:" << task_group_ptr->debug_string() - << ", enable cpu hard limit:" - << (tg_mgr->enable_cpu_hard_limit() ? "true" : "false"); - bool ret = false; - if (tg_mgr->enable_cgroup()) { - ret = tg_mgr->set_cg_task_sche_for_query_ctx(tg_id, query_ctx.get()); - if (ret) { - ss << ", use cgroup for cpu limit."; - } else { - ss << ", not found cgroup sche, no limit for cpu."; - } - } else { - ss << ", use doris sche for cpu limit."; - query_ctx->use_task_group_for_cpu_limit.store(true); - } - LOG(INFO) << ss.str(); - _exec_env->runtime_query_statistics_mgr()->set_workload_group_id( - print_id(query_id), tg_id); - } else { - VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id()) - << " no task group found, does not use task group."; - } - } else { - VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id()) - << " does not use task group."; + is_pipeline = true; + } + + if (params.__isset.workload_groups && !params.workload_groups.empty()) { + uint64_t tg_id = params.workload_groups[0].id; + auto* tg_mgr = _exec_env->task_group_manager(); + if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) { + task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker); + // set task group to queryctx for memory tracker can be removed, see QueryContext's destructor + query_ctx->set_task_group(task_group_ptr); + _exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id), + tg_id); + query_ctx->set_query_scheduler(tg_id); + + LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) + << ", use task group: " << task_group_ptr->debug_string() + << ", is pipeline: " << ((int)is_pipeline) + << ", enable cgroup soft limit: " + << ((int)config::enable_cgroup_cpu_soft_limit); } } @@ -795,7 +781,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, std::make_pair(params.params.fragment_instance_id, fragment_executor)); _cv.notify_all(); } - auto st = _thread_pool->submit_func( + + auto* current_thread_pool = query_ctx->get_non_pipe_exec_thread_pool(); + if (!current_thread_pool) { + current_thread_pool = _thread_pool.get(); + } + auto st = current_thread_pool->submit_func( [this, fragment_executor, cb] { _exec_actual(fragment_executor, cb); }); if (!st.ok()) { { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index fffb5ad57a9..a70bf6695ac 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -20,6 +20,7 @@ #include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_x/dependency.h" #include "runtime/runtime_query_statistics_mgr.h" +#include "runtime/task_group/task_group_manager.h" namespace doris { @@ -152,4 +153,28 @@ void QueryContext::register_cpu_statistics() { } } +void QueryContext::set_query_scheduler(uint64_t tg_id) { + auto* tg_mgr = _exec_env->task_group_manager(); + tg_mgr->get_query_scheduler(tg_id, &_task_scheduler, &_scan_task_scheduler, + &_non_pipe_thread_pool); +} + +doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() { + if (!config::enable_cgroup_cpu_soft_limit) { + return _exec_env->pipeline_task_group_scheduler(); + } else if (_task_scheduler) { + return _task_scheduler; + } else { + return _exec_env->pipeline_task_scheduler(); + } +} + +ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() { + if (_task_group) { + return _non_pipe_thread_pool; + } else { + return nullptr; + } +} + } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index dd24206c415..d5a8f12cee1 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -218,6 +218,12 @@ public: std::shared_ptr<QueryStatistics> get_cpu_statistics() { return _cpu_statistics; } + void set_query_scheduler(uint64_t wg_id); + + doris::pipeline::TaskScheduler* get_pipe_exec_scheduler(); + + ThreadPool* get_non_pipe_exec_thread_pool(); + public: DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; @@ -247,8 +253,6 @@ public: // only for file scan node std::map<int, TFileScanRangeParams> file_scan_range_params_map; - std::atomic<bool> use_task_group_for_cpu_limit = false; - private: TUniqueId _query_id; ExecEnv* _exec_env = nullptr; @@ -272,7 +276,7 @@ private: std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller; vectorized::RuntimePredicate _runtime_predicate; - taskgroup::TaskGroupPtr _task_group; + taskgroup::TaskGroupPtr _task_group = nullptr; std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr; const TQueryOptions _query_options; @@ -281,8 +285,9 @@ private: // to report the real message if failed. Status _exec_status = Status::OK(); - pipeline::TaskScheduler* _task_scheduler = nullptr; + doris::pipeline::TaskScheduler* _task_scheduler = nullptr; vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr; + ThreadPool* _non_pipe_thread_pool = nullptr; std::unique_ptr<pipeline::Dependency> _execution_dependency; std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr; diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index cbd7b5f73f6..74694baa9fc 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -67,18 +67,25 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) { return nullptr; } -bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr) { - std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock); +void TaskGroupManager::get_query_scheduler(uint64_t tg_id, + doris::pipeline::TaskScheduler** exec_sched, + vectorized::SimplifiedScanScheduler** scan_sched, + ThreadPool** non_pipe_thread_pool) { + std::shared_lock<std::shared_mutex> r_lock(_task_scheduler_lock); auto tg_sche_it = _tg_sche_map.find(tg_id); if (tg_sche_it != _tg_sche_map.end()) { - query_ctx_ptr->set_task_scheduler(tg_sche_it->second.get()); - auto _tg_scan_sche_it = _tg_scan_sche_map.find(tg_id); - if (_tg_scan_sche_it != _tg_scan_sche_map.end()) { - query_ctx_ptr->set_scan_task_scheduler(_tg_scan_sche_it->second.get()); - return true; - } + *exec_sched = tg_sche_it->second.get(); + } + + auto tg_scan_sche_it = _tg_scan_sche_map.find(tg_id); + if (tg_scan_sche_it != _tg_scan_sche_map.end()) { + *scan_sched = tg_scan_sche_it->second.get(); + } + + auto non_pipe_thread_pool_iter = _non_pipe_thread_pool_map.find(tg_id); + if (non_pipe_thread_pool_iter != _non_pipe_thread_pool_map.end()) { + *non_pipe_thread_pool = non_pipe_thread_pool_iter->second.get(); } - return false; } Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, @@ -135,7 +142,23 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i } } - // step 4 update cgroup cpu if needed + // step 4: init non-pipe scheduler + if (_non_pipe_thread_pool_map.find(tg_id) == _non_pipe_thread_pool_map.end()) { + std::unique_ptr<ThreadPool> thread_pool = nullptr; + auto ret = ThreadPoolBuilder("nonPip_" + tg_name) + .set_min_threads(1) + .set_max_threads(config::fragment_pool_thread_num_max) + .set_max_queue_size(config::fragment_pool_queue_size) + .set_cgroup_cpu_ctl(cg_cu_ctl_ptr) + .build(&thread_pool); + if (!ret.ok()) { + LOG(INFO) << "create non-pipline thread pool failed"; + } else { + _non_pipe_thread_pool_map.emplace(tg_id, std::move(thread_pool)); + } + } + + // step 5: update cgroup cpu if needed if (enable_cpu_hard_limit) { if (cpu_hard_limit > 0) { _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit); @@ -160,6 +183,7 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) { // stop task sche may cost some time, so it should not be locked std::set<doris::pipeline::TaskScheduler*> task_sche_to_del; std::set<vectorized::SimplifiedScanScheduler*> scan_task_sche_to_del; + std::set<ThreadPool*> non_pip_thread_pool_to_del; std::set<uint64_t> deleted_tg_ids; { std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock); @@ -177,6 +201,13 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) { scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get()); } } + for (auto iter = _non_pipe_thread_pool_map.begin(); iter != _non_pipe_thread_pool_map.end(); + iter++) { + uint64_t tg_id = iter->first; + if (used_wg_id.find(tg_id) == used_wg_id.end()) { + non_pip_thread_pool_to_del.insert(_non_pipe_thread_pool_map[tg_id].get()); + } + } } // 1 stop all threads for (auto* ptr1 : task_sche_to_del) { @@ -185,6 +216,9 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) { for (auto* ptr2 : scan_task_sche_to_del) { ptr2->stop(); } + for (auto& ptr3 : non_pip_thread_pool_to_del) { + ptr3->shutdown(); + } // 2 release resource in memory { std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock); @@ -242,6 +276,9 @@ void TaskGroupManager::stop() { for (auto& task_sche : _tg_scan_sche_map) { task_sche.second->stop(); } + for (auto& no_pip_sche : _non_pipe_thread_pool_map) { + no_pip_sche.second->shutdown(); + } } } // namespace doris::taskgroup diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 08968b6fe99..a7ccb52f00e 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -65,11 +65,9 @@ public: bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); } - bool set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr); - - // currently cgroup both support cpu soft limit and cpu hard limit - // doris task group only support cpu soft limit - bool enable_cgroup() { return enable_cpu_hard_limit() || config::enable_cgroup_cpu_soft_limit; } + void get_query_scheduler(uint64_t tg_id, doris::pipeline::TaskScheduler** exec_sched, + vectorized::SimplifiedScanScheduler** scan_sched, + ThreadPool** non_pipe_thread_pool); private: std::shared_mutex _group_mutex; @@ -81,6 +79,7 @@ private: std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>> _tg_sche_map; std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>> _tg_scan_sche_map; std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map; + std::map<uint64_t, std::unique_ptr<ThreadPool>> _non_pipe_thread_pool_map; std::shared_mutex _init_cg_ctl_lock; std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl; diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp index 2398bff465c..731dd0c8661 100644 --- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp @@ -81,7 +81,6 @@ void WorkloadSchedPolicyMgr::_schedule_workload() { if (list.size() == 0) { continue; } - LOG(INFO) << "[workload_schedule] get query list size=" << list.size(); for (int i = 0; i < list.size(); i++) { WorkloadQueryInfo* query_info_ptr = &(list[i]); diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 1dd9966b162..28aec83d6a2 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -253,7 +253,7 @@ protected: const int64_t _max_bytes_in_queue; doris::vectorized::ScannerScheduler* _scanner_scheduler; - SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; // used for cpu hard limit + SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; // List "scanners" saves all "unfinished" scanners. // The scanner scheduler will pop scanners from this list, run scanner, // and then if the scanner is not finished, will be pushed back to this list. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 586a27ee103..8581980da2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3639,6 +3639,9 @@ public class Coordinator implements CoordInterface { params.params.setPerNodeScanRanges(scanRanges); params.params.setPerExchNumSenders(perExchNumSenders); + if (tWorkloadGroups != null) { + params.setWorkloadGroups(tWorkloadGroups); + } params.params.setDestinations(destinations); params.params.setSenderId(i); params.params.setNumSenders(instanceExecParams.size()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 767d03f43d7..8b72ce3eb2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1519,7 +1519,7 @@ public class StmtExecutor { coordBase = new PointQueryExec(planner, analyzer); } else { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); - if (Config.enable_workload_group && context.sessionVariable.getEnablePipelineEngine()) { + if (Config.enable_workload_group) { coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context)); } else { context.setWorkloadGroupName(""); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index fb9a1888f6c..10f70bd7f86 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -400,6 +400,13 @@ struct TGlobalDict { 2: optional map<i32, i32> slot_dicts // map from slot id to column dict id, because 2 or more column may share the dict } +struct TPipelineWorkloadGroup { + 1: optional i64 id + 2: optional string name + 3: optional map<string, string> properties + 4: optional i64 version +} + // ExecPlanFragment struct TExecPlanFragmentParams { 1: required PaloInternalServiceVersion protocol_version @@ -483,6 +490,8 @@ struct TExecPlanFragmentParams { 29: optional i64 content_length + 30: optional list<TPipelineWorkloadGroup> workload_groups + // For cloud 1000: optional bool is_mow_table; } @@ -670,13 +679,6 @@ struct TPipelineInstanceParams { 7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans } -struct TPipelineWorkloadGroup { - 1: optional i64 id - 2: optional string name - 3: optional map<string, string> properties - 4: optional i64 version -} - // ExecPlanFragment struct TPipelineFragmentParams { 1: required PaloInternalServiceVersion protocol_version --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org