This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e4af923f62a [refactor](sort merger) Refine sort merger (#48075) e4af923f62a is described below commit e4af923f62abfb65ef4a5da7e8a608fb23e76d19 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Thu Feb 20 10:10:15 2025 +0800 [refactor](sort merger) Refine sort merger (#48075) --- be/src/pipeline/local_exchange/local_exchanger.cpp | 4 +- be/src/vec/core/sort_cursor.h | 23 +-- be/src/vec/runtime/vsorted_run_merger.cpp | 82 +++++---- be/src/vec/runtime/vsorted_run_merger.h | 6 +- be/test/vec/runtime/sort_merger_test.cpp | 192 ++++++++++++++++++++- 5 files changed, 241 insertions(+), 66 deletions(-) diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 76a8a8e1274..4f4dff8b037 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -427,9 +427,9 @@ Status LocalMergeSortExchanger::build_merger(RuntimeState* state, // If this block is the last block, we should block this pipeline task to wait for // the next block. // TODO: LocalMergeSortExchanger should be refactored. - if (_data_queue[id].data_queue.size_approx() == 0) { + if (_data_queue[id].data_queue.size_approx() == 0 && !*eos) { std::unique_lock l(*_m[id]); - if (_data_queue[id].data_queue.size_approx() == 0) { + if (_data_queue[id].data_queue.size_approx() == 0 && !*eos) { local_state->get_dependency(id)->block(); } } diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 72aa4cbb4dc..3c833ec9c24 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -171,8 +171,9 @@ struct MergeSortCursorImpl { void next(size_t size = 1) { pos += size; } size_t get_size() const { return rows; } - virtual bool has_next_block() { return false; } + virtual void process_next() {} virtual Block* block_ptr() { return nullptr; } + virtual bool eof() const { return false; } }; using BlockSupplier = std::function<Status(Block*, bool* eos)>; @@ -192,38 +193,32 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { desc[i].direction = is_asc_order[i] ? 1 : -1; desc[i].nulls_direction = nulls_first[i] ? -desc[i].direction : desc[i].direction; } - has_next_block(); + process_next(); } BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const SortDescription& desc_) : MergeSortCursorImpl(desc_), _block_supplier(std::move(block_supplier)) { - has_next_block(); + process_next(); } - bool has_next_block() override { + void process_next() override { if (_is_eof) { - return false; + return; } block->clear(); THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof)); - DCHECK(!block->empty() xor _is_eof); + DCHECK(!block->empty() or _is_eof); if (!block->empty()) { DCHECK_EQ(_ordering_expr.size(), desc.size()); for (int i = 0; i < desc.size(); ++i) { THROW_IF_ERROR(_ordering_expr[i]->execute(block.get(), &desc[i].column_number)); } MergeSortCursorImpl::reset(); - return true; } - return false; } - Block* block_ptr() override { - if (_is_eof) { - return nullptr; - } - return block.get(); - } + Block* block_ptr() override { return block.get(); } + bool eof() const override { return is_last() && _is_eof; } VExprContextSPtrs _ordering_expr; BlockSupplier _block_supplier {}; diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index a145c216bbb..16150cab63f 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -76,9 +76,9 @@ Status VSortedRunMerger::prepare(const std::vector<BlockSupplier>& input_runs) { return Status::Cancelled(e.what()); } - for (auto& _cursor : _cursors) { - if (!_cursor->_is_eof) { - _priority_queue.push(MergeSortCursor(_cursor)); + for (auto& cursor : _cursors) { + if (!cursor->eof()) { + _priority_queue.push(MergeSortCursor(cursor)); } } @@ -93,7 +93,11 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { if (_pending_cursor != nullptr) { MergeSortCursor cursor(_pending_cursor); - if (has_next_block(cursor)) { + { + ScopedTimer<MonotonicStopWatch> timer1(_get_next_block_timer); + cursor->process_next(); + } + if (!cursor->eof()) { _priority_queue.push(cursor); } _pending_cursor = nullptr; @@ -112,42 +116,39 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { return Status::OK(); } else if (_priority_queue.size() == 1) { auto current = _priority_queue.top(); - while (_offset != 0 && current->block_ptr() != nullptr) { - if (_offset >= current->rows - current->pos) { - _offset -= (current->rows - current->pos); - _pending_cursor = current.impl; + DCHECK(!current->eof()); + DCHECK(current->block_ptr() != nullptr); + while (_offset != 0) { + auto process_rows = std::min(current->rows - current->pos, _offset); + current->next(process_rows); + _offset -= process_rows; + if (current->is_last(0)) { _priority_queue.pop(); + if (current->eof()) { + *eos = true; + } else { + _pending_cursor = current.impl; + } return Status::OK(); - } else { - current->pos += _offset; - _offset = 0; } } - if (current->is_first()) { - if (current->block_ptr() != nullptr) { - current->block_ptr()->swap(*output_block); - _pending_cursor = current.impl; - _priority_queue.pop(); - return Status::OK(); - } else { - *eos = true; + if (!current->is_first()) { + for (int i = 0; i < current->block->columns(); i++) { + auto& column_with_type = current->block_ptr()->get_by_position(i); + column_with_type.column = + column_with_type.column->cut(current->pos, current->rows - current->pos); } + } + current->block_ptr()->swap(*output_block); + current->next(current->rows - current->pos); + if (current->eof()) { + *eos = true; } else { - if (current->block_ptr() != nullptr) { - for (int i = 0; i < current->block->columns(); i++) { - auto& column_with_type = current->block_ptr()->get_by_position(i); - column_with_type.column = column_with_type.column->cut( - current->pos, current->rows - current->pos); - } - current->block_ptr()->swap(*output_block); - _pending_cursor = current.impl; - _priority_queue.pop(); - return Status::OK(); - } else { - *eos = true; - } + _pending_cursor = current.impl; } + _priority_queue.pop(); + return Status::OK(); } else { size_t num_columns = _priority_queue.top().impl->block->columns(); MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( @@ -191,7 +192,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { ++merged_rows; } - if (!next_heap(current)) { + if (_need_more_data(current)) { do_insert(); return Status::OK(); } @@ -208,20 +209,17 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { return Status::OK(); } -bool VSortedRunMerger::next_heap(MergeSortCursor& current) { +bool VSortedRunMerger::_need_more_data(MergeSortCursor& current) { if (!current->is_last()) { current->next(); _priority_queue.push(current); + return false; + } else if (current->eof()) { + return false; + } else { + _pending_cursor = current.impl; return true; } - - _pending_cursor = current.impl; - return false; -} - -inline bool VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) { - ScopedTimer<MonotonicStopWatch> timer(_get_next_block_timer); - return current->has_next_block(); } } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 898b52a8601..d44a5b59dbd 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -92,10 +92,8 @@ protected: private: void init_timers(RuntimeProfile* profile); - - /// In pipeline engine, return false if need to read one more block from sender. - bool next_heap(MergeSortCursor& current); - bool has_next_block(MergeSortCursor& current); + // If current stream is exhausted and not eof, we should break this loop and read more blocks. + bool _need_more_data(MergeSortCursor& current); }; } // namespace doris::vectorized diff --git a/be/test/vec/runtime/sort_merger_test.cpp b/be/test/vec/runtime/sort_merger_test.cpp index a36b436deec..dece7b31074 100644 --- a/be/test/vec/runtime/sort_merger_test.cpp +++ b/be/test/vec/runtime/sort_merger_test.cpp @@ -35,7 +35,12 @@ public: TEST(SortMergerTest, NULL_FIRST_ASC) { /** - * in: [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4] + * in: [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)] + * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)] + * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)] + * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)] + * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)] + * offset = 0, limit = -1, NULL_FIRST, ASC * out: [NULL, NULL, NULL, NULL, NULL], [1, 1, 1, 1, 1], [2, 2, 2, 2, 2], [3, 3, 3, 3, 3], [4], [4], [4], [4], [4] */ const int num_children = 5; @@ -117,7 +122,12 @@ TEST(SortMergerTest, NULL_FIRST_ASC) { TEST(SortMergerTest, NULL_LAST_DESC) { /** - * in: [4, 3, 2, 1, NULL], [4, 3, 2, 1, NULL], [4, 3, 2, 1, NULL], [4, 3, 2, 1, NULL], [4, 3, 2, 1, NULL] + * in: [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)] + * [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)] + * [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)] + * [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)] + * [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)] + * offset = 0, limit = -1, NULL_LAST, DESC * out: [4, 4, 4, 4, 4], [3, 3, 3, 3, 3], [2, 2, 2, 2, 2], [1, 1, 1, 1, 1], [NULL], [NULL], [NULL], [NULL], [NULL] */ const int num_children = 5; @@ -195,8 +205,12 @@ TEST(SortMergerTest, NULL_LAST_DESC) { TEST(SortMergerTest, TEST_LIMIT) { /** - * in: [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4] - * offset = 20, limit = 1 + * in: [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)] + * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)] + * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)] + * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)] + * [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)] + * offset = 20, limit = 1, NULL_FIRST, ASC * out: [4] */ const int num_children = 5; @@ -249,4 +263,174 @@ TEST(SortMergerTest, TEST_LIMIT) { } } +TEST(SortMergerTest, LAST_BLOCK_WITH_EOS) { + /** + * in: [([NULL, 0, 1, 2, 3], eos = true)] + * [([NULL, 0, 1, 2, 3], eos = true)] + * [([NULL, 0, 1, 2, 3], eos = true)] + * [([NULL, 0, 1, 2, 3], eos = true)] + * [([NULL, 0, 1, 2, 3], eos = true)] + * offset = 0, limit = -1, NULL_FIRST, ASC + * out: [NULL, NULL, NULL, NULL, NULL], [0, 0, 0, 0, 0], [1, 1, 1, 1, 1], [2, 2, 2, 2, 2], [3, 3, 3, 3, 3] + */ + const int num_children = 5; + const int batch_size = 5; + std::vector<int> round; + round.resize(num_children, 0); + const int num_round = 1; + + std::unique_ptr<VSortedRunMerger> merger; + auto profile = std::make_shared<RuntimeProfile>(""); + auto ordering_expr = MockSlotRef::create_mock_contexts( + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())); + { + std::vector<bool> is_asc_order = {true}; + std::vector<bool> nulls_first = {true}; + const int limit = -1; + const int offset = 0; + merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, + limit, offset, profile.get())); + } + { + std::vector<vectorized::BlockSupplier> child_block_suppliers; + for (int child_idx = 0; child_idx < num_children; child_idx++) { + vectorized::BlockSupplier block_supplier = + [&, round_vec = &round, num_round = num_round, id = child_idx]( + vectorized::Block* block, bool* eos) { + *block = ColumnHelper::create_nullable_block<DataTypeInt64>( + {0, (*round_vec)[id] + 0, (*round_vec)[id] + 1, + (*round_vec)[id] + 2, (*round_vec)[id] + 3}, + {1, 0, 0, 0, 0}); + *eos = ++((*round_vec)[id]) == num_round; + return Status::OK(); + }; + child_block_suppliers.push_back(block_supplier); + } + EXPECT_TRUE(merger->prepare(child_block_suppliers).ok()); + } + { + for (int block_idx = 0; block_idx < num_children * num_round; block_idx++) { + vectorized::Block block; + bool eos = false; + EXPECT_TRUE(merger->get_next(&block, &eos).ok()); + auto expect_block = block_idx == 0 + ? ColumnHelper::create_nullable_column<DataTypeInt64>( + {0, 0, 0, 0, 0}, {1, 1, 1, 1, 1}) + : ColumnHelper::create_nullable_column<DataTypeInt64>( + {block_idx - 1, block_idx - 1, block_idx - 1, + block_idx - 1, block_idx - 1}, + {0, 0, 0, 0, 0}); + EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, expect_block)); + EXPECT_EQ(block.rows(), batch_size); + EXPECT_FALSE(eos); + } + vectorized::Block block; + bool eos = false; + EXPECT_TRUE(merger->get_next(&block, &eos).ok()); + EXPECT_EQ(block.rows(), 0); + EXPECT_TRUE(eos); + } +} + +TEST(SortMergerTest, TEST_BIG_OFFSET_SINGLE_STREAM) { + /** + * in: [([NULL, 0, 1, 2, 3], eos = true)] + * offset = 20, limit = 1, NULL_FIRST, ASC + * out: [] + */ + const int num_children = 1; + const int batch_size = 5; + std::vector<int> round; + round.resize(num_children, 0); + const int num_round = 1; + + std::unique_ptr<VSortedRunMerger> merger; + auto profile = std::make_shared<RuntimeProfile>(""); + auto ordering_expr = MockSlotRef::create_mock_contexts( + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())); + { + std::vector<bool> is_asc_order = {true}; + std::vector<bool> nulls_first = {true}; + const int limit = 1; + const int offset = 20; + merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, + limit, offset, profile.get())); + } + { + std::vector<vectorized::BlockSupplier> child_block_suppliers; + for (int child_idx = 0; child_idx < num_children; child_idx++) { + vectorized::BlockSupplier block_supplier = + [&, round_vec = &round, num_round = num_round, id = child_idx]( + vectorized::Block* block, bool* eos) { + *block = ColumnHelper::create_nullable_block<DataTypeInt64>( + {0, (*round_vec)[id] + 0, (*round_vec)[id] + 1, + (*round_vec)[id] + 2, (*round_vec)[id] + 3}, + {1, 0, 0, 0, 0}); + *eos = ++((*round_vec)[id]) == num_round; + return Status::OK(); + }; + child_block_suppliers.push_back(block_supplier); + } + EXPECT_TRUE(merger->prepare(child_block_suppliers).ok()); + } + { + vectorized::Block block; + bool eos = false; + EXPECT_TRUE(merger->get_next(&block, &eos).ok()); + EXPECT_EQ(block.rows(), 0); + EXPECT_TRUE(eos); + } +} + +TEST(SortMergerTest, TEST_SMALL_OFFSET_SINGLE_STREAM) { + /** + * in: [([NULL, 0, 1, 2, 3], eos = true)] + * offset = 4, limit = 1, NULL_FIRST, ASC + * out: [3] + */ + const int num_children = 1; + const int batch_size = 5; + std::vector<int> round; + round.resize(num_children, 0); + const int num_round = 1; + + std::unique_ptr<VSortedRunMerger> merger; + auto profile = std::make_shared<RuntimeProfile>(""); + auto ordering_expr = MockSlotRef::create_mock_contexts( + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())); + { + std::vector<bool> is_asc_order = {true}; + std::vector<bool> nulls_first = {true}; + const int limit = 1; + const int offset = 4; + merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, + limit, offset, profile.get())); + } + { + std::vector<vectorized::BlockSupplier> child_block_suppliers; + for (int child_idx = 0; child_idx < num_children; child_idx++) { + vectorized::BlockSupplier block_supplier = + [&, round_vec = &round, num_round = num_round, id = child_idx]( + vectorized::Block* block, bool* eos) { + *block = ColumnHelper::create_nullable_block<DataTypeInt64>( + {0, (*round_vec)[id] + 0, (*round_vec)[id] + 1, + (*round_vec)[id] + 2, (*round_vec)[id] + 3}, + {1, 0, 0, 0, 0}); + *eos = ++((*round_vec)[id]) == num_round; + return Status::OK(); + }; + child_block_suppliers.push_back(block_supplier); + } + EXPECT_TRUE(merger->prepare(child_block_suppliers).ok()); + } + { + vectorized::Block block; + bool eos = false; + EXPECT_TRUE(merger->get_next(&block, &eos).ok()); + auto expect_block = ColumnHelper::create_nullable_column<DataTypeInt64>({3}, {0}); + EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, expect_block)); + EXPECT_TRUE(eos); + } +} + } // namespace doris::vectorized \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org