This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 23854d8a904225c3270f2422d405dc24b7443df2
Author: yiguolei <[email protected]>
AuthorDate: Thu Mar 5 11:56:32 2026 +0800

    add force spill logic in join sink operator
---
 .../exec/partitioned_hash_join_sink_operator.cpp   | 32 +++++++++++++---------
 .../exec/partitioned_hash_join_sink_operator.h     | 10 ++++++-
 2 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index cac453c8b69..778196518a1 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -225,7 +225,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
         }
         offset += this_run;
         RETURN_IF_ERROR(_partition_block(state, &sub_block, 0, 
sub_block.rows()));
-        RETURN_IF_ERROR(_execute_spill_partitioned_blocks(state));
+        RETURN_IF_ERROR(_execute_spill_partitioned_blocks(state, true 
/*force_spill*/));
     }
     RETURN_IF_ERROR(_force_flush_partitions(state));
     return Status::OK();
@@ -286,7 +286,11 @@ Status 
PartitionedHashJoinSinkLocalState::_finish_spilling(RuntimeState* state)
     return Status::OK();
 }
 
-Status 
PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(RuntimeState*
 state) {
+/// If revoke memory API call this method, we has to flush all memory to avoid 
dead loop. For example, maybe
+/// revocable memory size calcuateld by memory usage is not enough using limit 
100K, but we can't spill all memory to disk
+/// because we use limit 1MB here. So we need to force spill all memory to 
disk to make sure we can make progress.
+Status 
PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(RuntimeState*
 state,
+                                                                            
bool force_spill) {
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
 {
         auto status = Status::InternalError(
                 "fault_inject partitioned_hash_join_sink revoke_memory 
canceled");
@@ -297,12 +301,12 @@ Status 
PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(Runt
 
     for (size_t i = 0; i != _shared_state->_partitioned_build_blocks.size(); 
++i) {
         auto& mutable_block = _shared_state->_partitioned_build_blocks[i];
-        // Avoid spilling empty blocks or very small blocks.
-        if (!mutable_block || mutable_block->allocated_bytes() < 
state->spill_buffer_size_bytes()) {
+        if (!mutable_block) {
             continue;
         }
-
-        RETURN_IF_ERROR(_spill_to_disk(static_cast<uint32_t>(i)));
+        if (force_spill || mutable_block->allocated_bytes() >= 
state->spill_buffer_size_bytes()) {
+            RETURN_IF_ERROR(_spill_to_disk(static_cast<uint32_t>(i)));
+        }
     }
     return Status::OK();
 }
@@ -319,13 +323,11 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
         DCHECK(revocable_mem_size(state) == 0);
         return st;
     }
-    return run_spill_task(state, [this, state] {
-        RETURN_IF_ERROR(_execute_spill_partitioned_blocks(state));
-        // force flush all partitions to make sure data is written to disk
-        RETURN_IF_ERROR(_force_flush_partitions(state));
-        DCHECK(revocable_mem_size(state) == 0);
-        return Status::OK();
-    });
+    RETURN_IF_ERROR(_execute_spill_partitioned_blocks(state, true 
/*force_spill*/));
+    // force flush all partitions to make sure data is written to disk
+    RETURN_IF_ERROR(_force_flush_partitions(state));
+    DCHECK(revocable_mem_size(state) == 0);
+    return Status::OK();
 }
 
 Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
@@ -514,7 +516,11 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
         // ---- Spilled path: data is partitioned and spilled to disk ----
         if (rows > 0) {
             RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, 
rows));
+            // If any partition block exceeds the spill buffer size, 
immediately spill that partition to disk to avoid large block accumulation.
+            RETURN_IF_ERROR(
+                    local_state._execute_spill_partitioned_blocks(state, false 
/*force_spill*/));
         }
+
         // Flush partitioned blocks when eos or when accumulated data is large 
enough.
         if (revocable_mem_size(state) > 
state->spill_join_build_sink_mem_limit_bytes()) {
             RETURN_IF_ERROR(revoke_memory(state));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 12d4810bbdd..f83df97766e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -76,7 +76,15 @@ protected:
     // Called after revoke operations to guarantee memory is cleared.
     Status _force_flush_partitions(RuntimeState* state);
 
-    Status _execute_spill_partitioned_blocks(RuntimeState* state);
+    /**
+     * @brief Spill partitioned build blocks to disk if needed.
+     *
+     * @param state Runtime state for the operator.
+     * @param force_spill If true, spill all non-empty partition blocks 
regardless of size;
+     *                    if false, only spill blocks whose size exceeds the 
spill buffer threshold.
+     *                    Use /*force_spill*/ at call sites for clarity.
+     */
+    Status _execute_spill_partitioned_blocks(RuntimeState* state, bool 
force_spill);
 
     Status _setup_internal_operator(RuntimeState* state);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to