This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e4af923f62a [refactor](sort merger) Refine sort merger (#48075)
e4af923f62a is described below

commit e4af923f62abfb65ef4a5da7e8a608fb23e76d19
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Thu Feb 20 10:10:15 2025 +0800

    [refactor](sort merger) Refine sort merger (#48075)
---
 be/src/pipeline/local_exchange/local_exchanger.cpp |   4 +-
 be/src/vec/core/sort_cursor.h                      |  23 +--
 be/src/vec/runtime/vsorted_run_merger.cpp          |  82 +++++----
 be/src/vec/runtime/vsorted_run_merger.h            |   6 +-
 be/test/vec/runtime/sort_merger_test.cpp           | 192 ++++++++++++++++++++-
 5 files changed, 241 insertions(+), 66 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 76a8a8e1274..4f4dff8b037 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -427,9 +427,9 @@ Status LocalMergeSortExchanger::build_merger(RuntimeState* 
state,
                 // If this block is the last block, we should block this 
pipeline task to wait for
                 // the next block.
                 // TODO: LocalMergeSortExchanger should be refactored.
-                if (_data_queue[id].data_queue.size_approx() == 0) {
+                if (_data_queue[id].data_queue.size_approx() == 0 && !*eos) {
                     std::unique_lock l(*_m[id]);
-                    if (_data_queue[id].data_queue.size_approx() == 0) {
+                    if (_data_queue[id].data_queue.size_approx() == 0 && 
!*eos) {
                         local_state->get_dependency(id)->block();
                     }
                 }
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 72aa4cbb4dc..3c833ec9c24 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -171,8 +171,9 @@ struct MergeSortCursorImpl {
     void next(size_t size = 1) { pos += size; }
     size_t get_size() const { return rows; }
 
-    virtual bool has_next_block() { return false; }
+    virtual void process_next() {}
     virtual Block* block_ptr() { return nullptr; }
+    virtual bool eof() const { return false; }
 };
 
 using BlockSupplier = std::function<Status(Block*, bool* eos)>;
@@ -192,38 +193,32 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
             desc[i].direction = is_asc_order[i] ? 1 : -1;
             desc[i].nulls_direction = nulls_first[i] ? -desc[i].direction : 
desc[i].direction;
         }
-        has_next_block();
+        process_next();
     }
 
     BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const 
SortDescription& desc_)
             : MergeSortCursorImpl(desc_), 
_block_supplier(std::move(block_supplier)) {
-        has_next_block();
+        process_next();
     }
 
-    bool has_next_block() override {
+    void process_next() override {
         if (_is_eof) {
-            return false;
+            return;
         }
         block->clear();
         THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof));
-        DCHECK(!block->empty() xor _is_eof);
+        DCHECK(!block->empty() or _is_eof);
         if (!block->empty()) {
             DCHECK_EQ(_ordering_expr.size(), desc.size());
             for (int i = 0; i < desc.size(); ++i) {
                 THROW_IF_ERROR(_ordering_expr[i]->execute(block.get(), 
&desc[i].column_number));
             }
             MergeSortCursorImpl::reset();
-            return true;
         }
-        return false;
     }
 
-    Block* block_ptr() override {
-        if (_is_eof) {
-            return nullptr;
-        }
-        return block.get();
-    }
+    Block* block_ptr() override { return block.get(); }
+    bool eof() const override { return is_last() && _is_eof; }
 
     VExprContextSPtrs _ordering_expr;
     BlockSupplier _block_supplier {};
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp 
b/be/src/vec/runtime/vsorted_run_merger.cpp
index a145c216bbb..16150cab63f 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -76,9 +76,9 @@ Status VSortedRunMerger::prepare(const 
std::vector<BlockSupplier>& input_runs) {
         return Status::Cancelled(e.what());
     }
 
-    for (auto& _cursor : _cursors) {
-        if (!_cursor->_is_eof) {
-            _priority_queue.push(MergeSortCursor(_cursor));
+    for (auto& cursor : _cursors) {
+        if (!cursor->eof()) {
+            _priority_queue.push(MergeSortCursor(cursor));
         }
     }
 
@@ -93,7 +93,11 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* 
eos) {
 
     if (_pending_cursor != nullptr) {
         MergeSortCursor cursor(_pending_cursor);
-        if (has_next_block(cursor)) {
+        {
+            ScopedTimer<MonotonicStopWatch> timer1(_get_next_block_timer);
+            cursor->process_next();
+        }
+        if (!cursor->eof()) {
             _priority_queue.push(cursor);
         }
         _pending_cursor = nullptr;
@@ -112,42 +116,39 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
         return Status::OK();
     } else if (_priority_queue.size() == 1) {
         auto current = _priority_queue.top();
-        while (_offset != 0 && current->block_ptr() != nullptr) {
-            if (_offset >= current->rows - current->pos) {
-                _offset -= (current->rows - current->pos);
-                _pending_cursor = current.impl;
+        DCHECK(!current->eof());
+        DCHECK(current->block_ptr() != nullptr);
+        while (_offset != 0) {
+            auto process_rows = std::min(current->rows - current->pos, 
_offset);
+            current->next(process_rows);
+            _offset -= process_rows;
+            if (current->is_last(0)) {
                 _priority_queue.pop();
+                if (current->eof()) {
+                    *eos = true;
+                } else {
+                    _pending_cursor = current.impl;
+                }
                 return Status::OK();
-            } else {
-                current->pos += _offset;
-                _offset = 0;
             }
         }
 
-        if (current->is_first()) {
-            if (current->block_ptr() != nullptr) {
-                current->block_ptr()->swap(*output_block);
-                _pending_cursor = current.impl;
-                _priority_queue.pop();
-                return Status::OK();
-            } else {
-                *eos = true;
+        if (!current->is_first()) {
+            for (int i = 0; i < current->block->columns(); i++) {
+                auto& column_with_type = 
current->block_ptr()->get_by_position(i);
+                column_with_type.column =
+                        column_with_type.column->cut(current->pos, 
current->rows - current->pos);
             }
+        }
+        current->block_ptr()->swap(*output_block);
+        current->next(current->rows - current->pos);
+        if (current->eof()) {
+            *eos = true;
         } else {
-            if (current->block_ptr() != nullptr) {
-                for (int i = 0; i < current->block->columns(); i++) {
-                    auto& column_with_type = 
current->block_ptr()->get_by_position(i);
-                    column_with_type.column = column_with_type.column->cut(
-                            current->pos, current->rows - current->pos);
-                }
-                current->block_ptr()->swap(*output_block);
-                _pending_cursor = current.impl;
-                _priority_queue.pop();
-                return Status::OK();
-            } else {
-                *eos = true;
-            }
+            _pending_cursor = current.impl;
         }
+        _priority_queue.pop();
+        return Status::OK();
     } else {
         size_t num_columns = _priority_queue.top().impl->block->columns();
         MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
@@ -191,7 +192,7 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
                 ++merged_rows;
             }
 
-            if (!next_heap(current)) {
+            if (_need_more_data(current)) {
                 do_insert();
                 return Status::OK();
             }
@@ -208,20 +209,17 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
     return Status::OK();
 }
 
-bool VSortedRunMerger::next_heap(MergeSortCursor& current) {
+bool VSortedRunMerger::_need_more_data(MergeSortCursor& current) {
     if (!current->is_last()) {
         current->next();
         _priority_queue.push(current);
+        return false;
+    } else if (current->eof()) {
+        return false;
+    } else {
+        _pending_cursor = current.impl;
         return true;
     }
-
-    _pending_cursor = current.impl;
-    return false;
-}
-
-inline bool 
VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) {
-    ScopedTimer<MonotonicStopWatch> timer(_get_next_block_timer);
-    return current->has_next_block();
 }
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/runtime/vsorted_run_merger.h 
b/be/src/vec/runtime/vsorted_run_merger.h
index 898b52a8601..d44a5b59dbd 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -92,10 +92,8 @@ protected:
 
 private:
     void init_timers(RuntimeProfile* profile);
-
-    /// In pipeline engine, return false if need to read one more block from 
sender.
-    bool next_heap(MergeSortCursor& current);
-    bool has_next_block(MergeSortCursor& current);
+    // If current stream is exhausted and not eof, we should break this loop 
and read more blocks.
+    bool _need_more_data(MergeSortCursor& current);
 };
 
 } // namespace doris::vectorized
diff --git a/be/test/vec/runtime/sort_merger_test.cpp 
b/be/test/vec/runtime/sort_merger_test.cpp
index a36b436deec..dece7b31074 100644
--- a/be/test/vec/runtime/sort_merger_test.cpp
+++ b/be/test/vec/runtime/sort_merger_test.cpp
@@ -35,7 +35,12 @@ public:
 
 TEST(SortMergerTest, NULL_FIRST_ASC) {
     /**
-     * in: [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 
1, 2, 3, 4], [NULL, 1, 2, 3, 4]
+     * in: [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+     *     [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+     *     [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+     *     [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+     *     [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+     *     offset = 0, limit = -1, NULL_FIRST, ASC
      * out: [NULL, NULL, NULL, NULL, NULL], [1, 1, 1, 1, 1], [2, 2, 2, 2, 2], 
[3, 3, 3, 3, 3], [4], [4], [4], [4], [4]
      */
     const int num_children = 5;
@@ -117,7 +122,12 @@ TEST(SortMergerTest, NULL_FIRST_ASC) {
 
 TEST(SortMergerTest, NULL_LAST_DESC) {
     /**
-     * in: [4, 3, 2, 1, NULL], [4, 3, 2, 1, NULL], [4, 3, 2, 1, NULL], [4, 3, 
2, 1, NULL], [4, 3, 2, 1, NULL]
+     * in: [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)]
+     *     [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)]
+     *     [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)]
+     *     [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)]
+     *     [([4, 3, 2, 1, NULL], eos = false), ([], eos = true)]
+     *     offset = 0, limit = -1, NULL_LAST, DESC
      * out: [4, 4, 4, 4, 4], [3, 3, 3, 3, 3], [2, 2, 2, 2, 2], [1, 1, 1, 1, 
1], [NULL], [NULL], [NULL], [NULL], [NULL]
      */
     const int num_children = 5;
@@ -195,8 +205,12 @@ TEST(SortMergerTest, NULL_LAST_DESC) {
 
 TEST(SortMergerTest, TEST_LIMIT) {
     /**
-     * in: [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 1, 2, 3, 4], [NULL, 
1, 2, 3, 4], [NULL, 1, 2, 3, 4]
-     * offset = 20, limit = 1
+     * in: [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+     *     [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+     *     [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+     *     [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+     *     [([NULL, 1, 2, 3, 4], eos = false), ([], eos = true)]
+     *     offset = 20, limit = 1, NULL_FIRST, ASC
      * out: [4]
      */
     const int num_children = 5;
@@ -249,4 +263,174 @@ TEST(SortMergerTest, TEST_LIMIT) {
     }
 }
 
+TEST(SortMergerTest, LAST_BLOCK_WITH_EOS) {
+    /**
+     * in: [([NULL, 0, 1, 2, 3], eos = true)]
+     *     [([NULL, 0, 1, 2, 3], eos = true)]
+     *     [([NULL, 0, 1, 2, 3], eos = true)]
+     *     [([NULL, 0, 1, 2, 3], eos = true)]
+     *     [([NULL, 0, 1, 2, 3], eos = true)]
+     *     offset = 0, limit = -1, NULL_FIRST, ASC
+     * out: [NULL, NULL, NULL, NULL, NULL], [0, 0, 0, 0, 0], [1, 1, 1, 1, 1], 
[2, 2, 2, 2, 2], [3, 3, 3, 3, 3]
+     */
+    const int num_children = 5;
+    const int batch_size = 5;
+    std::vector<int> round;
+    round.resize(num_children, 0);
+    const int num_round = 1;
+
+    std::unique_ptr<VSortedRunMerger> merger;
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto ordering_expr = MockSlotRef::create_mock_contexts(
+            
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()));
+    {
+        std::vector<bool> is_asc_order = {true};
+        std::vector<bool> nulls_first = {true};
+        const int limit = -1;
+        const int offset = 0;
+        merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, 
nulls_first, batch_size,
+                                          limit, offset, profile.get()));
+    }
+    {
+        std::vector<vectorized::BlockSupplier> child_block_suppliers;
+        for (int child_idx = 0; child_idx < num_children; child_idx++) {
+            vectorized::BlockSupplier block_supplier =
+                    [&, round_vec = &round, num_round = num_round, id = 
child_idx](
+                            vectorized::Block* block, bool* eos) {
+                        *block = 
ColumnHelper::create_nullable_block<DataTypeInt64>(
+                                {0, (*round_vec)[id] + 0, (*round_vec)[id] + 1,
+                                 (*round_vec)[id] + 2, (*round_vec)[id] + 3},
+                                {1, 0, 0, 0, 0});
+                        *eos = ++((*round_vec)[id]) == num_round;
+                        return Status::OK();
+                    };
+            child_block_suppliers.push_back(block_supplier);
+        }
+        EXPECT_TRUE(merger->prepare(child_block_suppliers).ok());
+    }
+    {
+        for (int block_idx = 0; block_idx < num_children * num_round; 
block_idx++) {
+            vectorized::Block block;
+            bool eos = false;
+            EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+            auto expect_block = block_idx == 0
+                                        ? 
ColumnHelper::create_nullable_column<DataTypeInt64>(
+                                                  {0, 0, 0, 0, 0}, {1, 1, 1, 
1, 1})
+                                        : 
ColumnHelper::create_nullable_column<DataTypeInt64>(
+                                                  {block_idx - 1, block_idx - 
1, block_idx - 1,
+                                                   block_idx - 1, block_idx - 
1},
+                                                  {0, 0, 0, 0, 0});
+            
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, 
expect_block));
+            EXPECT_EQ(block.rows(), batch_size);
+            EXPECT_FALSE(eos);
+        }
+        vectorized::Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        EXPECT_EQ(block.rows(), 0);
+        EXPECT_TRUE(eos);
+    }
+}
+
+TEST(SortMergerTest, TEST_BIG_OFFSET_SINGLE_STREAM) {
+    /**
+     * in: [([NULL, 0, 1, 2, 3], eos = true)]
+     *     offset = 20, limit = 1, NULL_FIRST, ASC
+     * out: []
+     */
+    const int num_children = 1;
+    const int batch_size = 5;
+    std::vector<int> round;
+    round.resize(num_children, 0);
+    const int num_round = 1;
+
+    std::unique_ptr<VSortedRunMerger> merger;
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto ordering_expr = MockSlotRef::create_mock_contexts(
+            
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()));
+    {
+        std::vector<bool> is_asc_order = {true};
+        std::vector<bool> nulls_first = {true};
+        const int limit = 1;
+        const int offset = 20;
+        merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, 
nulls_first, batch_size,
+                                          limit, offset, profile.get()));
+    }
+    {
+        std::vector<vectorized::BlockSupplier> child_block_suppliers;
+        for (int child_idx = 0; child_idx < num_children; child_idx++) {
+            vectorized::BlockSupplier block_supplier =
+                    [&, round_vec = &round, num_round = num_round, id = 
child_idx](
+                            vectorized::Block* block, bool* eos) {
+                        *block = 
ColumnHelper::create_nullable_block<DataTypeInt64>(
+                                {0, (*round_vec)[id] + 0, (*round_vec)[id] + 1,
+                                 (*round_vec)[id] + 2, (*round_vec)[id] + 3},
+                                {1, 0, 0, 0, 0});
+                        *eos = ++((*round_vec)[id]) == num_round;
+                        return Status::OK();
+                    };
+            child_block_suppliers.push_back(block_supplier);
+        }
+        EXPECT_TRUE(merger->prepare(child_block_suppliers).ok());
+    }
+    {
+        vectorized::Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        EXPECT_EQ(block.rows(), 0);
+        EXPECT_TRUE(eos);
+    }
+}
+
+TEST(SortMergerTest, TEST_SMALL_OFFSET_SINGLE_STREAM) {
+    /**
+     * in: [([NULL, 0, 1, 2, 3], eos = true)]
+     *     offset = 4, limit = 1, NULL_FIRST, ASC
+     * out: [3]
+     */
+    const int num_children = 1;
+    const int batch_size = 5;
+    std::vector<int> round;
+    round.resize(num_children, 0);
+    const int num_round = 1;
+
+    std::unique_ptr<VSortedRunMerger> merger;
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto ordering_expr = MockSlotRef::create_mock_contexts(
+            
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()));
+    {
+        std::vector<bool> is_asc_order = {true};
+        std::vector<bool> nulls_first = {true};
+        const int limit = 1;
+        const int offset = 4;
+        merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, 
nulls_first, batch_size,
+                                          limit, offset, profile.get()));
+    }
+    {
+        std::vector<vectorized::BlockSupplier> child_block_suppliers;
+        for (int child_idx = 0; child_idx < num_children; child_idx++) {
+            vectorized::BlockSupplier block_supplier =
+                    [&, round_vec = &round, num_round = num_round, id = 
child_idx](
+                            vectorized::Block* block, bool* eos) {
+                        *block = 
ColumnHelper::create_nullable_block<DataTypeInt64>(
+                                {0, (*round_vec)[id] + 0, (*round_vec)[id] + 1,
+                                 (*round_vec)[id] + 2, (*round_vec)[id] + 3},
+                                {1, 0, 0, 0, 0});
+                        *eos = ++((*round_vec)[id]) == num_round;
+                        return Status::OK();
+                    };
+            child_block_suppliers.push_back(block_supplier);
+        }
+        EXPECT_TRUE(merger->prepare(child_block_suppliers).ok());
+    }
+    {
+        vectorized::Block block;
+        bool eos = false;
+        EXPECT_TRUE(merger->get_next(&block, &eos).ok());
+        auto expect_block = 
ColumnHelper::create_nullable_column<DataTypeInt64>({3}, {0});
+        
EXPECT_TRUE(ColumnHelper::column_equal(block.get_by_position(0).column, 
expect_block));
+        EXPECT_TRUE(eos);
+    }
+}
+
 } // namespace doris::vectorized
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to