This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 41bb04b9e9e refactor logic of revoking and low memory mode in local 
exchange oper… (#41264)
41bb04b9e9e is described below

commit 41bb04b9e9e237fd5680674d924228be7aa3b0e6
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Thu Sep 26 10:12:36 2024 +0800

    refactor logic of revoking and low memory mode in local exchange oper… 
(#41264)
    
    …ator
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/dependency.h                       | 21 ++++++-
 be/src/pipeline/exec/operator.h                    | 37 ++++++++++++-
 .../exec/partitioned_aggregation_sink_operator.cpp | 29 ++++++++--
 .../exec/partitioned_aggregation_sink_operator.h   |  8 ++-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  6 +-
 .../exec/partitioned_hash_join_probe_operator.h    |  3 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   | 59 ++++++++++++++------
 .../exec/partitioned_hash_join_sink_operator.h     | 11 ++--
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  | 19 ++++---
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  7 ++-
 be/src/pipeline/exec/spill_utils.h                 | 32 +++++++++++
 .../local_exchange_sink_operator.cpp               |  8 ++-
 be/src/pipeline/local_exchange/local_exchanger.cpp | 49 +++++++++++++----
 be/src/pipeline/pipeline_task.cpp                  |  9 ++-
 be/src/pipeline/pipeline_task.h                    |  2 +-
 be/src/runtime/query_context.cpp                   | 64 ++++++++++++++++++++++
 be/src/runtime/query_context.h                     |  7 ++-
 .../workload_group/workload_group_manager.cpp      | 38 ++++++-------
 18 files changed, 325 insertions(+), 84 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 8cb479ccbb0..5f030dda5d2 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -26,6 +26,7 @@
 #include <thread>
 #include <utility>
 
+#include "common/config.h"
 #include "common/logging.h"
 #include "concurrentqueue.h"
 #include "gutil/integral_types.h"
@@ -832,6 +833,7 @@ public:
     std::unique_ptr<ExchangerBase> exchanger {};
     std::vector<MemTracker*> mem_trackers;
     std::atomic<int64_t> mem_usage = 0;
+    size_t _buffer_mem_limit = config::local_exchange_buffer_mem_limit;
     // We need to make sure to add mem_usage first and then enqueue, otherwise 
sub mem_usage may cause negative mem_usage during concurrent dequeue.
     std::mutex le_lock;
     virtual void create_dependencies(int local_exchange_id) {
@@ -875,7 +877,7 @@ public:
     void sub_mem_usage(int channel_id, size_t delta) { 
mem_trackers[channel_id]->release(delta); }
 
     virtual void add_total_mem_usage(size_t delta, int channel_id) {
-        if (mem_usage.fetch_add(delta) + delta > 
config::local_exchange_buffer_mem_limit) {
+        if (mem_usage.fetch_add(delta) + delta > _buffer_mem_limit) {
             sink_deps.front()->block();
         }
     }
@@ -884,10 +886,15 @@ public:
         auto prev_usage = mem_usage.fetch_sub(delta);
         DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " 
delta: " << delta
                                          << " channel_id: " << channel_id;
-        if (prev_usage - delta <= config::local_exchange_buffer_mem_limit) {
+        if (prev_usage - delta <= _buffer_mem_limit) {
             sink_deps.front()->set_ready();
         }
     }
+
+    virtual void set_low_memory_mode() {
+        _buffer_mem_limit =
+                std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10 
* 1024 * 1024);
+    }
 };
 
 struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
@@ -933,6 +940,14 @@ struct LocalMergeExchangeSharedState : public 
LocalExchangeSharedState {
         source_deps[channel_id]->set_ready();
     }
 
+    void set_low_memory_mode() override {
+        _buffer_mem_limit =
+                std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10 
* 1024 * 1024);
+        DCHECK(!_queues_mem_usage.empty());
+        _each_queue_limit =
+                std::max<int64_t>(64 * 1024, _buffer_mem_limit / 
_queues_mem_usage.size());
+    }
+
     Dependency* get_sink_dep_by_channel_id(int channel_id) override {
         return sink_deps[channel_id].get();
     }
@@ -943,7 +958,7 @@ struct LocalMergeExchangeSharedState : public 
LocalExchangeSharedState {
 
 private:
     std::vector<std::atomic_int64_t> _queues_mem_usage;
-    const int64_t _each_queue_limit;
+    int64_t _each_queue_limit;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 3a644eb4f02..1d2dbcc3592 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -32,6 +32,7 @@
 #include "common/status.h"
 #include "pipeline/dependency.h"
 #include "pipeline/exec/operator.h"
+#include "pipeline/exec/spill_utils.h"
 #include "pipeline/local_exchange/local_exchanger.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/query_context.h"
@@ -113,7 +114,11 @@ public:
         return state->minimum_operator_memory_required_bytes();
     }
 
-    virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
+    virtual Status revoke_memory(RuntimeState* state,
+                                 const std::shared_ptr<SpillContext>& 
spill_context) {
+        return Status::OK();
+    }
+
     [[nodiscard]] virtual bool require_data_distribution() const { return 
false; }
     OperatorPtr child() { return _child; }
     [[nodiscard]] bool followed_by_shuffled_join() const { return 
_followed_by_shuffled_join; }
@@ -603,6 +608,10 @@ public:
         _spill_write_wait_io_timer =
                 ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", 
1);
         _spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadWaitIOTime", 1);
+        _spill_max_rows_of_partition =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillMaxRowsOfPartition", TUnit::UNIT, 1);
+        _spill_min_rows_of_partition =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillMinRowsOfPartition", TUnit::UNIT, 1);
         return Status::OK();
     }
 
@@ -611,6 +620,25 @@ public:
         return dependencies;
     }
 
+    void update_max_min_rows_counter() {
+        int64_t max_rows = 0;
+        int64_t min_rows = std::numeric_limits<int64_t>::max();
+
+        for (auto rows : _rows_in_partitions) {
+            if (rows > max_rows) {
+                max_rows = rows;
+            }
+            if (rows < min_rows) {
+                min_rows = rows;
+            }
+        }
+
+        COUNTER_SET(_spill_max_rows_of_partition, max_rows);
+        COUNTER_SET(_spill_min_rows_of_partition, min_rows);
+    }
+
+    std::vector<int64_t> _rows_in_partitions;
+
     RuntimeProfile::Counter* _spill_timer = nullptr;
     RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
     RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
@@ -619,6 +647,8 @@ public:
     RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
     RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
     RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
+    RuntimeProfile::Counter* _spill_max_rows_of_partition = nullptr;
+    RuntimeProfile::Counter* _spill_min_rows_of_partition = nullptr;
 };
 
 class OperatorXBase : public OperatorBase {
@@ -718,9 +748,10 @@ public:
         return (_child and !is_source()) ? _child->revocable_mem_size(state) : 
0;
     }
 
-    Status revoke_memory(RuntimeState* state) override {
+    Status revoke_memory(RuntimeState* state,
+                         const std::shared_ptr<SpillContext>& spill_context) 
override {
         if (_child and !is_source()) {
-            return _child->revoke_memory(state);
+            return _child->revoke_memory(state, spill_context);
         }
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 39311a62dcc..6d84b8e8bb5 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -18,6 +18,7 @@
 #include "partitioned_aggregation_sink_operator.h"
 
 #include <cstdint>
+#include <limits>
 #include <memory>
 
 #include "aggregation_sink_operator.h"
@@ -26,6 +27,8 @@
 #include "pipeline/exec/spill_utils.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
+#include "util/runtime_profile.h"
+#include "vec/spill/spill_stream.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
@@ -60,6 +63,8 @@ Status 
PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
         
value_columns_.emplace_back(aggregate_evaluator->function()->create_serialize_column());
     }
 
+    _rows_in_partitions.assign(Base::_shared_state->partition_count, 0);
+
     _spill_dependency = Dependency::create_shared(parent.operator_id(), 
parent.node_id(),
                                                   "AggSinkSpillDependency", 
true);
     state->get_task()->add_spill_dependency(_spill_dependency.get());
@@ -128,6 +133,8 @@ void 
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
     UPDATE_PROFILE(_hash_table_emplace_timer, "HashTableEmplaceTime");
     UPDATE_PROFILE(_hash_table_input_counter, "HashTableInputCount");
     UPDATE_PROFILE(_max_row_size_counter, "MaxRowSizeInBytes");
+
+    update_max_min_rows_counter();
 }
 
 PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int 
operator_id,
@@ -172,7 +179,7 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
     if (eos) {
         if (local_state._shared_state->is_spilled) {
             if (revocable_mem_size(state) > 0) {
-                RETURN_IF_ERROR(revoke_memory(state));
+                RETURN_IF_ERROR(revoke_memory(state, nullptr));
             } else {
                 for (auto& partition : 
local_state._shared_state->spill_partitions) {
                     RETURN_IF_ERROR(partition->finish_current_spilling(eos));
@@ -184,6 +191,10 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
             local_state._dependency->set_ready_to_read();
             local_state._finish_dependency->set_ready();
         }
+    } else if (local_state._shared_state->is_spilled) {
+        if (revocable_mem_size(state) >= 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+            return revoke_memory(state, nullptr);
+        }
     }
     if (local_state._runtime_state) {
         auto* sink_local_state = 
local_state._runtime_state->get_sink_local_state();
@@ -191,9 +202,10 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
     }
     return Status::OK();
 }
-Status PartitionedAggSinkOperatorX::revoke_memory(RuntimeState* state) {
+Status PartitionedAggSinkOperatorX::revoke_memory(
+        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     auto& local_state = get_local_state(state);
-    return local_state.revoke_memory(state);
+    return local_state.revoke_memory(state, spill_context);
 }
 
 size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) 
const {
@@ -240,7 +252,8 @@ size_t 
PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
     return _agg_sink_operator->get_reserve_mem_size(runtime_state);
 }
 
-Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
+Status PartitionedAggSinkLocalState::revoke_memory(
+        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     const auto size_to_revoke = _parent->revocable_mem_size(state);
     VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node "
                << Base::_parent->node_id()
@@ -279,7 +292,7 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
     state->get_query_ctx()->increase_revoking_tasks_count();
     auto spill_runnable = std::make_shared<SpillRunnable>(
             state, _shared_state->shared_from_this(),
-            [this, &parent, state, query_id, size_to_revoke, submit_timer] {
+            [this, &parent, state, query_id, size_to_revoke, spill_context, 
submit_timer] {
                 
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
                     auto st = Status::InternalError(
                             "fault_inject partitioned_agg_sink "
@@ -308,8 +321,12 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
                         Base::_dependency->set_ready_to_read();
                         _finish_dependency->set_ready();
                     }
-                    Base::_spill_dependency->Dependency::set_ready();
                     state->get_query_ctx()->decrease_revoking_tasks_count();
+                    Base::_spill_dependency->Dependency::set_ready();
+
+                    if (spill_context) {
+                        spill_context->on_task_finished();
+                    }
                 }};
                 auto* runtime_state = _runtime_state.get();
                 auto* agg_data = 
parent._agg_sink_operator->get_agg_data(runtime_state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 756b686a5b3..0027754cde0 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -16,6 +16,7 @@
 // under the License.
 
 #pragma once
+#include <limits>
 #include <memory>
 
 #include "aggregation_sink_operator.h"
@@ -45,7 +46,7 @@ public:
     Status close(RuntimeState* state, Status exec_status) override;
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
 
-    Status revoke_memory(RuntimeState* state);
+    Status revoke_memory(RuntimeState* state, const 
std::shared_ptr<SpillContext>& spill_context);
 
     Status setup_in_memory_agg_op(RuntimeState* state);
 
@@ -102,6 +103,7 @@ public:
                 for (int i = 0; i < Base::_shared_state->partition_count && 
!state->is_cancelled();
                      ++i) {
                     if (spill_infos[i].keys_.size() >= spill_batch_rows) {
+                        _rows_in_partitions[i] += spill_infos[i].keys_.size();
                         status = _spill_partition(
                                 state, context, 
Base::_shared_state->spill_partitions[i],
                                 spill_infos[i].keys_, spill_infos[i].values_, 
nullptr, false);
@@ -117,6 +119,7 @@ public:
             auto spill_null_key_data =
                     (hash_null_key_data && i == 
Base::_shared_state->partition_count - 1);
             if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) {
+                _rows_in_partitions[i] += spill_infos[i].keys_.size();
                 status = _spill_partition(state, context, 
Base::_shared_state->spill_partitions[i],
                                           spill_infos[i].keys_, 
spill_infos[i].values_,
                                           spill_null_key_data
@@ -338,7 +341,8 @@ public:
     }
     size_t revocable_mem_size(RuntimeState* state) const override;
 
-    Status revoke_memory(RuntimeState* state) override;
+    Status revoke_memory(RuntimeState* state,
+                         const std::shared_ptr<SpillContext>& spill_context) 
override;
 
     size_t get_reserve_mem_size(RuntimeState* state) override;
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f95cca1f6af..fa9e3ff23b7 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -167,6 +167,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
+
     auto spill_func = [query_id, state, submit_timer, spill_size_threshold, 
this] {
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_spill_probe_timer);
@@ -791,7 +792,8 @@ size_t 
PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* stat
     return mem_size;
 }
 
-Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
+Status PartitionedHashJoinProbeOperatorX::revoke_memory(
+        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     auto& local_state = get_local_state(state);
     VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe 
node: " << node_id()
                << ", task: " << state->task_id() << ", child eos: " << 
local_state._child_eos;
@@ -806,7 +808,7 @@ Status 
PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
     RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
 
     if (_child) {
-        return _child->revoke_memory(state);
+        return _child->revoke_memory(state, nullptr);
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index b611fb661af..621681ca4cf 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -186,7 +186,8 @@ public:
         return _inner_probe_operator->require_data_distribution();
     }
 
-    Status revoke_memory(RuntimeState* state) override;
+    Status revoke_memory(RuntimeState* state,
+                         const std::shared_ptr<SpillContext>& spill_context) 
override;
 
 private:
     Status _revoke_memory(RuntimeState* state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index f2375632d2a..5323d74341a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -20,9 +20,11 @@
 #include <glog/logging.h>
 
 #include <algorithm>
+#include <memory>
 
 #include "common/logging.h"
 #include "pipeline/exec/operator.h"
+#include "pipeline/exec/spill_utils.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "util/mem_info.h"
@@ -41,6 +43,8 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
     _shared_state->partitioned_build_blocks.resize(p._partition_count);
     _shared_state->spilled_streams.resize(p._partition_count);
 
+    _rows_in_partitions.assign(p._partition_count, 0);
+
     _spill_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                                   
"HashJoinBuildSpillDependency", true);
     state->get_task()->add_spill_dependency(_spill_dependency.get());
@@ -125,16 +129,17 @@ size_t 
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
     return size_to_reserve;
 }
 
-Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* 
state) {
+Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
+        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
     _shared_state->inner_shared_state->hash_table_variants.reset();
     auto row_desc = p._child->row_desc();
     const auto num_slots = row_desc.num_slots();
     vectorized::Block build_block;
     size_t block_old_mem = 0;
-    auto inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+    auto* inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
     if (inner_sink_state_) {
-        auto inner_sink_state = 
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+        auto* inner_sink_state = 
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
         build_block = inner_sink_state->_build_side_mutable_block.to_block();
         block_old_mem = build_block.allocated_bytes();
     }
@@ -142,6 +147,9 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
     if (build_block.rows() <= 1) {
         LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id()
                      << ", task: " << state->task_id();
+        if (spill_context) {
+            spill_context->on_task_finished();
+        }
         return Status::OK();
     }
 
@@ -243,7 +251,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
         _spill_dependency->set_ready();
     };
 
-    auto exception_catch_func = [spill_func, this]() mutable {
+    auto exception_catch_func = [spill_func, spill_context, this]() mutable {
         SCOPED_TIMER(_spill_timer);
         auto status = [&]() {
             RETURN_IF_CATCH_EXCEPTION(spill_func());
@@ -256,6 +264,10 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
             _spill_status_ok = false;
             _spill_dependency->set_ready();
         }
+
+        if (spill_context) {
+            spill_context->on_task_finished();
+        }
     };
 
     auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
@@ -273,7 +285,8 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
     return thread_pool->submit(std::move(spill_runnable));
 }
 
-Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
+Status PartitionedHashJoinSinkLocalState::revoke_memory(
+        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     LOG(INFO) << "hash join sink " << _parent->node_id() << " revoke_memory"
               << ", eos: " << _child_eos;
     DCHECK_EQ(_spilling_streams_count, 0);
@@ -282,7 +295,7 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
     if (!_shared_state->need_to_spill) {
         profile()->add_info_string("Spilled", "true");
         _shared_state->need_to_spill = true;
-        return _revoke_unpartitioned_block(state);
+        return _revoke_unpartitioned_block(state, spill_context);
     }
 
     _spilling_streams_count = _shared_state->partitioned_build_blocks.size();
@@ -317,8 +330,7 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
         state->get_query_ctx()->increase_revoking_tasks_count();
         auto spill_runnable = std::make_shared<SpillRunnable>(
                 state, _shared_state->shared_from_this(),
-                [this, state, query_id, spilling_stream, i, submit_timer] {
-                    SCOPED_TIMER(_spill_timer);
+                [this, query_id, spilling_stream, i, submit_timer, 
spill_context] {
                     DBUG_EXECUTE_IF(
                             
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
                                 
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
@@ -331,7 +343,8 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
                     SCOPED_TIMER(_spill_build_timer);
 
                     auto status = [&]() {
-                        RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i, 
spilling_stream));
+                        RETURN_IF_CATCH_EXCEPTION(
+                                _spill_to_disk(i, spilling_stream, 
spill_context));
                         return Status::OK();
                     }();
 
@@ -341,8 +354,6 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
                         _spill_status_ok = false;
                         _spill_status = std::move(status);
                     }
-
-                    state->get_query_ctx()->decrease_revoking_tasks_count();
                 });
         if (st.ok()) {
             st = spill_io_pool->submit(std::move(spill_runnable));
@@ -368,6 +379,10 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
                           }
                       });
         _dependency->set_ready_to_read();
+
+        if (spill_context) {
+            spill_context->on_task_finished();
+        }
     }
     return Status::OK();
 }
@@ -408,13 +423,17 @@ Status 
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
         }
         RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(in_block, 
partition_indexes[i].data(),
                                                         
partition_indexes[i].data() + count));
+        _rows_in_partitions[i] += count;
     }
 
+    update_max_min_rows_counter();
+
     return Status::OK();
 }
 
 void PartitionedHashJoinSinkLocalState::_spill_to_disk(
-        uint32_t partition_index, const vectorized::SpillStreamSPtr& 
spilling_stream) {
+        uint32_t partition_index, const vectorized::SpillStreamSPtr& 
spilling_stream,
+        const std::shared_ptr<SpillContext>& spill_context) {
     auto& partitioned_block = 
_shared_state->partitioned_build_blocks[partition_index];
 
     if (_spill_status_ok) {
@@ -436,6 +455,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
 
     if (num == 1) {
         std::unique_lock<std::mutex> lock(_spill_lock);
+        _state->get_query_ctx()->decrease_revoking_tasks_count();
         _spill_dependency->set_ready();
         if (_child_eos) {
             VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << 
", hash join sink "
@@ -449,6 +469,10 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
                           });
             _dependency->set_ready_to_read();
         }
+
+        if (spill_context) {
+            spill_context->on_task_finished();
+        }
     }
 }
 
@@ -575,7 +599,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                            << _inner_sink_operator->get_memory_usage(
                                       
local_state._shared_state->inner_runtime_state.get());
             } else {
-                return revoke_memory(state);
+                return revoke_memory(state, nullptr);
             }
 
             
std::for_each(local_state._shared_state->partitioned_build_blocks.begin(),
@@ -594,9 +618,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
     if (need_to_spill) {
         RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, 
rows));
         if (eos) {
-            return revoke_memory(state);
+            return revoke_memory(state, nullptr);
         } else if (revocable_mem_size(state) > 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
-            return revoke_memory(state);
+            return revoke_memory(state, nullptr);
         }
     } else {
         if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
@@ -630,10 +654,11 @@ size_t 
PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state)
     return local_state.revocable_mem_size(state);
 }
 
-Status PartitionedHashJoinSinkOperatorX::revoke_memory(RuntimeState* state) {
+Status PartitionedHashJoinSinkOperatorX::revoke_memory(
+        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    return local_state.revoke_memory(state);
+    return local_state.revoke_memory(state, spill_context);
 }
 
 size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* 
state) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index b2c79967b97..a2d75cf2f9b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -43,7 +43,7 @@ public:
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
-    Status revoke_memory(RuntimeState* state);
+    Status revoke_memory(RuntimeState* state, const 
std::shared_ptr<SpillContext>& spill_context);
     size_t revocable_mem_size(RuntimeState* state) const;
     [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state);
 
@@ -52,12 +52,14 @@ protected:
             : 
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
 
     void _spill_to_disk(uint32_t partition_index,
-                        const vectorized::SpillStreamSPtr& spilling_stream);
+                        const vectorized::SpillStreamSPtr& spilling_stream,
+                        const std::shared_ptr<SpillContext>& spill_context);
 
     Status _partition_block(RuntimeState* state, vectorized::Block* in_block, 
size_t begin,
                             size_t end);
 
-    Status _revoke_unpartitioned_block(RuntimeState* state);
+    Status _revoke_unpartitioned_block(RuntimeState* state,
+                                       const std::shared_ptr<SpillContext>& 
spill_context);
 
     friend class PartitionedHashJoinSinkOperatorX;
 
@@ -102,7 +104,8 @@ public:
 
     size_t revocable_mem_size(RuntimeState* state) const override;
 
-    Status revoke_memory(RuntimeState* state) override;
+    Status revoke_memory(RuntimeState* state,
+                         const std::shared_ptr<SpillContext>& spill_context) 
override;
 
     size_t get_reserve_mem_size(RuntimeState* state) override;
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index e55982bcb3b..e83ad897257 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -131,9 +131,10 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) {
     return _sort_sink_operator->open(state);
 }
 
-Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) {
+Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state,
+                                             const 
std::shared_ptr<SpillContext>& spill_context) {
     auto& local_state = get_local_state(state);
-    return local_state.revoke_memory(state);
+    return local_state.revoke_memory(state, spill_context);
 }
 
 size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
@@ -163,7 +164,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     if (eos) {
         if (local_state._shared_state->is_spilled) {
             if (revocable_mem_size(state) > 0) {
-                RETURN_IF_ERROR(revoke_memory(state));
+                RETURN_IF_ERROR(revoke_memory(state, nullptr));
             } else {
                 local_state._dependency->set_ready_to_read();
                 local_state._finish_dependency->set_ready();
@@ -178,7 +179,8 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     return Status::OK();
 }
 
-Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
+Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
+                                              const 
std::shared_ptr<SpillContext>& spill_context) {
     if (!_shared_state->is_spilled) {
         _shared_state->is_spilled = true;
         profile()->add_info_string("Spilled", "true");
@@ -234,14 +236,13 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
             }
 
             _spilling_stream.reset();
+            state->get_query_ctx()->decrease_revoking_tasks_count();
             if (_eos) {
                 _dependency->set_ready_to_read();
                 _finish_dependency->set_ready();
             } else {
                 _spill_dependency->Dependency::set_ready();
             }
-
-            state->get_query_ctx()->decrease_revoking_tasks_count();
         }};
 
         _shared_state->sink_status =
@@ -273,7 +274,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state) {
         return Status::OK();
     };
 
-    auto exception_catch_func = [this, query_id, spill_func]() {
+    auto exception_catch_func = [this, query_id, spill_context, spill_func]() {
         DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", 
{
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     query_id, Status::InternalError("fault_inject 
spill_sort_sink "
@@ -284,6 +285,10 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
         _shared_state->sink_status = [&]() {
             RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
         }();
+
+        if (spill_context) {
+            spill_context->on_task_finished();
+        }
     };
 
     
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index e74b5d2a414..173e3c7847c 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <memory>
+
 #include "operator.h"
 #include "sort_sink_operator.h"
 
@@ -38,7 +40,7 @@ public:
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
 
     Status setup_in_memory_sort_op(RuntimeState* state);
-    Status revoke_memory(RuntimeState* state);
+    Status revoke_memory(RuntimeState* state, const 
std::shared_ptr<SpillContext>& spill_context);
 
 private:
     void _init_counters();
@@ -86,7 +88,8 @@ public:
 
     size_t revocable_mem_size(RuntimeState* state) const override;
 
-    Status revoke_memory(RuntimeState* state) override;
+    Status revoke_memory(RuntimeState* state,
+                         const std::shared_ptr<SpillContext>& spill_context) 
override;
 
     using DataSinkOperatorX<LocalStateType>::node_id;
     using DataSinkOperatorX<LocalStateType>::operator_id;
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index 925e7df44e6..086a6881fcd 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -17,6 +17,13 @@
 
 #pragma once
 
+#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
+
+#include <atomic>
+#include <functional>
+#include <utility>
+
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
@@ -28,6 +35,31 @@
 namespace doris::pipeline {
 using SpillPartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
 
+struct SpillContext {
+    std::atomic_int running_tasks_count;
+    TUniqueId query_id;
+    std::function<void()> all_tasks_finished_callback;
+
+    SpillContext(int running_tasks_count_, TUniqueId query_id_,
+                 std::function<void()> all_tasks_finished_callback_)
+            : running_tasks_count(running_tasks_count_),
+              query_id(std::move(query_id_)),
+              
all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {}
+
+    ~SpillContext() {
+        LOG_IF(WARNING, running_tasks_count.load() != 0)
+                << "query: " << print_id(query_id)
+                << " not all spill tasks finished, remaining tasks: " << 
running_tasks_count.load();
+    }
+
+    void on_task_finished() {
+        auto count = running_tasks_count.fetch_sub(1);
+        if (count == 1) {
+            all_tasks_finished_callback();
+        }
+    }
+};
+
 class SpillRunnable : public Runnable {
 public:
     SpillRunnable(RuntimeState* state, const 
std::shared_ptr<BasicSharedState>& shared_state,
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 19c37f3649b..ec0d1a40f5e 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -28,7 +28,7 @@ LocalExchangeSinkLocalState::~LocalExchangeSinkLocalState() = 
default;
 std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
     auto deps = Base::dependencies();
 
-    auto dep = _shared_state->get_sink_dep_by_channel_id(_channel_id);
+    auto* dep = _shared_state->get_sink_dep_by_channel_id(_channel_id);
     if (dep != nullptr) {
         deps.push_back(dep);
     }
@@ -136,7 +136,13 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
+
+    if (state->get_query_ctx()->low_memory_mode()) {
+        local_state._shared_state->set_low_memory_mode();
+    }
+
     RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, 
local_state));
+    
local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption());
 
     // If all exchange sources ended due to limit reached, current task should 
also finish
     if (local_state._exchanger->_running_source_operators == 0) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index f4630f328bb..50f359922a1 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -43,13 +43,17 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int 
channel_id,
         allocated_bytes = block->data_block.allocated_bytes();
     }
     std::unique_lock l(_m);
-    local_state._shared_state->add_mem_usage(channel_id, allocated_bytes,
-                                             !std::is_same_v<PartitionedBlock, 
BlockType> &&
-                                                     
!std::is_same_v<BroadcastBlock, BlockType>);
+    constexpr bool update_mem_usage = !std::is_same_v<PartitionedBlock, 
BlockType> &&
+                                      !std::is_same_v<BroadcastBlock, 
BlockType>;
+    local_state._shared_state->add_mem_usage(channel_id, allocated_bytes, 
update_mem_usage);
+    if constexpr (update_mem_usage) {
+        local_state._mem_tracker->consume(allocated_bytes);
+    }
     if (_data_queue[channel_id].enqueue(std::move(block))) {
         local_state._shared_state->set_ready_to_read(channel_id);
     } else {
         local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes);
+        local_state._mem_tracker->release(allocated_bytes);
         // `enqueue(block)` return false iff this queue's source operator is 
already closed so we
         // just unref the block.
         if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
@@ -77,11 +81,13 @@ bool 
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
     if (_data_queue[channel_id].try_dequeue(block)) {
         if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
                       std::is_same_v<BroadcastBlock, BlockType>) {
-            local_state._shared_state->sub_mem_usage(channel_id,
-                                                     
block.first->data_block.allocated_bytes());
+            const auto bytes = block.first->data_block.allocated_bytes();
+            local_state._shared_state->sub_mem_usage(channel_id, bytes);
+            local_state._mem_tracker->release(bytes);
         } else {
-            local_state._shared_state->sub_mem_usage(channel_id,
-                                                     
block->data_block.allocated_bytes());
+            const auto bytes = block->data_block.allocated_bytes();
+            local_state._shared_state->sub_mem_usage(channel_id, bytes);
+            local_state._mem_tracker->release(bytes);
             data_block->swap(block->data_block);
             block->unref(local_state._shared_state, 
data_block->allocated_bytes(), channel_id);
             DCHECK_EQ(block->ref_value(), 0);
@@ -94,11 +100,15 @@ bool 
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
         if (_data_queue[channel_id].try_dequeue(block)) {
             if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
                           std::is_same_v<BroadcastBlock, BlockType>) {
+                const auto bytes = block.first->data_block.allocated_bytes();
                 local_state._shared_state->sub_mem_usage(channel_id,
                                                          
block.first->data_block.allocated_bytes());
+                local_state._mem_tracker->release(bytes);
             } else {
+                const auto bytes = block->data_block.allocated_bytes();
                 local_state._shared_state->sub_mem_usage(channel_id,
                                                          
block->data_block.allocated_bytes());
+                local_state._mem_tracker->release(bytes);
                 data_block->swap(block->data_block);
                 block->unref(local_state._shared_state, 
data_block->allocated_bytes(), channel_id);
                 DCHECK_EQ(block->ref_value(), 0);
@@ -128,6 +138,8 @@ Status ShuffleExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                     in_block, local_state));
     }
 
+    
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
+
     return Status::OK();
 }
 
@@ -200,8 +212,10 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
     if (new_block_wrapper->data_block.empty()) {
         return Status::OK();
     }
-    
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(),
-                                                   local_state._channel_id);
+
+    const auto block_bytes = new_block_wrapper->data_block.allocated_bytes();
+    local_state._shared_state->add_total_mem_usage(block_bytes, 
local_state._channel_id);
+    local_state._mem_tracker->consume(block_bytes);
     auto bucket_seq_to_instance_idx =
             
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
     if (get_type() == ExchangeType::HASH_SHUFFLE) {
@@ -272,6 +286,8 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     auto channel_id = (local_state._channel_id++) % _num_partitions;
     _enqueue_data_and_set_ready(channel_id, local_state, std::move(wrapper));
 
+    
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
+
     return Status::OK();
 }
 
@@ -316,6 +332,8 @@ Status PassToOneExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
     BlockWrapperSPtr wrapper = 
BlockWrapper::create_shared(std::move(new_block));
     _enqueue_data_and_set_ready(0, local_state, std::move(wrapper));
 
+    
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
+
     return Status::OK();
 }
 
@@ -346,6 +364,8 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, 
vectorized::Block* in_
     if (eos) {
         
local_state._shared_state->source_deps[local_state._channel_id]->set_always_ready();
     }
+
+    
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
     return Status::OK();
 }
 
@@ -424,8 +444,9 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
     }
     new_block.swap(*in_block);
     auto wrapper = BlockWrapper::create_shared(std::move(new_block));
-    
local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes(),
-                                                   local_state._channel_id);
+    const auto block_bytes = wrapper->data_block.allocated_bytes();
+    local_state._shared_state->add_total_mem_usage(block_bytes, 
local_state._channel_id);
+    local_state._mem_tracker->consume(block_bytes);
     wrapper->ref(_num_partitions);
     for (size_t i = 0; i < _num_partitions; i++) {
         _enqueue_data_and_set_ready(i, local_state, {wrapper, {0, 
wrapper->data_block.rows()}});
@@ -475,6 +496,7 @@ Status 
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
     _enqueue_data_and_set_ready(channel_id, local_state,
                                 
BlockWrapper::create_shared(std::move(new_block)));
 
+    
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
     return Status::OK();
 }
 
@@ -494,7 +516,10 @@ Status 
AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectoriz
             std::iota(channel_ids.begin() + i, channel_ids.end(), 0);
         }
     }
-    return _split_rows(state, channel_ids.data(), block, local_state);
+
+    RETURN_IF_ERROR(_split_rows(state, channel_ids.data(), block, 
local_state));
+    
local_state._memory_used_counter->set(local_state._shared_state->mem_usage);
+    return Status::OK();
 }
 
 Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 356feea9c36..4a1e4536373 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -553,11 +553,14 @@ size_t PipelineTask::get_revocable_size() const {
     return revocable_size;
 }
 
-Status PipelineTask::revoke_memory() {
+Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context) {
     if (_sink->revocable_mem_size(_state) >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-        RETURN_IF_ERROR(_sink->revoke_memory(_state));
+        RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context));
+    } else if (spill_context) {
+        spill_context->on_task_finished();
     }
-    return _root->revoke_memory(_state);
+
+    return Status::OK();
 }
 
 void PipelineTask::wake_up() {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 943366b4b70..633cca93f46 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -248,7 +248,7 @@ public:
     }
 
     [[nodiscard]] size_t get_revocable_size() const;
-    [[nodiscard]] Status revoke_memory();
+    [[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context);
 
     void add_spill_dependency(Dependency* dependency) {
         _spill_dependencies.emplace_back(dependency);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 6a0cf20a4dd..45fd5562a93 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -23,13 +23,17 @@
 #include <gen_cpp/Types_types.h>
 #include <glog/logging.h>
 
+#include <algorithm>
 #include <exception>
 #include <memory>
 #include <mutex>
+#include <shared_mutex>
 #include <sstream>
 #include <utility>
+#include <vector>
 
 #include "common/logging.h"
+#include "common/status.h"
 #include "olap/olap_common.h"
 #include "pipeline/dependency.h"
 #include "pipeline/pipeline_fragment_context.h"
@@ -442,6 +446,66 @@ size_t QueryContext::get_revocable_size() const {
     return revocable_size;
 }
 
+Status QueryContext::revoke_memory() {
+    std::vector<std::pair<size_t, pipeline::PipelineTask*>> tasks;
+    std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> fragments;
+    for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
+        auto fragment_ctx = fragment_wptr.lock();
+        if (!fragment_ctx) {
+            continue;
+        }
+
+        auto tasks_of_fragment = fragment_ctx->get_revocable_tasks();
+        for (auto* task : tasks_of_fragment) {
+            tasks.emplace_back(task->get_revocable_size(), task);
+        }
+        fragments.emplace_back(std::move(fragment_ctx));
+    }
+
+    std::sort(tasks.begin(), tasks.end(), [](auto&& l, auto&& r) { return 
l.first > r.first; });
+
+    const auto mem_limit = query_mem_tracker->limit();
+    const auto target_revoking_size = mem_limit * 0.2;
+    size_t revoked_size = 0;
+
+    std::vector<pipeline::PipelineTask*> chosen_tasks;
+    for (auto&& [revocable_size, task] : tasks) {
+        chosen_tasks.emplace_back(task);
+
+        revoked_size += revocable_size;
+        if (revoked_size >= target_revoking_size) {
+            break;
+        }
+    }
+
+    std::weak_ptr<QueryContext> this_ctx = shared_from_this();
+    auto spill_context =
+            std::make_shared<pipeline::SpillContext>(chosen_tasks.size(), 
_query_id, [this_ctx] {
+                auto query_context = this_ctx.lock();
+                if (!query_context) {
+                    return;
+                }
+
+                LOG(INFO) << "query: " << print_id(query_context->_query_id)
+                          << " all revoking tasks done, resumt it.";
+                query_context->set_memory_sufficient(true);
+            });
+
+    for (auto* task : chosen_tasks) {
+        RETURN_IF_ERROR(task->revoke_memory(spill_context));
+    }
+
+    LOG(INFO) << "query: " << print_id(_query_id) << " total revoked size: " 
<< revoked_size
+              << ", target_size: " << target_revoking_size
+              << ", tasks count: " << chosen_tasks.size() << "/" << 
tasks.size();
+
+    return Status::OK();
+}
+
+void QueryContext::decrease_revoking_tasks_count() {
+    _revoking_tasks_count.fetch_sub(1);
+}
+
 std::vector<pipeline::PipelineTask*> QueryContext::get_revocable_tasks() const 
{
     std::vector<pipeline::PipelineTask*> tasks;
     for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 96b18eedfb9..b74b835af63 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -22,6 +22,7 @@
 #include <gen_cpp/Types_types.h>
 
 #include <atomic>
+#include <cstdint>
 #include <memory>
 #include <mutex>
 #include <string>
@@ -191,6 +192,8 @@ public:
 
     std::vector<pipeline::PipelineTask*> get_revocable_tasks() const;
 
+    Status revoke_memory();
+
     void register_query_statistics(std::shared_ptr<QueryStatistics> qs);
 
     std::shared_ptr<QueryStatistics> get_query_statistics();
@@ -228,7 +231,7 @@ public:
 
     void increase_revoking_tasks_count() { _revoking_tasks_count.fetch_add(1); 
}
 
-    void decrease_revoking_tasks_count() { _revoking_tasks_count.fetch_sub(1); 
}
+    void decrease_revoking_tasks_count();
 
     int get_revoking_tasks_count() const { return 
_revoking_tasks_count.load(); }
 
@@ -354,6 +357,8 @@ private:
     bool _is_pipeline = false;
     bool _is_nereids = false;
     std::atomic<int> _running_big_mem_op_num = 0;
+
+    std::mutex _revoking_tasks_mutex;
     std::atomic<int> _revoking_tasks_count = 0;
 
     // A token used to submit olap scanner to the "_limited_scan_thread_pool",
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 290d1fe1d5b..92235c1ded7 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -17,13 +17,17 @@
 
 #include "workload_group_manager.h"
 
+#include <glog/logging.h>
+
 #include <algorithm>
 #include <memory>
 #include <mutex>
 #include <unordered_map>
 
+#include "common/status.h"
 #include "exec/schema_scanner/schema_scanner_helper.h"
 #include "pipeline/task_scheduler.h"
+#include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/workload_group/workload_group.h"
 #include "util/mem_info.h"
@@ -198,8 +202,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
             doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
             PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES),
             weighted_memory_limit_ratio);
-    // LOG_EVERY_T(INFO, 60) << debug_msg;
-    LOG(INFO) << debug_msg;
+    LOG_EVERY_T(INFO, 60) << debug_msg;
     for (auto& wg : _workload_groups) {
         auto wg_mem_limit = wg.second->memory_limit();
         auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 
weighted_memory_limit_ratio);
@@ -299,8 +302,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
         }
         // During memory insufficent stage, we already set every query's 
memlimit, so that the flag is useless any more.
         wg.second->update_memory_sufficent(true);
-        // LOG_EVERY_T(INFO, 60) << debug_msg;
-        LOG(INFO) << debug_msg;
+        LOG_EVERY_T(INFO, 60) << debug_msg;
     }
 }
 
@@ -348,6 +350,7 @@ void WorkloadGroupMgr::add_paused_query(const 
std::shared_ptr<QueryContext>& que
  * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do spill disk or cancel it.
  */
 void WorkloadGroupMgr::handle_paused_queries() {
+    const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
     std::unique_lock<std::mutex> lock(_paused_queries_lock);
     for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
         auto& queries_list = it->second;
@@ -376,9 +379,13 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 continue;
             }
             if (query_ctx->is_cancelled()) {
-                LOG(INFO) << "query: " << print_id(query_ctx->query_id())
-                          << "was canceled, remove from paused list";
-                query_it = queries_list.erase(query_it);
+                /// Memory may not be released immediately after a query is 
canceled.
+                /// So here wait for a while.
+                if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
+                    LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+                              << " was canceled, remove from paused list";
+                    query_it = queries_list.erase(query_it);
+                }
                 continue;
             }
             if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
@@ -401,8 +408,9 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 }
                 // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
                 // and then set wg's flag, other query may not free memory 
very quickly.
-                if (query_it->elapsed_time() > 1000) {
+                if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
                     // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                    LOG(INFO) << "query: " << print_id(query_ctx->query_id()) 
<< " will be resume.";
                     query_ctx->set_memory_sufficient(true);
                     query_it = queries_list.erase(query_it);
                 } else {
@@ -437,6 +445,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
                             0.001 &&
                     query_it->cache_ratio_ > 0.001) {
+                    LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+                              << " will be resume after cache adjust.";
                     query_ctx->set_memory_sufficient(true);
                     query_it = queries_list.erase(query_it);
                     continue;
@@ -479,17 +489,7 @@ bool 
WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext> query
         }
     } else {
         SCOPED_ATTACH_TASK(query_ctx.get());
-        // TODO, should spill the task that has max memory, not all
-        for (auto* task : revocable_tasks) {
-            auto st = task->revoke_memory();
-            if (!st.ok()) {
-                query_ctx->cancel(st);
-                break;
-            }
-        }
-        LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << ", has "
-                  << revocable_tasks.size()
-                  << " tasks to revoke memory, revocable size: " << 
revocable_size;
+        RETURN_IF_ERROR(query_ctx->revoke_memory());
     }
     return true;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to