github-actions[bot] commented on code in PR #44469:
URL: https://github.com/apache/doris/pull/44469#discussion_r1853423297


##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -29,56 +46,281 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, 
int un_finish_copy, siz
     block->clear();
 }
 
-Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
-    int* un_finish_copy = nullptr;
+Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,

Review Comment:
   warning: function 'pull' has cognitive complexity of 54 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:54:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (!_cached_blocks[sender_idx].empty()) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:69:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (!_spill_readers[sender_idx].empty()) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:71:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (!reader_item->stream->ready_for_reading()) {
               ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:76:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(reader->open());
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:76:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(reader->open());
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:77:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (reader_item->block_offset != 0) {
               ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:82:** nesting level 
increased to 2
   ```cpp
               auto spill_func = [this, reader_item, sender_idx]() {
                                 ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:86:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   while (!spill_eos) {
                   ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:87:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(reader_item->reader->read(&block, 
&spill_eos));
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:87:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(reader_item->reader->read(&block, 
&spill_eos));
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:88:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       if (!block.empty()) {
                       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:92:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                           if (_cached_blocks[sender_idx].size() >= 32 ||
                           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:99:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   if (spill_eos || !_cached_blocks[sender_idx].empty()) {
                   ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:106:** nesting level 
increased to 2
   ```cpp
               auto catch_exception_func = [spill_func = 
std::move(spill_func)]() {
                                           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:107:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_CATCH_EXCEPTION(return spill_func(););
                   ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:107:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_CATCH_EXCEPTION(return spill_func(););
                   ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:107:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_CATCH_EXCEPTION(return spill_func(););
                   ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:116:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable)));
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:116:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable)));
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:122:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (pos_to_pull == end) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:137:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (pos_to_pull == end) {
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp:
##########
@@ -228,91 +257,100 @@ Status 
PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state)
     return sink_local_state->open(state);
 }
 
-Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
-    VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node "
-               << Base::_parent->node_id() << " revoke_memory"
+size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, 
bool eos) {
+    auto& local_state = get_local_state(state);
+    auto* runtime_state = local_state._runtime_state.get();
+    auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state, eos);
+    COUNTER_SET(local_state._memory_usage_reserved, int64_t(size));
+    return size;
+}
+
+Status PartitionedAggSinkLocalState::revoke_memory(

Review Comment:
   warning: function 'revoke_memory' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   Status PartitionedAggSinkLocalState::revoke_memory(
                                        ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp:267:** 89 
lines including whitespace and comments (threshold 80)
   ```cpp
   Status PartitionedAggSinkLocalState::revoke_memory(
                                        ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.h:
##########
@@ -32,30 +43,53 @@ struct MultiCastBlock {
     size_t _mem_size;
 };
 
+struct SpillingReader {
+    vectorized::SpillReaderUPtr reader;
+    vectorized::SpillStreamSPtr stream;
+    int64_t block_offset {0};
+    bool all_data_read {false};
+};
+
 // TDOD: MultiCastDataStreamer same as the data queue, maybe rethink union and 
refactor the
 // code
 class MultiCastDataStreamer {
 public:
-    MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int 
cast_sender_count,
+    MultiCastDataStreamer(const RowDescriptor& row_desc, MultiCastSharedState* 
shared_state,
+                          ObjectPool* pool, int cast_sender_count, int32_t 
node_id,
                           bool with_dependencies = false)
             : _row_desc(row_desc),
+              _shared_state(shared_state),
               _profile(pool->add(new 
RuntimeProfile("MultiCastDataStreamSink"))),
-              _cast_sender_count(cast_sender_count) {
+              _cached_blocks(cast_sender_count),
+              _cast_sender_count(cast_sender_count),
+              _node_id(node_id),
+              _spill_readers(cast_sender_count),
+              _source_profiles(cast_sender_count) {
         _sender_pos_to_read.resize(cast_sender_count, 
_multi_cast_blocks.end());
         if (with_dependencies) {
             _dependencies.resize(cast_sender_count, nullptr);
         }
 
+        _spill_dependency = Dependency::create_shared(_node_id, _node_id,
+                                                      
"MultiCastDataStreamerDependency", true);
+
+        for (int i = 0; i != cast_sender_count; ++i) {
+            _spill_read_dependencies.emplace_back(Dependency::create_shared(
+                    node_id, node_id, "MultiCastReadSpillDependency", true));
+        }
         _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
         _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT);
     };
 
-    ~MultiCastDataStreamer() = default;
+    ~MultiCastDataStreamer() {

Review Comment:
   warning: use '= default' to define a trivial destructor 
[modernize-use-equals-default]
   ```cpp
       ~MultiCastDataStreamer() {
       ^
   ```
   



##########
be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:
##########
@@ -156,25 +172,32 @@
     return Status::OK();
 }
 
-Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state) {
+Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
+        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     auto query_id = state->query_id();
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-    auto spill_func = [query_id, state, submit_timer, this] {
-        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+    auto spill_func = [query_id, state, this] {

Review Comment:
   warning: lambda has cognitive complexity of 56 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
       auto spill_func = [query_id, state, this] {
                         ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:184:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
           for (uint32_t partition_index = 0; partition_index != 
p._partition_count;
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:188:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (partitioned_block) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:190:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
                   if (size >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:193:** +1, 
nesting level increased to 3
   ```cpp
                   } else {
                     ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:198:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (blocks.empty()) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:203:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (!spilling_stream) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:204:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
                   
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:204:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:211:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
               while (!blocks.empty() && !state->is_cancelled()) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:211:** +1
   ```cpp
               while (!blocks.empty() && !state->is_cancelled()) {
                                      ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:215:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(merged_block->merge(std::move(block)));
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:215:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(merged_block->merge(std::move(block)));
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:216:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
                   
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks",
 {
                   ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:216:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks",
 {
                   ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:221:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
                   if (merged_block->allocated_bytes() >=
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:224:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:224:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:230:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (!merged_block->empty()) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:232:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:232:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -99,29 +115,86 @@
     return mem_size;
 }
 
-Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* 
state) {
+void PartitionedHashJoinSinkLocalState::update_memory_usage() {
+    if (!_shared_state->need_to_spill) {
+        if (_shared_state->inner_shared_state) {
+            auto* inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+            if (inner_sink_state_) {
+                auto* inner_sink_state =
+                        
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+                COUNTER_SET(_memory_used_counter, 
inner_sink_state->_memory_used_counter->value());
+                COUNTER_SET(_peak_memory_usage_counter,
+                            inner_sink_state->_memory_used_counter->value());
+            }
+        }
+        return;
+    }
+
+    int64_t mem_size = 0;
+    auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+    for (auto& block : partitioned_blocks) {
+        if (block) {
+            mem_size += block->allocated_bytes();
+        }
+    }
+    COUNTER_SET(_memory_used_counter, mem_size);
+    COUNTER_SET(_peak_memory_usage_counter, mem_size);
+}
+
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
+    size_t size_to_reserve = 0;
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    if (_shared_state->need_to_spill) {
+        size_to_reserve = p._partition_count * 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM;
+    } else {
+        if (_shared_state->inner_runtime_state) {
+            size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
+                    _shared_state->inner_runtime_state.get(), eos);
+        }
+    }
+
+    COUNTER_SET(_memory_usage_reserved, int64_t(size_to_reserve));
+    return size_to_reserve;
+}
+
+Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(

Review Comment:
   warning: function '_revoke_unpartitioned_block' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                                             ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:159:** 128 
lines including whitespace and comments (threshold 80)
   ```cpp
   Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                                             ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:
##########
@@ -654,32 +681,51 @@
         return Status::Error<INTERNAL_ERROR>(
                 "fault_inject partitioned_hash_join_probe sink failed");
     });
+
     
RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._runtime_state.get(), 
&block, true));
-    VLOG_DEBUG << "query: " << print_id(state->query_id())
+    VLOG_DEBUG << "Query: " << print_id(state->query_id())
                << ", internal build operator finished, node id: " << node_id()
                << ", task id: " << state->task_id()
-               << ", partition: " << local_state._partition_cursor;
+               << ", partition: " << local_state._partition_cursor << "rows: " 
<< block.rows()
+               << ", usage: "
+               << 
_inner_sink_operator->get_memory_usage(local_state._runtime_state.get());
+
+    COUNTER_SET(local_state._hash_table_memory_usage,
+                
sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value());
     return Status::OK();
 }
 
 Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,

Review Comment:
   warning: function 'pull' has cognitive complexity of 73 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
                                             ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:700:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!local_state._shared_state->_spill_status.ok()) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:707:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (local_state._recovered_build_block && 
!local_state._recovered_build_block->empty()) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:707:** +1
   ```cpp
       if (local_state._recovered_build_block && 
!local_state._recovered_build_block->empty()) {
                                              ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:710:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (!mutable_block) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:712:** +1, 
nesting level increased to 2
   ```cpp
           } else {
             ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:713:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               
RETURN_IF_ERROR(mutable_block->merge(local_state._recovered_build_block->to_block()));
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:713:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               
RETURN_IF_ERROR(mutable_block->merge(local_state._recovered_build_block->to_block()));
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:718:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (local_state._need_to_setup_internal_operators) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:720:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(local_state.recover_build_blocks_from_disk(
           ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:720:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(local_state.recover_build_blocks_from_disk(
           ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:722:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (has_data) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:727:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(local_state.finish_spilling(partition_index));
           ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:727:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(local_state.finish_spilling(partition_index));
           ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:728:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_setup_internal_operators(local_state, state));
           ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:728:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_setup_internal_operators(local_state, state));
           ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:731:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (mutable_block && !mutable_block->empty()) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:731:** +1
   ```cpp
           if (mutable_block && !mutable_block->empty()) {
                             ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:737:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       while (_inner_probe_operator->need_more_input_data(runtime_state)) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:738:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (probe_blocks.empty()) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:741:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:741:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:743:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!has_data) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:745:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, 
&block, true));
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:745:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, 
&block, true));
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:750:** +1, 
nesting level increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:757:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (!block.empty()) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:758:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, 
&block, false));
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:758:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, 
&block, false));
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:762:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       
RETURN_IF_ERROR(_inner_probe_operator->pull(local_state._runtime_state.get(), 
output_block,
       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:762:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       
RETURN_IF_ERROR(_inner_probe_operator->pull(local_state._runtime_state.get(), 
output_block,
       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:766:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (in_mem_eos) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:771:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (local_state._partition_cursor == _partition_count) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:773:** +1, 
nesting level increased to 2
   ```cpp
           } else {
             ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -235,37 +288,69 @@
     return thread_pool->submit(std::move(spill_runnable));
 }
 
-Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
-    LOG(INFO) << "hash join sink " << _parent->node_id() << " revoke_memory"
-              << ", eos: " << _child_eos;
-    DCHECK_EQ(_spilling_streams_count, 0);
+Status PartitionedHashJoinSinkLocalState::revoke_memory(

Review Comment:
   warning: function 'revoke_memory' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   Status PartitionedHashJoinSinkLocalState::revoke_memory(
                                             ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:290:** 131 
lines including whitespace and comments (threshold 80)
   ```cpp
   Status PartitionedHashJoinSinkLocalState::revoke_memory(
                                             ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/spill_utils.h:
##########
@@ -17,39 +17,131 @@
 
 #pragma once
 
+#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
+
+#include <atomic>
+#include <functional>
+#include <utility>
+
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
 #include "runtime/task_execution_context.h"
 #include "runtime/thread_context.h"
+#include "util/runtime_profile.h"
 #include "util/threadpool.h"
 #include "vec/runtime/partitioner.h"
 
 namespace doris::pipeline {
 using SpillPartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
 
+struct SpillContext {
+    std::atomic_int running_tasks_count;
+    TUniqueId query_id;
+    std::function<void(SpillContext*)> all_tasks_finished_callback;
+
+    SpillContext(int running_tasks_count_, TUniqueId query_id_,
+                 std::function<void(SpillContext*)> 
all_tasks_finished_callback_)
+            : running_tasks_count(running_tasks_count_),
+              query_id(std::move(query_id_)),
+              
all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {}
+
+    ~SpillContext() {

Review Comment:
   warning: use '= default' to define a trivial destructor 
[modernize-use-equals-default]
   ```cpp
       ~SpillContext() {
       ^
   ```
   



##########
be/src/pipeline/exec/spill_sort_sink_operator.cpp:
##########
@@ -180,73 +194,70 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     return Status::OK();
 }
 
-Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
+size_t SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool 
eos) {
+    auto& parent = Base::_parent->template cast<Parent>();
+    return 
parent._sort_sink_operator->get_reserve_mem_size_for_next_sink(_runtime_state.get(),
+                                                                          eos);
+}
+
+Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,

Review Comment:
   warning: function 'revoke_memory' has cognitive complexity of 52 (threshold 
50) [readability-function-cognitive-complexity]
   ```cpp
   Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
                                   ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:204:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!_shared_state->is_spilled) {
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:212:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!_shared_state->_spill_status.ok()) {
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:220:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(status);
       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:220:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(status);
       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:227:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!_eos) {
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:232:** nesting level 
increased to 1
   ```cpp
       auto spill_func = [this, state, query_id, &parent] {
                         ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:259:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(status);
           ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:259:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(status);
           ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:266:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           while (!eos && !state->is_cancelled()) {
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:266:** +1
   ```cpp
           while (!eos && !state->is_cancelled()) {
                       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:273:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:273:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:275:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:275:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:283:** nesting level 
increased to 1
   ```cpp
       auto exception_catch_func = [query_id, spill_func]() {
                                   ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:284:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", {
           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:284:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", {
           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:292:** nesting level 
increased to 2
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                         ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:292:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:292:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:292:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:297:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", {
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:297:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", {
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:302:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (status.ok()) {
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:310:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!status.ok()) {
       ^
   ```
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:311:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (!_eos) {
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -29,56 +46,281 @@
     block->clear();
 }
 
-Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
-    int* un_finish_copy = nullptr;
+Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,

Review Comment:
   warning: function 'pull' exceeds recommended size/complexity thresholds 
[readability-function-size]
   ```cpp
   Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:48:** 95 lines including 
whitespace and comments (threshold 80)
   ```cpp
   Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,
                                 ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -20,6 +20,7 @@
 #include <fmt/format.h>

Review Comment:
   warning: 'fmt/format.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <fmt/format.h>
            ^
   ```
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -117,4 +360,38 @@
     dep->block();
 }
 
+std::string MultiCastDataStreamer::debug_string() {

Review Comment:
   warning: method 'debug_string' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   std::string MultiCastDataStreamer::debug_string() const {
   ```
   
   be/src/pipeline/exec/multi_cast_data_streamer.h:115:
   ```diff
   -     std::string debug_string();
   +     std::string debug_string() const;
   ```
   



##########
be/src/pipeline/exec/hashjoin_build_sink.cpp:
##########
@@ -108,6 +112,87 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, 
bool eos) {

Review Comment:
   warning: method 'get_reserve_mem_size' can be made const 
[readability-make-member-function-const]
   
   be/src/pipeline/exec/hashjoin_build_sink.h:56:
   ```diff
   -     [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool 
eos);
   +     [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool 
eos) const;
   ```
   
   ```suggestion
   size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) const {
   ```
   



##########
be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:
##########
@@ -190,109 +234,94 @@ Status 
PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
     return source_local_state->open(state);
 }
 
-Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(RuntimeState* 
state) {
-    DCHECK(!_is_merging);
-    
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
-    if 
(Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator !=
-                
Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() ||
-        _shared_state->spill_partitions.empty()) {
+Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, 
bool& has_data) {

Review Comment:
   warning: function 'recover_blocks_from_disk' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* 
state, bool& has_data) {
                                    ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp:236:** 89 
lines including whitespace and comments (threshold 80)
   ```cpp
   Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* 
state, bool& has_data) {
                                    ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:
##########
@@ -156,25 +172,32 @@
     return Status::OK();
 }
 
-Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state) {
+Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(

Review Comment:
   warning: function 'spill_probe_blocks' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
                                              ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:174:** 96 
lines including whitespace and comments (threshold 80)
   ```cpp
   Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
                                              ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:
##########
@@ -156,25 +172,32 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
     return Status::OK();
 }
 
-Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state) {
+Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(

Review Comment:
   warning: function 'spill_probe_blocks' has cognitive complexity of 95 
(threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
                                              ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:179:** 
nesting level increased to 1
   ```cpp
       auto spill_func = [query_id, state, this] {
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:184:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           for (uint32_t partition_index = 0; partition_index != 
p._partition_count;
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:188:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (partitioned_block) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:190:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if (size >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:193:** +1, 
nesting level increased to 4
   ```cpp
                   } else {
                     ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:198:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (blocks.empty()) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:203:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!spilling_stream) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:204:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:204:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                   
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:211:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               while (!blocks.empty() && !state->is_cancelled()) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:211:** +1
   ```cpp
               while (!blocks.empty() && !state->is_cancelled()) {
                                      ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:215:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(merged_block->merge(std::move(block)));
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:215:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_ERROR(merged_block->merge(std::move(block)));
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:216:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks",
 {
                   ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:216:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                   
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks",
 {
                   ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:221:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if (merged_block->allocated_bytes() >=
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:224:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:224:** +6, 
including nesting penalty of 5, nesting level increased to 6
   ```cpp
                       RETURN_IF_ERROR(
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:230:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!merged_block->empty()) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:232:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:232:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_ERROR(
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:245:** 
nesting level increased to 1
   ```cpp
       auto exception_catch_func = [query_id, spill_func]() {
                                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:246:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
 {
           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:246:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
 {
           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:254:** 
nesting level increased to 2
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:254:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:254:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:254:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:258:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (spill_context) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:262:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func",
 {
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:262:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func",
 {
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:
##########
@@ -183,133 +206,115 @@
                         state, spilling_stream, print_id(state->query_id()), 
"hash_probe",
                         _parent->node_id(), 
std::numeric_limits<int32_t>::max(),
                         std::numeric_limits<size_t>::max(), 
_runtime_profile.get()));
-                RETURN_IF_ERROR(spilling_stream->prepare_spill());
-                spilling_stream->set_write_counters(
-                        _spill_serialize_block_timer, _spill_block_count, 
_spill_data_size,
-                        _spill_write_disk_timer, _spill_write_wait_io_timer);
             }
 
-            COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
+            auto merged_block = 
vectorized::MutableBlock::create_unique(blocks[0].clone_empty());
             while (!blocks.empty() && !state->is_cancelled()) {
                 auto block = std::move(blocks.back());
                 blocks.pop_back();
+
+                RETURN_IF_ERROR(merged_block->merge(std::move(block)));
                 
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks",
 {
                     return Status::Error<INTERNAL_ERROR>(
                             "fault_inject partitioned_hash_join_probe 
spill_probe_blocks failed");
                 });
-                RETURN_IF_ERROR(spilling_stream->spill_block(state, block, 
false));
-                COUNTER_UPDATE(_spill_probe_rows, block.rows());
+
+                if (merged_block->allocated_bytes() >=
+                    vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+                    COUNTER_UPDATE(_spill_probe_rows, merged_block->rows());
+                    RETURN_IF_ERROR(
+                            spilling_stream->spill_block(state, 
merged_block->to_block(), false));
+                    COUNTER_UPDATE(_spill_probe_blocks, 1);
+                }
+            }
+
+            if (!merged_block->empty()) {
+                COUNTER_UPDATE(_spill_probe_rows, merged_block->rows());
+                RETURN_IF_ERROR(
+                        spilling_stream->spill_block(state, 
merged_block->to_block(), false));
+                COUNTER_UPDATE(_spill_probe_blocks, 1);
             }
         }
-        VLOG_DEBUG << "query: " << print_id(query_id)
+
+        COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size));
+        VLOG_DEBUG << "Query: " << print_id(query_id)
                    << " hash probe revoke done, node: " << p.node_id()
                    << ", task: " << state->task_id();
         return Status::OK();
     };
 
-    auto exception_catch_func = [query_id, spill_func, this]() {
+    auto exception_catch_func = [query_id, spill_func]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
 {
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                    query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
-                                                    "spill_probe_blocks 
canceled"));
-            return;
+            auto status = Status::InternalError(
+                    "fault_inject partitioned_hash_join_probe "
+                    "spill_probe_blocks canceled");
+            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, 
status);
+            return status;
         });
 
         auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
-
-        if (!status.ok()) {
-            _spill_status_ok = false;
-            _spill_status = std::move(status);
-        }
-        _dependency->set_ready();
+        return status;
     };
 
-    _dependency->block();
+    if (spill_context) {
+        spill_context->on_non_sink_task_started();
+    }
+    _spill_dependency->block();
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func",
 {
         return Status::Error<INTERNAL_ERROR>(
                 "fault_inject partitioned_hash_join_probe spill_probe_blocks 
submit_func failed");
     });
 
-    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
-                                                          
exception_catch_func);
+    auto spill_runnable = std::make_shared<SpillNonSinkRunnable>(
+            state, spill_context, _spill_dependency, _runtime_profile.get(),
+            _shared_state->shared_from_this(), exception_catch_func);
     return spill_io_pool->submit(std::move(spill_runnable));
 }
 
 Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t 
partition_index) {
-    auto& build_spilling_stream = 
_shared_state->spilled_streams[partition_index];
-    if (build_spilling_stream) {
-        RETURN_IF_ERROR(build_spilling_stream->spill_eof());
-        build_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
-                                                 _spill_read_bytes, 
_spill_read_wait_io_timer);
-    }
-
     auto& probe_spilling_stream = _probe_spilling_streams[partition_index];
 
     if (probe_spilling_stream) {
         RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
-        probe_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
-                                                 _spill_read_bytes, 
_spill_read_wait_io_timer);
+        probe_spilling_stream->set_read_counters(profile());
     }
 
     return Status::OK();
 }
 
-Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState*
 state,
-                                                                           
uint32_t partition_index,
-                                                                           
bool& has_data) {
-    VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
+Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState*
 state,

Review Comment:
   warning: function 'recover_build_blocks_from_disk' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState*
 state,
                                              ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:284:** 116 
lines including whitespace and comments (threshold 80)
   ```cpp
   Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState*
 state,
                                              ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/spill_sort_sink_operator.cpp:
##########
@@ -180,73 +194,70 @@
     return Status::OK();
 }
 
-Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
+size_t SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool 
eos) {
+    auto& parent = Base::_parent->template cast<Parent>();
+    return 
parent._sort_sink_operator->get_reserve_mem_size_for_next_sink(_runtime_state.get(),
+                                                                          eos);
+}
+
+Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,

Review Comment:
   warning: function 'revoke_memory' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
                                   ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/spill_sort_sink_operator.cpp:202:** 113 lines 
including whitespace and comments (threshold 80)
   ```cpp
   Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
                                   ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -99,29 +115,86 @@ size_t 
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
     return mem_size;
 }
 
-Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* 
state) {
+void PartitionedHashJoinSinkLocalState::update_memory_usage() {
+    if (!_shared_state->need_to_spill) {
+        if (_shared_state->inner_shared_state) {
+            auto* inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+            if (inner_sink_state_) {
+                auto* inner_sink_state =
+                        
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+                COUNTER_SET(_memory_used_counter, 
inner_sink_state->_memory_used_counter->value());
+                COUNTER_SET(_peak_memory_usage_counter,
+                            inner_sink_state->_memory_used_counter->value());
+            }
+        }
+        return;
+    }
+
+    int64_t mem_size = 0;
+    auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+    for (auto& block : partitioned_blocks) {
+        if (block) {
+            mem_size += block->allocated_bytes();
+        }
+    }
+    COUNTER_SET(_memory_used_counter, mem_size);
+    COUNTER_SET(_peak_memory_usage_counter, mem_size);
+}
+
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
+    size_t size_to_reserve = 0;
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    if (_shared_state->need_to_spill) {
+        size_to_reserve = p._partition_count * 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM;
+    } else {
+        if (_shared_state->inner_runtime_state) {
+            size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
+                    _shared_state->inner_runtime_state.get(), eos);
+        }
+    }
+
+    COUNTER_SET(_memory_usage_reserved, int64_t(size_to_reserve));
+    return size_to_reserve;
+}
+
+Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(

Review Comment:
   warning: function '_revoke_unpartitioned_block' has cognitive complexity of 
63 (threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                                             ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:163:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (auto* tmp_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state()) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:167:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (inner_sink_state) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:176:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (inner_sink_state) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:181:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (build_block.rows() <= 1) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:184:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (spill_context) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:190:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (build_block.columns() > num_slots) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:195:** 
nesting level increased to 1
   ```cpp
       auto spill_func = [build_block = std::move(build_block), state, this]() 
mutable {
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:203:** 
nesting level increased to 2
   ```cpp
                         [](std::vector<uint32_t>& indices) { 
indices.reserve(reserved_size); });
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:207:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           while (offset < total_rows) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:211:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               for (size_t i = 0; i != build_block.columns(); ++i) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:230:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               for (size_t i = 0; i != sub_block.rows(); ++i) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:234:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               for (uint32_t partition_idx = 0; partition_idx != 
p._partition_count; ++partition_idx) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:240:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if (UNLIKELY(!partition_block)) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:248:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(partition_block->add_rows(&sub_block, 
begin, end));
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:248:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(partition_block->add_rows(&sub_block, 
begin, end));
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:253:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if (partition_block->rows() >= reserved_size || 
is_last_block) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:253:** +1
   ```cpp
                   if (partition_block->rows() >= reserved_size || 
is_last_block) {
                                                                ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:255:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(spilling_stream->spill_block(state, 
block, false));
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:255:** +6, 
including nesting penalty of 5, nesting level increased to 6
   ```cpp
                       RETURN_IF_ERROR(spilling_stream->spill_block(state, 
block, false));
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:259:** +1, 
nesting level increased to 4
   ```cpp
                   } else {
                     ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:269:** 
nesting level increased to 1
   ```cpp
       auto exception_catch_func = [spill_func]() mutable {
                                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:270:** 
nesting level increased to 2
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:270:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                                 ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:270:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                                 ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:270:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                                 ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:281:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       DBUG_EXECUTE_IF(
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:281:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       DBUG_EXECUTE_IF(
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:
##########
@@ -183,133 +206,115 @@
                         state, spilling_stream, print_id(state->query_id()), 
"hash_probe",
                         _parent->node_id(), 
std::numeric_limits<int32_t>::max(),
                         std::numeric_limits<size_t>::max(), 
_runtime_profile.get()));
-                RETURN_IF_ERROR(spilling_stream->prepare_spill());
-                spilling_stream->set_write_counters(
-                        _spill_serialize_block_timer, _spill_block_count, 
_spill_data_size,
-                        _spill_write_disk_timer, _spill_write_wait_io_timer);
             }
 
-            COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
+            auto merged_block = 
vectorized::MutableBlock::create_unique(blocks[0].clone_empty());
             while (!blocks.empty() && !state->is_cancelled()) {
                 auto block = std::move(blocks.back());
                 blocks.pop_back();
+
+                RETURN_IF_ERROR(merged_block->merge(std::move(block)));
                 
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks",
 {
                     return Status::Error<INTERNAL_ERROR>(
                             "fault_inject partitioned_hash_join_probe 
spill_probe_blocks failed");
                 });
-                RETURN_IF_ERROR(spilling_stream->spill_block(state, block, 
false));
-                COUNTER_UPDATE(_spill_probe_rows, block.rows());
+
+                if (merged_block->allocated_bytes() >=
+                    vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+                    COUNTER_UPDATE(_spill_probe_rows, merged_block->rows());
+                    RETURN_IF_ERROR(
+                            spilling_stream->spill_block(state, 
merged_block->to_block(), false));
+                    COUNTER_UPDATE(_spill_probe_blocks, 1);
+                }
+            }
+
+            if (!merged_block->empty()) {
+                COUNTER_UPDATE(_spill_probe_rows, merged_block->rows());
+                RETURN_IF_ERROR(
+                        spilling_stream->spill_block(state, 
merged_block->to_block(), false));
+                COUNTER_UPDATE(_spill_probe_blocks, 1);
             }
         }
-        VLOG_DEBUG << "query: " << print_id(query_id)
+
+        COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size));
+        VLOG_DEBUG << "Query: " << print_id(query_id)
                    << " hash probe revoke done, node: " << p.node_id()
                    << ", task: " << state->task_id();
         return Status::OK();
     };
 
-    auto exception_catch_func = [query_id, spill_func, this]() {
+    auto exception_catch_func = [query_id, spill_func]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
 {
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                    query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
-                                                    "spill_probe_blocks 
canceled"));
-            return;
+            auto status = Status::InternalError(
+                    "fault_inject partitioned_hash_join_probe "
+                    "spill_probe_blocks canceled");
+            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, 
status);
+            return status;
         });
 
         auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
-
-        if (!status.ok()) {
-            _spill_status_ok = false;
-            _spill_status = std::move(status);
-        }
-        _dependency->set_ready();
+        return status;
     };
 
-    _dependency->block();
+    if (spill_context) {
+        spill_context->on_non_sink_task_started();
+    }
+    _spill_dependency->block();
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func",
 {
         return Status::Error<INTERNAL_ERROR>(
                 "fault_inject partitioned_hash_join_probe spill_probe_blocks 
submit_func failed");
     });
 
-    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
-                                                          
exception_catch_func);
+    auto spill_runnable = std::make_shared<SpillNonSinkRunnable>(
+            state, spill_context, _spill_dependency, _runtime_profile.get(),
+            _shared_state->shared_from_this(), exception_catch_func);
     return spill_io_pool->submit(std::move(spill_runnable));
 }
 
 Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t 
partition_index) {
-    auto& build_spilling_stream = 
_shared_state->spilled_streams[partition_index];
-    if (build_spilling_stream) {
-        RETURN_IF_ERROR(build_spilling_stream->spill_eof());
-        build_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
-                                                 _spill_read_bytes, 
_spill_read_wait_io_timer);
-    }
-
     auto& probe_spilling_stream = _probe_spilling_streams[partition_index];
 
     if (probe_spilling_stream) {
         RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
-        probe_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
-                                                 _spill_read_bytes, 
_spill_read_wait_io_timer);
+        probe_spilling_stream->set_read_counters(profile());
     }
 
     return Status::OK();
 }
 
-Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState*
 state,
-                                                                           
uint32_t partition_index,
-                                                                           
bool& has_data) {
-    VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
+Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState*
 state,

Review Comment:
   warning: function 'recover_build_blocks_from_disk' has cognitive complexity 
of 60 (threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState*
 state,
                                              ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:292:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!spilled_stream) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:299:** 
nesting level increased to 1
   ```cpp
       auto read_func = [this, query_id, state, spilled_stream = 
spilled_stream, partition_index] {
                        ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:307:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           while (!eos) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:309:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks",
 {
               ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:309:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks",
 {
               ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:313:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (status.ok()) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:316:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!status.ok()) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:322:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (block.empty()) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:326:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (UNLIKELY(state->is_cancelled())) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:331:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!_recovered_build_block) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:333:** +1, 
nesting level increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:336:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if (!status.ok()) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:341:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (_recovered_build_block->allocated_bytes() >=
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:347:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (eos) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:357:** 
nesting level increased to 1
   ```cpp
       auto exception_catch_func = [read_func, query_id]() {
                                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:358:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel",
 {
           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:358:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel",
 {
           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:366:** 
nesting level increased to 2
   ```cpp
           auto status = [&]() {
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:367:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR_OR_CATCH_EXCEPTION(read_func());
               ^
   ```
   **be/src/common/exception.h:95:** expanded from macro 
'RETURN_IF_ERROR_OR_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:367:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR_OR_CATCH_EXCEPTION(read_func());
               ^
   ```
   **be/src/common/exception.h:101:** expanded from macro 
'RETURN_IF_ERROR_OR_CATCH_EXCEPTION'
   ```cpp
                   if (UNLIKELY(!_status_.ok())) {                              
                    \
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:367:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR_OR_CATCH_EXCEPTION(read_func());
               ^
   ```
   **be/src/common/exception.h:105:** expanded from macro 
'RETURN_IF_ERROR_OR_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:367:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
               RETURN_IF_ERROR_OR_CATCH_EXCEPTION(read_func());
               ^
   ```
   **be/src/common/exception.h:106:** expanded from macro 
'RETURN_IF_ERROR_OR_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:379:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (pipeline_task) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:388:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func",
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:388:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func",
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/spill_sort_source_operator.cpp:
##########
@@ -75,94 +73,85 @@
 }
 Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {
     auto& parent = Base::_parent->template cast<Parent>();
-    VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << 
_parent->node_id()
+    VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " << 
_parent->node_id()
                << " merge spill data";
-    _dependency->Dependency::block();
+    _spill_dependency->Dependency::block();
 
     auto query_id = state->query_id();
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    auto spill_func = [this, state, query_id, &parent, submit_timer] {
-        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+    auto spill_func = [this, state, query_id, &parent] {

Review Comment:
   warning: lambda has cognitive complexity of 52 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
       auto spill_func = [this, state, query_id, &parent] {
                         ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:102:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           while (!state->is_cancelled()) {
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:109:** nesting level 
increased to 2
   ```cpp
                   SCOPED_TIMER(Base::_spill_recover_time);
                   ^
   ```
   **be/src/util/runtime_profile.h:67:** expanded from macro 'SCOPED_TIMER'
   ```cpp
   #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> 
MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:114:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:114:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:117:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (_shared_state->sorted_streams.empty()) {
               ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:126:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
                   RETURN_IF_ERROR(status);
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:126:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(status);
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:131:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
                   while (!eos && !state->is_cancelled()) {
                   ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:131:** +1
   ```cpp
                   while (!eos && !state->is_cancelled()) {
                               ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:134:** nesting level 
increased to 3
   ```cpp
                           SCOPED_TIMER(Base::_spill_recover_time);
                           ^
   ```
   **be/src/util/runtime_profile.h:67:** expanded from macro 'SCOPED_TIMER'
   ```cpp
   #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> 
MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:135:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
                           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:135:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
                           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:140:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                           if (status.ok()) {
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:144:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:144:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:146:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                       if (status.ok()) {
                       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:147:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
                           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:147:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
                           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:153:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:153:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/spill_utils.h:
##########
@@ -17,39 +17,131 @@
 
 #pragma once
 
+#include <gen_cpp/Types_types.h>

Review Comment:
   warning: 'gen_cpp/Types_types.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/Types_types.h>
            ^
   ```
   



##########
be/src/pipeline/exec/spill_sort_source_operator.cpp:
##########
@@ -75,94 +73,85 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge() 
const {
 }
 Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {

Review Comment:
   warning: function 'initiate_merge_sort_spill_streams' has cognitive 
complexity of 84 (threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {
                               ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:81:** nesting level 
increased to 1
   ```cpp
       auto spill_func = [this, state, query_id, &parent] {
                         ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:102:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           while (!state->is_cancelled()) {
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:109:** nesting level 
increased to 3
   ```cpp
                   SCOPED_TIMER(Base::_spill_recover_time);
                   ^
   ```
   **be/src/util/runtime_profile.h:67:** expanded from macro 'SCOPED_TIMER'
   ```cpp
   #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> 
MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:114:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:114:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(status);
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:117:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (_shared_state->sorted_streams.empty()) {
               ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:126:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(status);
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:126:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(status);
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:131:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   while (!eos && !state->is_cancelled()) {
                   ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:131:** +1
   ```cpp
                   while (!eos && !state->is_cancelled()) {
                               ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:134:** nesting level 
increased to 4
   ```cpp
                           SCOPED_TIMER(Base::_spill_recover_time);
                           ^
   ```
   **be/src/util/runtime_profile.h:67:** expanded from macro 'SCOPED_TIMER'
   ```cpp
   #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> 
MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:135:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
                           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:135:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
                           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:140:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                           if (status.ok()) {
                           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:144:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:144:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:146:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       if (status.ok()) {
                       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:147:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
                           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:147:** +6, including 
nesting penalty of 5, nesting level increased to 6
   ```cpp
                           
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
                           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:153:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:153:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(status);
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:164:** nesting level 
increased to 1
   ```cpp
       auto exception_catch_func = [spill_func]() {
                                   ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:165:** nesting level 
increased to 2
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                         ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:165:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:165:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:165:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
spill_func(); }); }();
                                 ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:169:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::merge_sort_spill_data_submit_func",
 {
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/spill_sort_source_operator.cpp:169:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::merge_sort_spill_data_submit_func",
 {
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   
   </details>
   



##########
be/src/runtime/workload_group/workload_group.cpp:
##########
@@ -87,31 +94,58 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& 
tg_info, bool need_create_
 
 std::string WorkloadGroup::debug_string() const {
     std::shared_lock<std::shared_mutex> rl {_mutex};
+    auto realtime_total_mem_used = _total_mem_used + 
_wg_refresh_interval_memory_growth.load();
+    auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit + 
1);
     return fmt::format(
-            "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, 
enable_memory_overcommit = "
-            "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
+            "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
+            "total_query_slot_count = {}, "
+            "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= 
{}%, "
+            "enable_memory_overcommit = {}, total_mem_used = {} 
(write_buffer_size={}),"
+            "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, 
cpu_hard_limit = {}, "
+            "scan_thread_num = "
             "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = 
{}, "
             "memory_low_watermark={}, memory_high_watermark={}, 
is_shutdown={}, query_num={}, "
             "read_bytes_per_second={}, remote_read_bytes_per_second={}]",
-            _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, 
TUnit::BYTES),
-            _enable_memory_overcommit ? "true" : "false", _version, 
cpu_hard_limit(),
-            _scan_thread_num, _max_remote_scan_thread_num, 
_min_remote_scan_thread_num,
-            _memory_low_watermark, _memory_high_watermark, _is_shutdown, 
_query_ctxs.size(),
-            _scan_bytes_per_second, _remote_scan_bytes_per_second);
+            _id, _name, _version, cpu_share(), _total_query_slot_count,
+            PrettyPrinter::print(_memory_limit, TUnit::BYTES), 
to_string(_slot_mem_policy),
+            _load_buffer_ratio, _enable_memory_overcommit ? "true" : "false",
+            PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
+            PrettyPrinter::print(_write_buffer_size.load(), TUnit::BYTES),
+            PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), 
TUnit::BYTES),
+            mem_used_ratio, cpu_hard_limit(), _scan_thread_num, 
_max_remote_scan_thread_num,
+            _min_remote_scan_thread_num, _memory_low_watermark, 
_memory_high_watermark,
+            _is_shutdown, _query_ctxs.size(), _scan_bytes_per_second,
+            _remote_scan_bytes_per_second);
+}
+
+bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {

Review Comment:
   warning: method 'add_wg_refresh_interval_memory_growth' can be made const 
[readability-make-member-function-const]
   
   be/src/runtime/workload_group/workload_group.h:118:
   ```diff
   -     bool add_wg_refresh_interval_memory_growth(int64_t size);
   +     bool add_wg_refresh_interval_memory_growth(int64_t size) const;
   ```
   
   ```suggestion
   bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) 
const {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to