This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e819ab64b0 [minor](task) Complete debug string if task is finalized 
(#49950)
3e819ab64b0 is described below

commit 3e819ab64b00a52ce622aa315f6fc81fe92f692e
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Fri Apr 11 17:29:18 2025 +0800

    [minor](task) Complete debug string if task is finalized (#49950)
    
    1. Add spill dependencies to run BE UT.
    2. Add pipeline name in debug string if task is finalized.
---
 be/src/pipeline/exec/operator.h         |  6 +++++
 be/src/pipeline/pipeline.h              |  1 +
 be/src/pipeline/pipeline_task.cpp       | 43 +++++++++++++++------------------
 be/src/pipeline/pipeline_task.h         |  1 +
 be/test/pipeline/pipeline_task_test.cpp |  4 +--
 5 files changed, 29 insertions(+), 26 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 0625bcc0ad0..75a767aaa83 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -1100,12 +1100,15 @@ public:
                                                        
"DummyOperatorDependency", true);
         _filter_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                                        
"DummyOperatorDependency", true);
+        _spill_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                      
"DummyOperatorDependency", true);
     }
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
     ~DummyOperatorLocalState() = default;
 
     std::vector<Dependency*> dependencies() const override { return 
{_tmp_dependency.get()}; }
     std::vector<Dependency*> filter_dependencies() override { return 
{_filter_dependency.get()}; }
+    Dependency* spill_dependency() const override { return 
_spill_dependency.get(); }
 
 private:
     std::shared_ptr<Dependency> _tmp_dependency;
@@ -1145,10 +1148,13 @@ public:
                                                     "DummyOperatorDependency", 
true);
         _finish_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                                        
"DummyOperatorDependency", true);
+        _spill_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                      
"DummyOperatorDependency", true);
     }
 
     std::vector<Dependency*> dependencies() const override { return 
{_tmp_dependency.get()}; }
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
+    Dependency* spill_dependency() const override { return 
_spill_dependency.get(); }
     bool is_finished() const override { return _is_finished; }
 
 private:
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 7bde9323e94..7fff24cf8d9 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -138,6 +138,7 @@ public:
     }
 
     int num_tasks_of_parent() const { return _num_tasks_of_parent; }
+    std::string& name() { return _name; }
 
 private:
     void _init_profile();
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 94c5c6f7c75..852174137de 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -79,8 +79,8 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t 
task_id, RuntimeState
           _shared_state_map(std::move(shared_state_map)),
           _task_idx(task_idx),
           _execution_dep(state->get_query_ctx()->get_execution_dependency()),
-          _memory_sufficient_dependency(
-                  state->get_query_ctx()->get_memory_sufficient_dependency()) {
+          
_memory_sufficient_dependency(state->get_query_ctx()->get_memory_sufficient_dependency()),
+          _pipeline_name(_pipeline->name()) {
     _pipeline_task_watcher.start();
 
     if (!_shared_state_map.contains(_sink->dests_id().front())) {
@@ -291,24 +291,19 @@ void PipelineTask::terminate() {
     auto fragment = _fragment_context.lock();
     if (!is_finalized() && fragment) {
         DCHECK(_wake_up_early || fragment->is_canceled());
-        for (auto* dep : _spill_dependencies) {
-            dep->set_always_ready();
-        }
-
-        for (auto* dep : _filter_dependencies) {
-            dep->set_always_ready();
-        }
-        for (auto& deps : _read_dependencies) {
-            for (auto* dep : deps) {
-                dep->set_always_ready();
-            }
-        }
-        for (auto* dep : _write_dependencies) {
-            dep->set_always_ready();
-        }
-        for (auto* dep : _finish_dependencies) {
-            dep->set_always_ready();
-        }
+        std::for_each(_spill_dependencies.begin(), _spill_dependencies.end(),
+                      [&](Dependency* dep) { dep->set_always_ready(); });
+        std::for_each(_filter_dependencies.begin(), _filter_dependencies.end(),
+                      [&](Dependency* dep) { dep->set_always_ready(); });
+        std::for_each(_write_dependencies.begin(), _write_dependencies.end(),
+                      [&](Dependency* dep) { dep->set_always_ready(); });
+        std::for_each(_finish_dependencies.begin(), _finish_dependencies.end(),
+                      [&](Dependency* dep) { dep->set_always_ready(); });
+        std::for_each(_read_dependencies.begin(), _read_dependencies.end(),
+                      [&](std::vector<Dependency*>& deps) {
+                          std::for_each(deps.begin(), deps.end(),
+                                        [&](Dependency* dep) { 
dep->set_always_ready(); });
+                      });
         _execution_dep->set_ready();
         _memory_sufficient_dependency->set_ready();
     }
@@ -696,16 +691,16 @@ std::string PipelineTask::debug_string() {
                    print_id(_state->fragment_instance_id()));
 
     fmt::format_to(debug_string_buffer,
-                   "PipelineTask[this = {}, id = {}, open = {}, eos = {}, 
state = {}, dry run = "
+                   "PipelineTask[id = {}, open = {}, eos = {}, state = {}, dry 
run = "
                    "{}, _wake_up_early = {}, time elapsed since last state 
changing = {}s, spilling"
                    " = {}, is running = {}]",
-                   (void*)this, _index, _opened, _eos, 
_to_string(_exec_state), _dry_run,
-                   _wake_up_early.load(), _state_change_watcher.elapsed_time() 
/ NANOS_PER_SEC,
-                   _spilling, is_running());
+                   _index, _opened, _eos, _to_string(_exec_state), _dry_run, 
_wake_up_early.load(),
+                   _state_change_watcher.elapsed_time() / NANOS_PER_SEC, 
_spilling, is_running());
     std::unique_lock<std::mutex> lc(_dependency_lock);
     auto* cur_blocked_dep = _blocked_dep;
     auto fragment = _fragment_context.lock();
     if (is_finalized() || !fragment) {
+        fmt::format_to(debug_string_buffer, " pipeline name = {}", 
_pipeline_name);
         return fmt::to_string(debug_string_buffer);
     }
     auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 4dbaa58feec..36a85f7321e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -336,6 +336,7 @@ private:
     std::atomic<State> _exec_state = State::INITED;
     MonotonicStopWatch _state_change_watcher;
     std::atomic<bool> _spilling = false;
+    const std::string _pipeline_name;
 };
 
 using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
diff --git a/be/test/pipeline/pipeline_task_test.cpp 
b/be/test/pipeline/pipeline_task_test.cpp
index f5a68d2da1b..3e16f21568a 100644
--- a/be/test/pipeline/pipeline_task_test.cpp
+++ b/be/test/pipeline/pipeline_task_test.cpp
@@ -263,7 +263,7 @@ TEST_F(PipelineTaskTest, TEST_OPEN) {
         EXPECT_FALSE(task->_read_dependencies.empty());
         EXPECT_FALSE(task->_write_dependencies.empty());
         EXPECT_FALSE(task->_finish_dependencies.empty());
-        EXPECT_TRUE(task->_spill_dependencies.empty());
+        EXPECT_FALSE(task->_spill_dependencies.empty());
         EXPECT_TRUE(task->_opened);
     }
 }
@@ -360,7 +360,7 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) {
         EXPECT_FALSE(task->_read_dependencies.empty());
         EXPECT_FALSE(task->_write_dependencies.empty());
         EXPECT_FALSE(task->_finish_dependencies.empty());
-        EXPECT_TRUE(task->_spill_dependencies.empty());
+        EXPECT_FALSE(task->_spill_dependencies.empty());
         EXPECT_TRUE(task->_opened);
         EXPECT_FALSE(read_dep->ready());
         EXPECT_TRUE(write_dep->ready());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to