This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 0756e28d7df only spill largest task (#41518)
0756e28d7df is described below

commit 0756e28d7dfebcbce36da1531f6e8945092329cc
Author: yiguolei <676222...@qq.com>
AuthorDate: Wed Oct 2 12:28:37 2024 +0800

    only spill largest task (#41518)
    
    ## Proposed changes
    
    should desc revoking task count for every spill stream.
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 .../exec/partitioned_hash_join_sink_operator.cpp   |  5 +++--
 be/src/pipeline/pipeline_task.cpp                  |  3 ++-
 be/src/runtime/query_context.cpp                   | 24 ++++++++++++----------
 .../workload_group/workload_group_manager.cpp      |  4 ----
 .../doris/resource/workloadgroup/QueryQueue.java   |  3 ++-
 5 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 136466aa6b2..f4f03cdb52f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -327,7 +327,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
             st = Status::Error<INTERNAL_ERROR>(
                     "fault_inject partitioned_hash_join_sink revoke_memory 
submit_func failed");
         });
-
+        // For every stream, the task counter is increased +1
+        // so that when a stream finished, it should desc -1
         state->get_query_ctx()->increase_revoking_tasks_count();
         auto spill_runnable = std::make_shared<SpillRunnable>(
                 state, _shared_state->shared_from_this(),
@@ -349,6 +350,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
                         return Status::OK();
                     }();
 
+                    _state->get_query_ctx()->decrease_revoking_tasks_count();
                     if (!status.ok()) {
                         std::unique_lock<std::mutex> lock(_spill_lock);
                         _spill_dependency->set_ready();
@@ -458,7 +460,6 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
 
     if (num == 1) {
         std::unique_lock<std::mutex> lock(_spill_lock);
-        _state->get_query_ctx()->decrease_revoking_tasks_count();
         _spill_dependency->set_ready();
         if (_child_eos) {
             VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << 
", hash join sink "
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 4d7fcc4b53b..400903add18 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -443,7 +443,8 @@ Status PipelineTask::execute(bool* eos) {
             COUNTER_UPDATE(_yield_counts, 1);
 
             LOG(INFO) << "query: " << print_id(query_id) << ", task: " << 
(void*)this
-                      << ", insufficient memory. reserve_size: " << 
reserve_size;
+                      << ", insufficient memory. reserve_size: "
+                      << PrettyPrinter::print(reserve_size, TUnit::BYTES);
             _memory_sufficient_dependency->block();
             ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
                     _state->get_query_ctx()->shared_from_this(), reserve_size);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 5bba3d96642..3a40020677c 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -470,7 +470,7 @@ Status QueryContext::revoke_memory() {
     // Do not use memlimit, use current memory usage.
     // For example, if current limit is 1.6G, but current used is 1G, if 
reserve failed
     // should free 200MB memory, not 300MB
-    const int64_t target_revoking_size = 
(int64_t)(query_mem_tracker->consumption() * 0.2);
+    //const int64_t target_revoking_size = 
(int64_t)(query_mem_tracker->consumption() * 0.2);
     size_t revoked_size = 0;
 
     std::vector<pipeline::PipelineTask*> chosen_tasks;
@@ -478,9 +478,11 @@ Status QueryContext::revoke_memory() {
         chosen_tasks.emplace_back(task);
 
         revoked_size += revocable_size;
-        if (revoked_size >= target_revoking_size) {
-            break;
-        }
+        // Only revoke the largest task to ensure memory is used as much as 
possible
+        break;
+        //if (revoked_size >= target_revoking_size) {
+        //    break;
+        //}
     }
 
     std::weak_ptr<QueryContext> this_ctx = shared_from_this();
@@ -491,18 +493,18 @@ Status QueryContext::revoke_memory() {
                     return;
                 }
 
-                LOG(INFO) << "query: " << print_id(query_context->_query_id)
-                          << ", context: " << ((void*)context)
-                          << " all revoking tasks done, resumt it.";
+                LOG(INFO) << query_context->debug_string() << ", context: " << 
((void*)context)
+                          << " all spill tasks done, resume it.";
                 query_context->set_memory_sufficient(true);
             });
 
-    LOG(INFO) << "query: " << print_id(_query_id) << ", context: " << 
((void*)spill_context.get())
-              << " total revoked size: " << revoked_size << ", tasks count: " 
<< chosen_tasks.size()
-              << "/" << tasks.size();
     for (auto* task : chosen_tasks) {
         RETURN_IF_ERROR(task->revoke_memory(spill_context));
     }
+
+    LOG(INFO) << this->debug_string() << ", context: " << 
((void*)spill_context.get())
+              << " total revoked size: " << PrettyPrinter::print(revoked_size, 
TUnit::BYTES)
+              << ", tasks count: " << chosen_tasks.size() << "/" << 
tasks.size();
     return Status::OK();
 }
 
@@ -526,7 +528,7 @@ std::vector<pipeline::PipelineTask*> 
QueryContext::get_revocable_tasks() const {
 std::string QueryContext::debug_string() {
     std::lock_guard l(_paused_mutex);
     return fmt::format(
-            "MemTracker Label={}, Used={}, Limit={}, Peak={}, running revoke 
task count {}, "
+            "Label={}, Used={}, Limit={}, Peak={}, running revoke task count 
{}, "
             "MemorySufficient={}, PausedReason={}",
             query_mem_tracker->label(),
             PrettyPrinter::print(query_mem_tracker->consumption(), 
TUnit::BYTES),
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index bc87f342ac0..947476faeb1 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -285,8 +285,6 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
         auto& queries_list = it->second;
         const auto& wg = it->first;
         if (queries_list.empty()) {
-            LOG(INFO) << "wg: " << wg->debug_string()
-                      << " has no paused query, update it to memory sufficent";
             it = _paused_queries_list.erase(it);
             continue;
         }
@@ -639,8 +637,6 @@ void 
WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har
         int64_t expected_query_weighted_mem_limit = 0;
         // If the query enable hard limit, then it should not use the soft 
limit
         if (query_ctx->enable_query_slot_hard_limit()) {
-            LOG(INFO) << "query info " << wg_high_water_mark_except_load << ","
-                      << query_ctx->get_slot_count() << "," << 
total_slot_count;
             if (total_slot_count < 1) {
                 LOG(WARNING)
                         << "query " << print_id(query_ctx->query_id())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 0da702ad3f9..82822c05a0d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -130,7 +130,8 @@ public class QueryQueue {
                 queueToken.complete();
                 return queueToken;
             } else if (waitingQueryQueue.size() >= maxQueueSize) {
-                throw new UserException("query waiting queue is full, queue 
length=" + maxQueueSize);
+                throw new UserException("query waiting queue is full, queue 
capacity=" + maxQueueSize
+                        + ", waiting num=" + waitingQueryQueue.size());
             } else {
                 if (!hasFreeSlot) {
                     queueToken.setQueueMsg("NO_FREE_SLOT");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to