This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ff86ce0d9 [Refactor](join) some refactor of join process (#44346)
f3ff86ce0d9 is described below

commit f3ff86ce0d904a5ece8624da26621c608a89d5b6
Author: Pxl <x...@selectdb.com>
AuthorDate: Thu Jan 2 17:54:53 2025 +0800

    [Refactor](join) some refactor of join process (#44346)
    
    ### What problem does this PR solve?
    remove need_judge_null variable
---
 be/src/pipeline/dependency.h                       |   1 -
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |   7 -
 be/src/pipeline/exec/hashjoin_build_sink.h         |   4 +-
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  27 +--
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   2 -
 .../pipeline/exec/join/process_hash_table_probe.h  |  12 +-
 .../exec/join/process_hash_table_probe_impl.h      | 167 ++++++-------
 be/src/vec/common/hash_table/join_hash_table.h     | 265 +++++++++++----------
 8 files changed, 227 insertions(+), 258 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index ecbd49a5647..fd179cdfd0a 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -612,7 +612,6 @@ struct HashJoinSharedState : public JoinSharedState {
     size_t build_exprs_size = 0;
     std::shared_ptr<vectorized::Block> build_block;
     std::shared_ptr<std::vector<uint32_t>> build_indexes_null;
-    bool probe_ignore_null = false;
 };
 
 struct PartitionedHashJoinSharedState
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 6aca4897367..016ea494062 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -613,13 +613,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     if (eos) {
         local_state._eos = true;
         local_state.init_short_circuit_for_probe();
-        // Since the comparison of null values is meaningless, null aware left 
anti/semi join should not output null
-        // when the build side is not empty.
-        if (local_state._shared_state->build_block &&
-            (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-             _join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN)) {
-            local_state._shared_state->probe_ignore_null = true;
-        }
         local_state._dependency->set_ready_to_read();
     }
 
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 906fc5f9bd4..91465380d70 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -201,10 +201,12 @@ struct ProcessHashTableBuild {
         SCOPED_TIMER(_parent->_build_table_insert_timer);
         hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, 
_batch_size,
                                                                       
*has_null_key);
+
         // In order to make the null keys equal when using single null eq, all 
null keys need to be set to default value.
         if (_build_raw_ptrs.size() == 1 && null_map) {
             
_build_raw_ptrs[0]->assume_mutable()->replace_column_null_data(null_map->data());
         }
+
         hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
                                             null_map ? null_map->data() : 
nullptr, true, true,
                                             
hash_table_ctx.hash_table->get_bucket_size());
@@ -213,7 +215,7 @@ struct ProcessHashTableBuild {
         if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
              JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
             with_other_conjuncts) {
-            //null aware join with other conjuncts
+            // null aware join with other conjuncts
             keep_null_key = true;
         } else if (_parent->_shared_state->is_null_safe_eq_join.size() == 1 &&
                    _parent->_shared_state->is_null_safe_eq_join[0]) {
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 37ccd6206f3..1f30a6183a2 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -39,7 +39,6 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, 
LocalStateInfo& info)
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<HashJoinProbeOperatorX>();
-    _shared_state->probe_ignore_null = p._probe_ignore_null;
     _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
     for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
@@ -287,12 +286,12 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     if (local_state._probe_index < local_state._probe_block.rows()) {
         DCHECK(local_state._has_set_need_null_map_for_probe);
         std::visit(
-                [&](auto&& arg, auto&& process_hashtable_ctx, auto 
need_judge_null) {
+                [&](auto&& arg, auto&& process_hashtable_ctx) {
                     using HashTableProbeType = 
std::decay_t<decltype(process_hashtable_ctx)>;
                     if constexpr (!std::is_same_v<HashTableProbeType, 
std::monostate>) {
                         using HashTableCtxType = std::decay_t<decltype(arg)>;
                         if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                            st = process_hashtable_ctx.template 
process<need_judge_null>(
+                            st = process_hashtable_ctx.template process(
                                     arg,
                                     local_state._null_map_column
                                             ? 
&local_state._null_map_column->get_data()
@@ -308,9 +307,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                     }
                 },
                 local_state._shared_state->hash_table_variants->method_variant,
-                *local_state._process_hashtable_ctx_variants,
-                
vectorized::make_bool_variant(local_state._need_null_map_for_probe &&
-                                              
local_state._shared_state->probe_ignore_null));
+                *local_state._process_hashtable_ctx_variants);
     } else if (local_state._probe_eos) {
         if (_is_right_semi_anti || (_is_outer_join && _join_op != 
TJoinOp::LEFT_OUTER_JOIN)) {
             std::visit(
@@ -493,29 +490,13 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, 
vectorized::Block* inpu
 Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::init(tnode, 
state));
     DCHECK(tnode.__isset.hash_join_node);
-    const bool probe_dispose_null =
-            _match_all_probe || _build_unique || _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-            _join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN || _join_op == 
TJoinOp::LEFT_ANTI_JOIN ||
-            _join_op == TJoinOp::LEFT_SEMI_JOIN;
     const std::vector<TEqJoinCondition>& eq_join_conjuncts = 
tnode.hash_join_node.eq_join_conjuncts;
-    std::vector<bool> probe_not_ignore_null(eq_join_conjuncts.size());
-    size_t conjuncts_index = 0;
     for (const auto& eq_join_conjunct : eq_join_conjuncts) {
         vectorized::VExprContextSPtr ctx;
         
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, 
ctx));
         _probe_expr_ctxs.push_back(ctx);
-        bool null_aware = eq_join_conjunct.__isset.opcode &&
-                          eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL 
&&
-                          (eq_join_conjunct.right.nodes[0].is_nullable ||
-                           eq_join_conjunct.left.nodes[0].is_nullable);
-        probe_not_ignore_null[conjuncts_index] =
-                null_aware ||
-                (_probe_expr_ctxs.back()->root()->is_nullable() && 
probe_dispose_null);
-        conjuncts_index++;
-    }
-    for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
-        _probe_ignore_null |= !probe_not_ignore_null[i];
     }
+
     if (tnode.hash_join_node.__isset.other_join_conjuncts &&
         !tnode.hash_join_node.other_join_conjuncts.empty()) {
         RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 1bdb9d13347..55a8835f55b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -92,7 +92,6 @@ private:
     bool _ready_probe = false;
     bool _probe_eos = false;
     int _last_probe_match;
-
     // For mark join, last probe index of null mark
     int _last_probe_null_mark;
 
@@ -174,7 +173,6 @@ private:
 
     // probe expr
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
-    bool _probe_ignore_null = false;
 
     vectorized::DataTypes _right_table_data_types;
     vectorized::DataTypes _left_table_data_types;
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h 
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index 91fd82f0644..052ed5875ae 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -22,6 +22,7 @@
 #include "vec/columns/column.h"
 #include "vec/columns/columns_number.h"
 #include "vec/common/arena.h"
+#include "vec/common/hash_table/join_hash_table.h"
 
 namespace doris {
 namespace vectorized {
@@ -54,7 +55,7 @@ struct ProcessHashTableProbe {
                                   int last_probe_index, bool all_match_one,
                                   bool have_other_join_conjunct);
 
-    template <bool need_judge_null, typename HashTableType>
+    template <typename HashTableType>
     Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
                    vectorized::MutableBlock& mutable_block, vectorized::Block* 
output_block,
                    uint32_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct);
@@ -63,9 +64,8 @@ struct ProcessHashTableProbe {
     // the output block struct is same with mutable block. we can do more opt 
on it and simplify
     // the logic of probe
     // TODO: opt the visited here to reduce the size of hash table
-    template <bool need_judge_null, typename HashTableType, bool 
with_other_conjuncts,
-              bool is_mark_join>
-    Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
+    template <typename HashTableType, bool with_other_conjuncts, bool 
is_mark_join>
+    Status do_process(HashTableType& hash_table_ctx, const uint8_t* null_map,
                       vectorized::MutableBlock& mutable_block, 
vectorized::Block* output_block,
                       uint32_t probe_rows);
     // In the presence of other join conjunct, the process of join become more 
complicated.
@@ -76,12 +76,12 @@ struct ProcessHashTableProbe {
                                    bool has_null_in_build_side);
 
     template <bool with_other_conjuncts>
-    Status do_mark_join_conjuncts(vectorized::Block* output_block, size_t 
hash_table_bucket_size);
+    Status do_mark_join_conjuncts(vectorized::Block* output_block, const 
uint8_t* null_map);
 
     template <typename HashTableType>
     typename HashTableType::State _init_probe_side(HashTableType& 
hash_table_ctx, size_t probe_rows,
                                                    bool 
with_other_join_conjuncts,
-                                                   const uint8_t* null_map, 
bool need_judge_null);
+                                                   const uint8_t* null_map);
 
     // Process full outer join/ right join / right semi/anti join to output 
the join result
     // in hash table
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 24a9a7f6743..f97a4513816 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -27,6 +27,7 @@
 #include "util/simd/bits.h"
 #include "vec/columns/column_filter_helper.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/common/hash_table/join_hash_table.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris::pipeline {
@@ -160,14 +161,14 @@ template <int JoinOpType>
 template <typename HashTableType>
 typename HashTableType::State 
ProcessHashTableProbe<JoinOpType>::_init_probe_side(
         HashTableType& hash_table_ctx, size_t probe_rows, bool 
with_other_join_conjuncts,
-        const uint8_t* null_map, bool need_judge_null) {
+        const uint8_t* null_map) {
     // may over batch size 1 for some outer join case
     _probe_indexs.resize(_batch_size + 1);
     _build_indexs.resize(_batch_size + 1);
-    if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-                  JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) {
+    if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+         JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
+        with_other_join_conjuncts) {
         _null_flags.resize(_batch_size + 1);
-        memset(_null_flags.data(), 0, _batch_size + 1);
     }
 
     if (!_parent->_ready_probe) {
@@ -177,10 +178,10 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
         if (_parent->_probe_columns.size() == 1 && null_map) {
             
_parent->_probe_columns[0]->assume_mutable()->replace_column_null_data(null_map);
         }
+
         hash_table_ctx.init_serialized_keys(_parent->_probe_columns, 
probe_rows, null_map, true,
                                             false, 
hash_table_ctx.hash_table->get_bucket_size());
-        hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums,
-                                                  need_judge_null ? null_map : 
nullptr);
+        hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums);
         int64_t arena_memory_usage = 
hash_table_ctx.serialized_keys_size(false);
         COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage);
         COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage);
@@ -190,10 +191,9 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
 }
 
 template <int JoinOpType>
-template <bool need_judge_null, typename HashTableType, bool 
with_other_conjuncts,
-          bool is_mark_join>
+template <typename HashTableType, bool with_other_conjuncts, bool is_mark_join>
 Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& 
hash_table_ctx,
-                                                     
vectorized::ConstNullMapPtr null_map,
+                                                     const uint8_t* null_map,
                                                      vectorized::MutableBlock& 
mutable_block,
                                                      vectorized::Block* 
output_block,
                                                      uint32_t probe_rows) {
@@ -204,31 +204,22 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
     auto& probe_index = _parent->_probe_index;
     auto& build_index = _parent->_build_index;
     auto last_probe_index = probe_index;
-
     {
         SCOPED_TIMER(_init_probe_side_timer);
-        _init_probe_side<HashTableType>(
-                hash_table_ctx, probe_rows, with_other_conjuncts,
-                null_map ? null_map->data() : nullptr,
-                need_judge_null &&
-                        (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
-                         JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ||
-                         JoinOpType == 
doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-                         (is_mark_join && JoinOpType != 
doris::TJoinOp::RIGHT_SEMI_JOIN)));
+        _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, 
with_other_conjuncts, null_map);
     }
 
     auto& mcol = mutable_block.mutable_columns();
-    const bool has_mark_join_conjunct = !_parent->_mark_join_conjuncts.empty();
 
     uint32_t current_offset = 0;
-    if constexpr ((JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-                   JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
-                  with_other_conjuncts) {
+    if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+         JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
+        with_other_conjuncts) {
         SCOPED_TIMER(_search_hashtable_timer);
 
         /// If `_build_index_for_null_probe_key` is not zero, it means we are 
in progress of handling probe null key.
         if (_build_index_for_null_probe_key) {
-            DCHECK_EQ(build_index, 
hash_table_ctx.hash_table->get_bucket_size());
+            DCHECK(null_map && null_map[probe_index]);
             current_offset = _process_probe_null_key(probe_index);
             if (!_build_index_for_null_probe_key) {
                 probe_index++;
@@ -239,13 +230,13 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
                     
hash_table_ctx.hash_table->find_null_aware_with_other_conjuncts(
                             hash_table_ctx.keys, 
hash_table_ctx.bucket_nums.data(), probe_index,
                             build_index, probe_rows, _probe_indexs.data(), 
_build_indexs.data(),
-                            _null_flags.data(), _picking_null_keys);
+                            _null_flags.data(), _picking_null_keys, null_map);
             probe_index = new_probe_idx;
             build_index = new_build_idx;
             current_offset = new_current_offset;
             _picking_null_keys = picking_null_keys;
 
-            if (build_index == hash_table_ctx.hash_table->get_bucket_size()) {
+            if (null_map && null_map[probe_index]) {
                 _build_index_for_null_probe_key = 1;
                 if (current_offset == 0) {
                     current_offset = _process_probe_null_key(probe_index);
@@ -259,11 +250,11 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
     } else {
         SCOPED_TIMER(_search_hashtable_timer);
         auto [new_probe_idx, new_build_idx, new_current_offset] =
-                hash_table_ctx.hash_table->template find_batch<JoinOpType, 
with_other_conjuncts,
-                                                               is_mark_join, 
need_judge_null>(
+                hash_table_ctx.hash_table->template find_batch<JoinOpType>(
                         hash_table_ctx.keys, 
hash_table_ctx.bucket_nums.data(), probe_index,
                         build_index, cast_set<int32_t>(probe_rows), 
_probe_indexs.data(),
-                        _probe_visited, _build_indexs.data(), 
has_mark_join_conjunct);
+                        _probe_visited, _build_indexs.data(), null_map, 
with_other_conjuncts,
+                        is_mark_join, !_parent->_mark_join_conjuncts.empty());
         probe_index = new_probe_idx;
         build_index = new_build_idx;
         current_offset = new_current_offset;
@@ -296,8 +287,13 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
     output_block->swap(mutable_block.to_block());
 
     if constexpr (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
-        return do_mark_join_conjuncts<with_other_conjuncts>(
-                output_block, hash_table_ctx.hash_table->get_bucket_size());
+        bool ignore_null_map =
+                (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+                 JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
+                hash_table_ctx.hash_table
+                        ->empty_build_side(); // empty build side will return 
false to instead null
+        return do_mark_join_conjuncts<with_other_conjuncts>(output_block,
+                                                            ignore_null_map ? 
nullptr : null_map);
     } else if constexpr (with_other_conjuncts) {
         return do_other_join_conjuncts(output_block, 
hash_table_ctx.hash_table->get_visited(),
                                        
hash_table_ctx.hash_table->has_null_key());
@@ -366,7 +362,7 @@ uint32_t 
ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro
 template <int JoinOpType>
 template <bool with_other_conjuncts>
 Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Block* 
output_block,
-                                                                 size_t 
hash_table_bucket_size) {
+                                                                 const 
uint8_t* null_map) {
     DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
            JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
            JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
@@ -376,7 +372,6 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
                                   JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
     constexpr bool is_null_aware_join = JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN ||
                                         JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
-
     const auto row_count = output_block->rows();
     if (!row_count) {
         return Status::OK();
@@ -387,49 +382,45 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
     auto& mark_column = 
assert_cast<vectorized::ColumnNullable&>(*mark_column_mutable);
     vectorized::IColumn::Filter& filter =
             
assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column()).get_data();
+    RETURN_IF_ERROR(
+            
vectorized::VExprContext::execute_conjuncts(_parent->_mark_join_conjuncts, 
output_block,
+                                                        
mark_column.get_null_map_column(), filter));
+    uint8_t* mark_filter_data = filter.data();
+    uint8_t* mark_null_map = mark_column.get_null_map_data().data();
 
-    if (_parent->_mark_join_conjuncts.empty()) {
+    if (is_null_aware_join) {
         // For null aware anti/semi join, if the equal conjuncts was not 
matched and the build side has null value,
         // the result should be null. Like:
         // select 4 not in (2, 3, null) => null, select 4 not in (2, 3) => true
         // select 4 in (2, 3, null) => null, select 4 in (2, 3) => false
-        const bool should_be_null_if_build_side_has_null = 
*_has_null_in_build_side;
-
-        mark_column.resize(row_count);
-        auto* filter_data = 
assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column())
-                                    .get_data()
-                                    .data();
-        auto* mark_null_map = mark_column.get_null_map_data().data();
-        int last_probe_matched = -1;
         for (size_t i = 0; i != row_count; ++i) {
-            filter_data[i] = _build_indexs[i] != 0 && _build_indexs[i] != 
hash_table_bucket_size;
-            if constexpr (is_null_aware_join) {
-                if constexpr (with_other_conjuncts) {
-                    mark_null_map[i] = _null_flags[i];
-                } else {
-                    if (filter_data[i]) {
-                        last_probe_matched = _probe_indexs[i];
-                        mark_null_map[i] = false;
-                    } else if (_build_indexs[i] == 0) {
-                        mark_null_map[i] = 
should_be_null_if_build_side_has_null &&
-                                           last_probe_matched != 
_probe_indexs[i];
-                    } else if (_build_indexs[i] == hash_table_bucket_size) {
-                        mark_null_map[i] = true;
-                    }
+            mark_filter_data[i] = _build_indexs[i] != 0;
+        }
+
+        if constexpr (with_other_conjuncts) {
+            // _null_flags is true means build or probe side of the row is null
+            memcpy(mark_null_map, _null_flags.data(), row_count);
+        } else {
+            if (null_map) {
+                // probe side of the row is null, so the mark sign should also 
be null.
+                for (size_t i = 0; i != row_count; ++i) {
+                    mark_null_map[i] |= null_map[_probe_indexs[i]];
+                }
+            }
+            if (!with_other_conjuncts && *_has_null_in_build_side) {
+                // _has_null_in_build_side will change false to null when row 
not matched
+                for (size_t i = 0; i != row_count; ++i) {
+                    mark_null_map[i] |= _build_indexs[i] == 0;
                 }
             }
-        }
-        if constexpr (!is_null_aware_join) {
-            memset(mark_null_map, 0, row_count);
         }
     } else {
-        RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts(
-                _parent->_mark_join_conjuncts, output_block, 
mark_column.get_null_map_column(),
-                filter));
+        // for non null aware join, build_indexs is 0 which means there is no 
match
+        // sometimes null will be returned in conjunct, but it should not 
actually be null.
+        for (size_t i = 0; i != row_count; ++i) {
+            mark_null_map[i] &= _build_indexs[i] != 0;
+        }
     }
-    auto* mark_null_map = mark_column.get_null_map_data().data();
-
-    auto* mark_filter_data = filter.data();
 
     if constexpr (with_other_conjuncts) {
         vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
@@ -453,32 +444,20 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
 
     auto filter_column = vectorized::ColumnUInt8::create(row_count, 0);
     auto* __restrict filter_map = filter_column->get_data().data();
-
-    /**
-     * Here need `!with_other_conjuncts` be true,
-     * because null aware join with other join conjuncts will process the 
`mark_null_map` after the
-     * other join conjuncts are executed.
-     */
-    const bool should_be_null_if_build_side_has_null =
-            *_has_null_in_build_side && is_null_aware_join && 
!with_other_conjuncts;
     for (size_t i = 0; i != row_count; ++i) {
-        bool not_matched_before = _parent->_last_probe_match != 
_probe_indexs[i];
+        if (_parent->_last_probe_match == _probe_indexs[i]) {
+            continue;
+        }
         if (_build_indexs[i] == 0) {
             bool has_null_mark_value = _parent->_last_probe_null_mark == 
_probe_indexs[i];
-            if (not_matched_before) {
-                filter_map[i] = true;
-                mark_null_map[i] = has_null_mark_value || 
should_be_null_if_build_side_has_null;
-                mark_filter_data[i] = false;
-            }
-        } else {
-            if (mark_null_map[i]) { // is null
-                _parent->_last_probe_null_mark = _probe_indexs[i];
-            } else {
-                if (mark_filter_data[i] && not_matched_before) {
-                    _parent->_last_probe_match = _probe_indexs[i];
-                    filter_map[i] = true;
-                }
-            }
+            filter_map[i] = true;
+            mark_filter_data[i] = false;
+            mark_null_map[i] |= has_null_mark_value;
+        } else if (mark_null_map[i]) {
+            _parent->_last_probe_null_mark = _probe_indexs[i];
+        } else if (mark_filter_data[i]) {
+            filter_map[i] = true;
+            _parent->_last_probe_match = _probe_indexs[i];
         }
     }
 
@@ -677,7 +656,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_tab
 }
 
 template <int JoinOpType>
-template <bool need_judge_null, typename HashTableType>
+template <typename HashTableType>
 Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& 
hash_table_ctx,
                                                   vectorized::ConstNullMapPtr 
null_map,
                                                   vectorized::MutableBlock& 
mutable_block,
@@ -687,9 +666,9 @@ Status 
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
     Status res;
     std::visit(
             [&](auto is_mark_join, auto have_other_join_conjunct) {
-                res = do_process<need_judge_null, HashTableType, 
have_other_join_conjunct,
-                                 is_mark_join>(hash_table_ctx, null_map, 
mutable_block,
-                                               output_block, probe_rows);
+                res = do_process<HashTableType, have_other_join_conjunct, 
is_mark_join>(
+                        hash_table_ctx, null_map ? null_map->data() : nullptr, 
mutable_block,
+                        output_block, probe_rows);
             },
             vectorized::make_bool_variant(is_mark_join),
             vectorized::make_bool_variant(have_other_join_conjunct));
@@ -705,11 +684,7 @@ struct ExtractType<T(U)> {
 };
 
 #define INSTANTIATION(JoinOpType, T)                                           
                    \
-    template Status ProcessHashTableProbe<JoinOpType>::process<false, 
ExtractType<void(T)>::Type>( \
-            ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::ConstNullMapPtr null_map,     \
-            vectorized::MutableBlock & mutable_block, vectorized::Block * 
output_block,            \
-            uint32_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct);                \
-    template Status ProcessHashTableProbe<JoinOpType>::process<true, 
ExtractType<void(T)>::Type>(  \
+    template Status 
ProcessHashTableProbe<JoinOpType>::process<ExtractType<void(T)>::Type>(        \
             ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::ConstNullMapPtr null_map,     \
             vectorized::MutableBlock & mutable_block, vectorized::Block * 
output_block,            \
             uint32_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct);                \
diff --git a/be/src/vec/common/hash_table/join_hash_table.h 
b/be/src/vec/common/hash_table/join_hash_table.h
index 59463f0e3f4..ab501d67698 100644
--- a/be/src/vec/common/hash_table/join_hash_table.h
+++ b/be/src/vec/common/hash_table/join_hash_table.h
@@ -21,6 +21,8 @@
 
 #include <limits>
 
+#include "common/exception.h"
+#include "common/status.h"
 #include "vec/columns/column_filter_helper.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/hash_table_allocator.h"
@@ -70,6 +72,8 @@ public:
 
     std::vector<uint8_t>& get_visited() { return visited; }
 
+    bool empty_build_side() const { return _empty_build_side; }
+
     void build(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums, size_t num_elem,
                bool keep_null_key) {
         build_keys = keys;
@@ -81,63 +85,67 @@ public:
         if (!keep_null_key) {
             first[bucket_size] = 0; // index = bucket_size means null
         }
+        _keep_null_key = keep_null_key;
     }
 
-    template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join, 
bool need_judge_null>
+    template <int JoinOpType>
     auto find_batch(const Key* __restrict keys, const uint32_t* __restrict 
build_idx_map,
                     int probe_idx, uint32_t build_idx, int probe_rows,
                     uint32_t* __restrict probe_idxs, bool& probe_visited,
-                    uint32_t* __restrict build_idxs, bool 
has_mark_join_conjunct = false) {
-        if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-                      JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) {
-            if (_empty_build_side) {
-                return 
_process_null_aware_left_half_join_for_empty_build_side<JoinOpType>(
-                        probe_idx, probe_rows, probe_idxs, build_idxs);
-            }
+                    uint32_t* __restrict build_idxs, const uint8_t* null_map,
+                    bool with_other_conjuncts, bool is_mark_join, bool 
has_mark_join_conjunct) {
+        if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+             JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
+            _empty_build_side) {
+            return 
_process_null_aware_left_half_join_for_empty_build_side<JoinOpType>(
+                    probe_idx, probe_rows, probe_idxs, build_idxs);
         }
 
-        if constexpr (with_other_conjuncts ||
-                      (is_mark_join && JoinOpType != 
TJoinOp::RIGHT_SEMI_JOIN)) {
-            if constexpr (!with_other_conjuncts) {
-                constexpr bool is_null_aware_join =
-                        JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-                        JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN;
-                constexpr bool is_left_half_join = JoinOpType == 
TJoinOp::LEFT_SEMI_JOIN ||
-                                                   JoinOpType == 
TJoinOp::LEFT_ANTI_JOIN;
-
-                /// For null aware join or left half(semi/anti) join without 
other conjuncts and without
-                /// mark join conjunct.
-                /// If one row on probe side has one match in build side, we 
should stop searching the
-                /// hash table for this row.
-                if (is_null_aware_join || (is_left_half_join && 
!has_mark_join_conjunct)) {
-                    return _find_batch_conjunct<JoinOpType, need_judge_null, 
true>(
-                            keys, build_idx_map, probe_idx, build_idx, 
probe_rows, probe_idxs,
-                            build_idxs);
-                }
+        if (with_other_conjuncts) {
+            return _find_batch_conjunct<JoinOpType, false>(
+                    keys, build_idx_map, probe_idx, build_idx, probe_rows, 
probe_idxs, build_idxs);
+        }
+
+        if (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
+            bool is_null_aware_join = JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+                                      JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN;
+            bool is_left_half_join =
+                    JoinOpType == TJoinOp::LEFT_SEMI_JOIN || JoinOpType == 
TJoinOp::LEFT_ANTI_JOIN;
+
+            /// For null aware join or left half(semi/anti) join without other 
conjuncts and without
+            /// mark join conjunct.
+            /// If one row on probe side has one match in build side, we 
should stop searching the
+            /// hash table for this row.
+            if (is_null_aware_join || (is_left_half_join && 
!has_mark_join_conjunct)) {
+                return _find_batch_conjunct<JoinOpType, true>(keys, 
build_idx_map, probe_idx,
+                                                              build_idx, 
probe_rows, probe_idxs,
+                                                              build_idxs);
             }
 
-            return _find_batch_conjunct<JoinOpType, need_judge_null, false>(
+            return _find_batch_conjunct<JoinOpType, false>(
                     keys, build_idx_map, probe_idx, build_idx, probe_rows, 
probe_idxs, build_idxs);
         }
 
-        if constexpr (JoinOpType == TJoinOp::INNER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN ||
-                      JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
-                      JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
+        if (JoinOpType == TJoinOp::INNER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN ||
+            JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::RIGHT_OUTER_JOIN) {
             return _find_batch_inner_outer_join<JoinOpType>(keys, 
build_idx_map, probe_idx,
                                                             build_idx, 
probe_rows, probe_idxs,
                                                             probe_visited, 
build_idxs);
         }
-        if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
-                      JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
-                      JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-            return _find_batch_left_semi_anti<JoinOpType, need_judge_null>(
-                    keys, build_idx_map, probe_idx, probe_rows, probe_idxs);
+        if (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || JoinOpType == 
TJoinOp::LEFT_SEMI_JOIN ||
+            JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+            if (null_map) {
+                return _find_batch_left_semi_anti<JoinOpType, true>(
+                        keys, build_idx_map, probe_idx, probe_rows, 
probe_idxs, null_map);
+            } else {
+                return _find_batch_left_semi_anti<JoinOpType, false>(
+                        keys, build_idx_map, probe_idx, probe_rows, 
probe_idxs, nullptr);
+            }
         }
-        if constexpr (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN ||
-                      JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
+        if (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == 
TJoinOp::RIGHT_SEMI_JOIN) {
             return _find_batch_right_semi_anti(keys, build_idx_map, probe_idx, 
probe_rows);
         }
-        return std::tuple {0, 0U, 0U};
+        throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid hash join 
input");
     }
 
     /**
@@ -157,67 +165,16 @@ public:
                                               uint32_t* __restrict probe_idxs,
                                               uint32_t* __restrict build_idxs,
                                               uint8_t* __restrict null_flags,
-                                              bool picking_null_keys) {
-        uint32_t matched_cnt = 0;
-        const auto batch_size = max_batch_size;
-
-        auto do_the_probe = [&]() {
-            /// If no any rows match the probe key, here start to handle null 
keys in build side.
-            /// The result of "Any = null" is null.
-            if (build_idx == 0 && !picking_null_keys) {
-                build_idx = first[bucket_size];
-                picking_null_keys = true; // now pick null from build side
-            }
-
-            while (build_idx && matched_cnt < batch_size) {
-                if (picking_null_keys || keys[probe_idx] == 
build_keys[build_idx]) {
-                    build_idxs[matched_cnt] = build_idx;
-                    probe_idxs[matched_cnt] = probe_idx;
-                    null_flags[matched_cnt] = picking_null_keys;
-                    matched_cnt++;
-                }
-
-                build_idx = next[build_idx];
-
-                // If `build_idx` is 0, all matched keys are handled,
-                // now need to handle null keys in build side.
-                if (!build_idx && !picking_null_keys) {
-                    build_idx = first[bucket_size];
-                    picking_null_keys = true; // now pick null keys from build 
side
-                }
-            }
-
-            // may over batch_size when emplace 0 into build_idxs
-            if (!build_idx) {
-                probe_idxs[matched_cnt] = probe_idx;
-                build_idxs[matched_cnt] = 0;
-                picking_null_keys = false;
-                matched_cnt++;
-            }
-
-            probe_idx++;
-        };
-
-        if (build_idx) {
-            do_the_probe();
-        }
-
-        while (probe_idx < probe_rows && matched_cnt < batch_size) {
-            build_idx = build_idx_map[probe_idx];
-
-            /// If the probe key is null
-            if (build_idx == bucket_size) {
-                probe_idx++;
-                break;
-            }
-            do_the_probe();
-            if (picking_null_keys) {
-                break;
-            }
+                                              bool picking_null_keys, const 
uint8_t* null_map) {
+        if (null_map) {
+            return _find_null_aware_with_other_conjuncts_impl<true>(
+                    keys, build_idx_map, probe_idx, build_idx, probe_rows, 
probe_idxs, build_idxs,
+                    null_flags, picking_null_keys, null_map);
+        } else {
+            return _find_null_aware_with_other_conjuncts_impl<false>(
+                    keys, build_idx_map, probe_idx, build_idx, probe_rows, 
probe_idxs, build_idxs,
+                    null_flags, picking_null_keys, nullptr);
         }
-
-        probe_idx -= (build_idx != 0);
-        return std::tuple {probe_idx, build_idx, matched_cnt, 
picking_null_keys};
     }
 
     template <int JoinOpType, bool is_mark_join>
@@ -250,15 +207,11 @@ public:
 
     bool has_null_key() { return _has_null_key; }
 
-    void pre_build_idxs(std::vector<uint32>& buckets, const uint8_t* null_map) 
const {
-        if (null_map) {
-            for (unsigned int& bucket : buckets) {
-                bucket = bucket == bucket_size ? bucket_size : first[bucket];
-            }
-        } else {
-            for (unsigned int& bucket : buckets) {
-                bucket = first[bucket];
-            }
+    bool keep_null_key() { return _keep_null_key; }
+
+    void pre_build_idxs(std::vector<uint32>& buckets) const {
+        for (unsigned int& bucket : buckets) {
+            bucket = first[bucket];
         }
     }
 
@@ -267,8 +220,12 @@ private:
     auto _process_null_aware_left_half_join_for_empty_build_side(int 
probe_idx, int probe_rows,
                                                                  uint32_t* 
__restrict probe_idxs,
                                                                  uint32_t* 
__restrict build_idxs) {
-        static_assert(JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-                      JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN);
+        if (JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
+            JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            
"process_null_aware_left_half_join_for_empty_build_side meet invalid "
+                            "hash join input");
+        }
         uint32_t matched_cnt = 0;
         const auto batch_size = max_batch_size;
 
@@ -298,16 +255,17 @@ private:
         return std::tuple {probe_idx, 0U, 0U};
     }
 
-    template <int JoinOpType, bool need_judge_null>
+    template <int JoinOpType, bool has_null_map>
     auto _find_batch_left_semi_anti(const Key* __restrict keys,
                                     const uint32_t* __restrict build_idx_map, 
int probe_idx,
-                                    int probe_rows, uint32_t* __restrict 
probe_idxs) {
+                                    int probe_rows, uint32_t* __restrict 
probe_idxs,
+                                    const uint8_t* null_map) {
         uint32_t matched_cnt = 0;
         const auto batch_size = max_batch_size;
 
         while (probe_idx < probe_rows && matched_cnt < batch_size) {
-            if constexpr (need_judge_null) {
-                if (build_idx_map[probe_idx] == bucket_size) {
+            if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
has_null_map) {
+                if (null_map[probe_idx]) {
                     probe_idx++;
                     continue;
                 }
@@ -325,7 +283,7 @@ private:
         return std::tuple {probe_idx, 0U, matched_cnt};
     }
 
-    template <int JoinOpType, bool need_judge_null, bool 
only_need_to_match_one>
+    template <int JoinOpType, bool only_need_to_match_one>
     auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* 
__restrict build_idx_map,
                               int probe_idx, uint32_t build_idx, int 
probe_rows,
                               uint32_t* __restrict probe_idxs, uint32_t* 
__restrict build_idxs) {
@@ -341,14 +299,6 @@ private:
                         build_idxs[matched_cnt] = build_idx;
                         matched_cnt++;
                     }
-                } else if constexpr (need_judge_null) {
-                    if (build_idx == bucket_size) {
-                        build_idxs[matched_cnt] = build_idx;
-                        probe_idxs[matched_cnt] = probe_idx;
-                        build_idx = 0;
-                        matched_cnt++;
-                        break;
-                    }
                 }
 
                 if (keys[probe_idx] == build_keys[build_idx]) {
@@ -448,6 +398,76 @@ private:
         return std::tuple {probe_idx, build_idx, matched_cnt};
     }
 
+    template <bool has_null_map>
+    auto _find_null_aware_with_other_conjuncts_impl(
+            const Key* __restrict keys, const uint32_t* __restrict 
build_idx_map, int probe_idx,
+            uint32_t build_idx, int probe_rows, uint32_t* __restrict 
probe_idxs,
+            uint32_t* __restrict build_idxs, uint8_t* __restrict null_flags, 
bool picking_null_keys,
+            const uint8_t* null_map) {
+        uint32_t matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        auto do_the_probe = [&]() {
+            /// If no any rows match the probe key, here start to handle null 
keys in build side.
+            /// The result of "Any = null" is null.
+            if (build_idx == 0 && !picking_null_keys) {
+                build_idx = first[bucket_size];
+                picking_null_keys = true; // now pick null from build side
+            }
+
+            while (build_idx && matched_cnt < batch_size) {
+                if (picking_null_keys || keys[probe_idx] == 
build_keys[build_idx]) {
+                    build_idxs[matched_cnt] = build_idx;
+                    probe_idxs[matched_cnt] = probe_idx;
+                    null_flags[matched_cnt] = picking_null_keys;
+                    matched_cnt++;
+                }
+
+                build_idx = next[build_idx];
+
+                // If `build_idx` is 0, all matched keys are handled,
+                // now need to handle null keys in build side.
+                if (!build_idx && !picking_null_keys) {
+                    build_idx = first[bucket_size];
+                    picking_null_keys = true; // now pick null keys from build 
side
+                }
+            }
+
+            // may over batch_size when emplace 0 into build_idxs
+            if (!build_idx) {
+                probe_idxs[matched_cnt] = probe_idx;
+                build_idxs[matched_cnt] = 0;
+                picking_null_keys = false;
+                matched_cnt++;
+            }
+
+            probe_idx++;
+        };
+
+        if (build_idx) {
+            do_the_probe();
+        }
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            build_idx = build_idx_map[probe_idx];
+
+            /// If the probe key is null
+            if constexpr (has_null_map) {
+                if (null_map[probe_idx]) {
+                    probe_idx++;
+                    break;
+                }
+            }
+            do_the_probe();
+            if (picking_null_keys) {
+                break;
+            }
+        }
+
+        probe_idx -= (build_idx != 0);
+        return std::tuple {probe_idx, build_idx, matched_cnt, 
picking_null_keys};
+    }
+
     const Key* __restrict build_keys;
     std::vector<uint8_t> visited;
 
@@ -461,6 +481,7 @@ private:
     mutable uint32_t iter_idx = 1;
     vectorized::Arena* pool;
     bool _has_null_key = false;
+    bool _keep_null_key = false;
     bool _empty_build_side = true;
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to