This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d51643fa0d3 branch-3.0: [fix](load) Convert RowInBlock* to shared_ptr
to fix potential memory leaks in MemTable (#52902) (#52965)
d51643fa0d3 is described below
commit d51643fa0d3571324071f6abe955f7f7ba7b7da8
Author: Xin Liao <[email protected]>
AuthorDate: Wed Jul 9 10:28:49 2025 +0800
branch-3.0: [fix](load) Convert RowInBlock* to shared_ptr to fix potential
memory leaks in MemTable (#52902) (#52965)
pick from: #52902
---
be/src/olap/memtable.cpp | 62 ++++++++++++++++++++++--------------------------
be/src/olap/memtable.h | 6 ++---
2 files changed, 31 insertions(+), 37 deletions(-)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index b0d930e6d09..1bc3960dd6d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -156,8 +156,6 @@ MemTable::~MemTable() {
}
}
}
- std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(),
- std::default_delete<RowInBlock>());
// Arena has to be destroyed after agg state, because some agg state's
memory may be
// allocated in arena.
_arena.reset();
@@ -216,7 +214,7 @@ Status MemTable::insert(const vectorized::Block*
input_block,
RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(),
row_idxs.data() + num_rows,
&_column_offset));
for (int i = 0; i < num_rows; i++) {
- _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock +
i});
+
_row_in_blocks.emplace_back(std::make_shared<RowInBlock>(cursor_in_mutableblock
+ i));
}
_stat.raw_rows += num_rows;
@@ -264,7 +262,7 @@ size_t MemTable::_sort() {
// sort new rows
Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size());
for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
- auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
+ auto cmp = [&](RowInBlock* lhs, RowInBlock* rhs) -> int {
return _input_mutable_block.compare_one_column(lhs->_row_pos,
rhs->_row_pos, i, -1);
};
_sort_one_column(_row_in_blocks, tie, cmp);
@@ -275,16 +273,17 @@ size_t MemTable::_sort() {
while (iter.next()) {
pdqsort(std::next(_row_in_blocks.begin(), iter.left()),
std::next(_row_in_blocks.begin(), iter.right()),
- [&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) ->
bool {
+ [&is_dup](const std::shared_ptr<RowInBlock>& lhs,
+ const std::shared_ptr<RowInBlock>& rhs) -> bool {
return is_dup ? lhs->_row_pos > rhs->_row_pos :
lhs->_row_pos < rhs->_row_pos;
});
same_keys_num += iter.right() - iter.left();
}
// merge new rows and old rows
_vec_row_comparator->set_block(&_input_mutable_block);
- auto cmp_func = [this, is_dup, &same_keys_num](const RowInBlock* l,
- const RowInBlock* r) ->
bool {
- auto value = (*(this->_vec_row_comparator))(l, r);
+ auto cmp_func = [this, is_dup, &same_keys_num](const
std::shared_ptr<RowInBlock>& l,
+ const
std::shared_ptr<RowInBlock>& r) -> bool {
+ auto value = (*(this->_vec_row_comparator))(l.get(), r.get());
if (value == 0) {
same_keys_num++;
return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos <
r->_row_pos;
@@ -308,14 +307,10 @@ Status MemTable::_sort_by_cluster_keys() {
auto clone_block = in_block.clone_without_columns();
_output_mutable_block =
vectorized::MutableBlock::build_mutable_block(&clone_block);
- std::vector<RowInBlock*> row_in_blocks;
- std::unique_ptr<int, std::function<void(int*)>>
row_in_blocks_deleter((int*)0x01, [&](int*) {
- std::for_each(row_in_blocks.begin(), row_in_blocks.end(),
- std::default_delete<RowInBlock>());
- });
+ std::vector<std::shared_ptr<RowInBlock>> row_in_blocks;
row_in_blocks.reserve(mutable_block.rows());
for (size_t i = 0; i < mutable_block.rows(); i++) {
- row_in_blocks.emplace_back(new RowInBlock {i});
+ row_in_blocks.emplace_back(std::make_shared<RowInBlock>(i));
}
Tie tie = Tie(0, mutable_block.rows());
@@ -336,9 +331,8 @@ Status MemTable::_sort_by_cluster_keys() {
while (iter.next()) {
pdqsort(std::next(row_in_blocks.begin(), iter.left()),
std::next(row_in_blocks.begin(), iter.right()),
- [](const RowInBlock* lhs, const RowInBlock* rhs) -> bool {
- return lhs->_row_pos < rhs->_row_pos;
- });
+ [](const std::shared_ptr<RowInBlock>& lhs, const
std::shared_ptr<RowInBlock>& rhs)
+ -> bool { return lhs->_row_pos < rhs->_row_pos; });
}
in_block = mutable_block.to_block();
@@ -353,16 +347,16 @@ Status MemTable::_sort_by_cluster_keys() {
row_pos_vec.data() +
in_block.rows(), &_column_offset);
}
-void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie&
tie,
- std::function<int(const RowInBlock*, const
RowInBlock*)> cmp) {
+void MemTable::_sort_one_column(std::vector<std::shared_ptr<RowInBlock>>&
row_in_blocks, Tie& tie,
+ std::function<int(RowInBlock*, RowInBlock*)>
cmp) {
auto iter = tie.iter();
while (iter.next()) {
- pdqsort(std::next(row_in_blocks.begin(), iter.left()),
- std::next(row_in_blocks.begin(), iter.right()),
- [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs, rhs) < 0;
});
+ pdqsort(std::next(row_in_blocks.begin(),
static_cast<int>(iter.left())),
+ std::next(row_in_blocks.begin(),
static_cast<int>(iter.right())),
+ [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs.get(),
rhs.get()) < 0; });
tie[iter.left()] = 0;
- for (int i = iter.left() + 1; i < iter.right(); i++) {
- tie[i] = (cmp(row_in_blocks[i - 1], row_in_blocks[i]) == 0);
+ for (auto i = iter.left() + 1; i < iter.right(); i++) {
+ tie[i] = (cmp(row_in_blocks[i - 1].get(), row_in_blocks[i].get())
== 0);
}
}
}
@@ -423,14 +417,14 @@ void MemTable::_aggregate() {
vectorized::MutableBlock::build_mutable_block(&in_block);
_vec_row_comparator->set_block(&mutable_block);
auto& block_data = in_block.get_columns_with_type_and_name();
- std::vector<RowInBlock*> temp_row_in_blocks;
+ std::vector<std::shared_ptr<RowInBlock>> temp_row_in_blocks;
temp_row_in_blocks.reserve(_last_sorted_pos);
RowInBlock* prev_row = nullptr;
int row_pos = -1;
//only init agg if needed
- for (int i = 0; i < _row_in_blocks.size(); i++) {
- if (!temp_row_in_blocks.empty() &&
- (*_vec_row_comparator)(prev_row, _row_in_blocks[i]) == 0) {
+ for (const auto& row_ptr : _row_in_blocks) {
+ RowInBlock* current_row = row_ptr.get();
+ if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row,
current_row) == 0) {
if (!prev_row->has_init_agg()) {
prev_row->init_agg_places(
_arena->aligned_alloc(_total_size_of_aggregate_states,
16),
@@ -445,20 +439,20 @@ void MemTable::_aggregate() {
}
}
_stat.merged_rows++;
- _aggregate_two_row_in_block(mutable_block, _row_in_blocks[i],
prev_row);
+ _aggregate_two_row_in_block(mutable_block, current_row, prev_row);
} else {
- prev_row = _row_in_blocks[i];
+ prev_row = current_row;
if (!temp_row_in_blocks.empty()) {
// no more rows to merge for prev row, finalize it
- _finalize_one_row<is_final>(temp_row_in_blocks.back(),
block_data, row_pos);
+ _finalize_one_row<is_final>(temp_row_in_blocks.back().get(),
block_data, row_pos);
}
- temp_row_in_blocks.push_back(prev_row);
+ temp_row_in_blocks.push_back(row_ptr);
row_pos++;
}
}
if (!temp_row_in_blocks.empty()) {
- // finalize the last low
- _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data,
row_pos);
+ // finalize the last row
+ _finalize_one_row<is_final>(temp_row_in_blocks.back().get(),
block_data, row_pos);
}
if constexpr (!is_final) {
// if is not final, we collect the agg results to input_block and then
continue to insert
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 4ae92c2d2d8..1e08891f3f8 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -250,8 +250,8 @@ private:
//return number of same keys
size_t _sort();
Status _sort_by_cluster_keys();
- void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
- std::function<int(const RowInBlock*, const
RowInBlock*)> cmp);
+ void _sort_one_column(std::vector<std::shared_ptr<RowInBlock>>&
row_in_blocks, Tie& tie,
+ std::function<int(RowInBlock*, RowInBlock*)> cmp);
template <bool is_final>
void _finalize_one_row(RowInBlock* row, const
vectorized::ColumnsWithTypeAndName& block_data,
int row_pos);
@@ -264,7 +264,7 @@ private:
std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
std::vector<size_t> _offsets_of_aggregate_states;
size_t _total_size_of_aggregate_states;
- std::vector<RowInBlock*> _row_in_blocks;
+ std::vector<std::shared_ptr<RowInBlock>> _row_in_blocks;
size_t _num_columns;
int32_t _seq_col_idx_in_block = -1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]