This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch vectorized
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 8b8210433eeeb0fbfc20b39520188d2e23892767
Author: thinker <zchw...@qq.com>
AuthorDate: Mon Jan 10 20:28:21 2022 +0800

    [Vectorized] (olap) Optimize BlockReader's performance (#7642)
    
    Co-authored-by: zuochunwei <zuochun...@meituan.com>
---
 be/src/vec/olap/block_reader.cpp | 51 +++++++++++++++++-----------------------
 be/src/vec/olap/block_reader.h   |  9 +++----
 2 files changed, 24 insertions(+), 36 deletions(-)

diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 8e2d4b2..ef3ba3a 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -25,15 +25,8 @@
 #include "runtime/mem_tracker.h"
 #include "vec/olap/vcollect_iterator.h"
 
-using std::nothrow;
-using std::set;
-using std::vector;
-
 namespace doris::vectorized {
 
-BlockReader::BlockReader()
-        : _collect_iter(new VCollectIterator()), _next_row {nullptr, -1, 
false} {}
-
 BlockReader::~BlockReader() {
     for (int i = 0; i < _agg_functions.size(); ++i) {
         AggregateFunctionPtr function = _agg_functions[i];
@@ -45,7 +38,7 @@ BlockReader::~BlockReader() {
 
 OLAPStatus BlockReader::_init_collect_iter(const ReaderParams& read_params,
                                            std::vector<RowsetReaderSharedPtr>* 
valid_rs_readers) {
-    _collect_iter->init(this);
+    _vcollect_iter.init(this);
     std::vector<RowsetReaderSharedPtr> rs_readers;
     auto res = _capture_rs_readers(read_params, &rs_readers);
     if (res != OLAP_SUCCESS) {
@@ -59,7 +52,7 @@ OLAPStatus BlockReader::_init_collect_iter(const 
ReaderParams& read_params,
 
     for (auto& rs_reader : rs_readers) {
         RETURN_NOT_OK(rs_reader->init(&_reader_context));
-        OLAPStatus res = _collect_iter->add_child(rs_reader);
+        OLAPStatus res = _vcollect_iter.add_child(rs_reader);
         if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) {
             LOG(WARNING) << "failed to add child to iterator, err=" << res;
             return res;
@@ -69,9 +62,9 @@ OLAPStatus BlockReader::_init_collect_iter(const 
ReaderParams& read_params,
         }
     }
 
-    _collect_iter->build_heap(*valid_rs_readers);
-    if (_collect_iter->is_merge()) {
-        auto status = _collect_iter->current_row(&_next_row);
+    _vcollect_iter.build_heap(*valid_rs_readers);
+    if (_vcollect_iter.is_merge()) {
+        auto status = _vcollect_iter.current_row(&_next_row);
         _eof = status == OLAP_ERR_DATA_EOF;
     }
 
@@ -85,8 +78,9 @@ void BlockReader::_init_agg_state() {
     _stored_has_null_tag.resize(_stored_data_columns.size());
     _stored_has_string_tag.resize(_stored_data_columns.size());
 
+    auto& tablet_schema = tablet()->tablet_schema();
     for (auto idx : _agg_columns_idx) {
-        FieldAggregationMethod agg_method = 
tablet()->tablet_schema().column(idx).aggregation();
+        FieldAggregationMethod agg_method = 
tablet_schema.column(idx).aggregation();
         std::string agg_name =
                 TabletColumn::get_string_by_aggregation_type(agg_method) + 
agg_reader_suffix;
         std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
@@ -159,6 +153,7 @@ OLAPStatus BlockReader::init(const ReaderParams& 
read_params) {
         break;
     case KeysType::AGG_KEYS:
         _next_block_func = &BlockReader::_agg_key_next_block;
+        _init_agg_state();
         break;
     default:
         DCHECK(false) << "No next row function for type:" << 
tablet()->keys_type();
@@ -170,7 +165,7 @@ OLAPStatus BlockReader::init(const ReaderParams& 
read_params) {
 
 OLAPStatus BlockReader::_direct_next_block(Block* block, MemPool* mem_pool, 
ObjectPool* agg_pool,
                                            bool* eof) {
-    auto res = _collect_iter->next(block);
+    auto res = _vcollect_iter.next(block);
     if (UNLIKELY(res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF)) {
         return res;
     }
@@ -190,11 +185,6 @@ OLAPStatus BlockReader::_agg_key_next_block(Block* block, 
MemPool* mem_pool, Obj
         return OLAP_SUCCESS;
     }
 
-    if (!_agg_inited) {
-        _init_agg_state();
-        _agg_inited = true;
-    }
-
     auto target_block_row = 0;
     auto target_columns = block->mutate_columns();
 
@@ -203,7 +193,7 @@ OLAPStatus BlockReader::_agg_key_next_block(Block* block, 
MemPool* mem_pool, Obj
     _append_agg_data(target_columns);
 
     while (true) {
-        auto res = _collect_iter->next(&_next_row);
+        auto res = _vcollect_iter.next(&_next_row);
         if (UNLIKELY(res == OLAP_ERR_DATA_EOF)) {
             *eof = true;
             break;
@@ -251,7 +241,7 @@ OLAPStatus BlockReader::_unique_key_next_block(Block* 
block, MemPool* mem_pool,
         // the version is in reverse order, the first row is the highest 
version,
         // in UNIQUE_KEY highest version is the final result, there is no need 
to
         // merge the lower versions
-        auto res = _collect_iter->next(&_next_row);
+        auto res = _vcollect_iter.next(&_next_row);
         if (UNLIKELY(res == OLAP_ERR_DATA_EOF)) {
             *eof = true;
             break;
@@ -268,9 +258,9 @@ OLAPStatus BlockReader::_unique_key_next_block(Block* 
block, MemPool* mem_pool,
 }
 
 void BlockReader::_insert_data_normal(MutableColumns& columns) {
+    auto block = _next_row.block;
     for (auto idx : _normal_columns_idx) {
-        columns[_return_columns_loc[idx]]->insert_from(
-                *_next_row.block->get_by_position(idx).column, 
_next_row.row_pos);
+        
columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column,
 _next_row.row_pos);
     }
 }
 
@@ -279,7 +269,7 @@ void BlockReader::_append_agg_data(MutableColumns& columns) 
{
     _last_agg_data_counter++;
 
     // execute aggregate when have `batch_size` column or some ref invalid soon
-    bool is_last = (_stored_row_ref.back().block->rows() == 
_stored_row_ref.back().row_pos + 1);
+    bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
     if (_stored_row_ref.size() == _batch_size || is_last) {
         _update_agg_data(columns);
     }
@@ -314,23 +304,24 @@ void BlockReader::_copy_agg_data() {
     phmap::flat_hash_map<const Block*, std::vector<std::pair<int16_t, 
int16_t>>> temp_ref_map;
 
     for (int i = 0; i < _stored_row_ref.size(); i++) {
-        auto ref = _stored_row_ref[i];
-        temp_ref_map[ref.block].push_back({ref.row_pos, i});
+        auto& ref = _stored_row_ref[i];
+        temp_ref_map[ref.block].emplace_back(ref.row_pos, i);
     }
 
     for (auto idx : _agg_columns_idx) {
+        auto& dst_column = _stored_data_columns[idx];
         if (_stored_has_string_tag[idx]) {
             //string type should replace ordered
             for (int i = 0; i < _stored_row_ref.size(); i++) {
-                auto ref = _stored_row_ref[i];
-                _stored_data_columns[idx]->replace_column_data(
+                auto& ref = _stored_row_ref[i];
+                dst_column->replace_column_data(
                         *ref.block->get_by_position(idx).column, ref.row_pos, 
i);
             }
         } else {
             for (auto& it : temp_ref_map) {
+                auto& src_column = *it.first->get_by_position(idx).column;
                 for (auto& pos : it.second) {
-                    _stored_data_columns[idx]->replace_column_data(
-                            *it.first->get_by_position(idx).column, pos.first, 
pos.second);
+                    dst_column->replace_column_data(src_column, pos.first, 
pos.second);
                 }
             }
         }
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index c8c566f..9199072 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -34,8 +34,6 @@ namespace vectorized {
 
 class BlockReader final : public Reader {
 public:
-    BlockReader();
-
     ~BlockReader();
 
     // Initialize BlockReader with tablet, data version and fetch range.
@@ -87,8 +85,8 @@ private:
 
     void _update_agg_value(MutableColumns& columns, int begin, int end, bool 
is_close = true);
 
-    std::unique_ptr<VCollectIterator> _collect_iter;
-    IteratorRowRef _next_row;
+    VCollectIterator _vcollect_iter;
+    IteratorRowRef _next_row{nullptr, -1, false};
 
     std::vector<AggregateFunctionPtr> _agg_functions;
     std::vector<AggregateDataPtr> _agg_places;
@@ -97,7 +95,7 @@ private:
     std::vector<int> _agg_columns_idx;
     std::vector<int> _return_columns_loc;
 
-    int _batch_size;
+    int _batch_size = 0;
 
     std::vector<int> _agg_data_counters;
     int _last_agg_data_counter = 0;
@@ -110,7 +108,6 @@ private:
     std::vector<bool> _stored_has_string_tag;
 
     bool _eof = false;
-    bool _agg_inited = false;
 
     OLAPStatus (BlockReader::*_next_block_func)(Block* block, MemPool* 
mem_pool,
                                                 ObjectPool* agg_pool, bool* 
eof) = nullptr;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to