This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_repartition in repository https://gitbox.apache.org/repos/asf/doris.git
commit 23854d8a904225c3270f2422d405dc24b7443df2 Author: yiguolei <[email protected]> AuthorDate: Thu Mar 5 11:56:32 2026 +0800 add force spill logic in join sink operator --- .../exec/partitioned_hash_join_sink_operator.cpp | 32 +++++++++++++--------- .../exec/partitioned_hash_join_sink_operator.h | 10 ++++++- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index cac453c8b69..778196518a1 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -225,7 +225,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta } offset += this_run; RETURN_IF_ERROR(_partition_block(state, &sub_block, 0, sub_block.rows())); - RETURN_IF_ERROR(_execute_spill_partitioned_blocks(state)); + RETURN_IF_ERROR(_execute_spill_partitioned_blocks(state, true /*force_spill*/)); } RETURN_IF_ERROR(_force_flush_partitions(state)); return Status::OK(); @@ -286,7 +286,11 @@ Status PartitionedHashJoinSinkLocalState::_finish_spilling(RuntimeState* state) return Status::OK(); } -Status PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(RuntimeState* state) { +/// If revoke memory API call this method, we has to flush all memory to avoid dead loop. For example, maybe +/// revocable memory size calcuateld by memory usage is not enough using limit 100K, but we can't spill all memory to disk +/// because we use limit 1MB here. So we need to force spill all memory to disk to make sure we can make progress. +Status PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(RuntimeState* state, + bool force_spill) { DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { auto status = Status::InternalError( "fault_inject partitioned_hash_join_sink revoke_memory canceled"); @@ -297,12 +301,12 @@ Status PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(Runt for (size_t i = 0; i != _shared_state->_partitioned_build_blocks.size(); ++i) { auto& mutable_block = _shared_state->_partitioned_build_blocks[i]; - // Avoid spilling empty blocks or very small blocks. - if (!mutable_block || mutable_block->allocated_bytes() < state->spill_buffer_size_bytes()) { + if (!mutable_block) { continue; } - - RETURN_IF_ERROR(_spill_to_disk(static_cast<uint32_t>(i))); + if (force_spill || mutable_block->allocated_bytes() >= state->spill_buffer_size_bytes()) { + RETURN_IF_ERROR(_spill_to_disk(static_cast<uint32_t>(i))); + } } return Status::OK(); } @@ -319,13 +323,11 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { DCHECK(revocable_mem_size(state) == 0); return st; } - return run_spill_task(state, [this, state] { - RETURN_IF_ERROR(_execute_spill_partitioned_blocks(state)); - // force flush all partitions to make sure data is written to disk - RETURN_IF_ERROR(_force_flush_partitions(state)); - DCHECK(revocable_mem_size(state) == 0); - return Status::OK(); - }); + RETURN_IF_ERROR(_execute_spill_partitioned_blocks(state, true /*force_spill*/)); + // force flush all partitions to make sure data is written to disk + RETURN_IF_ERROR(_force_flush_partitions(state)); + DCHECK(revocable_mem_size(state) == 0); + return Status::OK(); } Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, @@ -514,7 +516,11 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B // ---- Spilled path: data is partitioned and spilled to disk ---- if (rows > 0) { RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, rows)); + // If any partition block exceeds the spill buffer size, immediately spill that partition to disk to avoid large block accumulation. + RETURN_IF_ERROR( + local_state._execute_spill_partitioned_blocks(state, false /*force_spill*/)); } + // Flush partitioned blocks when eos or when accumulated data is large enough. if (revocable_mem_size(state) > state->spill_join_build_sink_mem_limit_bytes()) { RETURN_IF_ERROR(revoke_memory(state)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 12d4810bbdd..f83df97766e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -76,7 +76,15 @@ protected: // Called after revoke operations to guarantee memory is cleared. Status _force_flush_partitions(RuntimeState* state); - Status _execute_spill_partitioned_blocks(RuntimeState* state); + /** + * @brief Spill partitioned build blocks to disk if needed. + * + * @param state Runtime state for the operator. + * @param force_spill If true, spill all non-empty partition blocks regardless of size; + * if false, only spill blocks whose size exceeds the spill buffer threshold. + * Use /*force_spill*/ at call sites for clarity. + */ + Status _execute_spill_partitioned_blocks(RuntimeState* state, bool force_spill); Status _setup_internal_operator(RuntimeState* state); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
