This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 1f10ce014a7 branch-3.1: [opt](metrics) add metrics
pipeline_task_queue_size #51878 (#52140)
1f10ce014a7 is described below
commit 1f10ce014a7951c27664260e030115d05dfba80d
Author: camby <[email protected]>
AuthorDate: Tue Jun 24 10:49:34 2025 +0800
branch-3.1: [opt](metrics) add metrics pipeline_task_queue_size #51878
(#52140)
Cherry-pick from #51878
---
be/src/pipeline/task_queue.cpp | 5 ++++-
be/src/util/doris_metrics.cpp | 3 +++
be/src/util/doris_metrics.h | 1 +
3 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index f59707d872f..b91b74dfdef 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -53,6 +53,7 @@ void PriorityTaskQueue::close() {
std::unique_lock<std::mutex> lock(_work_size_mutex);
_closed = true;
_wait_task.notify_all();
+
DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size);
}
PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
@@ -78,6 +79,7 @@ PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool
is_steal) {
if (task) {
task->update_queue_level(level);
_total_task_size--;
+ DorisMetrics::instance()->pipeline_task_queue_size->increment(-1);
}
return task;
}
@@ -127,6 +129,7 @@ Status PriorityTaskQueue::push(PipelineTask* task) {
_sub_queues[level].push_back(task);
_total_task_size++;
+ DorisMetrics::instance()->pipeline_task_queue_size->increment(1);
_wait_task.notify_one();
return Status::OK();
}
@@ -217,4 +220,4 @@ void MultiCoreTaskQueue::update_statistics(PipelineTask*
task, int64_t time_spen
time_spent);
}
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 8ec906cb630..e7e700bfaf3 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -227,6 +227,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt,
MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(pipeline_task_queue_size,
MetricUnit::NOUNIT);
const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
@@ -373,6 +374,8 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
+
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
pipeline_task_queue_size);
}
void DorisMetrics::initialize(bool init_system_metrics, const
std::set<std::string>& disk_devices,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 5a4bef95a85..d95eee6800e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -246,6 +246,7 @@ public:
IntCounter* scanner_ctx_cnt = nullptr;
IntCounter* scanner_cnt = nullptr;
IntCounter* scanner_task_cnt = nullptr;
+ IntCounter* pipeline_task_queue_size = nullptr;
static DorisMetrics* instance() {
static DorisMetrics instance;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]