This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 51abaa8  [fix](vec) Fix some bugs about vec engine (#7884)
51abaa8 is described below

commit 51abaa89f3d828dfdb8e6dfeef0d1424e28cdf05
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Thu Feb 3 19:21:17 2022 +0800

    [fix](vec) Fix some bugs about vec engine (#7884)
    
    1. mem leak in vcollector iter
    2. query slow in agg table limit 10
    3. query slow in SSB q4,q5,q6
---
 be/src/exec/olap_scan_node.h               |  3 ++-
 be/src/exec/olap_scanner.cpp               |  3 +++
 be/src/olap/reader.cpp                     |  1 +
 be/src/olap/reader.h                       |  5 ++++
 be/src/olap/rowset/beta_rowset_reader.cpp  | 33 ++++++++++++-----------
 be/src/olap/rowset/rowset_reader_context.h |  2 ++
 be/src/olap/storage_engine.cpp             | 10 +++----
 be/src/olap/tablet_schema.cpp              |  3 ++-
 be/src/vec/columns/column_string.cpp       |  1 +
 be/src/vec/exec/volap_scan_node.cpp        | 42 ++++++++++++++++++------------
 be/src/vec/exec/volap_scanner.cpp          |  6 +++++
 be/src/vec/exec/volap_scanner.h            | 10 +++++--
 be/src/vec/olap/block_reader.cpp           |  9 ++++---
 be/src/vec/olap/block_reader.h             |  3 ---
 be/src/vec/olap/vcollect_iterator.cpp      |  1 +
 15 files changed, 85 insertions(+), 47 deletions(-)

diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index d57a92d..82e98d5 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -160,7 +160,7 @@ protected:
                             RuntimeProfile* profile);
 
     friend class OlapScanner;
-    friend class doris::vectorized::VOlapScanner;
+    friend class vectorized::VOlapScanner;
 
     // Tuple id resolved in prepare() to set _tuple_desc;
     TupleId _tuple_id;
@@ -239,6 +239,7 @@ protected:
     SpinLock _status_mutex;
     Status _status;
     RuntimeState* _runtime_state;
+
     RuntimeProfile::Counter* _scan_timer;
     RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
     RuntimeProfile::Counter* _tablet_counter;
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index a1efc1d..d7dc839 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -59,6 +59,9 @@ Status OlapScanner::prepare(
         const std::vector<std::pair<string, 
std::shared_ptr<IBloomFilterFuncBase>>>&
                 bloom_filters) {
     set_tablet_reader();
+    // set limit to reduce end of rowset and segment mem use
+    _tablet_reader->set_batch_size(_parent->limit() == -1 ? 
_parent->_runtime_state->batch_size() : std::min(
+            static_cast<int64_t>(_parent->_runtime_state->batch_size()), 
_parent->limit()));
 
     // Get olap table
     TTabletId tablet_id = scan_range.tablet_id;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 13e50b8..4deda90 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -222,6 +222,7 @@ OLAPStatus TabletReader::_capture_rs_readers(const 
ReaderParams& read_params,
     _reader_context.runtime_state = read_params.runtime_state;
     _reader_context.use_page_cache = read_params.use_page_cache;
     _reader_context.sequence_id_idx = _sequence_col_idx;
+    _reader_context.batch_size = _batch_size;
 
     *valid_rs_readers = *rs_readers;
 
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 82cd7ff..3137e06 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -133,6 +133,10 @@ public:
                _stats.rows_vec_del_cond_filtered;
     }
 
+    void set_batch_size(int batch_size) {
+        _batch_size = batch_size;
+    }
+
     const OlapReaderStatistics& stats() const { return _stats; }
     OlapReaderStatistics* mutable_stats() { return &_stats; }
 
@@ -210,6 +214,7 @@ protected:
     bool _filter_delete = false;
     int32_t _sequence_col_idx = -1;
     bool _direct_mode = false;
+    int _batch_size = 1024;
 
     CollectIterator _collect_iter;
     std::vector<uint32_t> _key_cids;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 4d35f2f..263a4cc 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -131,20 +131,23 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
     _iterator.reset(final_iterator);
 
     // init input block
-    _input_block.reset(new RowBlockV2(schema, 1024, _parent_tracker));
-
-    // init input/output block and row
-    _output_block.reset(new RowBlock(read_context->tablet_schema, 
_parent_tracker));
-
-    RowBlockInfo output_block_info;
-    output_block_info.row_num = 1024;
-    output_block_info.null_supported = true;
-    // the output block's schema should be seek_columns to conform to v1
-    // TODO(hkp): this should be optimized to use return_columns
-    output_block_info.column_ids = *(_context->seek_columns);
-    _output_block->init(output_block_info);
-    _row.reset(new RowCursor());
-    RETURN_NOT_OK(_row->init(*(read_context->tablet_schema), 
*(_context->seek_columns)));
+    _input_block.reset(new RowBlockV2(schema,
+            std::min(1024, read_context->batch_size), _parent_tracker));
+
+    if (!read_context->is_vec) {
+        // init input/output block and row
+        _output_block.reset(new RowBlock(read_context->tablet_schema, 
_parent_tracker));
+
+        RowBlockInfo output_block_info;
+        output_block_info.row_num = std::min(1024, read_context->batch_size);
+        output_block_info.null_supported = true;
+        // the output block's schema should be seek_columns to conform to v1
+        // TODO(hkp): this should be optimized to use return_columns
+        output_block_info.column_ids = *(_context->seek_columns);
+        _output_block->init(output_block_info);
+        _row.reset(new RowCursor());
+        RETURN_NOT_OK(_row->init(*(read_context->tablet_schema), 
*(_context->seek_columns)));
+    }
 
     return OLAP_SUCCESS;
 }
@@ -211,7 +214,7 @@ OLAPStatus BetaRowsetReader::next_block(vectorized::Block* 
block) {
             }
         }
         is_first = false;
-    } while (block->rows() < _context->runtime_state->batch_size()); // here 
we should keep block.rows() < batch_size
+    } while (block->rows() < _context->batch_size); // here we should keep 
block.rows() < batch_size
 
     return OLAP_SUCCESS;
 }
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index cc98419..07d9340 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -61,6 +61,8 @@ struct RowsetReaderContext {
     RuntimeState* runtime_state = nullptr;
     bool use_page_cache = false;
     int sequence_id_idx = -1;
+    int batch_size = 1024;
+    bool is_vec = false;
 };
 
 } // namespace doris
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index aa1af9c..aeac350 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -570,11 +570,11 @@ void StorageEngine::stop() {
     THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread);
 #undef THREAD_JOIN
 
-#define THREADS_JOIN(threads)            \
-    for (const auto& thread : threads) { \
-        if (thread) {                    \
-            thread->join();              \
-        }                                \
+#define THREADS_JOIN(threads)           \
+    for (const auto& thread : threads) {\
+        if (thread) {                   \
+            thread->join();             \
+        }                               \
     }
 
     THREADS_JOIN(_path_gc_threads);
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 5b407d5..9ad710d 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -497,7 +497,8 @@ vectorized::Block TabletSchema::create_block(const 
std::vector<uint32_t>& return
     for (int i = 0; i < return_columns.size(); ++i) {
         const auto& col = _cols[return_columns[i]];
         auto data_type = vectorized::IDataType::from_olap_engine(col.type(), 
col.is_nullable());
-        block.insert({data_type->create_column(), data_type, col.name()});
+        auto column = data_type->create_column();
+        block.insert({std::move(column), data_type, col.name()});
     }
     return block;
 }
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index afd3f23..9ebf879 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -323,6 +323,7 @@ void ColumnString::replicate(const uint32_t* counts, size_t 
target_size, IColumn
 
 void ColumnString::reserve(size_t n) {
     offsets.reserve(n);
+    chars.reserve(n);
 }
 
 void ColumnString::resize(size_t n) {
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index b365c1d..77f0213 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -62,12 +62,19 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
     _total_assign_num = 0;
     _nice = 18 + std::max(0, 2 - (int)_volap_scanners.size() / 5);
 
-    auto block_per_scanner = (config::doris_scanner_row_num + 
(state->batch_size() - 1)) / state->batch_size();
-    for (int i = 0; i < _volap_scanners.size() * block_per_scanner; ++i) {
+    auto doris_scanner_row_num = _limit == -1 ? config::doris_scanner_row_num :
+            std::min(static_cast<int64_t>(config::doris_scanner_row_num), 
_limit);
+    auto block_size = _limit == -1 ? state->batch_size() :
+            std::min(static_cast<int64_t>(state->batch_size()), _limit);
+    auto block_per_scanner = (doris_scanner_row_num + (block_size - 1)) / 
block_size;
+    auto pre_block_count =
+            std::min(_volap_scanners.size(), 
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) * 
block_per_scanner;
+
+    for (int i = 0; i < pre_block_count; ++i) {
         auto block = new Block;
         for (const auto slot_desc : _tuple_desc->slots()) {
             auto column_ptr = slot_desc->get_empty_mutable_column();
-            column_ptr->reserve(state->batch_size());
+            column_ptr->reserve(block_size);
             block->insert(ColumnWithTypeAndName(std::move(column_ptr),
                                                     
slot_desc->get_data_type_ptr(),
                                                     slot_desc->col_name()));
@@ -240,16 +247,11 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) 
{
             _scan_blocks.insert(_scan_blocks.end(), blocks.begin(), 
blocks.end());
         }
         // If eos is true, we will process out of this lock block.
-        if (!eos) {
-            std::lock_guard<std::mutex> l(_volap_scanners_lock);
-            _volap_scanners.push_front(scanner);
-        }
+        if (eos) { scanner->mark_to_need_to_close(); }
+        std::lock_guard<std::mutex> l(_volap_scanners_lock);
+        _volap_scanners.push_front(scanner);
     }
     if (eos) {
-        // close out of blocks lock. we do this before _progress update
-        // that can assure this object can keep live before we finish.
-        scanner->close(_runtime_state);
-
         std::lock_guard<std::mutex> l(_scan_blocks_lock);
         _progress.update(1);
         if (_progress.done()) {
@@ -520,18 +522,26 @@ int 
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
         size_t thread_slot_num = 0;
         {
             std::lock_guard<std::mutex> l(_free_blocks_lock);
-            thread_slot_num = (_free_blocks.size() - (assigned_thread_num * 
block_per_scanner)) / block_per_scanner;
+            thread_slot_num = _free_blocks.size() / block_per_scanner;
+            thread_slot_num += (_free_blocks.size() % block_per_scanner != 0);
             if (thread_slot_num == 0) thread_slot_num++;
         }
 
         {
             std::lock_guard<std::mutex> l(_volap_scanners_lock);
             thread_slot_num = std::min(thread_slot_num, 
_volap_scanners.size());
-            for (int i = 0; i < thread_slot_num; ++i) {
-                olap_scanners.push_back(_volap_scanners.front());
+            for (int i = 0; i < thread_slot_num && !_volap_scanners.empty();) {
+                auto scanner = _volap_scanners.front();
                 _volap_scanners.pop_front();
-                _running_thread++;
-                assigned_thread_num++;
+
+                if (scanner->need_to_close())
+                    scanner->close(state);
+                else {
+                    olap_scanners.push_back(scanner);
+                    _running_thread++;
+                    assigned_thread_num++;
+                    i++;
+                }
             }
         }
     }
diff --git a/be/src/vec/exec/volap_scanner.cpp 
b/be/src/vec/exec/volap_scanner.cpp
index 1b4bb02..7b5b31e 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -19,6 +19,8 @@
 
 #include <memory>
 
+#include "runtime/runtime_state.h"
+
 #include "vec/columns/column_complex.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_string.h"
@@ -69,6 +71,10 @@ Status VOlapScanner::get_block(RuntimeState* state, 
vectorized::Block* block, bo
     return Status::OK();
 }
 
+void VOlapScanner::set_tablet_reader() {
+    _tablet_reader = std::make_unique<BlockReader>();
+}
+
 void 
VOlapScanner::_convert_row_to_block(std::vector<vectorized::MutableColumnPtr>* 
columns) {
     size_t slots_size = _query_slots.size();
     for (int i = 0; i < slots_size; ++i) {
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index 5efaf9d..0c1c4ad 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -36,19 +36,25 @@ public:
                  bool need_agg_finalize, const TPaloScanRange& scan_range);
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eof);
-    Status get_batch(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+
+    Status get_batch(RuntimeState* state, RowBatch* row_batch, bool* eos) 
override {
         return Status::NotSupported("Not Implemented VOlapScanNode 
Node::get_next scalar");
     }
 
     VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; }
 
+    void mark_to_need_to_close() { _need_to_close = true; }
+
+    bool need_to_close() { return _need_to_close; }
+
 protected:
-    virtual void set_tablet_reader() { _tablet_reader = 
std::make_unique<BlockReader>(); }
+    virtual void set_tablet_reader() override;
 
 private:
     // TODO: Remove this function after we finish reader vec
     void _convert_row_to_block(std::vector<vectorized::MutableColumnPtr>* 
columns);
     VExprContext* _vconjunct_ctx = nullptr;
+    bool _need_to_close = false;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 8fda9d6..37f3f7b 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -50,6 +50,8 @@ OLAPStatus BlockReader::_init_collect_iter(const 
ReaderParams& read_params,
         return res;
     }
 
+    _reader_context.batch_size = _batch_size;
+    _reader_context.is_vec = true;
     for (auto& rs_reader : rs_readers) {
         RETURN_NOT_OK(rs_reader->init(&_reader_context));
         OLAPStatus res = _vcollect_iter.add_child(rs_reader);
@@ -76,8 +78,8 @@ void BlockReader::_init_agg_state(const ReaderParams& 
read_params) {
         return;
     }
 
-    _stored_data_block = 
_next_row.block->create_same_struct_block(_batch_size);
-    _stored_data_columns = _stored_data_block->mutate_columns();
+    _stored_data_columns =
+            
_next_row.block->create_same_struct_block(_batch_size)->mutate_columns();
 
     _stored_has_null_tag.resize(_stored_data_columns.size());
     _stored_has_string_tag.resize(_stored_data_columns.size());
@@ -102,7 +104,6 @@ void BlockReader::_init_agg_state(const ReaderParams& 
read_params) {
                 _next_row.block->get_data_type(idx)->is_nullable());
         DCHECK(function != nullptr);
         _agg_functions.push_back(function);
-
         // create aggregate data
         AggregateDataPtr place = new char[function->size_of_data()];
         function->create(place);
@@ -120,7 +121,6 @@ void BlockReader::_init_agg_state(const ReaderParams& 
read_params) {
 
 OLAPStatus BlockReader::init(const ReaderParams& read_params) {
     TabletReader::init(read_params);
-    _batch_size = read_params.runtime_state->batch_size();
 
     auto return_column_size =
             read_params.origin_return_columns->size() - (_sequence_col_idx != 
-1 ? 1 : 0);
@@ -231,6 +231,7 @@ OLAPStatus BlockReader::_agg_key_next_block(Block* block, 
MemPool* mem_pool, Obj
     _merged_rows += target_block_row;
     return OLAP_SUCCESS;
 }
+
 OLAPStatus BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool,
                                                ObjectPool* agg_pool, bool* 
eof) {
     if (UNLIKELY(_eof)) {
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index b1bc7e8..e03706f 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -95,12 +95,9 @@ private:
     std::vector<int> _agg_columns_idx;
     std::vector<int> _return_columns_loc;
 
-    int _batch_size = 0;
-
     std::vector<int> _agg_data_counters;
     int _last_agg_data_counter = 0;
 
-    std::unique_ptr<Block> _stored_data_block;
     MutableColumns _stored_data_columns;
     std::vector<IteratorRowRef> _stored_row_ref;
 
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index 682a9ab..7efd200 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -54,6 +54,7 @@ void 
VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers
         for (auto [c_iter, r_iter] = std::pair {_children.begin(), 
rs_readers.begin()};
              c_iter != _children.end();) {
             if ((*c_iter)->init() != OLAP_SUCCESS) {
+                delete (*c_iter);
                 c_iter = _children.erase(c_iter);
                 r_iter = rs_readers.erase(r_iter);
             } else {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to