This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 51abaa8 [fix](vec) Fix some bugs about vec engine (#7884) 51abaa8 is described below commit 51abaa89f3d828dfdb8e6dfeef0d1424e28cdf05 Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Feb 3 19:21:17 2022 +0800 [fix](vec) Fix some bugs about vec engine (#7884) 1. mem leak in vcollector iter 2. query slow in agg table limit 10 3. query slow in SSB q4,q5,q6 --- be/src/exec/olap_scan_node.h | 3 ++- be/src/exec/olap_scanner.cpp | 3 +++ be/src/olap/reader.cpp | 1 + be/src/olap/reader.h | 5 ++++ be/src/olap/rowset/beta_rowset_reader.cpp | 33 ++++++++++++----------- be/src/olap/rowset/rowset_reader_context.h | 2 ++ be/src/olap/storage_engine.cpp | 10 +++---- be/src/olap/tablet_schema.cpp | 3 ++- be/src/vec/columns/column_string.cpp | 1 + be/src/vec/exec/volap_scan_node.cpp | 42 ++++++++++++++++++------------ be/src/vec/exec/volap_scanner.cpp | 6 +++++ be/src/vec/exec/volap_scanner.h | 10 +++++-- be/src/vec/olap/block_reader.cpp | 9 ++++--- be/src/vec/olap/block_reader.h | 3 --- be/src/vec/olap/vcollect_iterator.cpp | 1 + 15 files changed, 85 insertions(+), 47 deletions(-) diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index d57a92d..82e98d5 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -160,7 +160,7 @@ protected: RuntimeProfile* profile); friend class OlapScanner; - friend class doris::vectorized::VOlapScanner; + friend class vectorized::VOlapScanner; // Tuple id resolved in prepare() to set _tuple_desc; TupleId _tuple_id; @@ -239,6 +239,7 @@ protected: SpinLock _status_mutex; Status _status; RuntimeState* _runtime_state; + RuntimeProfile::Counter* _scan_timer; RuntimeProfile::Counter* _scan_cpu_timer = nullptr; RuntimeProfile::Counter* _tablet_counter; diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index a1efc1d..d7dc839 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -59,6 +59,9 @@ Status OlapScanner::prepare( const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters) { set_tablet_reader(); + // set limit to reduce end of rowset and segment mem use + _tablet_reader->set_batch_size(_parent->limit() == -1 ? _parent->_runtime_state->batch_size() : std::min( + static_cast<int64_t>(_parent->_runtime_state->batch_size()), _parent->limit())); // Get olap table TTabletId tablet_id = scan_range.tablet_id; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 13e50b8..4deda90 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -222,6 +222,7 @@ OLAPStatus TabletReader::_capture_rs_readers(const ReaderParams& read_params, _reader_context.runtime_state = read_params.runtime_state; _reader_context.use_page_cache = read_params.use_page_cache; _reader_context.sequence_id_idx = _sequence_col_idx; + _reader_context.batch_size = _batch_size; *valid_rs_readers = *rs_readers; diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 82cd7ff..3137e06 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -133,6 +133,10 @@ public: _stats.rows_vec_del_cond_filtered; } + void set_batch_size(int batch_size) { + _batch_size = batch_size; + } + const OlapReaderStatistics& stats() const { return _stats; } OlapReaderStatistics* mutable_stats() { return &_stats; } @@ -210,6 +214,7 @@ protected: bool _filter_delete = false; int32_t _sequence_col_idx = -1; bool _direct_mode = false; + int _batch_size = 1024; CollectIterator _collect_iter; std::vector<uint32_t> _key_cids; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 4d35f2f..263a4cc 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -131,20 +131,23 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { _iterator.reset(final_iterator); // init input block - _input_block.reset(new RowBlockV2(schema, 1024, _parent_tracker)); - - // init input/output block and row - _output_block.reset(new RowBlock(read_context->tablet_schema, _parent_tracker)); - - RowBlockInfo output_block_info; - output_block_info.row_num = 1024; - output_block_info.null_supported = true; - // the output block's schema should be seek_columns to conform to v1 - // TODO(hkp): this should be optimized to use return_columns - output_block_info.column_ids = *(_context->seek_columns); - _output_block->init(output_block_info); - _row.reset(new RowCursor()); - RETURN_NOT_OK(_row->init(*(read_context->tablet_schema), *(_context->seek_columns))); + _input_block.reset(new RowBlockV2(schema, + std::min(1024, read_context->batch_size), _parent_tracker)); + + if (!read_context->is_vec) { + // init input/output block and row + _output_block.reset(new RowBlock(read_context->tablet_schema, _parent_tracker)); + + RowBlockInfo output_block_info; + output_block_info.row_num = std::min(1024, read_context->batch_size); + output_block_info.null_supported = true; + // the output block's schema should be seek_columns to conform to v1 + // TODO(hkp): this should be optimized to use return_columns + output_block_info.column_ids = *(_context->seek_columns); + _output_block->init(output_block_info); + _row.reset(new RowCursor()); + RETURN_NOT_OK(_row->init(*(read_context->tablet_schema), *(_context->seek_columns))); + } return OLAP_SUCCESS; } @@ -211,7 +214,7 @@ OLAPStatus BetaRowsetReader::next_block(vectorized::Block* block) { } } is_first = false; - } while (block->rows() < _context->runtime_state->batch_size()); // here we should keep block.rows() < batch_size + } while (block->rows() < _context->batch_size); // here we should keep block.rows() < batch_size return OLAP_SUCCESS; } diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index cc98419..07d9340 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -61,6 +61,8 @@ struct RowsetReaderContext { RuntimeState* runtime_state = nullptr; bool use_page_cache = false; int sequence_id_idx = -1; + int batch_size = 1024; + bool is_vec = false; }; } // namespace doris diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index aa1af9c..aeac350 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -570,11 +570,11 @@ void StorageEngine::stop() { THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread); #undef THREAD_JOIN -#define THREADS_JOIN(threads) \ - for (const auto& thread : threads) { \ - if (thread) { \ - thread->join(); \ - } \ +#define THREADS_JOIN(threads) \ + for (const auto& thread : threads) {\ + if (thread) { \ + thread->join(); \ + } \ } THREADS_JOIN(_path_gc_threads); diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 5b407d5..9ad710d 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -497,7 +497,8 @@ vectorized::Block TabletSchema::create_block(const std::vector<uint32_t>& return for (int i = 0; i < return_columns.size(); ++i) { const auto& col = _cols[return_columns[i]]; auto data_type = vectorized::IDataType::from_olap_engine(col.type(), col.is_nullable()); - block.insert({data_type->create_column(), data_type, col.name()}); + auto column = data_type->create_column(); + block.insert({std::move(column), data_type, col.name()}); } return block; } diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index afd3f23..9ebf879 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -323,6 +323,7 @@ void ColumnString::replicate(const uint32_t* counts, size_t target_size, IColumn void ColumnString::reserve(size_t n) { offsets.reserve(n); + chars.reserve(n); } void ColumnString::resize(size_t n) { diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index b365c1d..77f0213 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -62,12 +62,19 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { _total_assign_num = 0; _nice = 18 + std::max(0, 2 - (int)_volap_scanners.size() / 5); - auto block_per_scanner = (config::doris_scanner_row_num + (state->batch_size() - 1)) / state->batch_size(); - for (int i = 0; i < _volap_scanners.size() * block_per_scanner; ++i) { + auto doris_scanner_row_num = _limit == -1 ? config::doris_scanner_row_num : + std::min(static_cast<int64_t>(config::doris_scanner_row_num), _limit); + auto block_size = _limit == -1 ? state->batch_size() : + std::min(static_cast<int64_t>(state->batch_size()), _limit); + auto block_per_scanner = (doris_scanner_row_num + (block_size - 1)) / block_size; + auto pre_block_count = + std::min(_volap_scanners.size(), static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) * block_per_scanner; + + for (int i = 0; i < pre_block_count; ++i) { auto block = new Block; for (const auto slot_desc : _tuple_desc->slots()) { auto column_ptr = slot_desc->get_empty_mutable_column(); - column_ptr->reserve(state->batch_size()); + column_ptr->reserve(block_size); block->insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); @@ -240,16 +247,11 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { _scan_blocks.insert(_scan_blocks.end(), blocks.begin(), blocks.end()); } // If eos is true, we will process out of this lock block. - if (!eos) { - std::lock_guard<std::mutex> l(_volap_scanners_lock); - _volap_scanners.push_front(scanner); - } + if (eos) { scanner->mark_to_need_to_close(); } + std::lock_guard<std::mutex> l(_volap_scanners_lock); + _volap_scanners.push_front(scanner); } if (eos) { - // close out of blocks lock. we do this before _progress update - // that can assure this object can keep live before we finish. - scanner->close(_runtime_state); - std::lock_guard<std::mutex> l(_scan_blocks_lock); _progress.update(1); if (_progress.done()) { @@ -520,18 +522,26 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per size_t thread_slot_num = 0; { std::lock_guard<std::mutex> l(_free_blocks_lock); - thread_slot_num = (_free_blocks.size() - (assigned_thread_num * block_per_scanner)) / block_per_scanner; + thread_slot_num = _free_blocks.size() / block_per_scanner; + thread_slot_num += (_free_blocks.size() % block_per_scanner != 0); if (thread_slot_num == 0) thread_slot_num++; } { std::lock_guard<std::mutex> l(_volap_scanners_lock); thread_slot_num = std::min(thread_slot_num, _volap_scanners.size()); - for (int i = 0; i < thread_slot_num; ++i) { - olap_scanners.push_back(_volap_scanners.front()); + for (int i = 0; i < thread_slot_num && !_volap_scanners.empty();) { + auto scanner = _volap_scanners.front(); _volap_scanners.pop_front(); - _running_thread++; - assigned_thread_num++; + + if (scanner->need_to_close()) + scanner->close(state); + else { + olap_scanners.push_back(scanner); + _running_thread++; + assigned_thread_num++; + i++; + } } } } diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index 1b4bb02..7b5b31e 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -19,6 +19,8 @@ #include <memory> +#include "runtime/runtime_state.h" + #include "vec/columns/column_complex.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" @@ -69,6 +71,10 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo return Status::OK(); } +void VOlapScanner::set_tablet_reader() { + _tablet_reader = std::make_unique<BlockReader>(); +} + void VOlapScanner::_convert_row_to_block(std::vector<vectorized::MutableColumnPtr>* columns) { size_t slots_size = _query_slots.size(); for (int i = 0; i < slots_size; ++i) { diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h index 5efaf9d..0c1c4ad 100644 --- a/be/src/vec/exec/volap_scanner.h +++ b/be/src/vec/exec/volap_scanner.h @@ -36,19 +36,25 @@ public: bool need_agg_finalize, const TPaloScanRange& scan_range); Status get_block(RuntimeState* state, vectorized::Block* block, bool* eof); - Status get_batch(RuntimeState* state, RowBatch* row_batch, bool* eos) { + + Status get_batch(RuntimeState* state, RowBatch* row_batch, bool* eos) override { return Status::NotSupported("Not Implemented VOlapScanNode Node::get_next scalar"); } VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; } + void mark_to_need_to_close() { _need_to_close = true; } + + bool need_to_close() { return _need_to_close; } + protected: - virtual void set_tablet_reader() { _tablet_reader = std::make_unique<BlockReader>(); } + virtual void set_tablet_reader() override; private: // TODO: Remove this function after we finish reader vec void _convert_row_to_block(std::vector<vectorized::MutableColumnPtr>* columns); VExprContext* _vconjunct_ctx = nullptr; + bool _need_to_close = false; }; } // namespace vectorized diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 8fda9d6..37f3f7b 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -50,6 +50,8 @@ OLAPStatus BlockReader::_init_collect_iter(const ReaderParams& read_params, return res; } + _reader_context.batch_size = _batch_size; + _reader_context.is_vec = true; for (auto& rs_reader : rs_readers) { RETURN_NOT_OK(rs_reader->init(&_reader_context)); OLAPStatus res = _vcollect_iter.add_child(rs_reader); @@ -76,8 +78,8 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) { return; } - _stored_data_block = _next_row.block->create_same_struct_block(_batch_size); - _stored_data_columns = _stored_data_block->mutate_columns(); + _stored_data_columns = + _next_row.block->create_same_struct_block(_batch_size)->mutate_columns(); _stored_has_null_tag.resize(_stored_data_columns.size()); _stored_has_string_tag.resize(_stored_data_columns.size()); @@ -102,7 +104,6 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) { _next_row.block->get_data_type(idx)->is_nullable()); DCHECK(function != nullptr); _agg_functions.push_back(function); - // create aggregate data AggregateDataPtr place = new char[function->size_of_data()]; function->create(place); @@ -120,7 +121,6 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) { OLAPStatus BlockReader::init(const ReaderParams& read_params) { TabletReader::init(read_params); - _batch_size = read_params.runtime_state->batch_size(); auto return_column_size = read_params.origin_return_columns->size() - (_sequence_col_idx != -1 ? 1 : 0); @@ -231,6 +231,7 @@ OLAPStatus BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, Obj _merged_rows += target_block_row; return OLAP_SUCCESS; } + OLAPStatus BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof) { if (UNLIKELY(_eof)) { diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index b1bc7e8..e03706f 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -95,12 +95,9 @@ private: std::vector<int> _agg_columns_idx; std::vector<int> _return_columns_loc; - int _batch_size = 0; - std::vector<int> _agg_data_counters; int _last_agg_data_counter = 0; - std::unique_ptr<Block> _stored_data_block; MutableColumns _stored_data_columns; std::vector<IteratorRowRef> _stored_row_ref; diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 682a9ab..7efd200 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -54,6 +54,7 @@ void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers for (auto [c_iter, r_iter] = std::pair {_children.begin(), rs_readers.begin()}; c_iter != _children.end();) { if ((*c_iter)->init() != OLAP_SUCCESS) { + delete (*c_iter); c_iter = _children.erase(c_iter); r_iter = rs_readers.erase(r_iter); } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org