This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 1f10ce014a7 branch-3.1: [opt](metrics) add metrics 
pipeline_task_queue_size #51878 (#52140)
1f10ce014a7 is described below

commit 1f10ce014a7951c27664260e030115d05dfba80d
Author: camby <[email protected]>
AuthorDate: Tue Jun 24 10:49:34 2025 +0800

    branch-3.1: [opt](metrics) add metrics pipeline_task_queue_size #51878 
(#52140)
    
    Cherry-pick from #51878
---
 be/src/pipeline/task_queue.cpp | 5 ++++-
 be/src/util/doris_metrics.cpp  | 3 +++
 be/src/util/doris_metrics.h    | 1 +
 3 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index f59707d872f..b91b74dfdef 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -53,6 +53,7 @@ void PriorityTaskQueue::close() {
     std::unique_lock<std::mutex> lock(_work_size_mutex);
     _closed = true;
     _wait_task.notify_all();
+    
DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size);
 }
 
 PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
@@ -78,6 +79,7 @@ PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool 
is_steal) {
     if (task) {
         task->update_queue_level(level);
         _total_task_size--;
+        DorisMetrics::instance()->pipeline_task_queue_size->increment(-1);
     }
     return task;
 }
@@ -127,6 +129,7 @@ Status PriorityTaskQueue::push(PipelineTask* task) {
 
     _sub_queues[level].push_back(task);
     _total_task_size++;
+    DorisMetrics::instance()->pipeline_task_queue_size->increment(1);
     _wait_task.notify_one();
     return Status::OK();
 }
@@ -217,4 +220,4 @@ void MultiCoreTaskQueue::update_statistics(PipelineTask* 
task, int64_t time_spen
                                                                          
time_spent);
 }
 
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 8ec906cb630..e7e700bfaf3 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -227,6 +227,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, 
MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(pipeline_task_queue_size, 
MetricUnit::NOUNIT);
 
 const std::string DorisMetrics::_s_registry_name = "doris_be";
 const std::string DorisMetrics::_s_hook_name = "doris_metrics";
@@ -373,6 +374,8 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
+
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
pipeline_task_queue_size);
 }
 
 void DorisMetrics::initialize(bool init_system_metrics, const 
std::set<std::string>& disk_devices,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 5a4bef95a85..d95eee6800e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -246,6 +246,7 @@ public:
     IntCounter* scanner_ctx_cnt = nullptr;
     IntCounter* scanner_cnt = nullptr;
     IntCounter* scanner_task_cnt = nullptr;
+    IntCounter* pipeline_task_queue_size = nullptr;
 
     static DorisMetrics* instance() {
         static DorisMetrics instance;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to