This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit c177197c388415d39f83e6fa10044555c49cd1c8
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Sat Mar 5 15:27:36 2022 +0800

    [fix](vectorized) Fix core dump of mutable block different of block (#8280)
---
 be/src/vec/core/block.cpp                |  9 ++++++++
 be/src/vec/core/block.h                  |  7 ++----
 be/src/vec/exec/join/vhash_join_node.cpp | 39 +++++++++++++++++++-------------
 be/src/vec/exec/vset_operation_node.cpp  |  6 ++---
 4 files changed, 37 insertions(+), 24 deletions(-)

diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 1dda373..23022d3 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -835,6 +835,15 @@ doris::Tuple* Block::deep_copy_tuple(const 
doris::TupleDescriptor& desc, MemPool
     return dst;
 }
 
+MutableBlock::MutableBlock(const std::vector<TupleDescriptor *>& tuple_descs) {
+    for (auto tuple_desc : tuple_descs) {
+        for (auto slot_desc : tuple_desc->slots()) {
+            _data_types.emplace_back(slot_desc->get_data_type_ptr());
+            _columns.emplace_back(_data_types.back()->create_column());
+        }
+    }
+}
+
 size_t MutableBlock::rows() const {
     for (const auto& column : _columns) {
         if (column) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 816ed29..c660bdb 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -306,11 +306,7 @@ public:
     MutableBlock() = default;
     ~MutableBlock() = default;
 
-    MutableBlock(DataTypes data_types) :  _columns(data_types.size()), 
_data_types(std::move(data_types)) {
-        for (int i = 0; i < _data_types.size(); ++i) {
-            _columns[i] = _data_types[i]->create_column();
-        }
-    }
+    MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs);
 
     MutableBlock(Block* block)
             : _columns(block->mutate_columns()), 
_data_types(block->get_data_types()) {}
@@ -348,6 +344,7 @@ public:
                 }
             }
         } else {
+            DCHECK_EQ(_columns.size(), block.columns());
             for (int i = 0; i < _columns.size(); ++i) {
                 if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
                     DCHECK(_data_types[i]->is_nullable());
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index f6a5422..9f93f96 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -187,11 +187,14 @@ struct ProcessHashTableProbe {
         int current_offset = 0;
 
         _items_counts.resize(_probe_rows);
+        _build_block_offsets.resize(_batch_size);
+        _build_block_rows.resize(_batch_size);
         memset(_items_counts.data(), 0, sizeof(uint32_t) * _probe_rows);
 
-        constexpr auto is_right_join = JoinOpType::value == 
TJoinOp::RIGHT_ANTI_JOIN ||
+        constexpr auto need_to_set_visited = JoinOpType::value == 
TJoinOp::RIGHT_ANTI_JOIN ||
                                        JoinOpType::value == 
TJoinOp::RIGHT_SEMI_JOIN ||
-                                       JoinOpType::value == 
TJoinOp::RIGHT_OUTER_JOIN;
+                                       JoinOpType::value == 
TJoinOp::RIGHT_OUTER_JOIN ||
+                                       JoinOpType::value == 
TJoinOp::FULL_OUTER_JOIN;
 
         constexpr auto is_right_semi_anti_join = JoinOpType::value == 
TJoinOp::RIGHT_ANTI_JOIN ||
                                             JoinOpType::value == 
TJoinOp::RIGHT_SEMI_JOIN;
@@ -230,7 +233,7 @@ struct ProcessHashTableProbe {
                     // TODO: Iterators are currently considered to be a heavy 
operation and have a certain impact on performance.
                     // We should rethink whether to use this iterator mode in 
the future. Now just opt the one row case
                     if (mapped.get_row_count() == 1) {
-                        if constexpr (is_right_join)
+                        if constexpr (need_to_set_visited)
                             mapped.visited = true;
 
                         if constexpr (!is_right_semi_anti_join) {
@@ -254,7 +257,7 @@ struct ProcessHashTableProbe {
                                 }
                                 ++current_offset;
                             }
-                            if constexpr (is_right_join)
+                            if constexpr (need_to_set_visited)
                                 it->visited = true;
                         }
                     }
@@ -294,7 +297,7 @@ struct ProcessHashTableProbe {
                                 mcol[i + right_col_idx]->insert_from(column, 
_build_block_rows[j]);
                             }
                         } else {
-                            auto &column = 
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
+                            auto& column = 
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
                             mcol[i + right_col_idx]->insert_from(column, 
_build_block_rows[j]);
                         }
                     }
@@ -369,14 +372,14 @@ struct ProcessHashTableProbe {
                 for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
                     same_to_prev.emplace_back(true);
                 }
-            } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN 
|| 
+            } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN 
||
                                  JoinOpType::value == TJoinOp::FULL_OUTER_JOIN 
||
                                  JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) 
{
                 ++current_offset;
                 same_to_prev.emplace_back(false);
                 visited_map.emplace_back(nullptr);
                 // only full outer / left outer need insert the data of right 
table
-                if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || 
+                if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
                               JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
                     for (size_t j = 0; j < right_col_len; ++j) {
                         DCHECK(mcol[j + right_col_idx]->is_nullable());
@@ -414,7 +417,7 @@ struct ProcessHashTableProbe {
             (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, 
&result_column_id);
 
             auto column = 
output_block->get_by_position(result_column_id).column;
-            if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || 
+            if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
                           JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
                 auto new_filter_column = ColumnVector<UInt8>::create();
                 auto& filter_map = new_filter_column->get_data();
@@ -492,7 +495,7 @@ struct ProcessHashTableProbe {
 
                 output_block->get_by_position(result_column_id).column =
                         std::move(new_filter_column);
-            } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN 
|| 
+            } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN 
||
                                  JoinOpType::value == 
TJoinOp::RIGHT_ANTI_JOIN) {
                 for (int i = 0; i < column->size(); ++i) {
                     DCHECK(visited_map[i]);
@@ -502,7 +505,7 @@ struct ProcessHashTableProbe {
                 // inner join do nothing
             }
 
-            if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || 
+            if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
                           JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) {
                 output_block->clear();
             } else {
@@ -526,6 +529,7 @@ struct ProcessHashTableProbe {
 
         auto& iter = hash_table_ctx.iter;
         auto block_size = 0;
+
         auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) {
             block_size++;
             for (size_t j = 0; j < right_col_len; ++j) {
@@ -548,9 +552,9 @@ struct ProcessHashTableProbe {
         }
 
         // right outer join / full join need insert data of left table
-        if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || 
-                      JoinOpType::value == TJoinOp::FULL_OUTER_JOIN || 
-                      JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || 
+        if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
+                      JoinOpType::value == TJoinOp::FULL_OUTER_JOIN ||
+                      JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
                       JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
             for (int i = 0; i < right_col_idx; ++i) {
                 for (int j = 0; j < block_size; ++j) {
@@ -880,7 +884,7 @@ Status HashJoinNode::open(RuntimeState* state) {
 Status HashJoinNode::_hash_table_build(RuntimeState* state) {
     RETURN_IF_ERROR(child(1)->open(state));
     SCOPED_TIMER(_build_timer);
-    MutableBlock mutable_block(_right_table_data_types);
+    MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors());
 
     uint8_t index = 0;
     int64_t last_mem_used = 0;
@@ -907,7 +911,7 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) 
{
             RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], 
index));
             RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the 
hash table.");
 
-            mutable_block = MutableBlock(_right_table_data_types);
+            mutable_block = MutableBlock();
             ++index;
             last_mem_used = _mem_used;
         }
@@ -942,7 +946,10 @@ Status HashJoinNode::extract_build_join_column(Block& 
block, NullMap& null_map,
             RETURN_IF_ERROR(_build_expr_ctxs[i]->execute(&block, 
&result_col_id));
         }
 
-        // MutableBlock assume no const column in build block
+        // TODO: opt the column is const
+        block.get_by_position(result_col_id).column =
+                
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
+        
         if (_is_null_safe_eq_join[i]) {
             raw_ptrs[i] = block.get_by_position(result_col_id).column.get();
         } else {
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 9031e0a..63a1ccb 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -228,13 +228,13 @@ void VSetOperationNode::hash_table_init() {
 Status VSetOperationNode::hash_table_build(RuntimeState* state) {
     RETURN_IF_ERROR(child(0)->open(state));
     Block block;
-    MutableBlock mutable_block(_left_table_data_types);
+    MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors());
 
     uint8_t index = 0;
     int64_t last_mem_used = 0;
     bool eos = false;
     while (!eos) {
-        block.clear();
+        block.clear_column_data();
         SCOPED_TIMER(_build_timer);
         RETURN_IF_CANCELLED(state);
         RETURN_IF_ERROR(child(0)->get_next(state, &block, &eos));
@@ -254,7 +254,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState* 
state) {
             // which is better.
             RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
             RETURN_IF_LIMIT_EXCEEDED(state, "Set Operation Node, while 
constructing the hash table.");
-            mutable_block = MutableBlock(_left_table_data_types);
+            mutable_block = MutableBlock();
             ++index;
             last_mem_used = _mem_used;
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to