This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b6b94cb576b712e74c28ee8e43fadf3dd062d12e
Author: jacktengg <tengjianp...@selectdb.com>
AuthorDate: Fri Dec 13 18:22:47 2024 +0800

    test
---
 be/src/common/config.cpp                           |  2 ++
 be/src/common/config.h                             |  2 ++
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  8 ++++++
 be/src/pipeline/exec/hashjoin_build_sink.h         |  2 ++
 .../exec/partitioned_hash_join_sink_operator.cpp   | 33 ++++++++++++++--------
 be/src/pipeline/pipeline_task.cpp                  | 32 ++++++++++++++++++---
 6 files changed, 63 insertions(+), 16 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1c66d88dfe5..89fcea3372d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1396,6 +1396,8 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, 
"false");
 DEFINE_Bool(enable_table_size_correctness_check, "false");
 DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
 
+DEFINE_mInt32(revocable_memory_bytes_high_watermark, "5");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 29e55e64063..8e308c1794d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1482,6 +1482,8 @@ DECLARE_mBool(enable_delete_bitmap_merge_on_compaction);
 // Enable validation to check the correctness of table size.
 DECLARE_Bool(enable_table_size_correctness_check);
 
+DECLARE_mInt32(revocable_memory_bytes_high_watermark);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 19ca5292c2f..a8122dd11ed 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -682,4 +682,12 @@ size_t 
HashJoinBuildSinkOperatorX::get_memory_usage(RuntimeState* state) const {
     return local_state._memory_used_counter->value();
 }
 
+std::string 
HashJoinBuildSinkOperatorX::get_memory_usage_debug_str(RuntimeState* state) 
const {
+    auto& local_state = get_local_state(state);
+    return fmt::format("build block: {}, hash table: {}, build key arena: {}",
+                       
PrettyPrinter::print_bytes(local_state._build_blocks_memory_usage->value()),
+                       
PrettyPrinter::print_bytes(local_state._hash_table_memory_usage->value()),
+                       
PrettyPrinter::print_bytes(local_state._build_arena_memory_usage->value()));
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 1e1405e149c..074ff33abb2 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -126,6 +126,8 @@ public:
 
     [[nodiscard]] size_t get_memory_usage(RuntimeState* state) const;
 
+    std::string get_memory_usage_debug_str(RuntimeState* state) const;
+
     bool should_dry_run(RuntimeState* state) override {
         return _is_broadcast_join && !state->get_sink_local_state()
                                               
->cast<HashJoinBuildSinkLocalState>()
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 09e13eb465d..fe78489cb98 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -582,14 +582,20 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
 
     local_state._child_eos = eos;
 
+    Defer defer_dgb {[&]() {
+        if (local_state.revocable_mem_size(state) > 128 * 1024 * 1024) {
+            VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task 
" << state->task_id()
+                       << " sink " << node_id() << " _child_eos: " << 
local_state._child_eos
+                       << ", revocable memory: " << 
local_state.revocable_mem_size(state);
+        }
+    }};
     const auto rows = in_block->rows();
 
     const auto need_to_spill = local_state._shared_state->need_to_spill;
     if (rows == 0) {
         if (eos) {
             VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task 
" << state->task_id()
-                       << " sink " << node_id() << " eos, set_ready_to_read"
-                       << ", need spill: " << need_to_spill;
+                       << " sink " << node_id() << " eos, need spill: " << 
need_to_spill;
 
             if (need_to_spill) {
                 return revoke_memory(state, nullptr);
@@ -605,11 +611,13 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                 Defer defer {[&]() { local_state.update_memory_usage(); }};
                 RETURN_IF_ERROR(_inner_sink_operator->sink(
                         local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
-                VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " 
task "
-                           << state->task_id() << " sink " << node_id() << " 
eos, set_ready_to_read"
-                           << ", nonspill build usage: "
-                           << 
PrettyPrinter::print_bytes(_inner_sink_operator->get_memory_usage(
-                                      
local_state._shared_state->inner_runtime_state.get()));
+
+                VLOG_DEBUG << fmt::format(
+                        "Query: {}, task {}, sink {} eos, set_ready_to_read, 
nonspill memory "
+                        "usage: {}",
+                        print_id(state->query_id()), state->task_id(), 
node_id(),
+                        _inner_sink_operator->get_memory_usage_debug_str(
+                                
local_state._shared_state->inner_runtime_state.get()));
             }
 
             
std::for_each(local_state._shared_state->partitioned_build_blocks.begin(),
@@ -646,11 +654,12 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                 local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
 
         if (eos) {
-            VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " task " 
<< state->task_id()
-                       << " sink " << node_id() << " eos, set_ready_to_read"
-                       << ", nonspill build usage: "
-                       << _inner_sink_operator->get_memory_usage(
-                                  
local_state._shared_state->inner_runtime_state.get());
+            VLOG_DEBUG << fmt::format(
+                    "Query: {}, task {}, sink {} eos, set_ready_to_read, 
nonspill memory "
+                    "usage: {}",
+                    print_id(state->query_id()), state->task_id(), node_id(),
+                    _inner_sink_operator->get_memory_usage_debug_str(
+                            
local_state._shared_state->inner_runtime_state.get()));
             local_state._dependency->set_ready_to_read();
         }
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index a8b7dd06b27..6bf4cb39fd1 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -42,6 +42,7 @@
 #include "util/container_util.hpp"
 #include "util/defer_op.h"
 #include "util/mem_info.h"
+#include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
 #include "util/uid_util.h"
 #include "vec/core/block.h"
@@ -492,10 +493,33 @@ Status PipelineTask::execute(bool* eos) {
                     // it will not be able to spill later even if memory is 
low, and will cause cancel of queries.
                     // So make a check here, if it's low watermark after 
reserve and if reserved memory is too many,
                     // then trigger revoke memory.
-                    if (is_low_watermark &&
-                        sink_reserve_size >= workload_group->memory_limit() * 
0.05) {
-                        RETURN_IF_ERROR(_sink->revoke_memory(_state, nullptr));
-                        continue;
+
+                    // debug
+                    if (sink_reserve_size > 64 * 1024 * 1024) {
+                        LOG(INFO) << fmt::format(
+                                "Query: {}, sink name: {}, node id: {}, task 
id: {}, "
+                                "is_low_watermark: {}, sink_reserve_size: {}, 
wg mem limit: {}, "
+                                "reserve/wg_limit: {}",
+                                print_id(query_id), _sink->get_name(), 
_sink->node_id(),
+                                _state->task_id(), is_low_watermark,
+                                PrettyPrinter::print_bytes(sink_reserve_size),
+                                
PrettyPrinter::print_bytes(workload_group->memory_limit()),
+                                ((double)sink_reserve_size) / 
workload_group->memory_limit());
+                    }
+                    if (is_low_watermark) {
+                        const auto revocable_size = 
_sink->revocable_mem_size(_state);
+                        if (revocable_size >= 
config::revocable_memory_bytes_high_watermark) {
+                            LOG(INFO) << fmt::format(
+                                    "Query: {}, sink name: {}, node id: {}, 
task id: {}, "
+                                    "sink_reserve_size: {}, revoke_memory "
+                                    "because revocable memory is high: {}",
+                                    print_id(query_id), _sink->get_name(), 
_sink->node_id(),
+                                    _state->task_id(),
+                                    
PrettyPrinter::print_bytes(sink_reserve_size),
+                                    
PrettyPrinter::print_bytes(revocable_size));
+                            RETURN_IF_ERROR(_sink->revoke_memory(_state, 
nullptr));
+                            continue;
+                        }
                     }
                 }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to