This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d51643fa0d3 branch-3.0: [fix](load) Convert RowInBlock* to shared_ptr 
to fix potential memory leaks in MemTable (#52902) (#52965)
d51643fa0d3 is described below

commit d51643fa0d3571324071f6abe955f7f7ba7b7da8
Author: Xin Liao <[email protected]>
AuthorDate: Wed Jul 9 10:28:49 2025 +0800

    branch-3.0: [fix](load) Convert RowInBlock* to shared_ptr to fix potential 
memory leaks in MemTable (#52902) (#52965)
    
    pick from: #52902
---
 be/src/olap/memtable.cpp | 62 ++++++++++++++++++++++--------------------------
 be/src/olap/memtable.h   |  6 ++---
 2 files changed, 31 insertions(+), 37 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index b0d930e6d09..1bc3960dd6d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -156,8 +156,6 @@ MemTable::~MemTable() {
                 }
             }
         }
-        std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(),
-                      std::default_delete<RowInBlock>());
         // Arena has to be destroyed after agg state, because some agg state's 
memory may be
         // allocated in arena.
         _arena.reset();
@@ -216,7 +214,7 @@ Status MemTable::insert(const vectorized::Block* 
input_block,
     RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(),
                                                   row_idxs.data() + num_rows, 
&_column_offset));
     for (int i = 0; i < num_rows; i++) {
-        _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + 
i});
+        
_row_in_blocks.emplace_back(std::make_shared<RowInBlock>(cursor_in_mutableblock 
+ i));
     }
 
     _stat.raw_rows += num_rows;
@@ -264,7 +262,7 @@ size_t MemTable::_sort() {
     // sort new rows
     Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size());
     for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
-        auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
+        auto cmp = [&](RowInBlock* lhs, RowInBlock* rhs) -> int {
             return _input_mutable_block.compare_one_column(lhs->_row_pos, 
rhs->_row_pos, i, -1);
         };
         _sort_one_column(_row_in_blocks, tie, cmp);
@@ -275,16 +273,17 @@ size_t MemTable::_sort() {
     while (iter.next()) {
         pdqsort(std::next(_row_in_blocks.begin(), iter.left()),
                 std::next(_row_in_blocks.begin(), iter.right()),
-                [&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> 
bool {
+                [&is_dup](const std::shared_ptr<RowInBlock>& lhs,
+                          const std::shared_ptr<RowInBlock>& rhs) -> bool {
                     return is_dup ? lhs->_row_pos > rhs->_row_pos : 
lhs->_row_pos < rhs->_row_pos;
                 });
         same_keys_num += iter.right() - iter.left();
     }
     // merge new rows and old rows
     _vec_row_comparator->set_block(&_input_mutable_block);
-    auto cmp_func = [this, is_dup, &same_keys_num](const RowInBlock* l,
-                                                   const RowInBlock* r) -> 
bool {
-        auto value = (*(this->_vec_row_comparator))(l, r);
+    auto cmp_func = [this, is_dup, &same_keys_num](const 
std::shared_ptr<RowInBlock>& l,
+                                                   const 
std::shared_ptr<RowInBlock>& r) -> bool {
+        auto value = (*(this->_vec_row_comparator))(l.get(), r.get());
         if (value == 0) {
             same_keys_num++;
             return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos < 
r->_row_pos;
@@ -308,14 +307,10 @@ Status MemTable::_sort_by_cluster_keys() {
     auto clone_block = in_block.clone_without_columns();
     _output_mutable_block = 
vectorized::MutableBlock::build_mutable_block(&clone_block);
 
-    std::vector<RowInBlock*> row_in_blocks;
-    std::unique_ptr<int, std::function<void(int*)>> 
row_in_blocks_deleter((int*)0x01, [&](int*) {
-        std::for_each(row_in_blocks.begin(), row_in_blocks.end(),
-                      std::default_delete<RowInBlock>());
-    });
+    std::vector<std::shared_ptr<RowInBlock>> row_in_blocks;
     row_in_blocks.reserve(mutable_block.rows());
     for (size_t i = 0; i < mutable_block.rows(); i++) {
-        row_in_blocks.emplace_back(new RowInBlock {i});
+        row_in_blocks.emplace_back(std::make_shared<RowInBlock>(i));
     }
     Tie tie = Tie(0, mutable_block.rows());
 
@@ -336,9 +331,8 @@ Status MemTable::_sort_by_cluster_keys() {
     while (iter.next()) {
         pdqsort(std::next(row_in_blocks.begin(), iter.left()),
                 std::next(row_in_blocks.begin(), iter.right()),
-                [](const RowInBlock* lhs, const RowInBlock* rhs) -> bool {
-                    return lhs->_row_pos < rhs->_row_pos;
-                });
+                [](const std::shared_ptr<RowInBlock>& lhs, const 
std::shared_ptr<RowInBlock>& rhs)
+                        -> bool { return lhs->_row_pos < rhs->_row_pos; });
     }
 
     in_block = mutable_block.to_block();
@@ -353,16 +347,16 @@ Status MemTable::_sort_by_cluster_keys() {
                                           row_pos_vec.data() + 
in_block.rows(), &_column_offset);
 }
 
-void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& 
tie,
-                                std::function<int(const RowInBlock*, const 
RowInBlock*)> cmp) {
+void MemTable::_sort_one_column(std::vector<std::shared_ptr<RowInBlock>>& 
row_in_blocks, Tie& tie,
+                                std::function<int(RowInBlock*, RowInBlock*)> 
cmp) {
     auto iter = tie.iter();
     while (iter.next()) {
-        pdqsort(std::next(row_in_blocks.begin(), iter.left()),
-                std::next(row_in_blocks.begin(), iter.right()),
-                [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs, rhs) < 0; 
});
+        pdqsort(std::next(row_in_blocks.begin(), 
static_cast<int>(iter.left())),
+                std::next(row_in_blocks.begin(), 
static_cast<int>(iter.right())),
+                [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs.get(), 
rhs.get()) < 0; });
         tie[iter.left()] = 0;
-        for (int i = iter.left() + 1; i < iter.right(); i++) {
-            tie[i] = (cmp(row_in_blocks[i - 1], row_in_blocks[i]) == 0);
+        for (auto i = iter.left() + 1; i < iter.right(); i++) {
+            tie[i] = (cmp(row_in_blocks[i - 1].get(), row_in_blocks[i].get()) 
== 0);
         }
     }
 }
@@ -423,14 +417,14 @@ void MemTable::_aggregate() {
             vectorized::MutableBlock::build_mutable_block(&in_block);
     _vec_row_comparator->set_block(&mutable_block);
     auto& block_data = in_block.get_columns_with_type_and_name();
-    std::vector<RowInBlock*> temp_row_in_blocks;
+    std::vector<std::shared_ptr<RowInBlock>> temp_row_in_blocks;
     temp_row_in_blocks.reserve(_last_sorted_pos);
     RowInBlock* prev_row = nullptr;
     int row_pos = -1;
     //only init agg if needed
-    for (int i = 0; i < _row_in_blocks.size(); i++) {
-        if (!temp_row_in_blocks.empty() &&
-            (*_vec_row_comparator)(prev_row, _row_in_blocks[i]) == 0) {
+    for (const auto& row_ptr : _row_in_blocks) {
+        RowInBlock* current_row = row_ptr.get();
+        if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, 
current_row) == 0) {
             if (!prev_row->has_init_agg()) {
                 prev_row->init_agg_places(
                         _arena->aligned_alloc(_total_size_of_aggregate_states, 
16),
@@ -445,20 +439,20 @@ void MemTable::_aggregate() {
                 }
             }
             _stat.merged_rows++;
-            _aggregate_two_row_in_block(mutable_block, _row_in_blocks[i], 
prev_row);
+            _aggregate_two_row_in_block(mutable_block, current_row, prev_row);
         } else {
-            prev_row = _row_in_blocks[i];
+            prev_row = current_row;
             if (!temp_row_in_blocks.empty()) {
                 // no more rows to merge for prev row, finalize it
-                _finalize_one_row<is_final>(temp_row_in_blocks.back(), 
block_data, row_pos);
+                _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), 
block_data, row_pos);
             }
-            temp_row_in_blocks.push_back(prev_row);
+            temp_row_in_blocks.push_back(row_ptr);
             row_pos++;
         }
     }
     if (!temp_row_in_blocks.empty()) {
-        // finalize the last low
-        _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, 
row_pos);
+        // finalize the last row
+        _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), 
block_data, row_pos);
     }
     if constexpr (!is_final) {
         // if is not final, we collect the agg results to input_block and then 
continue to insert
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 4ae92c2d2d8..1e08891f3f8 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -250,8 +250,8 @@ private:
     //return number of same keys
     size_t _sort();
     Status _sort_by_cluster_keys();
-    void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
-                          std::function<int(const RowInBlock*, const 
RowInBlock*)> cmp);
+    void _sort_one_column(std::vector<std::shared_ptr<RowInBlock>>& 
row_in_blocks, Tie& tie,
+                          std::function<int(RowInBlock*, RowInBlock*)> cmp);
     template <bool is_final>
     void _finalize_one_row(RowInBlock* row, const 
vectorized::ColumnsWithTypeAndName& block_data,
                            int row_pos);
@@ -264,7 +264,7 @@ private:
     std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
     std::vector<size_t> _offsets_of_aggregate_states;
     size_t _total_size_of_aggregate_states;
-    std::vector<RowInBlock*> _row_in_blocks;
+    std::vector<std::shared_ptr<RowInBlock>> _row_in_blocks;
 
     size_t _num_columns;
     int32_t _seq_col_idx_in_block = -1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to