This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bb0c62696f0 [Refactor](pipeline) Remove some unused code in pipeline query engine (#42929) bb0c62696f0 is described below commit bb0c62696f0fc7b150af1cc500d96f6de01dc8ce Author: HappenLee <happen...@hotmail.com> AuthorDate: Fri Nov 1 19:58:28 2024 +0800 [Refactor](pipeline) Remove some unused code in pipeline query engine (#42929) 1. change some api unify 2. remove some unused code --- be/src/pipeline/dependency.cpp | 15 +++------------ be/src/pipeline/dependency.h | 12 ++++-------- be/src/pipeline/pipeline.h | 7 ++++--- be/src/pipeline/pipeline_fragment_context.cpp | 1 - be/src/pipeline/task_scheduler.h | 7 ++----- be/src/runtime/exec_env_init.cpp | 2 +- be/src/runtime/workload_group/workload_group.cpp | 4 ++-- 7 files changed, 16 insertions(+), 32 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 8d82c340e2d..5fef018423d 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -34,13 +34,14 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" Dependency* BasicSharedState::create_source_dependency(int operator_id, int node_id, - std::string name) { + const std::string& name) { source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY")); source_deps.back()->set_shared_state(this); return source_deps.back().get(); } -Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, std::string name) { +Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, + const std::string& name) { sink_deps.push_back(std::make_shared<Dependency>(dest_id, node_id, name + "_DEPENDENCY", true)); sink_deps.back()->set_shared_state(this); return sink_deps.back().get(); @@ -105,16 +106,6 @@ std::string RuntimeFilterDependency::debug_string(int indentation_level) { return fmt::to_string(debug_string_buffer); } -Dependency* RuntimeFilterDependency::is_blocked_by(PipelineTask* task) { - std::unique_lock<std::mutex> lc(_task_lock); - auto ready = _ready.load(); - if (!ready && task) { - _add_block_task(task); - task->_blocked_dep = this; - } - return ready ? nullptr : this; -} - void RuntimeFilterTimer::call_timeout() { _parent->set_ready(); } diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index c6100fe0d8b..4cc3aceaeeb 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -17,6 +17,7 @@ #pragma once +#include <concurrentqueue.h> #include <sqltypes.h> #include <atomic> @@ -27,7 +28,6 @@ #include <utility> #include "common/logging.h" -#include "concurrentqueue.h" #include "gutil/integral_types.h" #include "pipeline/common/agg_utils.h" #include "pipeline/common/join_utils.h" @@ -81,17 +81,15 @@ struct BasicSharedState { virtual ~BasicSharedState() = default; - Dependency* create_source_dependency(int operator_id, int node_id, std::string name); + Dependency* create_source_dependency(int operator_id, int node_id, const std::string& name); - Dependency* create_sink_dependency(int dest_id, int node_id, std::string name); + Dependency* create_sink_dependency(int dest_id, int node_id, const std::string& name); }; class Dependency : public std::enable_shared_from_this<Dependency> { public: ENABLE_FACTORY_CREATOR(Dependency); - Dependency(int id, int node_id, std::string name) - : _id(id), _node_id(node_id), _name(std::move(name)), _ready(false) {} - Dependency(int id, int node_id, std::string name, bool ready) + Dependency(int id, int node_id, std::string name, bool ready = false) : _id(id), _node_id(node_id), _name(std::move(name)), _ready(ready) {} virtual ~Dependency() = default; @@ -278,8 +276,6 @@ public: : Dependency(id, node_id, name), _runtime_filter(runtime_filter) {} std::string debug_string(int indentation_level = 0) override; - Dependency* is_blocked_by(PipelineTask* task) override; - private: const IRuntimeFilter* _runtime_filter = nullptr; }; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 98e52ec5271..b969186b178 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -44,8 +44,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> { friend class PipelineFragmentContext; public: - explicit Pipeline(PipelineId pipeline_id, int num_tasks, - std::weak_ptr<PipelineFragmentContext> context, int num_tasks_of_parent) + explicit Pipeline(PipelineId pipeline_id, int num_tasks, int num_tasks_of_parent) : _pipeline_id(pipeline_id), _num_tasks(num_tasks), _num_tasks_of_parent(num_tasks_of_parent) { @@ -86,7 +85,9 @@ public: std::vector<std::shared_ptr<Pipeline>>& children() { return _children; } void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); } - void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; } + void set_children(std::vector<std::shared_ptr<Pipeline>> children) { + _children = std::move(children); + } void incr_created_tasks(int i, PipelineTask* task) { _num_tasks_created++; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index bd45016adf5..76d9c347c38 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -215,7 +215,6 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { PipelineId id = _next_pipeline_id++; auto pipeline = std::make_shared<Pipeline>( id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances, - std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()), parent ? parent->num_tasks() : _num_instances); if (idx >= 0) { _pipelines.insert(_pipelines.begin() + idx, pipeline); diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 6fc6ad8d6f2..4caceca20d4 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -36,17 +36,14 @@ namespace doris { class ExecEnv; class ThreadPool; - -namespace pipeline { -class TaskQueue; -} // namespace pipeline } // namespace doris namespace doris::pipeline { +class TaskQueue; class TaskScheduler { public: - TaskScheduler(ExecEnv* exec_env, std::shared_ptr<TaskQueue> task_queue, std::string name, + TaskScheduler(std::shared_ptr<TaskQueue> task_queue, std::string name, CgroupCpuCtl* cgroup_cpu_ctl) : _task_queue(std::move(task_queue)), _shutdown(false), diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 3d8affade82..3cddcd60b8b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -383,7 +383,7 @@ Status ExecEnv::init_pipeline_task_scheduler() { // TODO pipeline workload group combie two blocked schedulers. auto t_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size); _without_group_task_scheduler = - new pipeline::TaskScheduler(this, t_queue, "PipeNoGSchePool", nullptr); + new pipeline::TaskScheduler(t_queue, "PipeNoGSchePool", nullptr); RETURN_IF_ERROR(_without_group_task_scheduler->start()); _runtime_filter_timer_queue = new doris::pipeline::RuntimeFilterTimerQueue(); diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 0488e9ec83c..84016132da9 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -470,8 +470,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e } auto task_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size); std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler = - std::make_unique<pipeline::TaskScheduler>(exec_env, std::move(task_queue), - "Pipe_" + tg_name, cg_cpu_ctl_ptr); + std::make_unique<pipeline::TaskScheduler>(std::move(task_queue), "Pipe_" + tg_name, + cg_cpu_ctl_ptr); Status ret = pipeline_task_scheduler->start(); if (ret.ok()) { _task_sched = std::move(pipeline_task_scheduler); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org