This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-c108335-hive-sql in repository https://gitbox.apache.org/repos/asf/doris.git
commit c0076c248a7c92ea66cf17042f4d8574a2c10c57 Author: zhangstar333 <zhangs...@selectdb.com> AuthorDate: Thu Mar 20 22:23:48 2025 +0800 [Bug](partition) fix partition sort lock scope cause error (#49243) Currently, the partition sort implementation follows a streaming approach. Assuming there are a total of 10 sorters, when the sink operator completes sorting for one sorter, it immediately notifies the source operator that it's ready to read data. After finishing reading from the current sorter, the source operator checks if the next sorter is available for reading. If not, it blocks itself and waits for the next notification from the sink operator. Now the have a case like: ``` // sink --->0-->ready(0)------>1--->finish-----sleep(10s)---------->ready(1号) -------> 2 // source -------->0 get_next------->1 get_next--->2--->2 block------>2 get_next---> 3 ``` the sink operator signal ready for idx[1] sorter, but the source get this mistake, let idx[2] sorter running. --- be/src/pipeline/dependency.h | 1 + be/src/pipeline/exec/partition_sort_sink_operator.cpp | 3 ++- be/src/pipeline/exec/partition_sort_source_operator.cpp | 3 +-- be/src/vec/common/sort/partition_sorter.cpp | 1 - be/src/vec/common/sort/partition_sorter.h | 3 ++- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index cd7486baadb..15a50185b2a 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -646,6 +646,7 @@ public: std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts; bool sink_eos = false; std::mutex sink_eos_lock; + std::mutex prepared_finish_lock; }; struct SetSharedState : public BasicSharedState { diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index b2444414dde..8dea1bd1aab 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -151,8 +151,9 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } local_state._value_places[i]->_blocks.clear(); RETURN_IF_ERROR(sorter->prepare_for_read()); + std::unique_lock<std::mutex> lc(local_state._shared_state->prepared_finish_lock); + sorter->set_prepared_finish(); // iff one sorter have data, then could set source ready to read - std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock); local_state._dependency->set_ready_to_read(); } diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index ebf20160958..479ebde522c 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -96,9 +96,8 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, } if (current_eos) { // current sort have eos, so get next idx - sorters[local_state._sort_idx].reset(nullptr); local_state._sort_idx++; - std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock); + std::unique_lock<std::mutex> lc(local_state._shared_state->prepared_finish_lock); if (local_state._sort_idx < sorter_size && !sorters[local_state._sort_idx]->prepared_finish()) { local_state._dependency->block(); diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 4c7702bdcc9..09c45e83c6a 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -74,7 +74,6 @@ Status PartitionSorter::prepare_for_read() { } queue = MergeSorterQueue(cursors); blocks.clear(); - _prepared_finish = true; return Status::OK(); } diff --git a/be/src/vec/common/sort/partition_sorter.h b/be/src/vec/common/sort/partition_sorter.h index 721f8e541c5..80a23b8be8d 100644 --- a/be/src/vec/common/sort/partition_sorter.h +++ b/be/src/vec/common/sort/partition_sorter.h @@ -96,6 +96,7 @@ public: int64 get_output_rows() const { return _output_total_rows; } void reset_sorter_state(RuntimeState* runtime_state); bool prepared_finish() { return _prepared_finish; } + void set_prepared_finish() { _prepared_finish = true; } private: Status _read_row_num(Block* block, bool* eos, int batch_size); @@ -121,7 +122,7 @@ private: int64 _partition_inner_limit = 0; TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::type::ROW_NUMBER; SortCursorCmp* _previous_row = nullptr; - std::atomic_bool _prepared_finish = false; + bool _prepared_finish = false; }; #include "common/compile_check_end.h" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org