This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.0 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit c177197c388415d39f83e6fa10044555c49cd1c8 Author: HappenLee <happen...@hotmail.com> AuthorDate: Sat Mar 5 15:27:36 2022 +0800 [fix](vectorized) Fix core dump of mutable block different of block (#8280) --- be/src/vec/core/block.cpp | 9 ++++++++ be/src/vec/core/block.h | 7 ++---- be/src/vec/exec/join/vhash_join_node.cpp | 39 +++++++++++++++++++------------- be/src/vec/exec/vset_operation_node.cpp | 6 ++--- 4 files changed, 37 insertions(+), 24 deletions(-) diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 1dda373..23022d3 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -835,6 +835,15 @@ doris::Tuple* Block::deep_copy_tuple(const doris::TupleDescriptor& desc, MemPool return dst; } +MutableBlock::MutableBlock(const std::vector<TupleDescriptor *>& tuple_descs) { + for (auto tuple_desc : tuple_descs) { + for (auto slot_desc : tuple_desc->slots()) { + _data_types.emplace_back(slot_desc->get_data_type_ptr()); + _columns.emplace_back(_data_types.back()->create_column()); + } + } +} + size_t MutableBlock::rows() const { for (const auto& column : _columns) { if (column) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 816ed29..c660bdb 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -306,11 +306,7 @@ public: MutableBlock() = default; ~MutableBlock() = default; - MutableBlock(DataTypes data_types) : _columns(data_types.size()), _data_types(std::move(data_types)) { - for (int i = 0; i < _data_types.size(); ++i) { - _columns[i] = _data_types[i]->create_column(); - } - } + MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs); MutableBlock(Block* block) : _columns(block->mutate_columns()), _data_types(block->get_data_types()) {} @@ -348,6 +344,7 @@ public: } } } else { + DCHECK_EQ(_columns.size(), block.columns()); for (int i = 0; i < _columns.size(); ++i) { if (!_data_types[i]->equals(*block.get_by_position(i).type)) { DCHECK(_data_types[i]->is_nullable()); diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index f6a5422..9f93f96 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -187,11 +187,14 @@ struct ProcessHashTableProbe { int current_offset = 0; _items_counts.resize(_probe_rows); + _build_block_offsets.resize(_batch_size); + _build_block_rows.resize(_batch_size); memset(_items_counts.data(), 0, sizeof(uint32_t) * _probe_rows); - constexpr auto is_right_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || + constexpr auto need_to_set_visited = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN; + JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || + JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; constexpr auto is_right_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN; @@ -230,7 +233,7 @@ struct ProcessHashTableProbe { // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. // We should rethink whether to use this iterator mode in the future. Now just opt the one row case if (mapped.get_row_count() == 1) { - if constexpr (is_right_join) + if constexpr (need_to_set_visited) mapped.visited = true; if constexpr (!is_right_semi_anti_join) { @@ -254,7 +257,7 @@ struct ProcessHashTableProbe { } ++current_offset; } - if constexpr (is_right_join) + if constexpr (need_to_set_visited) it->visited = true; } } @@ -294,7 +297,7 @@ struct ProcessHashTableProbe { mcol[i + right_col_idx]->insert_from(column, _build_block_rows[j]); } } else { - auto &column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; + auto& column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; mcol[i + right_col_idx]->insert_from(column, _build_block_rows[j]); } } @@ -369,14 +372,14 @@ struct ProcessHashTableProbe { for (int i = 0; i < current_offset - origin_offset - 1; ++i) { same_to_prev.emplace_back(true); } - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || + } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN || JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { ++current_offset; same_to_prev.emplace_back(false); visited_map.emplace_back(nullptr); // only full outer / left outer need insert the data of right table - if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || + if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { for (size_t j = 0; j < right_col_len; ++j) { DCHECK(mcol[j + right_col_idx]->is_nullable()); @@ -414,7 +417,7 @@ struct ProcessHashTableProbe { (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, &result_column_id); auto column = output_block->get_by_position(result_column_id).column; - if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || + if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { auto new_filter_column = ColumnVector<UInt8>::create(); auto& filter_map = new_filter_column->get_data(); @@ -492,7 +495,7 @@ struct ProcessHashTableProbe { output_block->get_by_position(result_column_id).column = std::move(new_filter_column); - } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || + } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) { for (int i = 0; i < column->size(); ++i) { DCHECK(visited_map[i]); @@ -502,7 +505,7 @@ struct ProcessHashTableProbe { // inner join do nothing } - if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || + if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) { output_block->clear(); } else { @@ -526,6 +529,7 @@ struct ProcessHashTableProbe { auto& iter = hash_table_ctx.iter; auto block_size = 0; + auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) { block_size++; for (size_t j = 0; j < right_col_len; ++j) { @@ -548,9 +552,9 @@ struct ProcessHashTableProbe { } // right outer join / full join need insert data of left table - if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN || - JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || + if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType::value == TJoinOp::FULL_OUTER_JOIN || + JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { for (int i = 0; i < right_col_idx; ++i) { for (int j = 0; j < block_size; ++j) { @@ -880,7 +884,7 @@ Status HashJoinNode::open(RuntimeState* state) { Status HashJoinNode::_hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(1)->open(state)); SCOPED_TIMER(_build_timer); - MutableBlock mutable_block(_right_table_data_types); + MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors()); uint8_t index = 0; int64_t last_mem_used = 0; @@ -907,7 +911,7 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index)); RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); - mutable_block = MutableBlock(_right_table_data_types); + mutable_block = MutableBlock(); ++index; last_mem_used = _mem_used; } @@ -942,7 +946,10 @@ Status HashJoinNode::extract_build_join_column(Block& block, NullMap& null_map, RETURN_IF_ERROR(_build_expr_ctxs[i]->execute(&block, &result_col_id)); } - // MutableBlock assume no const column in build block + // TODO: opt the column is const + block.get_by_position(result_col_id).column = + block.get_by_position(result_col_id).column->convert_to_full_column_if_const(); + if (_is_null_safe_eq_join[i]) { raw_ptrs[i] = block.get_by_position(result_col_id).column.get(); } else { diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 9031e0a..63a1ccb 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -228,13 +228,13 @@ void VSetOperationNode::hash_table_init() { Status VSetOperationNode::hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(0)->open(state)); Block block; - MutableBlock mutable_block(_left_table_data_types); + MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors()); uint8_t index = 0; int64_t last_mem_used = 0; bool eos = false; while (!eos) { - block.clear(); + block.clear_column_data(); SCOPED_TIMER(_build_timer); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(child(0)->get_next(state, &block, &eos)); @@ -254,7 +254,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { // which is better. RETURN_IF_ERROR(process_build_block(_build_blocks[index], index)); RETURN_IF_LIMIT_EXCEEDED(state, "Set Operation Node, while constructing the hash table."); - mutable_block = MutableBlock(_left_table_data_types); + mutable_block = MutableBlock(); ++index; last_mem_used = _mem_used; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org