This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c43a34b1fdcdde0d0e4e47ab8ac2204f3db427be Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Mon Jul 31 22:28:13 2023 +0800 [fix](sort) VSortedRunMerger does not return any rows with a large offset value (#22191) --- be/src/vec/runtime/vdata_stream_recvr.cpp | 1 + be/src/vec/runtime/vsorted_run_merger.cpp | 47 +++++++++++++-- be/src/vec/runtime/vsorted_run_merger.h | 12 +++- .../array_functions/test_array_functions.out | 66 ---------------------- .../array_functions/test_array_functions.groovy | 5 ++ 5 files changed, 60 insertions(+), 71 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 9ea496fd81..45b910cf82 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -353,6 +353,7 @@ Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr, _sender_queues[i], std::placeholders::_1, std::placeholders::_2)); } + _merger->set_pipeline_engine_enabled(_enable_pipeline); RETURN_IF_ERROR(_merger->prepare(child_block_suppliers)); return Status::OK(); } diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index c45004081b..5f4b8206c7 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -96,11 +96,28 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { // Only have one receive data queue of data, no need to do merge and // copy the data of block. // return the data in receive data directly - if (_priority_queue.size() == 1) { + + if (_pending_cursor != nullptr) { + MergeSortCursor cursor(_pending_cursor); + if (has_next_block(cursor)) { + _priority_queue.push(cursor); + } + _pending_cursor = nullptr; + } + + if (_priority_queue.empty()) { + *eos = true; + return Status::OK(); + } else if (_priority_queue.size() == 1) { auto current = _priority_queue.top(); while (_offset != 0 && current->block_ptr() != nullptr) { if (_offset >= current->rows - current->pos) { _offset -= (current->rows - current->pos); + if (_pipeline_engine_enabled) { + _pending_cursor = current.impl; + _priority_queue.pop(); + return Status::OK(); + } has_next_block(current); } else { current->pos += _offset; @@ -111,6 +128,11 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { if (current->isFirst()) { if (current->block_ptr() != nullptr) { current->block_ptr()->swap(*output_block); + if (_pipeline_engine_enabled) { + _pending_cursor = current.impl; + _priority_queue.pop(); + return Status::OK(); + } *eos = !has_next_block(current); } else { *eos = true; @@ -123,6 +145,11 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { current->pos, current->rows - current->pos); } current->block_ptr()->swap(*output_block); + if (_pipeline_engine_enabled) { + _pending_cursor = current.impl; + _priority_queue.pop(); + return Status::OK(); + } *eos = !has_next_block(current); } else { *eos = true; @@ -147,8 +174,15 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { merged_columns[i]->insert_from(*current->all_columns[i], current->pos); ++merged_rows; } - next_heap(current); - if (merged_rows == _batch_size) break; + + // In pipeline engine, needs to check if the sender is readable before the next reading. + if (!next_heap(current)) { + return Status::OK(); + } + + if (merged_rows == _batch_size) { + break; + } } if (merged_rows == 0) { @@ -165,13 +199,18 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { return Status::OK(); } -void VSortedRunMerger::next_heap(MergeSortCursor& current) { +bool VSortedRunMerger::next_heap(MergeSortCursor& current) { if (!current->isLast()) { current->next(); _priority_queue.push(current); + } else if (_pipeline_engine_enabled) { + // need to check sender is readable again before the next reading. + _pending_cursor = current.impl; + return false; } else if (has_next_block(current)) { _priority_queue.push(current); } + return true; } inline bool VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) { diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 1f6f566526..2f9ebe04a6 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -62,6 +62,8 @@ public: // Return the next block of sorted rows from this merger. Status get_next(Block* output_block, bool* eos); + void set_pipeline_engine_enabled(bool value) { _pipeline_engine_enabled = value; } + protected: const VExprContextSPtrs _ordering_expr; SortDescription _desc; @@ -74,9 +76,15 @@ protected: int64_t _limit = -1; size_t _offset = 0; + bool _pipeline_engine_enabled = false; + std::vector<BlockSupplierSortCursorImpl> _cursors; std::priority_queue<MergeSortCursor> _priority_queue; + /// In pipeline engine, if a cursor needs to read one more block from supplier, + /// we make it as a pending cursor until the supplier is readable. + MergeSortCursorImpl* _pending_cursor = nullptr; + Block _empty_block; // Times calls to get_next(). @@ -87,7 +95,9 @@ protected: private: void init_timers(RuntimeProfile* profile); - void next_heap(MergeSortCursor& current); + + /// In pipeline engine, return false if need to read one more block from sender. + bool next_heap(MergeSortCursor& current); bool has_next_block(MergeSortCursor& current); }; diff --git a/regression-test/data/query_p0/sql_functions/array_functions/test_array_functions.out b/regression-test/data/query_p0/sql_functions/array_functions/test_array_functions.out index ca027aaf71..69ac30a140 100644 --- a/regression-test/data/query_p0/sql_functions/array_functions/test_array_functions.out +++ b/regression-test/data/query_p0/sql_functions/array_functions/test_array_functions.out @@ -703,72 +703,6 @@ 8 \N 9 \N --- !select_array_shuffle1 -- -1 [1, 2, 3] 6 6 [3, 2, 1] [3, 2, 1] -2 [4] 4 4 [4] [4] -3 [] \N \N [] [] -4 [1, 2, 3, 4, 5, 4, 3, 2, 1] 25 25 [3, 2, 4, 1, 2, 4, 3, 1, 5] [3, 2, 4, 1, 2, 4, 3, 1, 5] -5 [] \N \N [] [] -6 [1, 2, 3, 4, 5, 4, 3, 2, 1] 25 25 [3, 2, 4, 1, 1, 4, 5, 3, 2] [3, 2, 4, 1, 1, 4, 5, 3, 2] -7 [8, 9, NULL, 10, NULL] 27 27 [NULL, 9, NULL, 8, 10] [NULL, 9, NULL, 8, 10] -8 [1, 2, 3, 3, 4, 4, NULL] 17 17 [2, 4, 3, 4, 1, NULL, 3] [2, 4, 3, 4, 1, NULL, 3] -9 [1, 2, 3] 6 6 [1, 2, 3] [1, 2, 3] - --- !select_array_shuffle2 -- -1 ["hi"] 1 1 ["hi"] ["hi"] -2 ["hi2"] 1 1 ["hi2"] ["hi2"] -3 ["hi3"] 1 1 ["hi3"] ["hi3"] -4 \N \N \N \N \N -5 \N \N \N \N \N -6 \N \N \N \N \N -7 \N \N \N \N \N -8 ["hi", "hi", "hello"] 3 3 ["hello", "hi", "hi"] ["hello", "hi", "hi"] -9 ["hi"] 1 1 ["hi"] ["hi"] - --- !select_array_shuffle3 -- -1 [2015-03-13] 1 1 [2015-03-13] [2015-03-13] -2 \N \N \N \N \N -3 \N \N \N \N \N -4 \N \N \N \N \N -5 \N \N \N \N \N -6 \N \N \N \N \N -7 \N \N \N \N \N -8 [2015-03-13] 1 1 [2015-03-13] [2015-03-13] -9 [2015-03-13, 2015-03-13, 2015-03-14] 3 3 [2015-03-14, 2015-03-13, 2015-03-13] [2015-03-14, 2015-03-13, 2015-03-13] - --- !select_array_shuffle4 -- -1 [2015-03-13 12:36:38] 1 1 [2015-03-13 12:36:38] [2015-03-13 12:36:38] -2 \N \N \N \N \N -3 \N \N \N \N \N -4 \N \N \N \N \N -5 \N \N \N \N \N -6 \N \N \N \N \N -7 \N \N \N \N \N -8 [2015-03-13 12:36:38] 1 1 [2015-03-13 12:36:38] [2015-03-13 12:36:38] -9 [2015-03-13 12:36:38, 2015-03-13 12:36:38] 2 2 [2015-03-13 12:36:38, 2015-03-13 12:36:38] [2015-03-13 12:36:38, 2015-03-13 12:36:38] - --- !select_array_shuffle5 -- -1 [2022-10-15 10:30:00.999, 2022-08-31 12:00:00.999] 2 2 [2022-10-15 10:30:00.999, 2022-08-31 12:00:00.999] [2022-10-15 10:30:00.999, 2022-08-31 12:00:00.999] -2 [2022-11-15 10:30:00.999, 2022-01-31 12:00:00.999] 2 2 [2022-11-15 10:30:00.999, 2022-01-31 12:00:00.999] [2022-11-15 10:30:00.999, 2022-01-31 12:00:00.999] -3 \N \N \N \N \N -4 \N \N \N \N \N -5 \N \N \N \N \N -6 \N \N \N \N \N -7 \N \N \N \N \N -8 \N \N \N \N \N -9 \N \N \N \N \N - --- !select_array_shuffle6 -- -1 [111.111, 222.222] 333.333 333.333 [222.222, 111.111] [222.222, 111.111] -2 [333.333, 444.444] 777.777 777.777 [333.333, 444.444] [333.333, 444.444] -3 \N \N \N \N \N -4 \N \N \N \N \N -5 \N \N \N \N \N -6 \N \N \N \N \N -7 \N \N \N \N \N -8 \N \N \N \N \N -9 \N \N \N \N \N - -- !select -- 1 [1, 2] 2 [] diff --git a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy index 25f246b612..910a4d4f5e 100644 --- a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy +++ b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_functions.groovy @@ -116,12 +116,17 @@ suite("test_array_functions") { qt_select "SELECT k1, array_enumerate_uniq(k8) from ${tableName} ORDER BY k1" qt_select "SELECT k1, array_enumerate_uniq(k10) from ${tableName} ORDER BY k1" qt_select "SELECT k1, array_enumerate_uniq(k12) from ${tableName} ORDER BY k1" + + // Here disable the cases about `array_shuffle` since the result of `array_shuffle` is not stable. + // FYI: the result of `array_shuffle` depends on the row position in the column. + /* qt_select_array_shuffle1 "SELECT k1, k2, array_sum(k2), array_sum(array_shuffle(k2)), array_shuffle(k2, 0), shuffle(k2, 0) from ${tableName} ORDER BY k1" qt_select_array_shuffle2 "SELECT k1, k5, array_size(k5), array_size(array_shuffle(k5)), array_shuffle(k5, 0), shuffle(k5, 0) from ${tableName} ORDER BY k1" qt_select_array_shuffle3 "SELECT k1, k6, array_size(k6), array_size(array_shuffle(k6)), array_shuffle(k6, 0), shuffle(k6, 0) from ${tableName} ORDER BY k1" qt_select_array_shuffle4 "SELECT k1, k7, array_size(k7), array_size(array_shuffle(k7)), array_shuffle(k7, 0), shuffle(k7, 0) from ${tableName} ORDER BY k1" qt_select_array_shuffle5 "SELECT k1, k10, array_size(k10), array_size(array_shuffle(k10)), array_shuffle(k10, 0), shuffle(k10, 0) from ${tableName} ORDER BY k1" qt_select_array_shuffle6 "SELECT k1, k12, array_sum(k12), array_sum(array_shuffle(k12)), array_shuffle(k12, 1), shuffle(k12, 1) from ${tableName} ORDER BY k1" + */ qt_select "SELECT k1, array_popback(k2) from ${tableName} ORDER BY k1" qt_select "SELECT k1, array_popback(k5) from ${tableName} ORDER BY k1" qt_select "SELECT k1, array_popback(k6) from ${tableName} ORDER BY k1" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org