This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 5d5928e4744cd65bb95dd4fd44a9f8e74953f43e Author: Pxl <952130...@qq.com> AuthorDate: Thu Apr 21 11:02:04 2022 +0800 [Bug][Storage-vectorized] fix code dump on outer join with not nullable column (#9112) --- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 60 ++++++---------------- be/src/olap/rowset/segment_v2/segment_iterator.h | 11 ++-- be/src/vec/core/block.cpp | 9 ++++ be/src/vec/core/block.h | 33 ++++++------ be/src/vec/exec/volap_scan_node.cpp | 51 +++++++++--------- be/src/vec/exec/volap_scan_node.h | 3 ++ 6 files changed, 76 insertions(+), 91 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 58774e2352..52cc584fdb 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -668,8 +668,10 @@ void SegmentIterator::_vec_init_lazy_materialization() { // todo(wb) make a cost-based lazy-materialization framework // check non-pred column type to decide whether using lazy-materialization FieldType type = _schema.column(cid)->type(); - if (_is_all_column_basic_type && (type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT - || type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_STRING)) { + if (_is_all_column_basic_type && + (type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT || + type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR || + type == OLAP_FIELD_TYPE_STRING)) { _is_all_column_basic_type = false; } } @@ -753,23 +755,7 @@ Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids, void SegmentIterator::_init_current_block( vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>& current_columns) { - bool is_block_mem_reuse = block->mem_reuse(); - if (is_block_mem_reuse) { - block->clear_column_data(_schema.num_column_ids()); - } else { // pre fill output block here - for (size_t i = 0; i < _schema.num_column_ids(); i++) { - auto cid = _schema.column_id(i); - auto column_desc = _schema.column(cid); - auto data_type = Schema::get_data_type_ptr(column_desc->type()); - if (column_desc->is_nullable()) { - block->insert({nullptr, - std::make_shared<vectorized::DataTypeNullable>(std::move(data_type)), - column_desc->name()}); - } else { - block->insert({nullptr, std::move(data_type), column_desc->name()}); - } - } - } + block->clear_column_data(_schema.num_column_ids()); for (size_t i = 0; i < _schema.num_column_ids(); i++) { auto cid = _schema.column_id(i); @@ -778,17 +764,8 @@ void SegmentIterator::_init_current_block( if (_is_pred_column[cid]) { //todo(wb) maybe we can release it after output block current_columns[cid]->clear(); } else { // non-predicate column - if (is_block_mem_reuse) { - current_columns[cid] = std::move(*block->get_by_position(i).column).mutate(); - } else { - auto data_type = Schema::get_data_type_ptr(column_desc->type()); - if (column_desc->is_nullable()) { - current_columns[cid] = doris::vectorized::ColumnNullable::create( - data_type->create_column(), doris::vectorized::ColumnUInt8::create()); - } else { - current_columns[cid] = data_type->create_column(); - } - } + current_columns[cid] = std::move(*block->get_by_position(i).column).mutate(); + if (column_desc->type() == OLAP_FIELD_TYPE_DATE) { current_columns[cid]->set_date_type(); } else if (column_desc->type() == OLAP_FIELD_TYPE_DATETIME) { @@ -799,7 +776,7 @@ void SegmentIterator::_init_current_block( } } -void SegmentIterator::_output_non_pred_columns(vectorized::Block* block, bool is_block_mem_reuse) { +void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) { for (auto cid : _non_predicate_columns) { block->replace_by_position(_schema_block_id_map[cid], std::move(_current_return_columns[cid])); @@ -919,6 +896,8 @@ void SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column Status SegmentIterator::next_batch(vectorized::Block* block) { bool is_mem_reuse = block->mem_reuse(); + DCHECK(is_mem_reuse); + SCOPED_RAW_TIMER(&_opts.stats->block_load_ns); if (UNLIKELY(!_inited)) { RETURN_IF_ERROR(_init(true)); @@ -953,24 +932,15 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // todo(wb) abstract make column where if (!_is_pred_column[cid]) { // non-predicate block->replace_by_position(i, std::move(_current_return_columns[cid])); - } else { // predicate - if (!is_mem_reuse) { - auto column_desc = _schema.column(cid); - auto data_type = Schema::get_data_type_ptr(column_desc->type()); - block->replace_by_position(i, data_type->create_column()); - } } } - // not sure whether block is clear before enter segmentIter, so clear it here. - if (is_mem_reuse) { - block->clear_column_data(); - } + block->clear_column_data(); return Status::EndOfFile("no more data in segment"); } // when no predicate(include delete condition) is provided, output column directly if (_vec_pred_column_ids.empty() && _short_cir_pred_column_ids.empty()) { - _output_non_pred_columns(block, is_mem_reuse); + _output_non_pred_columns(block); } else { // need predicate evaluation uint16_t selected_size = nrows_read; uint16_t sel_rowid_idx[selected_size]; @@ -982,7 +952,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // So output block directly after vectorization evaluation if (_is_all_column_basic_type) { RETURN_IF_ERROR(_output_column_by_sel_idx(block, _first_read_column_ids, sel_rowid_idx, - selected_size, is_mem_reuse)); + selected_size)); } else { // step 2: evaluate short ciruit predicate // todo(wb) research whether need to read short predicate after vectorization evaluation @@ -998,7 +968,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // step4: output columns // 4.1 output non-predicate column - _output_non_pred_columns(block, is_mem_reuse); + _output_non_pred_columns(block); // 4.2 get union of short_cir_pred and vec_pred std::set<ColumnId> pred_column_ids; @@ -1008,7 +978,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // 4.3 output short circuit and predicate column RETURN_IF_ERROR(_output_column_by_sel_idx(block, pred_column_ids, sel_rowid_idx, - selected_size, is_mem_reuse)); + selected_size)); } } diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 7f2d11e0b6..fb461fe54d 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -102,20 +102,19 @@ private: std::vector<vectorized::MutableColumnPtr>& non_pred_vector); void _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t& selected_size); void _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t* selected_size); - void _output_non_pred_columns(vectorized::Block* block, bool is_block_mem_reuse); + void _output_non_pred_columns(vectorized::Block* block); void _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids, std::vector<rowid_t>& rowid_vector, uint16_t* sel_rowid_idx, size_t select_size, vectorized::MutableColumns* mutable_columns); template <class Container> Status _output_column_by_sel_idx(vectorized::Block* block, const Container& column_ids, - uint16_t* sel_rowid_idx, uint16_t select_size, - bool is_block_mem_reuse) { + uint16_t* sel_rowid_idx, uint16_t select_size) { for (auto cid : column_ids) { int block_cid = _schema_block_id_map[cid]; - RETURN_IF_ERROR(block->copy_column_data_to_block( - is_block_mem_reuse, _current_return_columns[cid].get(), sel_rowid_idx, - select_size, block_cid, _opts.block_row_max)); + RETURN_IF_ERROR(block->copy_column_data_to_block(_current_return_columns[cid].get(), + sel_rowid_idx, select_size, block_cid, + _opts.block_row_max)); } return Status::OK(); } diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 33c73205a5..0d275f6604 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -137,6 +137,15 @@ Block::Block(const ColumnsWithTypeAndName& data_) : data {data_} { initialize_index_by_name(); } +Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) { + for (const auto slot_desc : slots) { + auto column_ptr = slot_desc->get_empty_mutable_column(); + column_ptr->reserve(block_size); + insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } +} + Block::Block(const PBlock& pblock) { const char* buf = nullptr; std::string compression_scratch; diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 6ef105cf3b..5b543dd8ff 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -28,6 +28,8 @@ #include <parallel_hashmap/phmap.h> #include "gen_cpp/data.pb.h" +#include "runtime/descriptors.h" +#include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/core/block_info.h" #include "vec/core/column_with_type_and_name.h" @@ -67,6 +69,7 @@ public: Block(std::initializer_list<ColumnWithTypeAndName> il); Block(const ColumnsWithTypeAndName& data_); Block(const PBlock& pblock); + Block(const std::vector<SlotDescriptor*>& slots, size_t block_size); /// insert the column at the specified position void insert(size_t position, const ColumnWithTypeAndName& elem); @@ -97,8 +100,7 @@ public: ColumnWithTypeAndName& get_by_position(size_t position) { return data[position]; } const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; } - Status copy_column_data_to_block(bool is_block_mem_reuse, - doris::vectorized::IColumn* input_col_ptr, + Status copy_column_data_to_block(doris::vectorized::IColumn* input_col_ptr, uint16_t* sel_rowid_idx, uint16_t select_size, int block_cid, size_t batch_size) { // Only the additional deleted filter condition need to materialize column be at the end of the block @@ -108,21 +110,22 @@ public: // `select b from table;` // a column only effective in segment iterator, the block from query engine only contain the b column. // so the `block_cid >= data.size()` is true - if (block_cid >= data.size()) + if (block_cid >= data.size()) { return Status::OK(); + } - if (is_block_mem_reuse) { - auto* raw_res_ptr = this->get_by_position(block_cid).column.get(); - const_cast<doris::vectorized::IColumn*>(raw_res_ptr)->reserve(batch_size); - return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, const_cast<doris::vectorized::IColumn*>(raw_res_ptr)); - } else { - MutableColumnPtr res_col_ptr = data[block_cid].type->create_column(); - res_col_ptr->reserve(batch_size); - auto* raw_res_ptr = res_col_ptr.get(); - RETURN_IF_ERROR(input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, const_cast<doris::vectorized::IColumn*>(raw_res_ptr))); - this->replace_by_position(block_cid, std::move(res_col_ptr)); - return Status::OK(); + MutableColumnPtr raw_res_ptr = this->get_by_position(block_cid).column->assume_mutable(); + raw_res_ptr->reserve(batch_size); + + // adapt for outer join change column to nullable + if (raw_res_ptr->is_nullable()) { + auto col_ptr_nullable = + reinterpret_cast<vectorized::ColumnNullable*>(raw_res_ptr.get()); + col_ptr_nullable->get_null_map_column().insert_many_defaults(select_size); + raw_res_ptr = col_ptr_nullable->get_nested_column_ptr(); } + + return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, raw_res_ptr); } void replace_by_position(size_t position, ColumnPtr&& res) { @@ -335,7 +338,7 @@ public: size_t rows() const; size_t columns() const { return _columns.size(); } - bool empty() { return rows() == 0; } + bool empty() const { return rows() == 0; } MutableColumns& mutable_columns() { return _columns; } diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 59387fbf10..f6101692ce 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -65,22 +65,16 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { auto doris_scanner_row_num = _limit == -1 ? config::doris_scanner_row_num : std::min(static_cast<int64_t>(config::doris_scanner_row_num), _limit); - auto block_size = _limit == -1 ? state->batch_size() - : std::min(static_cast<int64_t>(state->batch_size()), _limit); - auto block_per_scanner = (doris_scanner_row_num + (block_size - 1)) / block_size; + _block_size = _limit == -1 ? state->batch_size() + : std::min(static_cast<int64_t>(state->batch_size()), _limit); + auto block_per_scanner = (doris_scanner_row_num + (_block_size - 1)) / _block_size; auto pre_block_count = std::min(_volap_scanners.size(), static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) * block_per_scanner; for (int i = 0; i < pre_block_count; ++i) { - auto block = new Block; - for (const auto slot_desc : _tuple_desc->slots()) { - auto column_ptr = slot_desc->get_empty_mutable_column(); - column_ptr->reserve(block_size); - block->insert(ColumnWithTypeAndName( - std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); - } + auto block = new Block(_tuple_desc->slots(), _block_size); _free_blocks.emplace_back(block); _buffered_bytes += block->allocated_bytes(); } @@ -152,7 +146,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { Status status = Status::OK(); bool eos = false; RuntimeState* state = scanner->runtime_state(); - DCHECK(NULL != state); + DCHECK(nullptr != state); if (!scanner->is_open()) { status = scanner->open(); if (!status.ok()) { @@ -203,8 +197,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; bool get_free_block = true; - while (!eos && raw_rows_read < raw_rows_threshold && - raw_bytes_read < raw_bytes_threshold && get_free_block) { + while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold && + get_free_block) { if (UNLIKELY(_transfer_done)) { eos = true; status = Status::Cancelled("Cancelled"); @@ -230,7 +224,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { std::lock_guard<std::mutex> l(_free_blocks_lock); _free_blocks.emplace_back(block); } else { - if (!blocks.empty() && blocks.back()->rows() + block->rows() <= _runtime_state->batch_size()) { + if (!blocks.empty() && + blocks.back()->rows() + block->rows() <= _runtime_state->batch_size()) { MutableBlock(blocks.back()).merge(*block); block->clear_column_data(); std::lock_guard<std::mutex> l(_free_blocks_lock); @@ -408,7 +403,9 @@ Status VOlapScanNode::close(RuntimeState* state) { _scan_block_added_cv.notify_all(); // join transfer thread - if (_transfer_thread) _transfer_thread->join(); + if (_transfer_thread) { + _transfer_thread->join(); + } // clear some block in queue // TODO: The presence of transfer_thread here may cause Block's memory alloc and be released not in a thread, @@ -475,7 +472,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { } // wait for block from queue - Block* materialized_block = NULL; + Block* materialized_block = nullptr; { std::unique_lock<std::mutex> l(_blocks_lock); SCOPED_TIMER(_olap_wait_batch_queue_timer); @@ -490,14 +487,14 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { if (!_materialized_blocks.empty()) { materialized_block = _materialized_blocks.back(); - DCHECK(materialized_block != NULL); + DCHECK(materialized_block != nullptr); _materialized_blocks.pop_back(); _materialized_row_batches_bytes -= materialized_block->allocated_bytes(); } } // return block - if (NULL != materialized_block) { + if (nullptr != materialized_block) { // notify scanner _block_consumed_cv.notify_one(); // get scanner's block memory @@ -533,8 +530,6 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { return _status; } -// TODO: we should register the mem cost of new Block in -// alloc block Block* VOlapScanNode::_alloc_block(bool& get_free_block) { { std::lock_guard<std::mutex> l(_free_blocks_lock); @@ -544,15 +539,19 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) { return block; } } + get_free_block = false; - return new Block(); + + auto block = new Block(_tuple_desc->slots(), _block_size); + _buffered_bytes += block->allocated_bytes(); + return block; } int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) { std::list<VOlapScanner*> olap_scanners; int assigned_thread_num = _running_thread; size_t max_thread = std::min(_volap_scanners.size(), - static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)); + static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)); // copy to local { // How many thread can apply to this query @@ -563,7 +562,9 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per thread_slot_num = _free_blocks.size() / block_per_scanner; thread_slot_num += (_free_blocks.size() % block_per_scanner != 0); thread_slot_num = std::min(thread_slot_num, max_thread - assigned_thread_num); - if (thread_slot_num <= 0) thread_slot_num = 1; + if (thread_slot_num <= 0) { + thread_slot_num = 1; + } } else { std::lock_guard<std::mutex> l(_scan_blocks_lock); if (_scan_blocks.empty()) { @@ -583,9 +584,9 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per auto scanner = _volap_scanners.front(); _volap_scanners.pop_front(); - if (scanner->need_to_close()) + if (scanner->need_to_close()) { scanner->close(state); - else { + } else { olap_scanners.push_back(scanner); _running_thread++; assigned_thread_num++; diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h index 921399ee6b..831b0963b6 100644 --- a/be/src/vec/exec/volap_scan_node.h +++ b/be/src/vec/exec/volap_scan_node.h @@ -39,6 +39,7 @@ public: } Status get_next(RuntimeState* state, Block* block, bool* eos) override; Status close(RuntimeState* state) override; + private: void transfer_thread(RuntimeState* state); void scanner_thread(VOlapScanner* scanner); @@ -64,6 +65,8 @@ private: std::mutex _volap_scanners_lock; int _max_materialized_blocks; + + size_t _block_size = 0; }; } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org