This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 1768169b9a0 Revert "[Improvement](sort) Free sort blocks if this block 
is exhausted (#39306)" (#40211)
1768169b9a0 is described below

commit 1768169b9a034204491cbc3368d101977d55ec7a
Author: yiguolei <676222...@qq.com>
AuthorDate: Sat Aug 31 15:58:55 2024 +0800

    Revert "[Improvement](sort) Free sort blocks if this block is exhausted 
(#39306)" (#40211)
    
    Reverts apache/doris#39956
---
 be/src/vec/common/sort/partition_sorter.cpp | 42 +++++++++--------
 be/src/vec/common/sort/partition_sorter.h   |  4 +-
 be/src/vec/common/sort/sorter.cpp           | 71 ++++++++++++++---------------
 be/src/vec/common/sort/sorter.h             | 12 +++--
 be/src/vec/common/sort/topn_sorter.cpp      | 17 +++----
 be/src/vec/core/sort_cursor.h               | 68 ++++++++++++++++-----------
 be/src/vec/runtime/vsorted_run_merger.cpp   | 34 ++++++++++----
 be/src/vec/runtime/vsorted_run_merger.h     | 13 ++++--
 8 files changed, 149 insertions(+), 112 deletions(-)

diff --git a/be/src/vec/common/sort/partition_sorter.cpp 
b/be/src/vec/common/sort/partition_sorter.cpp
index c363a41d1c7..1ea7c6de6a8 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -58,17 +58,20 @@ Status PartitionSorter::append_block(Block* input_block) {
     Block sorted_block = 
VectorizedUtils::create_empty_columnswithtypename(_row_desc);
     DCHECK(input_block->columns() == sorted_block.columns());
     RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
-    _state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
+    RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
     return Status::OK();
 }
 
 Status PartitionSorter::prepare_for_read() {
+    auto& cursors = _state->get_cursors();
     auto& blocks = _state->get_sorted_block();
     auto& priority_queue = _state->get_priority_queue();
     for (auto& block : blocks) {
-        priority_queue.push(MergeSortCursorImpl::create_shared(block, 
_sort_description));
+        cursors.emplace_back(block, _sort_description);
+    }
+    for (auto& cursor : cursors) {
+        priority_queue.push(MergeSortCursor(&cursor));
     }
-    blocks.clear();
     return Status::OK();
 }
 
@@ -81,30 +84,29 @@ void PartitionSorter::reset_sorter_state(RuntimeState* 
runtime_state) {
 }
 
 Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) 
{
-    if (_state->get_priority_queue().empty()) {
-        *eos = true;
-    } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
-        block->swap(*_state->get_priority_queue().top().impl->block);
-        block->set_num_rows(_partition_inner_limit);
+    if (_state->get_sorted_block().empty()) {
         *eos = true;
     } else {
-        RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
+        if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
+            auto& sorted_block = _state->get_sorted_block()[0];
+            block->swap(sorted_block);
+            block->set_num_rows(_partition_inner_limit);
+            *eos = true;
+        } else {
+            RETURN_IF_ERROR(partition_sort_read(block, eos, 
state->batch_size()));
+        }
     }
     return Status::OK();
 }
 
 Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, 
int batch_size) {
-    auto& priority_queue = _state->get_priority_queue();
-    if (priority_queue.empty()) {
-        *eos = true;
-        return Status::OK();
-    }
-    const auto& sorted_block = priority_queue.top().impl->block;
-    size_t num_columns = sorted_block->columns();
+    const auto& sorted_block = _state->get_sorted_block()[0];
+    size_t num_columns = sorted_block.columns();
     MutableBlock m_block =
-            VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
*sorted_block);
+            VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
sorted_block);
     MutableColumns& merged_columns = m_block.mutable_columns();
     size_t current_output_rows = 0;
+    auto& priority_queue = _state->get_priority_queue();
 
     bool get_enough_data = false;
     while (!priority_queue.empty()) {
@@ -119,7 +121,7 @@ Status PartitionSorter::partition_sort_read(Block* 
output_block, bool* eos, int
             //1 row_number no need to check distinct, just output 
partition_inner_limit row
             if ((current_output_rows + _output_total_rows) < 
_partition_inner_limit) {
                 for (size_t i = 0; i < num_columns; ++i) {
-                    
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+                    merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
                 }
             } else {
                 //rows has get enough
@@ -153,7 +155,7 @@ Status PartitionSorter::partition_sort_read(Block* 
output_block, bool* eos, int
                 }
             }
             for (size_t i = 0; i < num_columns; ++i) {
-                
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+                merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
             }
             break;
         }
@@ -178,7 +180,7 @@ Status PartitionSorter::partition_sort_read(Block* 
output_block, bool* eos, int
                 *_previous_row = current;
             }
             for (size_t i = 0; i < num_columns; ++i) {
-                
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+                merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
             }
             current_output_rows++;
             break;
diff --git a/be/src/vec/common/sort/partition_sorter.h 
b/be/src/vec/common/sort/partition_sorter.h
index 01e009d200d..77dcb683711 100644
--- a/be/src/vec/common/sort/partition_sorter.h
+++ b/be/src/vec/common/sort/partition_sorter.h
@@ -50,7 +50,7 @@ public:
     SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), 
impl(cursor.impl) {}
 
     void reset() {
-        impl->reset();
+        impl = nullptr;
         row = 0;
     }
     bool compare_two_rows(const MergeSortCursor& rhs) const {
@@ -67,7 +67,7 @@ public:
         return true;
     }
     int row = 0;
-    std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
+    MergeSortCursorImpl* impl = nullptr;
 };
 
 class PartitionSorter final : public Sorter {
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index 89f1c7d73f1..eca7e15626b 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -59,44 +59,48 @@ namespace doris::vectorized {
 void MergeSorterState::reset() {
     auto empty_queue = std::priority_queue<MergeSortCursor>();
     priority_queue_.swap(empty_queue);
-    std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
-    std::vector<std::shared_ptr<Block>> empty_blocks(0);
+    std::vector<MergeSortCursorImpl> empty_cursors(0);
+    cursors_.swap(empty_cursors);
+    std::vector<Block> empty_blocks(0);
     sorted_blocks_.swap(empty_blocks);
     unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
     in_mem_sorted_bocks_size_ = 0;
 }
 
-void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
-    auto rows = block->rows();
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
     if (0 == rows) {
-        return;
+        return Status::OK();
     }
-    in_mem_sorted_bocks_size_ += block->bytes();
-    sorted_blocks_.emplace_back(block);
+    in_mem_sorted_bocks_size_ += block.bytes();
+    sorted_blocks_.emplace_back(std::move(block));
     num_rows_ += rows;
+    return Status::OK();
 }
 
 Status MergeSorterState::build_merge_tree(const SortDescription& 
sort_description) {
     for (auto& block : sorted_blocks_) {
-        priority_queue_.emplace(
-                MergeSortCursorImpl::create_shared(std::move(block), 
sort_description));
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) {
+            priority_queue_.emplace(&cursor);
+        }
     }
 
-    sorted_blocks_.clear();
     return Status::OK();
 }
 
 Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int 
batch_size,
                                          bool* eos) {
-    DCHECK(sorted_blocks_.empty());
-    DCHECK(unsorted_block_->empty());
-    if (priority_queue_.empty()) {
+    if (sorted_blocks_.empty()) {
         *eos = true;
-    } else if (priority_queue_.size() == 1) {
+    } else if (sorted_blocks_.size() == 1) {
         if (offset_ != 0) {
-            priority_queue_.top().impl->block->skip_num_rows(offset_);
+            sorted_blocks_[0].skip_num_rows(offset_);
         }
-        block->swap(*priority_queue_.top().impl->block);
+        block->swap(sorted_blocks_[0]);
         *eos = true;
     } else {
         RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
@@ -106,14 +110,9 @@ Status 
MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba
 
 Status MergeSorterState::_merge_sort_read_impl(int batch_size, 
doris::vectorized::Block* block,
                                                bool* eos) {
-    if (priority_queue_.empty()) {
-        *eos = true;
-        return Status::OK();
-    }
-    size_t num_columns = priority_queue_.top().impl->block->columns();
+    size_t num_columns = sorted_blocks_[0].columns();
 
-    MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
-            block, *priority_queue_.top().impl->block);
+    MutableBlock m_block = 
VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
     MutableColumns& merged_columns = m_block.mutable_columns();
 
     /// Take rows from queue in right order and push to 'merged'.
@@ -124,7 +123,7 @@ Status MergeSorterState::_merge_sort_read_impl(int 
batch_size, doris::vectorized
 
         if (offset_ == 0) {
             for (size_t i = 0; i < num_columns; ++i)
-                
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
+                merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
             ++merged_rows;
         } else {
             offset_--;
@@ -135,9 +134,7 @@ Status MergeSorterState::_merge_sort_read_impl(int 
batch_size, doris::vectorized
             priority_queue_.push(current);
         }
 
-        if (merged_rows == batch_size) {
-            break;
-        }
+        if (merged_rows == batch_size) break;
     }
     block->set_columns(std::move(merged_columns));
 
@@ -264,22 +261,22 @@ Status FullSorter::_do_sort() {
         // if one block totally greater the heap top of _block_priority_queue
         // we can throw the block data directly.
         if (_state->num_rows() < _offset + _limit) {
-            
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
-            _block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
-                    _state->last_sorted_block(), _sort_description));
+            static_cast<void>(_state->add_sorted_block(desc_block));
+            _block_priority_queue.emplace(_pool->add(
+                    new MergeSortCursorImpl(_state->last_sorted_block(), 
_sort_description)));
         } else {
-            auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
-                    Block::create_shared(std::move(desc_block)), 
_sort_description);
-            MergeSortBlockCursor block_cursor(tmp_cursor_impl);
+            auto tmp_cursor_impl =
+                    std::make_unique<MergeSortCursorImpl>(desc_block, 
_sort_description);
+            MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
             if (!block_cursor.totally_greater(_block_priority_queue.top())) {
-                _state->add_sorted_block(tmp_cursor_impl->block);
-                
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
-                        _state->last_sorted_block(), _sort_description));
+                static_cast<void>(_state->add_sorted_block(desc_block));
+                _block_priority_queue.emplace(_pool->add(
+                        new MergeSortCursorImpl(_state->last_sorted_block(), 
_sort_description)));
             }
         }
     } else {
         // dispose normal sort logic
-        _state->add_sorted_block(Block::create_shared(std::move(desc_block)));
+        static_cast<void>(_state->add_sorted_block(desc_block));
     }
     return Status::OK();
 }
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index daa871f5d48..2525ca8c0c1 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -59,7 +59,7 @@ public:
 
     ~MergeSorterState() = default;
 
-    void add_sorted_block(std::shared_ptr<Block> block);
+    Status add_sorted_block(Block& block);
 
     Status build_merge_tree(const SortDescription& sort_description);
 
@@ -72,19 +72,23 @@ public:
 
     uint64_t num_rows() const { return num_rows_; }
 
-    std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); 
}
+    Block& last_sorted_block() { return sorted_blocks_.back(); }
 
-    std::vector<std::shared_ptr<Block>>& get_sorted_block() { return 
sorted_blocks_; }
+    std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
     std::priority_queue<MergeSortCursor>& get_priority_queue() { return 
priority_queue_; }
+    std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
     void reset();
 
     std::unique_ptr<Block> unsorted_block_;
 
 private:
+    int _calc_spill_blocks_to_merge() const;
+
     Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* 
block, bool* eos);
 
     std::priority_queue<MergeSortCursor> priority_queue_;
-    std::vector<std::shared_ptr<Block>> sorted_blocks_;
+    std::vector<MergeSortCursorImpl> cursors_;
+    std::vector<Block> sorted_blocks_;
     size_t in_mem_sorted_bocks_size_ = 0;
     uint64_t num_rows_ = 0;
 
diff --git a/be/src/vec/common/sort/topn_sorter.cpp 
b/be/src/vec/common/sort/topn_sorter.cpp
index 1f24fb14c95..58c3cd2dd0c 100644
--- a/be/src/vec/common/sort/topn_sorter.cpp
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -72,16 +72,17 @@ Status TopNSorter::_do_sort(Block* block) {
         // if one block totally greater the heap top of _block_priority_queue
         // we can throw the block data directly.
         if (_state->num_rows() < _offset + _limit) {
-            
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
-            _block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
-                    _state->last_sorted_block(), _sort_description));
+            RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
+            _block_priority_queue.emplace(_pool->add(
+                    new MergeSortCursorImpl(_state->last_sorted_block(), 
_sort_description)));
         } else {
-            auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
-                    Block::create_shared(std::move(sorted_block)), 
_sort_description);
-            MergeSortBlockCursor block_cursor(tmp_cursor_impl);
+            auto tmp_cursor_impl =
+                    std::make_unique<MergeSortCursorImpl>(sorted_block, 
_sort_description);
+            MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
             if (!block_cursor.totally_greater(_block_priority_queue.top())) {
-                _state->add_sorted_block(block_cursor.impl->block);
-                _block_priority_queue.emplace(tmp_cursor_impl);
+                RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
+                _block_priority_queue.emplace(_pool->add(
+                        new MergeSortCursorImpl(_state->last_sorted_block(), 
_sort_description)));
             }
         }
     } else {
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 8b627f50af7..e565819c9d6 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -120,8 +120,7 @@ private:
   * It is used in priority queue.
   */
 struct MergeSortCursorImpl {
-    ENABLE_FACTORY_CREATOR(MergeSortCursorImpl);
-    std::shared_ptr<Block> block;
+    ColumnRawPtrs all_columns;
     ColumnRawPtrs sort_columns;
     SortDescription desc;
     size_t sort_columns_size = 0;
@@ -131,30 +130,37 @@ struct MergeSortCursorImpl {
     MergeSortCursorImpl() = default;
     virtual ~MergeSortCursorImpl() = default;
 
-    MergeSortCursorImpl(std::shared_ptr<Block> block_, const SortDescription& 
desc_)
-            : block(block_), desc(desc_), sort_columns_size(desc.size()) {
-        reset();
+    MergeSortCursorImpl(Block& block, const SortDescription& desc_)
+            : desc(desc_), sort_columns_size(desc.size()) {
+        reset(block);
     }
 
     MergeSortCursorImpl(const SortDescription& desc_)
-            : block(Block::create_shared()), desc(desc_), 
sort_columns_size(desc.size()) {}
+            : desc(desc_), sort_columns_size(desc.size()) {}
     bool empty() const { return rows == 0; }
 
     /// Set the cursor to the beginning of the new block.
-    void reset() {
+    void reset(Block& block) {
+        all_columns.clear();
         sort_columns.clear();
 
-        auto columns = block->get_columns_and_convert();
+        auto columns = block.get_columns_and_convert();
+        size_t num_columns = columns.size();
+
+        for (size_t j = 0; j < num_columns; ++j) {
+            all_columns.push_back(columns[j].get());
+        }
+
         for (size_t j = 0, size = desc.size(); j < size; ++j) {
             auto& column_desc = desc[j];
             size_t column_number = !column_desc.column_name.empty()
-                                           ? 
block->get_position_by_name(column_desc.column_name)
+                                           ? 
block.get_position_by_name(column_desc.column_name)
                                            : column_desc.column_number;
             sort_columns.push_back(columns[column_number].get());
         }
 
         pos = 0;
-        rows = block->rows();
+        rows = all_columns[0]->size();
     }
 
     bool is_first() const { return pos == 0; }
@@ -168,13 +174,11 @@ struct MergeSortCursorImpl {
 using BlockSupplier = std::function<Status(Block*, bool* eos)>;
 
 struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
-    ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl);
     BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier,
                                 const VExprContextSPtrs& ordering_expr,
                                 const std::vector<bool>& is_asc_order,
                                 const std::vector<bool>& nulls_first)
             : _ordering_expr(ordering_expr), _block_supplier(block_supplier) {
-        block = Block::create_shared();
         sort_columns_size = ordering_expr.size();
 
         desc.resize(ordering_expr.size());
@@ -191,21 +195,21 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
     }
 
     bool has_next_block() override {
-        block->clear();
+        _block.clear();
         Status status;
         do {
-            status = _block_supplier(block.get(), &_is_eof);
-        } while (block->empty() && !_is_eof && status.ok());
+            status = _block_supplier(&_block, &_is_eof);
+        } while (_block.empty() && !_is_eof && status.ok());
         // If status not ok, upper callers could not detect whether it is eof 
or error.
         // So that fatal here, and should throw exception in the future.
-        if (status.ok() && !block->empty()) {
+        if (status.ok() && !_block.empty()) {
             if (_ordering_expr.size() > 0) {
                 for (int i = 0; status.ok() && i < desc.size(); ++i) {
                     // TODO yiguolei: throw exception if status not ok in the 
future
-                    status = _ordering_expr[i]->execute(block.get(), 
&desc[i].column_number);
+                    status = _ordering_expr[i]->execute(&_block, 
&desc[i].column_number);
                 }
             }
-            MergeSortCursorImpl::reset();
+            MergeSortCursorImpl::reset(_block);
             return status.ok();
         } else if (!status.ok()) {
             throw std::runtime_error(status.msg());
@@ -217,21 +221,32 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
         if (_is_eof) {
             return nullptr;
         }
-        return block.get();
+        return &_block;
+    }
+
+    size_t columns_num() const { return all_columns.size(); }
+
+    Block create_empty_blocks() const {
+        size_t num_columns = columns_num();
+        MutableColumns columns(num_columns);
+        for (size_t i = 0; i < num_columns; ++i) {
+            columns[i] = all_columns[i]->clone_empty();
+        }
+        return _block.clone_with_columns(std::move(columns));
     }
 
     VExprContextSPtrs _ordering_expr;
+    Block _block;
     BlockSupplier _block_supplier {};
     bool _is_eof = false;
 };
 
 /// For easy copying.
 struct MergeSortCursor {
-    ENABLE_FACTORY_CREATOR(MergeSortCursor);
-    std::shared_ptr<MergeSortCursorImpl> impl;
+    MergeSortCursorImpl* impl;
 
-    MergeSortCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : impl(impl_) 
{}
-    MergeSortCursorImpl* operator->() const { return impl.get(); }
+    MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
+    MergeSortCursorImpl* operator->() const { return impl; }
 
     /// The specified row of this cursor is greater than the specified row of 
another cursor.
     int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t 
rhs_pos) const {
@@ -271,11 +286,10 @@ struct MergeSortCursor {
 
 /// For easy copying.
 struct MergeSortBlockCursor {
-    ENABLE_FACTORY_CREATOR(MergeSortBlockCursor);
-    std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
+    MergeSortCursorImpl* impl = nullptr;
 
-    MergeSortBlockCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : 
impl(impl_) {}
-    MergeSortCursorImpl* operator->() const { return impl.get(); }
+    MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
+    MergeSortCursorImpl* operator->() const { return impl; }
 
     /// The specified row of this cursor is greater than the specified row of 
another cursor.
     int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const {
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp 
b/be/src/vec/runtime/vsorted_run_merger.cpp
index f321622012f..3b17f957deb 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -28,6 +28,14 @@
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/utils/util.hpp"
 
+namespace doris {
+namespace vectorized {
+class VExprContext;
+} // namespace vectorized
+} // namespace doris
+
+using std::vector;
+
 namespace doris::vectorized {
 
 VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr,
@@ -60,14 +68,13 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile) 
{
     _get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock");
 }
 
-Status VSortedRunMerger::prepare(const std::vector<BlockSupplier>& input_runs) 
{
+Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) {
     try {
         for (const auto& supplier : input_runs) {
             if (_use_sort_desc) {
-                
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier, 
_desc));
+                _cursors.emplace_back(supplier, _desc);
             } else {
-                
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(
-                        supplier, _ordering_expr, _is_asc_order, 
_nulls_first));
+                _cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, 
_nulls_first);
             }
         }
     } catch (const std::exception& e) {
@@ -75,8 +82,15 @@ Status VSortedRunMerger::prepare(const 
std::vector<BlockSupplier>& input_runs) {
     }
 
     for (auto& _cursor : _cursors) {
-        if (!_cursor->_is_eof) {
-            _priority_queue.push(MergeSortCursor(_cursor));
+        if (!_cursor._is_eof) {
+            _priority_queue.push(MergeSortCursor(&_cursor));
+        }
+    }
+
+    for (const auto& cursor : _cursors) {
+        if (!cursor._is_eof) {
+            _empty_block = cursor.create_empty_blocks();
+            break;
         }
     }
 
@@ -131,7 +145,7 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
             }
         } else {
             if (current->block_ptr() != nullptr) {
-                for (int i = 0; i < current->block->columns(); i++) {
+                for (int i = 0; i < current->all_columns.size(); i++) {
                     auto& column_with_type = 
current->block_ptr()->get_by_position(i);
                     column_with_type.column = column_with_type.column->cut(
                             current->pos, current->rows - current->pos);
@@ -148,9 +162,9 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
             }
         }
     } else {
-        size_t num_columns = _priority_queue.top().impl->block->columns();
-        MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
-                output_block, *_priority_queue.top().impl->block);
+        size_t num_columns = _empty_block.columns();
+        MutableBlock m_block =
+                VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
_empty_block);
         MutableColumns& merged_columns = m_block.mutable_columns();
 
         if (num_columns != merged_columns.size()) {
diff --git a/be/src/vec/runtime/vsorted_run_merger.h 
b/be/src/vec/runtime/vsorted_run_merger.h
index 844704fd130..943956d8c38 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -30,7 +30,9 @@
 #include "vec/core/sort_description.h"
 #include "vec/exprs/vexpr_fwd.h"
 
-namespace doris::vectorized {
+namespace doris {
+
+namespace vectorized {
 
 // VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is 
a sorted
 // sequence of blocks, which are fetched from a BlockSupplier function object.
@@ -76,12 +78,14 @@ protected:
 
     bool _pipeline_engine_enabled = false;
 
-    std::vector<std::shared_ptr<BlockSupplierSortCursorImpl>> _cursors;
+    std::vector<BlockSupplierSortCursorImpl> _cursors;
     std::priority_queue<MergeSortCursor> _priority_queue;
 
     /// In pipeline engine, if a cursor needs to read one more block from 
supplier,
     /// we make it as a pending cursor until the supplier is readable.
-    std::shared_ptr<MergeSortCursorImpl> _pending_cursor = nullptr;
+    MergeSortCursorImpl* _pending_cursor = nullptr;
+
+    Block _empty_block;
 
     // Times calls to get_next().
     RuntimeProfile::Counter* _get_next_timer = nullptr;
@@ -101,4 +105,5 @@ private:
     bool has_next_block(MergeSortCursor& current);
 };
 
-} // namespace doris::vectorized
+} // namespace vectorized
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to