This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e2a2ef852c963effd6ccd29d68ac39aed340865b
Author: yiguolei <yiguo...@gmail.com>
AuthorDate: Fri Oct 11 14:34:50 2024 +0800

    [improvement] add detailed paused information in log
    
    f
---
 be/src/pipeline/pipeline_task.cpp                        |  2 --
 be/src/runtime/query_context.cpp                         | 11 ++++++++---
 be/src/runtime/query_context.h                           |  8 +++++---
 be/src/runtime/workload_group/workload_group_manager.cpp |  3 ++-
 4 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 9ae848bd6c9..5509fcb19ed 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -426,7 +426,6 @@ Status PipelineTask::execute(bool* eos) {
                     bool is_low_wartermark = false;
                     workload_group->check_mem_used(&is_low_wartermark, 
&is_high_wartermark);
                     if (is_low_wartermark || is_high_wartermark) {
-                        _memory_sufficient_dependency->block();
                         
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
                                 _state->get_query_ctx()->shared_from_this(), 
reserve_size);
                         continue;
@@ -465,7 +464,6 @@ Status PipelineTask::execute(bool* eos) {
             LOG(INFO) << "query: " << print_id(query_id) << ", task: " << 
(void*)this
                       << ", insufficient memory. reserve_size: "
                       << PrettyPrinter::print(reserve_size, TUnit::BYTES);
-            _memory_sufficient_dependency->block();
             ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
                     _state->get_query_ctx()->shared_from_this(), reserve_size);
             break;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 9717a969d47..2ea548fd6f5 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -236,10 +236,14 @@ void QueryContext::set_memory_sufficient(bool sufficient) 
{
         {
             std::lock_guard l(_paused_mutex);
             _paused_reason = Status::OK();
+            _paused_timer.stop();
+            _paused_period_secs += _paused_timer.elapsed_time() / (1000L * 
1000L * 1000L);
         }
         _memory_sufficient_dependency->set_ready();
     } else {
         _memory_sufficient_dependency->block();
+        _paused_timer.start();
+        ++_paused_count;
     }
 }
 
@@ -526,13 +530,14 @@ std::vector<pipeline::PipelineTask*> 
QueryContext::get_revocable_tasks() const {
 std::string QueryContext::debug_string() {
     std::lock_guard l(_paused_mutex);
     return fmt::format(
-            "Label={}, Used={}, Limit={}, Peak={}, running revoke task count 
{}, "
-            "MemorySufficient={}, PausedReason={}",
+            "QueryId={}, Memory [Used={}, Limit={}, Peak={}], "
+            "Spill[RunningSpillTaskCnt={}, TotalPausedPeriodSecs={}, "
+            "MemorySufficient={}, LatestPausedReason={}]",
             query_mem_tracker->label(),
             PrettyPrinter::print(query_mem_tracker->consumption(), 
TUnit::BYTES),
             PrettyPrinter::print(query_mem_tracker->limit(), TUnit::BYTES),
             PrettyPrinter::print(query_mem_tracker->peak_consumption(), 
TUnit::BYTES),
-            _revoking_tasks_count, _memory_sufficient_dependency->ready(),
+            _revoking_tasks_count, _memory_sufficient_dependency->ready(), 
_paused_period_secs,
             _paused_reason.to_string());
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 864ea1f9ce1..9dd75cb340d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -413,6 +413,11 @@ private:
     std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> 
_fragment_id_to_pipeline_ctx;
     std::mutex _pipeline_map_write_lock;
 
+    std::mutex _paused_mutex;
+    Status _paused_reason;
+    std::atomic<int64_t> _paused_count = 0;
+    MonotonicStopWatch _paused_timer;
+    std::atomic<int64_t> _paused_period_secs = 0;
     std::atomic<bool> _low_memory_mode = false;
     int64_t _user_set_mem_limit = 0;
     std::atomic<int64_t> _expected_mem_limit = 0;
@@ -423,9 +428,6 @@ private:
     // help us manage the query.
     QuerySource _query_source;
 
-    Status _paused_reason;
-    std::mutex _paused_mutex;
-
     // when fragment of pipeline is closed, it will register its profile to 
this map by using add_fragment_profile
     // flatten profile of one fragment:
     // Pipeline 0
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 87a8fc5be0b..c2c51a35429 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -258,8 +258,9 @@ void WorkloadGroupMgr::add_paused_query(const 
std::shared_ptr<QueryContext>& que
     // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
     // if hard limit is enabled, then not need enable other queries hard limit.
     if (inserted) {
+        query_ctx->set_memory_sufficient(false);
         LOG(INFO) << "workload group " << wg->debug_string()
-                  << " insert one new paused query: " << it->query_id();
+                  << " insert one new paused query: " << 
query_ctx->debug_string();
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to