This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bb0c62696f0 [Refactor](pipeline) Remove some unused code in pipeline 
query engine (#42929)
bb0c62696f0 is described below

commit bb0c62696f0fc7b150af1cc500d96f6de01dc8ce
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Fri Nov 1 19:58:28 2024 +0800

    [Refactor](pipeline) Remove some unused code in pipeline query engine 
(#42929)
    
    1. change some api unify
    2. remove some unused code
---
 be/src/pipeline/dependency.cpp                   | 15 +++------------
 be/src/pipeline/dependency.h                     | 12 ++++--------
 be/src/pipeline/pipeline.h                       |  7 ++++---
 be/src/pipeline/pipeline_fragment_context.cpp    |  1 -
 be/src/pipeline/task_scheduler.h                 |  7 ++-----
 be/src/runtime/exec_env_init.cpp                 |  2 +-
 be/src/runtime/workload_group/workload_group.cpp |  4 ++--
 7 files changed, 16 insertions(+), 32 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 8d82c340e2d..5fef018423d 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -34,13 +34,14 @@
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
 Dependency* BasicSharedState::create_source_dependency(int operator_id, int 
node_id,
-                                                       std::string name) {
+                                                       const std::string& 
name) {
     source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id, 
name + "_DEPENDENCY"));
     source_deps.back()->set_shared_state(this);
     return source_deps.back().get();
 }
 
-Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, 
std::string name) {
+Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id,
+                                                     const std::string& name) {
     sink_deps.push_back(std::make_shared<Dependency>(dest_id, node_id, name + 
"_DEPENDENCY", true));
     sink_deps.back()->set_shared_state(this);
     return sink_deps.back().get();
@@ -105,16 +106,6 @@ std::string RuntimeFilterDependency::debug_string(int 
indentation_level) {
     return fmt::to_string(debug_string_buffer);
 }
 
-Dependency* RuntimeFilterDependency::is_blocked_by(PipelineTask* task) {
-    std::unique_lock<std::mutex> lc(_task_lock);
-    auto ready = _ready.load();
-    if (!ready && task) {
-        _add_block_task(task);
-        task->_blocked_dep = this;
-    }
-    return ready ? nullptr : this;
-}
-
 void RuntimeFilterTimer::call_timeout() {
     _parent->set_ready();
 }
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index c6100fe0d8b..4cc3aceaeeb 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <concurrentqueue.h>
 #include <sqltypes.h>
 
 #include <atomic>
@@ -27,7 +28,6 @@
 #include <utility>
 
 #include "common/logging.h"
-#include "concurrentqueue.h"
 #include "gutil/integral_types.h"
 #include "pipeline/common/agg_utils.h"
 #include "pipeline/common/join_utils.h"
@@ -81,17 +81,15 @@ struct BasicSharedState {
 
     virtual ~BasicSharedState() = default;
 
-    Dependency* create_source_dependency(int operator_id, int node_id, 
std::string name);
+    Dependency* create_source_dependency(int operator_id, int node_id, const 
std::string& name);
 
-    Dependency* create_sink_dependency(int dest_id, int node_id, std::string 
name);
+    Dependency* create_sink_dependency(int dest_id, int node_id, const 
std::string& name);
 };
 
 class Dependency : public std::enable_shared_from_this<Dependency> {
 public:
     ENABLE_FACTORY_CREATOR(Dependency);
-    Dependency(int id, int node_id, std::string name)
-            : _id(id), _node_id(node_id), _name(std::move(name)), 
_ready(false) {}
-    Dependency(int id, int node_id, std::string name, bool ready)
+    Dependency(int id, int node_id, std::string name, bool ready = false)
             : _id(id), _node_id(node_id), _name(std::move(name)), 
_ready(ready) {}
     virtual ~Dependency() = default;
 
@@ -278,8 +276,6 @@ public:
             : Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
     std::string debug_string(int indentation_level = 0) override;
 
-    Dependency* is_blocked_by(PipelineTask* task) override;
-
 private:
     const IRuntimeFilter* _runtime_filter = nullptr;
 };
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 98e52ec5271..b969186b178 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -44,8 +44,7 @@ class Pipeline : public 
std::enable_shared_from_this<Pipeline> {
     friend class PipelineFragmentContext;
 
 public:
-    explicit Pipeline(PipelineId pipeline_id, int num_tasks,
-                      std::weak_ptr<PipelineFragmentContext> context, int 
num_tasks_of_parent)
+    explicit Pipeline(PipelineId pipeline_id, int num_tasks, int 
num_tasks_of_parent)
             : _pipeline_id(pipeline_id),
               _num_tasks(num_tasks),
               _num_tasks_of_parent(num_tasks_of_parent) {
@@ -86,7 +85,9 @@ public:
 
     std::vector<std::shared_ptr<Pipeline>>& children() { return _children; }
     void set_children(std::shared_ptr<Pipeline> child) { 
_children.push_back(child); }
-    void set_children(std::vector<std::shared_ptr<Pipeline>> children) { 
_children = children; }
+    void set_children(std::vector<std::shared_ptr<Pipeline>> children) {
+        _children = std::move(children);
+    }
 
     void incr_created_tasks(int i, PipelineTask* task) {
         _num_tasks_created++;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index bd45016adf5..76d9c347c38 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -215,7 +215,6 @@ PipelinePtr 
PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
     PipelineId id = _next_pipeline_id++;
     auto pipeline = std::make_shared<Pipeline>(
             id, parent ? std::min(parent->num_tasks(), _num_instances) : 
_num_instances,
-            
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
             parent ? parent->num_tasks() : _num_instances);
     if (idx >= 0) {
         _pipelines.insert(_pipelines.begin() + idx, pipeline);
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 6fc6ad8d6f2..4caceca20d4 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -36,17 +36,14 @@
 namespace doris {
 class ExecEnv;
 class ThreadPool;
-
-namespace pipeline {
-class TaskQueue;
-} // namespace pipeline
 } // namespace doris
 
 namespace doris::pipeline {
+class TaskQueue;
 
 class TaskScheduler {
 public:
-    TaskScheduler(ExecEnv* exec_env, std::shared_ptr<TaskQueue> task_queue, 
std::string name,
+    TaskScheduler(std::shared_ptr<TaskQueue> task_queue, std::string name,
                   CgroupCpuCtl* cgroup_cpu_ctl)
             : _task_queue(std::move(task_queue)),
               _shutdown(false),
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 3d8affade82..3cddcd60b8b 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -383,7 +383,7 @@ Status ExecEnv::init_pipeline_task_scheduler() {
     // TODO pipeline workload group combie two blocked schedulers.
     auto t_queue = 
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
     _without_group_task_scheduler =
-            new pipeline::TaskScheduler(this, t_queue, "PipeNoGSchePool", 
nullptr);
+            new pipeline::TaskScheduler(t_queue, "PipeNoGSchePool", nullptr);
     RETURN_IF_ERROR(_without_group_task_scheduler->start());
 
     _runtime_filter_timer_queue = new 
doris::pipeline::RuntimeFilterTimerQueue();
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 0488e9ec83c..84016132da9 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -470,8 +470,8 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
         }
         auto task_queue = 
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
         std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
-                std::make_unique<pipeline::TaskScheduler>(exec_env, 
std::move(task_queue),
-                                                          "Pipe_" + tg_name, 
cg_cpu_ctl_ptr);
+                
std::make_unique<pipeline::TaskScheduler>(std::move(task_queue), "Pipe_" + 
tg_name,
+                                                          cg_cpu_ctl_ptr);
         Status ret = pipeline_task_scheduler->start();
         if (ret.ok()) {
             _task_sched = std::move(pipeline_task_scheduler);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to