This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch vectorized in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 8b8210433eeeb0fbfc20b39520188d2e23892767 Author: thinker <zchw...@qq.com> AuthorDate: Mon Jan 10 20:28:21 2022 +0800 [Vectorized] (olap) Optimize BlockReader's performance (#7642) Co-authored-by: zuochunwei <zuochun...@meituan.com> --- be/src/vec/olap/block_reader.cpp | 51 +++++++++++++++++----------------------- be/src/vec/olap/block_reader.h | 9 +++---- 2 files changed, 24 insertions(+), 36 deletions(-) diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 8e2d4b2..ef3ba3a 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -25,15 +25,8 @@ #include "runtime/mem_tracker.h" #include "vec/olap/vcollect_iterator.h" -using std::nothrow; -using std::set; -using std::vector; - namespace doris::vectorized { -BlockReader::BlockReader() - : _collect_iter(new VCollectIterator()), _next_row {nullptr, -1, false} {} - BlockReader::~BlockReader() { for (int i = 0; i < _agg_functions.size(); ++i) { AggregateFunctionPtr function = _agg_functions[i]; @@ -45,7 +38,7 @@ BlockReader::~BlockReader() { OLAPStatus BlockReader::_init_collect_iter(const ReaderParams& read_params, std::vector<RowsetReaderSharedPtr>* valid_rs_readers) { - _collect_iter->init(this); + _vcollect_iter.init(this); std::vector<RowsetReaderSharedPtr> rs_readers; auto res = _capture_rs_readers(read_params, &rs_readers); if (res != OLAP_SUCCESS) { @@ -59,7 +52,7 @@ OLAPStatus BlockReader::_init_collect_iter(const ReaderParams& read_params, for (auto& rs_reader : rs_readers) { RETURN_NOT_OK(rs_reader->init(&_reader_context)); - OLAPStatus res = _collect_iter->add_child(rs_reader); + OLAPStatus res = _vcollect_iter.add_child(rs_reader); if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) { LOG(WARNING) << "failed to add child to iterator, err=" << res; return res; @@ -69,9 +62,9 @@ OLAPStatus BlockReader::_init_collect_iter(const ReaderParams& read_params, } } - _collect_iter->build_heap(*valid_rs_readers); - if (_collect_iter->is_merge()) { - auto status = _collect_iter->current_row(&_next_row); + _vcollect_iter.build_heap(*valid_rs_readers); + if (_vcollect_iter.is_merge()) { + auto status = _vcollect_iter.current_row(&_next_row); _eof = status == OLAP_ERR_DATA_EOF; } @@ -85,8 +78,9 @@ void BlockReader::_init_agg_state() { _stored_has_null_tag.resize(_stored_data_columns.size()); _stored_has_string_tag.resize(_stored_data_columns.size()); + auto& tablet_schema = tablet()->tablet_schema(); for (auto idx : _agg_columns_idx) { - FieldAggregationMethod agg_method = tablet()->tablet_schema().column(idx).aggregation(); + FieldAggregationMethod agg_method = tablet_schema.column(idx).aggregation(); std::string agg_name = TabletColumn::get_string_by_aggregation_type(agg_method) + agg_reader_suffix; std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), @@ -159,6 +153,7 @@ OLAPStatus BlockReader::init(const ReaderParams& read_params) { break; case KeysType::AGG_KEYS: _next_block_func = &BlockReader::_agg_key_next_block; + _init_agg_state(); break; default: DCHECK(false) << "No next row function for type:" << tablet()->keys_type(); @@ -170,7 +165,7 @@ OLAPStatus BlockReader::init(const ReaderParams& read_params) { OLAPStatus BlockReader::_direct_next_block(Block* block, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof) { - auto res = _collect_iter->next(block); + auto res = _vcollect_iter.next(block); if (UNLIKELY(res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF)) { return res; } @@ -190,11 +185,6 @@ OLAPStatus BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, Obj return OLAP_SUCCESS; } - if (!_agg_inited) { - _init_agg_state(); - _agg_inited = true; - } - auto target_block_row = 0; auto target_columns = block->mutate_columns(); @@ -203,7 +193,7 @@ OLAPStatus BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, Obj _append_agg_data(target_columns); while (true) { - auto res = _collect_iter->next(&_next_row); + auto res = _vcollect_iter.next(&_next_row); if (UNLIKELY(res == OLAP_ERR_DATA_EOF)) { *eof = true; break; @@ -251,7 +241,7 @@ OLAPStatus BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, // the version is in reverse order, the first row is the highest version, // in UNIQUE_KEY highest version is the final result, there is no need to // merge the lower versions - auto res = _collect_iter->next(&_next_row); + auto res = _vcollect_iter.next(&_next_row); if (UNLIKELY(res == OLAP_ERR_DATA_EOF)) { *eof = true; break; @@ -268,9 +258,9 @@ OLAPStatus BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, } void BlockReader::_insert_data_normal(MutableColumns& columns) { + auto block = _next_row.block; for (auto idx : _normal_columns_idx) { - columns[_return_columns_loc[idx]]->insert_from( - *_next_row.block->get_by_position(idx).column, _next_row.row_pos); + columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column, _next_row.row_pos); } } @@ -279,7 +269,7 @@ void BlockReader::_append_agg_data(MutableColumns& columns) { _last_agg_data_counter++; // execute aggregate when have `batch_size` column or some ref invalid soon - bool is_last = (_stored_row_ref.back().block->rows() == _stored_row_ref.back().row_pos + 1); + bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1); if (_stored_row_ref.size() == _batch_size || is_last) { _update_agg_data(columns); } @@ -314,23 +304,24 @@ void BlockReader::_copy_agg_data() { phmap::flat_hash_map<const Block*, std::vector<std::pair<int16_t, int16_t>>> temp_ref_map; for (int i = 0; i < _stored_row_ref.size(); i++) { - auto ref = _stored_row_ref[i]; - temp_ref_map[ref.block].push_back({ref.row_pos, i}); + auto& ref = _stored_row_ref[i]; + temp_ref_map[ref.block].emplace_back(ref.row_pos, i); } for (auto idx : _agg_columns_idx) { + auto& dst_column = _stored_data_columns[idx]; if (_stored_has_string_tag[idx]) { //string type should replace ordered for (int i = 0; i < _stored_row_ref.size(); i++) { - auto ref = _stored_row_ref[i]; - _stored_data_columns[idx]->replace_column_data( + auto& ref = _stored_row_ref[i]; + dst_column->replace_column_data( *ref.block->get_by_position(idx).column, ref.row_pos, i); } } else { for (auto& it : temp_ref_map) { + auto& src_column = *it.first->get_by_position(idx).column; for (auto& pos : it.second) { - _stored_data_columns[idx]->replace_column_data( - *it.first->get_by_position(idx).column, pos.first, pos.second); + dst_column->replace_column_data(src_column, pos.first, pos.second); } } } diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index c8c566f..9199072 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -34,8 +34,6 @@ namespace vectorized { class BlockReader final : public Reader { public: - BlockReader(); - ~BlockReader(); // Initialize BlockReader with tablet, data version and fetch range. @@ -87,8 +85,8 @@ private: void _update_agg_value(MutableColumns& columns, int begin, int end, bool is_close = true); - std::unique_ptr<VCollectIterator> _collect_iter; - IteratorRowRef _next_row; + VCollectIterator _vcollect_iter; + IteratorRowRef _next_row{nullptr, -1, false}; std::vector<AggregateFunctionPtr> _agg_functions; std::vector<AggregateDataPtr> _agg_places; @@ -97,7 +95,7 @@ private: std::vector<int> _agg_columns_idx; std::vector<int> _return_columns_loc; - int _batch_size; + int _batch_size = 0; std::vector<int> _agg_data_counters; int _last_agg_data_counter = 0; @@ -110,7 +108,6 @@ private: std::vector<bool> _stored_has_string_tag; bool _eof = false; - bool _agg_inited = false; OLAPStatus (BlockReader::*_next_block_func)(Block* block, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof) = nullptr; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org