This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new ed3ca41faa3 Opt the spill logic (#41415)
ed3ca41faa3 is described below

commit ed3ca41faa300562002729006845877f014fad5d
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Fri Sep 27 19:37:16 2024 +0800

    Opt the spill logic (#41415)
    
    1. Add some counters in join/agg
    2. Optimize the log, add some debug infomation
    3. Revoke memory from non-sink operator(join probe)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp | 65 ++++++++++++----------
 be/src/pipeline/exec/aggregation_sink_operator.h   |  3 +
 .../pipeline/exec/aggregation_source_operator.cpp  | 13 ++++-
 be/src/pipeline/exec/aggregation_source_operator.h |  2 +
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  4 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |  6 ++
 .../exec/partitioned_aggregation_sink_operator.h   |  2 +
 .../partitioned_aggregation_source_operator.cpp    |  8 ++-
 .../exec/partitioned_aggregation_source_operator.h |  1 +
 .../exec/partitioned_hash_join_probe_operator.cpp  | 41 ++++++++++++--
 .../exec/partitioned_hash_join_probe_operator.h    |  7 ++-
 .../exec/partitioned_hash_join_sink_operator.cpp   |  7 ++-
 be/src/pipeline/exec/spill_utils.h                 | 24 ++++++--
 .../exec/streaming_aggregation_operator.cpp        |  4 +-
 be/src/pipeline/pipeline_task.cpp                  | 15 ++---
 be/src/runtime/query_context.cpp                   | 15 +++--
 be/src/runtime/query_context.h                     |  6 +-
 .../workload_group/workload_group_manager.cpp      | 32 +++++++----
 .../workload_group/workload_group_manager.h        |  3 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  1 +
 20 files changed, 186 insertions(+), 73 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 9597371057e..b8d38d32674 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -24,6 +24,7 @@
 #include "pipeline/exec/operator.h"
 #include "runtime/primitive_type.h"
 #include "runtime/thread_context.h"
+#include "util/runtime_profile.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 
@@ -58,8 +59,8 @@ Status AggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     _agg_data = Base::_shared_state->agg_data.get();
     _agg_arena_pool = Base::_shared_state->agg_arena_pool.get();
     _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", 
TUnit::UNIT);
-    _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"HashTable",
-                                                            TUnit::BYTES, 
"MemoryUsage", 1);
+    _hash_table_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableMemoryUsage", 
TUnit::BYTES, 1);
     _serialize_key_arena_memory_usage = 
Base::profile()->AddHighWaterMarkCounter(
             "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
 
@@ -76,6 +77,9 @@ Status AggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
     _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", 
TUnit::UNIT);
 
+    _container_memory_usage = ADD_COUNTER(profile(), "ContainerMemoryUsage", 
TUnit::BYTES);
+    _arena_memory_usage = ADD_COUNTER(profile(), "ArenaMemoryUsage", 
TUnit::BYTES);
+
     return Status::OK();
 }
 
@@ -226,32 +230,36 @@ size_t AggSinkLocalState::_memory_usage() const {
 }
 
 void AggSinkLocalState::_update_memusage_with_serialized_key() {
-    std::visit(vectorized::Overload {
-                       [&](std::monostate& arg) -> void {
-                           throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
-                       },
-                       [&](auto& agg_method) -> void {
-                           auto& data = *agg_method.hash_table;
-                           auto arena_memory_usage =
-                                   _agg_arena_pool->size() +
-                                   
Base::_shared_state->aggregate_data_container->memory_usage() -
-                                   
Base::_shared_state->mem_usage_record.used_in_arena;
-                           Base::_mem_tracker->consume(arena_memory_usage);
-                           Base::_mem_tracker->consume(
-                                   data.get_buffer_size_in_bytes() -
-                                   
Base::_shared_state->mem_usage_record.used_in_state);
-                           
_serialize_key_arena_memory_usage->add(arena_memory_usage);
-                           COUNTER_UPDATE(
-                                   _hash_table_memory_usage,
-                                   data.get_buffer_size_in_bytes() -
-                                           
Base::_shared_state->mem_usage_record.used_in_state);
-                           Base::_shared_state->mem_usage_record.used_in_state 
=
-                                   data.get_buffer_size_in_bytes();
-                           Base::_shared_state->mem_usage_record.used_in_arena 
=
-                                   _agg_arena_pool->size() +
-                                   
Base::_shared_state->aggregate_data_container->memory_usage();
-                       }},
-               _agg_data->method_variant);
+    std::visit(
+            vectorized::Overload {
+                    [&](std::monostate& arg) -> void {
+                        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                    },
+                    [&](auto& agg_method) -> void {
+                        auto& data = *agg_method.hash_table;
+                        auto arena_memory_usage =
+                                _agg_arena_pool->size() +
+                                
Base::_shared_state->aggregate_data_container->memory_usage() -
+                                
Base::_shared_state->mem_usage_record.used_in_arena;
+                        Base::_mem_tracker->consume(arena_memory_usage);
+                        Base::_mem_tracker->consume(
+                                data.get_buffer_size_in_bytes() -
+                                
Base::_shared_state->mem_usage_record.used_in_state);
+                        
_serialize_key_arena_memory_usage->add(arena_memory_usage);
+                        COUNTER_SET(_container_memory_usage,
+                                    
Base::_shared_state->aggregate_data_container->memory_usage());
+                        COUNTER_SET(_arena_memory_usage,
+                                    
static_cast<int64_t>(_agg_arena_pool->size()));
+                        COUNTER_UPDATE(_hash_table_memory_usage,
+                                       data.get_buffer_size_in_bytes() -
+                                               
Base::_shared_state->mem_usage_record.used_in_state);
+                        Base::_shared_state->mem_usage_record.used_in_state =
+                                data.get_buffer_size_in_bytes();
+                        Base::_shared_state->mem_usage_record.used_in_arena =
+                                _agg_arena_pool->size() +
+                                
Base::_shared_state->aggregate_data_container->memory_usage();
+                    }},
+            _agg_data->method_variant);
 }
 
 Status AggSinkLocalState::_destroy_agg_status(vectorized::AggregateDataPtr 
data) {
@@ -887,6 +895,7 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState* 
state) {
     auto& ss = *local_state.Base::_shared_state;
     RETURN_IF_ERROR(ss.reset_hash_table());
     local_state._agg_arena_pool = ss.agg_arena_pool.get();
+    local_state._serialize_key_arena_memory_usage->set(0);
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 39f11f6270f..2fa2d18a7e6 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -21,6 +21,7 @@
 
 #include "pipeline/exec/operator.h"
 #include "runtime/exec_env.h"
+#include "util/runtime_profile.h"
 
 namespace doris::pipeline {
 
@@ -111,6 +112,8 @@ protected:
     RuntimeProfile::Counter* _max_row_size_counter = nullptr;
     RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
     RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
+    RuntimeProfile::Counter* _container_memory_usage = nullptr;
+    RuntimeProfile::Counter* _arena_memory_usage = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = 
nullptr;
 
     bool _should_limit_output = false;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 6df089bbb5b..6391be5d70a 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -23,6 +23,7 @@
 #include "common/exception.h"
 #include "pipeline/exec/operator.h"
 #include "runtime/thread_context.h"
+#include "util/runtime_profile.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 #include "vec/exprs/vexpr_fwd.h"
 
@@ -44,7 +45,12 @@ Status AggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     _deserialize_data_timer = ADD_TIMER(Base::profile(), 
"DeserializeAndMergeTime");
     _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
     _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
-    _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
+    _hash_table_input_counter =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableInputCount", 
TUnit::UNIT, 1);
+    _hash_table_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableMemoryUsage", 
TUnit::BYTES, 1);
+    _hash_table_size_counter =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableSize", 
TUnit::UNIT, 1);
 
     auto& p = _parent->template cast<AggSourceOperatorX>();
     if (p._without_key) {
@@ -626,6 +632,11 @@ void 
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
                            }
 
                            COUNTER_UPDATE(_hash_table_input_counter, num_rows);
+                           COUNTER_SET(_hash_table_memory_usage,
+                                       static_cast<int64_t>(
+                                               
agg_method.hash_table->get_buffer_size_in_bytes()));
+                           COUNTER_SET(_hash_table_size_counter,
+                                       
static_cast<int64_t>(agg_method.hash_table->size()));
                        }},
                _shared_state->agg_data->method_variant);
 }
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index a3824a381eb..98ddd6a2142 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -82,6 +82,8 @@ protected:
     RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
+    RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
+    RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
     RuntimeProfile::Counter* _merge_timer = nullptr;
     RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
 
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index ef706dd0dea..02e538c4ab3 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -74,7 +74,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     _build_blocks_memory_usage =
             ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", 
TUnit::BYTES, "MemoryUsage", 1);
     _hash_table_memory_usage =
-            ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES, 
"MemoryUsage", 1);
+            ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage", 
TUnit::BYTES, 1);
     _build_arena_memory_usage =
             profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, 
"MemoryUsage", 1);
 
@@ -337,6 +337,8 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                         _mem_tracker->consume(arg.hash_table->get_byte_size() -
                                               old_hash_table_size);
                         _mem_tracker->consume(arg.serialized_keys_size(true) - 
old_key_size);
+                        COUNTER_SET(_hash_table_memory_usage,
+                                    int64_t(arg.hash_table->get_byte_size()));
                         return st;
                     }},
             *_shared_state->hash_table_variants, 
_shared_state->join_op_variants,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 6d84b8e8bb5..0bed1d9e13d 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -107,6 +107,10 @@ void PartitionedAggSinkLocalState::_init_counters() {
     _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
     _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
     _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", 
TUnit::UNIT);
+    _container_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "ContainerMemoryUsage", 
TUnit::BYTES, 1);
+    _arena_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "ArenaMemoryUsage", 
TUnit::BYTES, 1);
     COUNTER_SET(_max_row_size_counter, (int64_t)0);
 
     _spill_serialize_hash_table_timer =
@@ -133,6 +137,8 @@ void 
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
     UPDATE_PROFILE(_hash_table_emplace_timer, "HashTableEmplaceTime");
     UPDATE_PROFILE(_hash_table_input_counter, "HashTableInputCount");
     UPDATE_PROFILE(_max_row_size_counter, "MaxRowSizeInBytes");
+    UPDATE_PROFILE(_container_memory_usage, "ContainerMemoryUsage");
+    UPDATE_PROFILE(_arena_memory_usage, "ArenaMemoryUsage");
 
     update_max_min_rows_counter();
 }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 0027754cde0..22001b752a2 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -303,6 +303,8 @@ public:
     RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
     RuntimeProfile::Counter* _max_row_size_counter = nullptr;
     RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
+    RuntimeProfile::Counter* _container_memory_usage = nullptr;
+    RuntimeProfile::Counter* _arena_memory_usage = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = 
nullptr;
 
     RuntimeProfile::Counter* _spill_serialize_hash_table_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index bf7ec22793f..8b281a88684 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -64,13 +64,16 @@ void PartitionedAggLocalState::_init_counters() {
     _hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime");
     _insert_keys_to_column_timer = ADD_TIMER(profile(), 
"InsertKeysToColumnTime");
     _serialize_data_timer = ADD_TIMER(profile(), "SerializeDataTime");
-    _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", 
TUnit::UNIT);
+    _hash_table_size_counter = ADD_COUNTER_WITH_LEVEL(profile(), 
"HashTableSize", TUnit::UNIT, 1);
 
     _merge_timer = ADD_TIMER(profile(), "MergeTime");
     _deserialize_data_timer = ADD_TIMER(profile(), "DeserializeAndMergeTime");
     _hash_table_compute_timer = ADD_TIMER(profile(), "HashTableComputeTime");
     _hash_table_emplace_timer = ADD_TIMER(profile(), "HashTableEmplaceTime");
-    _hash_table_input_counter = ADD_COUNTER(profile(), "HashTableInputCount", 
TUnit::UNIT);
+    _hash_table_input_counter =
+            ADD_COUNTER_WITH_LEVEL(profile(), "HashTableInputCount", 
TUnit::UNIT, 1);
+    _hash_table_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage", 
TUnit::BYTES, 1);
 }
 
 #define UPDATE_PROFILE(counter, name)                           \
@@ -88,6 +91,7 @@ void PartitionedAggLocalState::update_profile(RuntimeProfile* 
child_profile) {
     UPDATE_PROFILE(_insert_keys_to_column_timer, "InsertKeysToColumnTime");
     UPDATE_PROFILE(_serialize_data_timer, "SerializeDataTime");
     UPDATE_PROFILE(_hash_table_size_counter, "HashTableSize");
+    UPDATE_PROFILE(_hash_table_memory_usage, "HashTableMemoryUsage");
 }
 
 Status PartitionedAggLocalState::close(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 3505cf7eed8..05f9ff6eff0 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -74,6 +74,7 @@ protected:
     RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
+    RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
 };
 class AggSourceOperatorX;
 class PartitionedAggSourceOperatorX : public 
OperatorX<PartitionedAggLocalState> {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index fa9e3ff23b7..4b769544b52 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -17,11 +17,13 @@
 
 #include "partitioned_hash_join_probe_operator.h"
 
+#include <gen_cpp/Metrics_types.h>
 #include <glog/logging.h>
 
 #include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
 #include "util/mem_info.h"
+#include "util/runtime_profile.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
@@ -60,6 +62,8 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* 
state, LocalStateI
     _recovery_probe_blocks = ADD_COUNTER(profile(), 
"SpillRecoveryProbeBlocks", TUnit::UNIT);
     _recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillRecoveryProbeTime", 1);
 
+    _probe_blocks_bytes = ADD_COUNTER_WITH_LEVEL(profile(), 
"ProbeBlocksBytes", TUnit::BYTES, 1);
+
     _spill_serialize_block_timer =
             ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 
1);
     _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteDiskTime", 1);
@@ -82,6 +86,10 @@ Status 
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
     _build_expr_call_timer = ADD_CHILD_TIMER(profile(), "BuildExprCallTime", 
"BuildPhase");
     _build_side_compute_hash_timer =
             ADD_CHILD_TIMER(profile(), "BuildSideHashComputingTime", 
"BuildPhase");
+
+    _hash_table_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage", 
TUnit::BYTES, 1);
+
     _allocate_resource_timer = ADD_CHILD_TIMER(profile(), 
"AllocateResourceTime", "BuildPhase");
 
     // Probe phase
@@ -158,7 +166,8 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
     return Status::OK();
 }
 
-Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state, bool force) {
+Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
+        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context, bool force) {
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     auto query_id = state->query_id();
 
@@ -212,7 +221,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
         return Status::OK();
     };
 
-    auto exception_catch_func = [query_id, spill_func, this]() {
+    auto exception_catch_func = [query_id, spill_func, spill_context, this]() {
         SCOPED_TIMER(_spill_timer);
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
 {
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
@@ -228,8 +237,14 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
             _spill_status = std::move(status);
         }
         _spill_dependency->set_ready();
+        if (spill_context) {
+            spill_context->on_non_sink_task_finished();
+        }
     };
 
+    if (spill_context) {
+        spill_context->on_non_sink_task_started();
+    }
     _spill_dependency->block();
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func",
 {
         return Status::Error<INTERNAL_ERROR>(
@@ -575,6 +590,7 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
     }
 
     SCOPED_TIMER(local_state._partition_shuffle_timer);
+    int64_t bytes_of_blocks = 0;
     for (uint32_t i = 0; i != _partition_count; ++i) {
         const auto count = partition_indexes[i].size();
         if (UNLIKELY(count == 0)) {
@@ -592,9 +608,17 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
             (eos && partitioned_blocks[i]->rows() > 0)) {
             
local_state._probe_blocks[i].emplace_back(partitioned_blocks[i]->to_block());
             partitioned_blocks[i].reset();
+        } else {
+            bytes_of_blocks += partitioned_blocks[i]->allocated_bytes();
+        }
+
+        for (auto& block : local_state._probe_blocks[i]) {
+            bytes_of_blocks += block.allocated_bytes();
         }
     }
 
+    COUNTER_SET(local_state._probe_blocks_bytes, bytes_of_blocks);
+
     return Status::OK();
 }
 
@@ -604,6 +628,12 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill
     local_state._runtime_state = 
std::move(local_state._shared_state->inner_runtime_state);
     local_state._in_mem_shared_state_sptr =
             std::move(local_state._shared_state->inner_shared_state);
+
+    auto* sink_state = local_state._runtime_state->get_sink_local_state();
+    if (sink_state != nullptr) {
+        COUNTER_SET(local_state._hash_table_memory_usage,
+                    
sink_state->profile()->get_counter("HashTableMemoryUsage")->value());
+    }
     return Status::OK();
 }
 
@@ -670,6 +700,9 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
                << ", partition: " << local_state._partition_cursor << "rows: " 
<< block.rows()
                << ", usage: "
                << 
_inner_sink_operator->get_memory_usage(local_state._runtime_state.get());
+
+    COUNTER_SET(local_state._hash_table_memory_usage,
+                
sink_local_state->profile()->get_counter("HashTableMemoryUsage")->value());
     return Status::OK();
 }
 
@@ -805,10 +838,10 @@ Status PartitionedHashJoinProbeOperatorX::revoke_memory(
         return Status::OK();
     }
 
-    RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
+    RETURN_IF_ERROR(local_state.spill_probe_blocks(state, spill_context, 
true));
 
     if (_child) {
-        return _child->revoke_memory(state, nullptr);
+        return _child->revoke_memory(state, spill_context);
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 621681ca4cf..3effcadfaa1 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -47,7 +47,9 @@ public:
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
-    Status spill_probe_blocks(RuntimeState* state, bool force = false);
+    Status spill_probe_blocks(RuntimeState* state,
+                              const std::shared_ptr<SpillContext>& 
spill_context = nullptr,
+                              bool force = false);
 
     Status recovery_build_blocks_from_disk(RuntimeState* state, uint32_t 
partition_index,
                                            bool& has_data);
@@ -123,6 +125,9 @@ private:
     RuntimeProfile::Counter* _build_side_compute_hash_timer = nullptr;
     RuntimeProfile::Counter* _build_side_merge_block_timer = nullptr;
 
+    RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
+    RuntimeProfile::Counter* _probe_blocks_bytes = nullptr;
+
     RuntimeProfile::Counter* _allocate_resource_timer = nullptr;
 
     RuntimeProfile::Counter* _probe_phase_label = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 5323d74341a..136466aa6b2 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -287,8 +287,9 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
 
 Status PartitionedHashJoinSinkLocalState::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
-    LOG(INFO) << "hash join sink " << _parent->node_id() << " revoke_memory"
-              << ", eos: " << _child_eos;
+    VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " << 
state->task_id()
+               << " hash join sink " << _parent->node_id() << " revoke_memory"
+               << ", eos: " << _child_eos;
     DCHECK_EQ(_spilling_streams_count, 0);
     CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
 
@@ -450,6 +451,8 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
         }
     }
 
+    VLOG_DEBUG << "query: " << print_id(_state->query_id()) << ", task: " << 
_state->task_id()
+               << ", join sink " << _parent->node_id() << " revoke done";
     auto num = _spilling_streams_count.fetch_sub(1);
     DCHECK_GE(_spilling_streams_count, 0);
 
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index 086a6881fcd..d2b157463ae 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -38,10 +38,10 @@ using SpillPartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::SpillP
 struct SpillContext {
     std::atomic_int running_tasks_count;
     TUniqueId query_id;
-    std::function<void()> all_tasks_finished_callback;
+    std::function<void(SpillContext*)> all_tasks_finished_callback;
 
     SpillContext(int running_tasks_count_, TUniqueId query_id_,
-                 std::function<void()> all_tasks_finished_callback_)
+                 std::function<void(SpillContext*)> 
all_tasks_finished_callback_)
             : running_tasks_count(running_tasks_count_),
               query_id(std::move(query_id_)),
               
all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {}
@@ -50,14 +50,30 @@ struct SpillContext {
         LOG_IF(WARNING, running_tasks_count.load() != 0)
                 << "query: " << print_id(query_id)
                 << " not all spill tasks finished, remaining tasks: " << 
running_tasks_count.load();
+
+        LOG_IF(WARNING, _running_non_sink_tasks_count.load() != 0)
+                << "query: " << print_id(query_id)
+                << " not all spill tasks(non sink tasks) finished, remaining 
tasks: "
+                << _running_non_sink_tasks_count.load();
     }
 
     void on_task_finished() {
         auto count = running_tasks_count.fetch_sub(1);
-        if (count == 1) {
-            all_tasks_finished_callback();
+        if (count == 1 && _running_non_sink_tasks_count.load() == 0) {
+            all_tasks_finished_callback(this);
         }
     }
+
+    void on_non_sink_task_started() { 
_running_non_sink_tasks_count.fetch_add(1); }
+    void on_non_sink_task_finished() {
+        const auto count = _running_non_sink_tasks_count.fetch_sub(1);
+        if (count == 1 && running_tasks_count.load() == 0) {
+            all_tasks_finished_callback(this);
+        }
+    }
+
+private:
+    std::atomic_int _running_non_sink_tasks_count {0};
 };
 
 class SpillRunnable : public Runnable {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 41424914edc..a959b8c1456 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -87,8 +87,8 @@ Status StreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(Base::exec_time_counter());
     SCOPED_TIMER(Base::_init_timer);
-    _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"HashTable",
-                                                            TUnit::BYTES, 
"MemoryUsage", 1);
+    _hash_table_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableMemoryUsage", 
TUnit::BYTES, 1);
     _serialize_key_arena_memory_usage = 
Base::profile()->AddHighWaterMarkCounter(
             "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 0028614b22a..aa2b01b741a 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -388,7 +388,7 @@ Status PipelineTask::execute(bool* eos) {
                 if (!st.ok()) {
                     LOG(INFO) << "query: " << print_id(query_id)
                               << ", try to reserve: " << reserve_size << 
"(sink reserve size:("
-                              << sink_reserve_size << " )"
+                              << sink_reserve_size << ")"
                               << ", sink name: " << _sink->get_name()
                               << ", node id: " << _sink->node_id() << " 
failed: " << st.to_string()
                               << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
@@ -547,19 +547,20 @@ size_t PipelineTask::get_revocable_size() const {
         return 0;
     }
 
-    auto revocable_size = _root->revocable_mem_size(_state);
-    revocable_size += _sink->revocable_mem_size(_state);
-
-    return revocable_size;
+    return _sink->revocable_mem_size(_state) + 
_root->revocable_mem_size(_state);
 }
 
 Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context) {
-    if (_sink->revocable_mem_size(_state) >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+    RETURN_IF_ERROR(_root->revoke_memory(_state, spill_context));
+
+    const auto revocable_size = _sink->revocable_mem_size(_state);
+    if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
         RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context));
     } else if (spill_context) {
         spill_context->on_task_finished();
+        LOG(INFO) << "query: " << print_id(_state->query_id()) << ", task: " 
<< ((void*)this)
+                  << " has not enough data to revoke: " << revocable_size;
     }
-
     return Status::OK();
 }
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 51e01a071bb..af4cd3d417d 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -486,26 +486,25 @@ Status QueryContext::revoke_memory() {
     }
 
     std::weak_ptr<QueryContext> this_ctx = shared_from_this();
-    auto spill_context =
-            std::make_shared<pipeline::SpillContext>(chosen_tasks.size(), 
_query_id, [this_ctx] {
+    auto spill_context = std::make_shared<pipeline::SpillContext>(
+            chosen_tasks.size(), _query_id, [this_ctx](pipeline::SpillContext* 
context) {
                 auto query_context = this_ctx.lock();
                 if (!query_context) {
                     return;
                 }
 
                 LOG(INFO) << "query: " << print_id(query_context->_query_id)
-                          << " all revoking tasks done, resume it.";
+                          << ", context: " << ((void*)context)
+                          << " all revoking tasks done, resumt it.";
                 query_context->set_memory_sufficient(true);
             });
 
+    LOG(INFO) << "query: " << print_id(_query_id) << ", context: " << 
((void*)spill_context.get())
+              << " total revoked size: " << revoked_size << ", tasks count: " 
<< chosen_tasks.size()
+              << "/" << tasks.size();
     for (auto* task : chosen_tasks) {
         RETURN_IF_ERROR(task->revoke_memory(spill_context));
     }
-
-    LOG(INFO) << "query: " << print_id(_query_id) << " total revoked size: " 
<< revoked_size
-              << ", target_size: " << 
PrettyPrinter::print(target_revoking_size, TUnit::BYTES)
-              << ", tasks count: " << chosen_tasks.size() << "/" << 
tasks.size();
-
     return Status::OK();
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 4ad946562bb..299e4ced55c 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -245,13 +245,15 @@ public:
         query_mem_tracker->set_limit(std::min<int64_t>(new_mem_limit, 
_user_set_mem_limit));
     }
 
+    int64_t get_mem_limit() const { return query_mem_tracker->limit(); }
+
     std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return 
query_mem_tracker; }
 
-    int32_t get_slot_count() {
+    int32_t get_slot_count() const {
         return _query_options.__isset.query_slot_count ? 
_query_options.query_slot_count : 1;
     }
 
-    bool enable_query_slot_hard_limit() {
+    bool enable_query_slot_hard_limit() const {
         return _query_options.__isset.enable_query_slot_hard_limit
                        ? _query_options.enable_query_slot_hard_limit
                        : false;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index df65124f635..287a6b45729 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -328,7 +328,8 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
             }
 
             if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
-                bool spill_res = handle_single_query(query_ctx, 
query_ctx->paused_reason());
+                bool spill_res = handle_single_query(query_ctx, 
query_it->reserve_size_,
+                                                     
query_ctx->paused_reason());
                 if (!spill_res) {
                     ++query_it;
                     continue;
@@ -412,7 +413,8 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                 }
                 if (query_it->cache_ratio_ < 0.001) {
                     // TODO: Find other exceed limit workload group and cancel 
query.
-                    bool spill_res = handle_single_query(query_ctx, 
query_ctx->paused_reason());
+                    bool spill_res = handle_single_query(query_ctx, 
query_it->reserve_size_,
+                                                         
query_ctx->paused_reason());
                     if (!spill_res) {
                         ++query_it;
                         continue;
@@ -473,11 +475,12 @@ void 
WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
 // If the query could not release memory, then cancel the query, the return 
value is true.
 // If the query is not ready to do these tasks, it means just wait.
 bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> 
query_ctx,
-                                           Status paused_reason) {
+                                           size_t size_to_reserve, Status 
paused_reason) {
     // TODO: If the query is an insert into select query, should consider 
memtable as revoke memory.
     size_t revocable_size = 0;
     size_t memory_usage = 0;
     bool has_running_task = false;
+    const auto query_id = print_id(query_ctx->query_id());
     query_ctx->get_revocable_info(&revocable_size, &memory_usage, 
&has_running_task);
     if (has_running_task) {
         LOG(INFO) << "query: " << print_id(query_ctx->query_id())
@@ -488,16 +491,25 @@ bool 
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
     auto revocable_tasks = query_ctx->get_revocable_tasks();
     if (revocable_tasks.empty()) {
         if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
-            // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code 
and do try logic
-            
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
-                    "query reserve memory failed, but could not find  memory 
that "
-                    "could "
-                    "release or spill to disk"));
+            const auto limit = query_ctx->get_mem_limit();
+            if ((memory_usage + size_to_reserve) < limit) {
+                LOG(INFO) << "query: " << query_id << ", usage(" << 
memory_usage << " + "
+                          << size_to_reserve << ") less than limit(" << limit 
<< "), resume it.";
+                query_ctx->set_memory_sufficient(true);
+                return true;
+            } else {
+                // Use MEM_LIMIT_EXCEEDED so that FE could parse the error 
code and do try logic
+                
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
+                        "query({}) reserve memory failed, but could not find  
memory that could "
+                        "release or spill to disk(usage:{}, limit: {})",
+                        query_id, memory_usage, query_ctx->get_mem_limit()));
+            }
         } else {
             
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
-                    "The query reserved memory failed because process limit 
exceeded, and "
+                    "The query({}) reserved memory failed because process 
limit exceeded, and "
                     "there is no cache now. And could not find task to spill. 
Maybe you should set "
-                    "the workload group's limit to a lower value."));
+                    "the workload group's limit to a lower value.",
+                    query_id));
         }
     } else {
         SCOPED_ATTACH_TASK(query_ctx.get());
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index 8f69d5653b4..03f134006f5 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -106,7 +106,8 @@ public:
     void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>& 
wg_memtable_usages);
 
 private:
-    bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, Status 
paused_reason);
+    bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, size_t 
size_to_reserve,
+                             Status paused_reason);
     void handle_non_overcommit_wg_paused_queries();
     void handle_overcommit_wg_paused_queries();
     void change_query_to_hard_limit(WorkloadGroupPtr wg, bool 
enable_hard_limit);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 125cacd9353..a9c9ab2a2ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -3793,6 +3793,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableSortSpill(enableSortSpill);
         tResult.setEnableAggSpill(enableAggSpill);
         tResult.setEnableForceSpill(enableForceSpill);
+        tResult.setExternalAggPartitionBits(externalAggPartitionBits);
         tResult.setMinRevocableMem(minRevocableMem);
         tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to