This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit e2a2ef852c963effd6ccd29d68ac39aed340865b Author: yiguolei <yiguo...@gmail.com> AuthorDate: Fri Oct 11 14:34:50 2024 +0800 [improvement] add detailed paused information in log f --- be/src/pipeline/pipeline_task.cpp | 2 -- be/src/runtime/query_context.cpp | 11 ++++++++--- be/src/runtime/query_context.h | 8 +++++--- be/src/runtime/workload_group/workload_group_manager.cpp | 3 ++- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 9ae848bd6c9..5509fcb19ed 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -426,7 +426,6 @@ Status PipelineTask::execute(bool* eos) { bool is_low_wartermark = false; workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark); if (is_low_wartermark || is_high_wartermark) { - _memory_sufficient_dependency->block(); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( _state->get_query_ctx()->shared_from_this(), reserve_size); continue; @@ -465,7 +464,6 @@ Status PipelineTask::execute(bool* eos) { LOG(INFO) << "query: " << print_id(query_id) << ", task: " << (void*)this << ", insufficient memory. reserve_size: " << PrettyPrinter::print(reserve_size, TUnit::BYTES); - _memory_sufficient_dependency->block(); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( _state->get_query_ctx()->shared_from_this(), reserve_size); break; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 9717a969d47..2ea548fd6f5 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -236,10 +236,14 @@ void QueryContext::set_memory_sufficient(bool sufficient) { { std::lock_guard l(_paused_mutex); _paused_reason = Status::OK(); + _paused_timer.stop(); + _paused_period_secs += _paused_timer.elapsed_time() / (1000L * 1000L * 1000L); } _memory_sufficient_dependency->set_ready(); } else { _memory_sufficient_dependency->block(); + _paused_timer.start(); + ++_paused_count; } } @@ -526,13 +530,14 @@ std::vector<pipeline::PipelineTask*> QueryContext::get_revocable_tasks() const { std::string QueryContext::debug_string() { std::lock_guard l(_paused_mutex); return fmt::format( - "Label={}, Used={}, Limit={}, Peak={}, running revoke task count {}, " - "MemorySufficient={}, PausedReason={}", + "QueryId={}, Memory [Used={}, Limit={}, Peak={}], " + "Spill[RunningSpillTaskCnt={}, TotalPausedPeriodSecs={}, " + "MemorySufficient={}, LatestPausedReason={}]", query_mem_tracker->label(), PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES), PrettyPrinter::print(query_mem_tracker->limit(), TUnit::BYTES), PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES), - _revoking_tasks_count, _memory_sufficient_dependency->ready(), + _revoking_tasks_count, _memory_sufficient_dependency->ready(), _paused_period_secs, _paused_reason.to_string()); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 864ea1f9ce1..9dd75cb340d 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -413,6 +413,11 @@ private: std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> _fragment_id_to_pipeline_ctx; std::mutex _pipeline_map_write_lock; + std::mutex _paused_mutex; + Status _paused_reason; + std::atomic<int64_t> _paused_count = 0; + MonotonicStopWatch _paused_timer; + std::atomic<int64_t> _paused_period_secs = 0; std::atomic<bool> _low_memory_mode = false; int64_t _user_set_mem_limit = 0; std::atomic<int64_t> _expected_mem_limit = 0; @@ -423,9 +428,6 @@ private: // help us manage the query. QuerySource _query_source; - Status _paused_reason; - std::mutex _paused_mutex; - // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile // flatten profile of one fragment: // Pipeline 0 diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 87a8fc5be0b..c2c51a35429 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -258,8 +258,9 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit // if hard limit is enabled, then not need enable other queries hard limit. if (inserted) { + query_ctx->set_memory_sufficient(false); LOG(INFO) << "workload group " << wg->debug_string() - << " insert one new paused query: " << it->query_id(); + << " insert one new paused query: " << query_ctx->debug_string(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org