This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new b16f2f94f6 [fix](planner) fix projection may return error results  
(#12551)
b16f2f94f6 is described below

commit b16f2f94f6e28aab116aa07ab6d020aad518691b
Author: wangbo <wan...@apache.org>
AuthorDate: Tue Sep 13 20:03:21 2022 +0800

    [fix](planner) fix projection may return error results  (#12551)
---
 be/src/vec/columns/column_vector.cpp     | 14 +++++++++++
 be/src/vec/columns/column_vector.h       |  2 ++
 be/src/vec/exec/join/vhash_join_node.cpp | 41 +++++++++++++++++++++++++++++---
 be/src/vec/exec/join/vhash_join_node.h   |  1 +
 4 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index dfe1bce3b3..5b371b64aa 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -239,6 +239,20 @@ void ColumnVector<T>::insert_indices_from(const IColumn& 
src, const int* indices
     }
 }
 
+template <typename T>
+void ColumnVector<T>::insert_join_nullmap(const int* indices_begin, const int* 
indices_end) {
+    auto origin_size = size();
+    auto new_size = indices_end - indices_begin;
+    data.resize(origin_size + new_size);
+
+    for (int i = 0; i < new_size; ++i) {
+        int offset = indices_begin[i];
+        if constexpr (std::is_same_v<T, UInt8>) {
+            data[origin_size + i] = (offset == -1) ? T{JOIN_NULL_HINT} : 0;
+        }
+    }
+}
+
 template <typename T>
 ColumnPtr ColumnVector<T>::filter(const IColumn::Filter& filt, ssize_t 
result_size_hint) const {
     size_t size = data.size();
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index e57ffe4a9c..4d4ceb14e2 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -275,6 +275,8 @@ public:
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
 
+    void insert_join_nullmap(const int* indices_begin, const int* indices_end);
+
     void fill(const value_type& element, size_t num) {
         auto old_size = data.size();
         auto new_size = old_size + num;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 88f8fa5d95..23036f150c 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -179,7 +179,8 @@ struct ProcessHashTableProbe {
     // output build side result column
     template <bool have_other_join_conjunct = false>
     void build_side_output_column(MutableColumns& mcol, int column_offset, int 
column_length,
-                                  const std::vector<bool>& output_slot_flags, 
int size) {
+                                  const std::vector<bool>& output_slot_flags,
+                                  const std::vector<bool>& right_check_flags, 
int size) {
         constexpr auto is_semi_anti_join = JoinOpType::value == 
TJoinOp::RIGHT_ANTI_JOIN ||
                                            JoinOpType::value == 
TJoinOp::RIGHT_SEMI_JOIN ||
                                            JoinOpType::value == 
TJoinOp::LEFT_ANTI_JOIN ||
@@ -195,6 +196,12 @@ struct ProcessHashTableProbe {
                         auto& column = 
_build_blocks[0].get_by_position(i).column;
                         mcol[i + column_offset]->insert_indices_from(
                                 *column, _build_block_rows.data(), 
_build_block_rows.data() + size);
+                    } else if (right_check_flags[i] &&
+                               mcol[i + column_offset]->is_nullable()) {
+                        reinterpret_cast<ColumnNullable*>(mcol[i + 
column_offset].get())
+                                ->get_null_map_column()
+                                .insert_join_nullmap(_build_block_rows.data(),
+                                                     _build_block_rows.data() 
+ size);
                     } else {
                         mcol[i + column_offset]->resize(size);
                     }
@@ -261,6 +268,12 @@ struct ProcessHashTableProbe {
                                 }
                             }
                         }
+                    } else if (right_check_flags[i] &&
+                               mcol[i + column_offset]->is_nullable()) {
+                        reinterpret_cast<ColumnNullable*>(mcol[i + 
column_offset].get())
+                                ->get_null_map_column()
+                                .insert_join_nullmap(_build_block_rows.data(),
+                                                     _build_block_rows.data() 
+ size);
                     } else {
                         mcol[i + column_offset]->resize(size);
                     }
@@ -396,7 +409,8 @@ struct ProcessHashTableProbe {
         {
             SCOPED_TIMER(_build_side_output_timer);
             build_side_output_column(mcol, right_col_idx, right_col_len,
-                                     _join_node->_right_output_slot_flags, 
current_offset);
+                                     _join_node->_right_output_slot_flags,
+                                     _join_node->_right_check_slot_flags, 
current_offset);
         }
 
         if constexpr (JoinOpType::value != TJoinOp::RIGHT_SEMI_JOIN &&
@@ -505,7 +519,8 @@ struct ProcessHashTableProbe {
         {
             SCOPED_TIMER(_build_side_output_timer);
             build_side_output_column<true>(mcol, right_col_idx, right_col_len,
-                                           
_join_node->_right_output_slot_flags, current_offset);
+                                           
_join_node->_right_output_slot_flags,
+                                           
_join_node->_right_check_slot_flags, current_offset);
         }
         {
             SCOPED_TIMER(_probe_side_output_timer);
@@ -816,6 +831,26 @@ Status HashJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     init_output_slots_flags(child(0)->row_desc().tuple_descriptors(), 
_left_output_slot_flags);
     init_output_slots_flags(child(1)->row_desc().tuple_descriptors(), 
_right_output_slot_flags);
 
+    _right_check_slot_flags.resize(_right_output_slot_flags.size(), false);
+    if (_hash_output_slot_ids.size() != 0) {
+        auto is_semi_anti_join =
+                _join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == 
TJoinOp::RIGHT_SEMI_JOIN ||
+                _join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == 
TJoinOp::LEFT_SEMI_JOIN;
+        if (!is_semi_anti_join || _have_other_join_conjunct) {
+            auto& right_tuple_ids = child(1)->get_tuple_ids();
+            uint32_t loc = 0;
+            for (auto& tuple_id : right_tuple_ids) {
+                for (auto& tuple_desc : 
child(1)->row_desc().tuple_descriptors()) {
+                    if (tuple_desc->id() == tuple_id) {
+                        _right_check_slot_flags[loc] = true;
+                        loc += tuple_desc->slots().size();
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 5b2025b336..81ccb680bf 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -239,6 +239,7 @@ private:
     std::vector<SlotId> _hash_output_slot_ids;
     std::vector<bool> _left_output_slot_flags;
     std::vector<bool> _right_output_slot_flags;
+    std::vector<bool> _right_check_slot_flags; // used for 
TupleIsNullPredicate to check
 
     RowDescriptor _output_row_desc;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to