This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new ffcf519d841 memtable flush reserve process mem and improve logs 
(#45743)
ffcf519d841 is described below

commit ffcf519d841d295285cc5caef15b47a15acb7d20
Author: TengJianPing <tengjianp...@selectdb.com>
AuthorDate: Tue Dec 24 16:29:15 2024 +0800

    memtable flush reserve process mem and improve logs (#45743)
---
 be/src/olap/memtable.cpp                           |  14 +++
 be/src/olap/memtable.h                             |   1 +
 be/src/olap/memtable_flush_executor.cpp            |  40 ++++++++
 be/src/olap/memtable_flush_executor.h              |  19 ++++
 be/src/olap/memtable_memory_limiter.cpp            |  28 ++++--
 be/src/olap/memtable_memory_limiter.h              |   2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |   5 +-
 be/src/pipeline/pipeline_task.cpp                  |  10 +-
 be/src/pipeline/pipeline_task.h                    |   2 +
 be/src/runtime/memory/global_memory_arbitrator.h   |  14 +--
 be/src/runtime/memory/mem_tracker_limiter.cpp      |   3 +-
 be/src/runtime/memory/memory_profile.cpp           |  13 ++-
 be/src/runtime/memory/memory_profile.h             |   1 +
 be/src/runtime/memory/thread_mem_tracker_mgr.h     |  48 ++++++----
 be/src/runtime/runtime_state.h                     |  14 ---
 be/src/runtime/thread_context.h                    |  11 ++-
 be/src/runtime/workload_group/workload_group.cpp   |  20 ++--
 be/src/runtime/workload_group/workload_group.h     |   6 +-
 .../workload_group/workload_group_manager.cpp      | 101 ++++++++++++---------
 .../workload_group/workload_group_manager.h        |   3 +-
 be/src/vec/exec/scan/scanner_context.h             |   2 +
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  69 ++++++++++----
 be/src/vec/exec/scan/vscanner.h                    |  11 ++-
 .../java/org/apache/doris/qe/SessionVariable.java  |  15 ---
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 25 files changed, 303 insertions(+), 150 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 995402bc273..bf7c5d53d25 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -611,6 +611,20 @@ bool MemTable::need_agg() const {
     return false;
 }
 
+size_t MemTable::get_flush_reserve_memory_size() const {
+    size_t reserve_size = 0;
+    if (_keys_type == KeysType::DUP_KEYS) {
+        if (_tablet_schema->num_key_columns() == 0) {
+            // no need to reserve
+        } else {
+            reserve_size = _input_mutable_block.allocated_bytes();
+        }
+    } else {
+        reserve_size = _input_mutable_block.allocated_bytes();
+    }
+    return reserve_size;
+}
+
 Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
     size_t same_keys_num = _sort();
     if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 77ff2e886bf..09591df2745 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -181,6 +181,7 @@ public:
 
     int64_t tablet_id() const { return _tablet_id; }
     size_t memory_usage() const { return _mem_tracker->consumption(); }
+    size_t get_flush_reserve_memory_size() const;
     // insert tuple from (row_pos) to (row_pos+num_rows)
     Status insert(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
 
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index 50ccbb25958..5533a360fac 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -28,6 +28,7 @@
 #include "common/signal_handler.h"
 #include "olap/memtable.h"
 #include "olap/rowset/rowset_writer.h"
+#include "olap/storage_engine.h"
 #include "util/debug_points.h"
 #include "util/doris_metrics.h"
 #include "util/metrics.h"
@@ -140,6 +141,36 @@ Status FlushToken::wait() {
     return Status::OK();
 }
 
+Status FlushToken::_try_reserve_memory(QueryThreadContext 
query_thread_context, int64_t size) {
+    auto* thread_context = doris::thread_context();
+    auto* memtable_flush_executor =
+            ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
+    Status st;
+    do {
+        // only try to reserve process memory
+        st = thread_context->try_reserve_process_memory(size);
+        if (st.ok()) {
+            memtable_flush_executor->inc_flushing_task();
+            break;
+        }
+        if (_is_shutdown() || 
query_thread_context.get_memory_tracker()->is_query_cancelled()) {
+            st = Status::Cancelled("flush memtable already cancelled");
+            break;
+        }
+        // Make sure at least one memtable is flushing even reserve memory 
failed.
+        if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
+            // If there are already any flushing task, Wait for some time and 
retry.
+            LOG_EVERY_T(INFO, 60) << fmt::format(
+                    "Failed to reserve memory {} for flush memtable, retry 
after 100ms", size);
+            std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        } else {
+            st = Status::OK();
+            break;
+        }
+    } while (true);
+    return st;
+}
+
 Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, 
int64_t* flush_size) {
     VLOG_CRITICAL << "begin to flush memtable for tablet: " << 
memtable->tablet_id()
                   << ", memsize: " << memtable->memory_usage()
@@ -150,10 +181,19 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, 
int32_t segment_id, in
     SCOPED_ATTACH_TASK(memtable->query_thread_context());
     signal::set_signal_task_id(_rowset_writer->load_id());
     signal::tablet_id = memtable->tablet_id();
+
+    DEFER_RELEASE_RESERVED();
+
+    auto reserve_size = memtable->get_flush_reserve_memory_size();
+    RETURN_IF_ERROR(_try_reserve_memory(memtable->query_thread_context(), 
reserve_size));
     {
         SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
                 
memtable->query_thread_context().query_mem_tracker->write_tracker());
         SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
+
+        Defer defer {[&]() {
+            
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
+        }};
         std::unique_ptr<vectorized::Block> block;
         RETURN_IF_ERROR(memtable->to_block(&block));
         RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), 
segment_id, flush_size));
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 27e8e8a9b0e..040a8fa5449 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -94,6 +94,8 @@ private:
 
     Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* 
flush_size);
 
+    Status _try_reserve_memory(QueryThreadContext query_thread_context, 
int64_t size);
+
     // Records the current flush status of the tablet.
     // Note: Once its value is set to Failed, it cannot return to SUCCESS.
     std::shared_mutex _flush_status_lock;
@@ -140,12 +142,29 @@ public:
                               std::shared_ptr<RowsetWriter> rowset_writer, 
bool is_high_priority,
                               std::shared_ptr<WorkloadGroup> wg_sptr);
 
+    // return true if it already has any flushing task
+    bool check_and_inc_has_any_flushing_task() {
+        // need to use CAS instead of only `if (0 == _flushing_task_count)` 
statement,
+        // to avoid concurrent entries both pass the if statement
+        int expected_count = 0;
+        if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) {
+            return true;
+        }
+        DCHECK(expected_count == 0 && _flushing_task_count == 1);
+        return false;
+    }
+
+    void inc_flushing_task() { _flushing_task_count++; }
+
+    void dec_flushing_task() { _flushing_task_count--; }
+
 private:
     void _register_metrics();
     static void _deregister_metrics();
 
     std::unique_ptr<ThreadPool> _flush_pool;
     std::unique_ptr<ThreadPool> _high_prio_flush_pool;
+    std::atomic<int> _flushing_task_count = 0;
 };
 
 } // namespace doris
diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index b222c041b34..22b842ec672 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -124,10 +124,10 @@ void 
MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPt
         --sleep_times;
     }
     // Check process memory again.
-    handle_memtable_flush();
+    handle_memtable_flush(wg);
 }
 
-void MemTableMemoryLimiter::handle_memtable_flush() {
+void MemTableMemoryLimiter::handle_memtable_flush(WorkloadGroupPtr wg) {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
     if (!_soft_limit_reached() || _load_usage_low()) {
@@ -150,12 +150,17 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
         if (need_flush > 0) {
             auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
             LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? 
"hard" : "soft")
-                      << ", " << 
GlobalMemoryArbitrator::process_memory_used_details_str()
+                      << ", " << 
GlobalMemoryArbitrator::process_memory_used_details_str() << ", "
+                      << 
GlobalMemoryArbitrator::sys_mem_available_details_str()
                       << ", load mem: " << 
PrettyPrinter::print_bytes(_mem_tracker->consumption())
                       << ", memtable writers num: " << _writers.size()
                       << ", active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
                       << ", queue: " << 
PrettyPrinter::print_bytes(_queue_mem_usage)
-                      << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage);
+                      << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage)
+                      << ", wg: " << (wg ? wg->debug_string() : "null\n")
+                      << doris::ProcessProfile::instance()
+                                 ->memory_profile()
+                                 ->process_memory_detail_str();
             _flush_active_memtables(0, need_flush);
         }
     } while (_hard_limit_reached() && !_load_usage_low());
@@ -163,7 +168,16 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
     timer.stop();
     int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
     g_memtable_memory_limit_latency_ms << time_ms;
-    LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit";
+    LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit"
+              << ", " << 
GlobalMemoryArbitrator::process_memory_used_details_str() << ", "
+              << GlobalMemoryArbitrator::sys_mem_available_details_str()
+              << ", load mem: " << 
PrettyPrinter::print_bytes(_mem_tracker->consumption())
+              << ", memtable writers num: " << _writers.size()
+              << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
+              << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
+              << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage)
+              << ", wg: " << (wg ? wg->debug_string() : "null.\n")
+              << 
doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str();
 }
 
 int64_t MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id, 
int64_t need_flush) {
@@ -270,11 +284,13 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
     _last_limit = limit;
     _log_timer.reset();
     LOG(INFO) << ss.str() << ", " << 
GlobalMemoryArbitrator::process_memory_used_details_str()
+              << ", " << 
GlobalMemoryArbitrator::sys_mem_available_details_str()
               << ", load mem: " << 
PrettyPrinter::print_bytes(_mem_tracker->consumption())
               << ", memtable writers num: " << _writers.size()
               << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
               << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
-              << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
+              << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) 
<< "\n"
+              << 
doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str();
 }
 
 void MemTableMemoryLimiter::_refresh_mem_tracker() {
diff --git a/be/src/olap/memtable_memory_limiter.h 
b/be/src/olap/memtable_memory_limiter.h
index de2fb802165..155a1dd424b 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -43,7 +43,7 @@ public:
     // If yes, it will flush memtable to try to reduce memory consumption.
     // Every write operation will call this API to check if need flush 
memtable OR hang
     // when memory is not available.
-    void handle_memtable_flush();
+    void handle_memtable_flush(WorkloadGroupPtr wg);
 
     int64_t flush_workload_group_memtables(uint64_t wg_id, int64_t 
need_flush_bytes);
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 42d2640441e..f86fb491d71 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1821,8 +1821,9 @@ size_t PipelineFragmentContext::get_revocable_size(bool* 
has_running_task) const
             if (task->is_running() || task->is_revoking()) {
                 LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
                                       << " is running, task: " << 
(void*)task.get()
-                                      << ", task->is_revoking(): " << 
task->is_revoking() << ", "
-                                      << task->is_running();
+                                      << ", is_revoking: " << 
task->is_revoking()
+                                      << ", is_running: " << task->is_running()
+                                      << ", task info: " << 
task->debug_string();
                 *has_running_task = true;
                 return 0;
             }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 0ddc329da3b..3fef0fc4c93 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -437,11 +437,8 @@ Status PipelineTask::execute(bool* eos) {
                     }
                     LOG(INFO) << debug_msg;
 
-                    _state->get_query_ctx()->update_paused_reason(st);
-                    _state->get_query_ctx()->set_low_memory_mode();
-                    _state->get_query_ctx()->set_memory_sufficient(false);
                     
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                            _state->get_query_ctx()->shared_from_this(), 
reserve_size);
+                            _state->get_query_ctx()->shared_from_this(), 
reserve_size, st);
                     continue;
                 }
             }
@@ -484,11 +481,8 @@ Status PipelineTask::execute(bool* eos) {
                     DCHECK_EQ(_pending_block.get(), nullptr);
                     _pending_block = std::move(_block);
                     _block = 
vectorized::Block::create_unique(_pending_block->clone_empty());
-                    _state->get_query_ctx()->update_paused_reason(status);
-                    _state->get_query_ctx()->set_low_memory_mode();
-                    _state->get_query_ctx()->set_memory_sufficient(false);
                     
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                            _state->get_query_ctx()->shared_from_this(), 
sink_reserve_size);
+                            _state->get_query_ctx()->shared_from_this(), 
sink_reserve_size, status);
                     _pending_eos = *eos;
                     *eos = false;
                     continue;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 99decc05a9d..19315bef89d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -251,6 +251,8 @@ public:
         return _memory_sufficient_dependency.get();
     }
 
+    void inc_memory_reserve_failed_times() { 
COUNTER_UPDATE(_memory_reserve_failed_times, 1); }
+
 private:
     friend class RuntimeFilterDependency;
     bool _is_blocked();
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h 
b/be/src/runtime/memory/global_memory_arbitrator.h
index 05963132cb1..790301b68f9 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -91,12 +91,13 @@ public:
     static inline std::string sys_mem_available_details_str() {
         auto msg = fmt::format(
                 "sys available memory {}(= {}[proc/available] - {}[reserved] - 
"
-                "{}B[waiting_refresh])",
+                "{}B[waiting_refresh] + {}[tc/jemalloc_cache])",
                 PrettyPrinter::print(sys_mem_available(), TUnit::BYTES),
                 
PrettyPrinter::print(MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed),
                                      TUnit::BYTES),
                 PrettyPrinter::print(process_reserved_memory(), TUnit::BYTES),
-                refresh_interval_memory_growth);
+                refresh_interval_memory_growth,
+                
PrettyPrinter::print_bytes(static_cast<uint64_t>(MemInfo::allocator_cache_mem())));
 #ifdef ADDRESS_SANITIZER
         msg = "[ASAN]" + msg;
 #endif
@@ -165,15 +166,16 @@ public:
 
     static std::string process_limit_exceeded_errmsg_str() {
         return fmt::format(
-                "{} exceed limit {} or {} less than low water mark {}", 
process_memory_used_str(),
-                MemInfo::mem_limit_str(), sys_mem_available_str(),
+                "{} exceed limit {} or {} less than low water mark {}",
+                process_memory_used_details_str(), MemInfo::mem_limit_str(),
+                sys_mem_available_details_str(),
                 
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), 
TUnit::BYTES));
     }
 
     static std::string process_soft_limit_exceeded_errmsg_str() {
         return fmt::format("{} exceed soft limit {} or {} less than warning 
water mark {}.",
-                           process_memory_used_str(), 
MemInfo::soft_mem_limit_str(),
-                           sys_mem_available_str(),
+                           process_memory_used_details_str(), 
MemInfo::soft_mem_limit_str(),
+                           sys_mem_available_details_str(),
                            
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
                                                 TUnit::BYTES));
     }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 068c3427b84..bd9aa705a33 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -358,7 +358,8 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() 
{
             "{}, peak used {}, current used {}. backend {}, {}.",
             label(), type_string(_type), MemCounter::print_bytes(limit()),
             MemCounter::print_bytes(peak_consumption()), 
MemCounter::print_bytes(consumption()),
-            BackendOptions::get_localhost(), 
GlobalMemoryArbitrator::process_memory_used_str());
+            BackendOptions::get_localhost(),
+            GlobalMemoryArbitrator::process_memory_used_details_str());
     if (_type == Type::QUERY || _type == Type::LOAD) {
         err_msg += fmt::format(
                 " exec node:<{}>, can `set exec_mem_limit=8G` to change limit, 
details see "
diff --git a/be/src/runtime/memory/memory_profile.cpp 
b/be/src/runtime/memory/memory_profile.cpp
index c7421236c42..fa8b45abfb4 100644
--- a/be/src/runtime/memory/memory_profile.cpp
+++ b/be/src/runtime/memory/memory_profile.cpp
@@ -340,14 +340,17 @@ int64_t MemoryProfile::other_current_usage() {
     return memory_other_trackers_sum_bytes.get_value();
 }
 
+std::string MemoryProfile::process_memory_detail_str() const {
+    return fmt::format("Process Memory Summary: {}\n, {}\n, {}\n, {}",
+                       GlobalMemoryArbitrator::process_mem_log_str(),
+                       print_memory_overview_profile(), 
print_global_memory_profile(),
+                       print_top_memory_tasks_profile());
+}
+
 void MemoryProfile::print_log_process_usage() {
     if (_enable_print_log_process_usage) {
         _enable_print_log_process_usage = false;
-        LOG(WARNING) << "Process Memory Summary: " + 
GlobalMemoryArbitrator::process_mem_log_str()
-                     << "\n"
-                     << print_memory_overview_profile() << "\n"
-                     << print_global_memory_profile() << "\n"
-                     << print_top_memory_tasks_profile();
+        LOG(WARNING) << process_memory_detail_str();
     }
 }
 
diff --git a/be/src/runtime/memory/memory_profile.h 
b/be/src/runtime/memory/memory_profile.h
index 9f1bab0c02a..bf3a6aa9f39 100644
--- a/be/src/runtime/memory/memory_profile.h
+++ b/be/src/runtime/memory/memory_profile.h
@@ -69,6 +69,7 @@ public:
     // process memory changes more than 256M, or the GC ends
     void enable_print_log_process_usage() { _enable_print_log_process_usage = 
true; }
     void print_log_process_usage();
+    std::string process_memory_detail_str() const;
 
 private:
     MultiVersion<RuntimeProfile> _memory_overview_profile;
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 83caf753aed..8193d89394a 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -83,7 +83,7 @@ public:
     void consume(int64_t size, int skip_large_memory_check = 0);
     void flush_untracked_mem();
 
-    doris::Status try_reserve(int64_t size);
+    doris::Status try_reserve(int64_t size, bool only_check_process_memory);
 
     void release_reserved();
 
@@ -278,7 +278,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
     _stop_consume = false;
 }
 
-inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
+inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size,
+                                                      bool 
only_check_process_memory) {
     DCHECK(_limiter_tracker);
     DCHECK(size >= 0);
     CHECK(init());
@@ -286,26 +287,35 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size) {
     // _untracked_mem store bytes that not synchronized to process reserved 
memory.
     flush_untracked_mem();
     auto wg_ptr = _wg_wptr.lock();
-    if (!_limiter_tracker->try_reserve(size)) {
-        auto err_msg = fmt::format(
-                "reserve memory failed, size: {}, because query memory 
exceeded, memory tracker "
-                "consumption: {}, limit: {}",
-                PrettyPrinter::print(size, TUnit::BYTES),
-                PrettyPrinter::print(_limiter_tracker->consumption(), 
TUnit::BYTES),
-                PrettyPrinter::print(_limiter_tracker->limit(), TUnit::BYTES));
-        return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
-    }
-    if (wg_ptr) {
-        if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
+    if (only_check_process_memory) {
+        _limiter_tracker->reserve(size);
+        if (wg_ptr) {
+            wg_ptr->add_wg_refresh_interval_memory_growth(size);
+        }
+    } else {
+        if (!_limiter_tracker->try_reserve(size)) {
             auto err_msg = fmt::format(
-                    "reserve memory failed, size: {}, because workload group 
memory exceeded, "
-                    "workload group: {}",
-                    PrettyPrinter::print(size, TUnit::BYTES), 
wg_ptr->memory_debug_string());
-            _limiter_tracker->release(size);          // rollback
-            _limiter_tracker->release_reserved(size); // rollback
-            return 
doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg);
+                    "reserve memory failed, size: {}, because query memory 
exceeded, memory "
+                    "tracker "
+                    "consumption: {}, limit: {}",
+                    PrettyPrinter::print(size, TUnit::BYTES),
+                    PrettyPrinter::print(_limiter_tracker->consumption(), 
TUnit::BYTES),
+                    PrettyPrinter::print(_limiter_tracker->limit(), 
TUnit::BYTES));
+            return 
doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
+        }
+        if (wg_ptr) {
+            if (!wg_ptr->try_add_wg_refresh_interval_memory_growth(size)) {
+                auto err_msg = fmt::format(
+                        "reserve memory failed, size: {}, because workload 
group memory exceeded, "
+                        "workload group: {}",
+                        PrettyPrinter::print(size, TUnit::BYTES), 
wg_ptr->memory_debug_string());
+                _limiter_tracker->release(size);          // rollback
+                _limiter_tracker->release_reserved(size); // rollback
+                return 
doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg);
+            }
         }
     }
+
     if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
         auto err_msg =
                 fmt::format("reserve memory failed, size: {}, because proccess 
memory exceeded, {}",
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 16f500b2fcc..879bd647d96 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -410,20 +410,6 @@ public:
 
     bool enable_page_cache() const;
 
-    int partitioned_hash_join_rows_threshold() const {
-        if (!_query_options.__isset.partitioned_hash_join_rows_threshold) {
-            return 0;
-        }
-        return _query_options.partitioned_hash_join_rows_threshold;
-    }
-
-    int partitioned_hash_agg_rows_threshold() const {
-        if (!_query_options.__isset.partitioned_hash_agg_rows_threshold) {
-            return 0;
-        }
-        return _query_options.partitioned_hash_agg_rows_threshold;
-    }
-
     const std::vector<TTabletCommitInfo>& tablet_commit_infos() const {
         return _tablet_commit_infos;
     }
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index a48bc680925..a9aede24487 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -249,13 +249,22 @@ public:
         thread_mem_tracker_mgr->consume(size, skip_large_memory_check);
     }
 
+    doris::Status try_reserve_process_memory(const int64_t size) const {
+#ifdef USE_MEM_TRACKER
+        DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check ||
+               thread_mem_tracker()->label() != "Orphan")
+                << doris::memory_orphan_check_msg;
+#endif
+        return thread_mem_tracker_mgr->try_reserve(size, true);
+    }
+
     doris::Status try_reserve_memory(const int64_t size) const {
 #ifdef USE_MEM_TRACKER
         DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check ||
                thread_mem_tracker()->label() != "Orphan")
                 << doris::memory_orphan_check_msg;
 #endif
-        return thread_mem_tracker_mgr->try_reserve(size);
+        return thread_mem_tracker_mgr->try_reserve(size, false);
     }
 
     void release_reserved_memory() const {
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 7e9a9812956..c87a927a455 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -118,7 +118,7 @@ std::string WorkloadGroup::debug_string() const {
             _remote_scan_bytes_per_second);
 }
 
-bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {
+bool WorkloadGroup::try_add_wg_refresh_interval_memory_growth(int64_t size) {
     auto realtime_total_mem_used =
             _total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
     if ((realtime_total_mem_used >
@@ -137,15 +137,19 @@ std::string WorkloadGroup::memory_debug_string() const {
     auto realtime_total_mem_used = _total_mem_used + 
_wg_refresh_interval_memory_growth.load();
     auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit + 
1);
     return fmt::format(
-            "WorkloadGroup[id = {}, name = {}, memory_limit = {}, 
enable_memory_overcommit = {}, "
-            "total_mem_used = {}, wg_refresh_interval_memory_growth = {},  
mem_used_ratio = {}, "
-            "memory_low_watermark = {}, memory_high_watermark = {}, version = 
{}, is_shutdown = "
-            "{}, query_num = {}]",
-            _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
-            _enable_memory_overcommit ? "true" : "false",
+            "WorkloadGroup[id = {}, name = {}, version = {},"
+            "total_query_slot_count = {}, "
+            "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= 
{}%, "
+            "enable_memory_overcommit = {}, total_mem_used = {} 
(write_buffer_size={}),"
+            "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, "
+            "memory_low_watermark={}, memory_high_watermark={}, 
is_shutdown={}, query_num={}]",
+            _id, _name, _version, _total_query_slot_count,
+            PrettyPrinter::print(_memory_limit, TUnit::BYTES), 
to_string(_slot_mem_policy),
+            _load_buffer_ratio, _enable_memory_overcommit ? "true" : "false",
             PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
+            PrettyPrinter::print(_write_buffer_size.load(), TUnit::BYTES),
             PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), 
TUnit::BYTES),
-            mem_used_ratio, _memory_low_watermark, _memory_high_watermark, 
_version, _is_shutdown,
+            mem_used_ratio, _memory_low_watermark, _memory_high_watermark, 
_is_shutdown,
             _query_ctxs.size());
 }
 
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 1d617b22bfe..73fc4c965b8 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -113,7 +113,11 @@ public:
         return _total_query_slot_count.load(std::memory_order_relaxed);
     }
 
-    bool add_wg_refresh_interval_memory_growth(int64_t size);
+    void add_wg_refresh_interval_memory_growth(int64_t size) {
+        _wg_refresh_interval_memory_growth.fetch_add(size);
+    }
+
+    bool try_add_wg_refresh_interval_memory_growth(int64_t size);
 
     void sub_wg_refresh_interval_memory_growth(int64_t size) {
         _wg_refresh_interval_memory_growth.fetch_sub(size);
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index ade4f228850..dd3d7d970dd 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -271,9 +271,12 @@ void 
WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
 }
 
 void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
-                                        int64_t reserve_size) {
-    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+                                        int64_t reserve_size, const Status& 
status) {
     DCHECK(query_ctx != nullptr);
+    query_ctx->update_paused_reason(status);
+    query_ctx->set_low_memory_mode();
+    query_ctx->set_memory_sufficient(false);
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
     auto wg = query_ctx->workload_group();
     auto&& [it, inserted] = _paused_queries_list[wg].emplace(
             query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
@@ -281,7 +284,6 @@ void WorkloadGroupMgr::add_paused_query(const 
std::shared_ptr<QueryContext>& que
     // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
     // if hard limit is enabled, then not need enable other queries hard limit.
     if (inserted) {
-        query_ctx->set_memory_sufficient(false);
         LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
                   << ", workload group: " << wg->debug_string();
     }
@@ -399,8 +401,13 @@ void WorkloadGroupMgr::handle_paused_queries() {
                               << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
                               << ") failed due to workload group memory 
exceed, "
                                  "should set the workload group work in memory 
insufficent mode, "
-                                 "so that other query will reduce their 
memory. wg: "
-                              << wg->debug_string();
+                                 "so that other query will reduce their 
memory."
+                              << " Query mem limit: "
+                              << 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+                              << " mem usage: "
+                              << PrettyPrinter::print_bytes(
+                                         
query_ctx->get_mem_tracker()->consumption())
+                              << ", wg: " << wg->debug_string();
                 }
                 if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
                     // If not enable slot memory policy, then should spill 
directly
@@ -657,11 +664,12 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
         return false;
     }
 
+    const auto wg = query_ctx->workload_group();
     auto revocable_tasks = query_ctx->get_revocable_tasks();
     if (revocable_tasks.empty()) {
+        const auto limit = query_ctx->get_mem_limit();
+        const auto reserved_size = 
query_ctx->query_mem_tracker->reserved_consumption();
         if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
-            const auto limit = query_ctx->get_mem_limit();
-            const auto reserved_size = 
query_ctx->query_mem_tracker->reserved_consumption();
             // During waiting time, another operator in the query may finished 
and release
             // many memory and we could run.
             if ((memory_usage + size_to_reserve) < limit) {
@@ -674,43 +682,44 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
             } else if (time_in_queue >= 
config::spill_in_paused_queue_timeout_ms) {
                 // Use MEM_LIMIT_EXCEEDED so that FE could parse the error 
code and do try logic
                 auto msg1 = fmt::format(
-                        "Query {} reserve memory failed, but could not find 
memory that could "
-                        "release or spill to disk. Query memory usage: {}, 
reserved size: {}, try "
-                        "to reserve: {} "
-                        ", limit: {} ,process memory info: {}, wg info: {}.",
+                        "Query {} failed beause query limit is exceeded, but 
could "
+                        "not find memory that could release or spill to disk. 
Query memory usage: "
+                        "{}, limit: {}, reserved "
+                        "size: {}, try to reserve: {}, wg info: {}.",
                         query_id, PrettyPrinter::print_bytes(memory_usage),
+                        PrettyPrinter::print_bytes(limit),
                         PrettyPrinter::print_bytes(reserved_size),
-                        PrettyPrinter::print_bytes(size_to_reserve),
-                        PrettyPrinter::print_bytes(query_ctx->get_mem_limit()),
-                        
GlobalMemoryArbitrator::process_memory_used_details_str(),
-                        query_ctx->workload_group()->memory_debug_string());
-                auto msg2 = msg1 + fmt::format(
-                                           " Query Memory Tracker Summary: {}."
-                                           " Load Memory Tracker Summary: {}",
-                                           
MemTrackerLimiter::make_type_trackers_profile_str(
-                                                   
MemTrackerLimiter::Type::QUERY),
-                                           
MemTrackerLimiter::make_type_trackers_profile_str(
-                                                   
MemTrackerLimiter::Type::LOAD));
-                LOG(INFO) << msg2;
+                        PrettyPrinter::print_bytes(size_to_reserve), 
wg->memory_debug_string());
+                LOG(INFO) << fmt::format("{}.\n{}", msg1,
+                                         doris::ProcessProfile::instance()
+                                                 ->memory_profile()
+                                                 
->process_memory_detail_str());
                 
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
             } else {
                 return false;
             }
         } else if 
(paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
-            if (!query_ctx->workload_group()->exceed_limit()) {
+            if (!wg->exceed_limit()) {
                 LOG(INFO) << "Query: " << query_id
                           << " paused caused by 
WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it.";
                 query_ctx->set_memory_sufficient(true);
                 return true;
             } else if (time_in_queue > 
config::spill_in_paused_queue_timeout_ms) {
-                LOG(INFO) << "Query: " << query_id << ", workload group 
exceeded, info: "
-                          << 
GlobalMemoryArbitrator::process_memory_used_details_str()
-                          << ", wg info: " << 
query_ctx->workload_group()->memory_debug_string();
-                
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
-                        "The query({}) reserved memory failed because workload 
group limit "
-                        "exceeded, and there is no cache now. And could not 
find task to spill. "
-                        "Maybe you should set the workload group's limit to a 
lower value.",
-                        query_id));
+                auto msg1 = fmt::format(
+                        "Query {} failed because workload group memory is 
exceeded"
+                        ", and there is no cache now. And could not find task 
to spill. "
+                        "Query memory usage: {}, limit: {}, reserved "
+                        "size: {}, try to reserve: {}, wg info: {}."
+                        " Maybe you should set the workload group's limit to a 
lower value.",
+                        query_id, PrettyPrinter::print_bytes(memory_usage),
+                        PrettyPrinter::print_bytes(limit),
+                        PrettyPrinter::print_bytes(reserved_size),
+                        PrettyPrinter::print_bytes(size_to_reserve), 
wg->memory_debug_string());
+                LOG(INFO) << fmt::format("{}.\n{}", msg1,
+                                         doris::ProcessProfile::instance()
+                                                 ->memory_profile()
+                                                 
->process_memory_detail_str());
+                
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
             } else {
                 return false;
             }
@@ -724,21 +733,25 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
                           << ", process limit not exceeded now, resume this 
query"
                           << ", process memory info: "
                           << 
GlobalMemoryArbitrator::process_memory_used_details_str()
-                          << ", wg info: " << 
query_ctx->workload_group()->memory_debug_string();
+                          << ", wg info: " << wg->debug_string();
                 query_ctx->set_memory_sufficient(true);
                 return true;
             } else if (time_in_queue > 
config::spill_in_paused_queue_timeout_ms) {
-                LOG(INFO) << "Query: " << query_id << ", process limit 
exceeded, info: "
-                          << 
GlobalMemoryArbitrator::process_memory_used_details_str()
-                          << ", wg info: " << 
query_ctx->workload_group()->memory_debug_string();
-                
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
-                        "The query({}) reserved memory failed because process 
limit exceeded, "
-                        "and "
-                        "there is no cache now. And could not find task to 
spill. Maybe you "
-                        "should "
-                        "set "
-                        "the workload group's limit to a lower value.",
-                        query_id));
+                auto msg1 = fmt::format(
+                        "Query {} failed because process memory is exceeded"
+                        ", and there is no cache now. And could not find task 
to spill. "
+                        "Query memory usage: {}, limit: {}, reserved "
+                        "size: {}, try to reserve: {}, wg info: {}."
+                        " Maybe you should set the workload group's limit to a 
lower value.",
+                        query_id, PrettyPrinter::print_bytes(memory_usage),
+                        PrettyPrinter::print_bytes(limit),
+                        PrettyPrinter::print_bytes(reserved_size),
+                        PrettyPrinter::print_bytes(size_to_reserve), 
wg->memory_debug_string());
+                LOG(INFO) << fmt::format("{}.\n{}", msg1,
+                                         doris::ProcessProfile::instance()
+                                                 ->memory_profile()
+                                                 
->process_memory_detail_str());
+                
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
             } else {
                 return false;
             }
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index fc53bfea858..47ea5540d3f 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -105,7 +105,8 @@ public:
         return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID];
     }
 
-    void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, 
int64_t reserve_size);
+    void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, 
int64_t reserve_size,
+                          const Status& status);
 
     void handle_paused_queries();
 
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 2e1a4e9a343..ed5224289bd 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -170,6 +170,8 @@ public:
 
     int32_t low_memory_mode_scanners() const { return 4; }
 
+    pipeline::ScanLocalStateBase* local_state() const { return _local_state; }
+
     // the unique id of this context
     std::string ctx_id;
     TUniqueId _query_id;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index a7bf8600663..d8858dd5aba 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -33,10 +33,12 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "olap/tablet.h"
+#include "pipeline/pipeline_task.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
+#include "runtime/workload_group/workload_group_manager.h"
 #include "util/async_io.h" // IWYU pragma: keep
 #include "util/blocking_queue.hpp"
 #include "util/cpu_info.h"
@@ -209,6 +211,32 @@ std::unique_ptr<ThreadPoolToken> 
ScannerScheduler::new_limited_scan_pool_token(
     return _limited_scan_thread_pool->new_token(mode, max_concurrency);
 }
 
+void handle_reserve_memory_failure(RuntimeState* state, 
std::shared_ptr<ScannerContext> ctx,
+                                   const Status& st, size_t reserve_size) {
+    ctx->clear_free_blocks();
+    auto* pipeline_task = state->get_task();
+    auto* local_state = ctx->local_state();
+
+    pipeline_task->inc_memory_reserve_failed_times();
+    auto debug_msg = fmt::format(
+            "Query: {} , scanner try to reserve: {}, operator name {}, "
+            "operator "
+            "id: {}, "
+            "task id: "
+            "{}, revocable mem size: {}, failed: {}",
+            print_id(state->query_id()), 
PrettyPrinter::print_bytes(reserve_size),
+            local_state->get_name(), local_state->parent()->node_id(), 
state->task_id(),
+            PrettyPrinter::print_bytes(pipeline_task->get_revocable_size()), 
st.to_string());
+    // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str
+    if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
+        debug_msg += fmt::format(", debug info: {}", 
GlobalMemoryArbitrator::process_mem_log_str());
+    }
+    LOG(INFO) << debug_msg;
+
+    ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+            state->get_query_ctx()->shared_from_this(), reserve_size, st);
+}
+
 void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                                      std::shared_ptr<ScanTask> scan_task) {
     auto task_lock = ctx->task_exec_ctx();
@@ -246,6 +274,10 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
     bool eos = false;
     ASSIGN_STATUS_IF_CATCH_EXCEPTION(
             RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
+            // scanner->open may alloc plenty amount of memory(read blocks of 
data),
+            // so better to also check low memory and clear free blocks here.
+            if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); }
+
             if (!scanner->is_init()) {
                 status = scanner->init();
                 if (!status.ok()) {
@@ -268,16 +300,17 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
             }
 
             size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
-            if (ctx->low_memory_mode() &&
-                raw_bytes_threshold > 
ctx->low_memory_mode_scan_bytes_per_scanner()) {
-                raw_bytes_threshold = 
ctx->low_memory_mode_scan_bytes_per_scanner();
+            if (ctx->low_memory_mode()) {
+                ctx->clear_free_blocks();
+                if (raw_bytes_threshold > 
ctx->low_memory_mode_scan_bytes_per_scanner()) {
+                    raw_bytes_threshold = 
ctx->low_memory_mode_scan_bytes_per_scanner();
+                }
             }
 
             size_t raw_bytes_read = 0;
             bool first_read = true;
             // If the first block is full, then it is true. Or the first block 
+ second block > batch_size
             bool has_first_full_block = false;
-            size_t block_avg_bytes = ctx->batch_size();
 
             // During low memory mode, every scan task will return at most 2 
block to reduce memory usage.
             while (!eos && raw_bytes_read < raw_bytes_threshold &&
@@ -298,15 +331,10 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     free_block = ctx->get_free_block(first_read);
                 } else {
                     if (state->enable_reserve_memory()) {
-                        auto reserve_status = 
thread_context()->try_reserve_memory(block_avg_bytes);
-                        if (!reserve_status.ok()) {
-                            LOG(INFO) << "query: " << 
print_id(state->query_id())
-                                      << ", scanner try to reserve: "
-                                      << PrettyPrinter::print(block_avg_bytes, 
TUnit::BYTES)
-                                      << ", failed: " << 
reserve_status.to_string()
-                                      << ", process info: "
-                                      << 
GlobalMemoryArbitrator::process_mem_log_str();
-                            ctx->clear_free_blocks();
+                        size_t block_avg_bytes = 
scanner->get_block_avg_bytes();
+                        auto st = 
thread_context()->try_reserve_memory(block_avg_bytes);
+                        if (!st.ok()) {
+                            handle_reserve_memory_failure(state, ctx, st, 
block_avg_bytes);
                             break;
                         }
                     }
@@ -353,10 +381,17 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
                 }
                 if (scan_task->cached_blocks.back().first->rows() > 0) {
-                    block_avg_bytes = 
(scan_task->cached_blocks.back().first->allocated_bytes() +
-                                       
scan_task->cached_blocks.back().first->rows() - 1) /
-                                      
scan_task->cached_blocks.back().first->rows() *
-                                      ctx->batch_size();
+                    auto block_avg_bytes =
+                            
(scan_task->cached_blocks.back().first->allocated_bytes() +
+                             scan_task->cached_blocks.back().first->rows() - 
1) /
+                            scan_task->cached_blocks.back().first->rows() * 
ctx->batch_size();
+                    scanner->update_block_avg_bytes(block_avg_bytes);
+                }
+                if (ctx->low_memory_mode()) {
+                    ctx->clear_free_blocks();
+                    if (raw_bytes_threshold > 
ctx->low_memory_mode_scan_bytes_per_scanner()) {
+                        raw_bytes_threshold = 
ctx->low_memory_mode_scan_bytes_per_scanner();
+                    }
                 }
             } // end for while
 
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 6c4f3294ce1..dab49b757c8 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -72,7 +72,10 @@ public:
     virtual Status init() { return Status::OK(); }
     // Not virtual, all child will call this method explictly
     virtual Status prepare(RuntimeState* state, const VExprContextSPtrs& 
conjuncts);
-    virtual Status open(RuntimeState* state) { return Status::OK(); }
+    virtual Status open(RuntimeState* state) {
+        _block_avg_bytes = state->batch_size() * 8;
+        return Status::OK();
+    }
 
     Status get_block(RuntimeState* state, Block* block, bool* eos);
     Status get_block_after_projects(RuntimeState* state, vectorized::Block* 
block, bool* eos);
@@ -156,6 +159,10 @@ public:
         _query_statistics = query_statistics;
     }
 
+    auto get_block_avg_bytes() const { return _block_avg_bytes; }
+
+    void update_block_avg_bytes(size_t block_avg_bytes) { _block_avg_bytes = 
block_avg_bytes; }
+
 protected:
     void _discard_conjuncts() {
         for (auto& conjunct : _conjuncts) {
@@ -211,6 +218,8 @@ protected:
     // num of rows return from scanner, after filter block
     int64_t _num_rows_return = 0;
 
+    size_t _block_avg_bytes = 0;
+
     // Set true after counter is updated finally
     bool _has_updated_counter = false;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index f0380efd7b6..8de6f379ecd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -397,7 +397,6 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String INTERNAL_SESSION = "internal_session";
 
-    public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD = 
"partitioned_hash_join_rows_threshold";
     public static final String PARTITIONED_HASH_AGG_ROWS_THRESHOLD = 
"partitioned_hash_agg_rows_threshold";
 
     public static final String PARTITION_PRUNING_EXPAND_THRESHOLD = 
"partition_pruning_expand_threshold";
@@ -1587,10 +1586,6 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = INTERNAL_SESSION)
     public boolean internalSession = false;
 
-    // Use partitioned hash join if build side row count >= the threshold . 0 
- the threshold is not set.
-    @VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD, fuzzy = 
true)
-    public int partitionedHashJoinRowsThreshold = 0;
-
     // Use partitioned hash join if build side row count >= the threshold . 0 
- the threshold is not set.
     @VariableMgr.VarAttr(name = PARTITIONED_HASH_AGG_ROWS_THRESHOLD, fuzzy = 
true)
     public int partitionedHashAggRowsThreshold = 0;
@@ -2377,7 +2372,6 @@ public class SessionVariable implements Serializable, 
Writable {
         // this.disableJoinReorder = random.nextBoolean();
         this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
         this.disableStreamPreaggregations = random.nextBoolean();
-        this.partitionedHashJoinRowsThreshold = random.nextBoolean() ? 8 : 
1048576;
         this.partitionedHashAggRowsThreshold = random.nextBoolean() ? 8 : 
1048576;
         this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
         // this.enableHashJoinEarlyStartProbe = random.nextBoolean();
@@ -3075,14 +3069,6 @@ public class SessionVariable implements Serializable, 
Writable {
         this.queryCacheEntryMaxRows = queryCacheEntryMaxRows;
     }
 
-    public int getPartitionedHashJoinRowsThreshold() {
-        return partitionedHashJoinRowsThreshold;
-    }
-
-    public void setPartitionedHashJoinRowsThreshold(int threshold) {
-        this.partitionedHashJoinRowsThreshold = threshold;
-    }
-
     // Serialize to thrift object
     public boolean getForwardToMaster() {
         return forwardToMaster;
@@ -3910,7 +3896,6 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setSkipDeleteBitmap(skipDeleteBitmap);
 
-        
tResult.setPartitionedHashJoinRowsThreshold(partitionedHashJoinRowsThreshold);
         
tResult.setPartitionedHashAggRowsThreshold(partitionedHashAggRowsThreshold);
 
         tResult.setExternalSortBytesThreshold(externalSortBytesThreshold);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index b44196d3df2..8cf33a6218b 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -182,6 +182,7 @@ struct TQueryOptions {
 
   52: optional i32 be_exec_version = 0
 
+  // not used any more
   53: optional i32 partitioned_hash_join_rows_threshold = 0
 
   54: optional bool enable_share_hash_table_for_broadcast_join


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to