This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_join
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2e0778a3d87f7a0af33dd11ed3e914ddfd922ed6
Author: BiteTheDDDDt <pxl...@qq.com>
AuthorDate: Mon Oct 16 18:43:26 2023 +0800

    merge block to single block on join/set node
    
    update
    
    update
    
    update
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 101 ++------
 be/src/pipeline/exec/hashjoin_build_sink.h         |   5 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   4 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |  13 +-
 be/src/pipeline/exec/set_sink_operator.h           |   2 +-
 be/src/pipeline/exec/set_source_operator.cpp       |   6 +-
 be/src/pipeline/pipeline_x/dependency.h            |   7 +-
 be/src/vec/common/hash_table/hash_map.h            |  67 ++++++
 be/src/vec/common/hash_table/hash_map_context.h    |   6 +-
 .../vec/common/hash_table/hash_table_set_build.h   |   9 +-
 be/src/vec/exec/join/join_op.h                     |  72 +++---
 be/src/vec/exec/join/process_hash_table_probe.h    |   9 +-
 .../vec/exec/join/process_hash_table_probe_impl.h  | 257 +++------------------
 be/src/vec/exec/join/vhash_join_node.cpp           |  72 ++----
 be/src/vec/exec/join/vhash_join_node.h             | 100 ++------
 be/src/vec/exec/vset_operation_node.cpp            |  69 +++---
 be/src/vec/exec/vset_operation_node.h              |   5 +-
 be/src/vec/runtime/shared_hash_table_controller.h  |  11 +-
 18 files changed, 256 insertions(+), 559 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 153882075b6..e7da32b0340 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -39,10 +39,7 @@ Overload(Callables&&... callables) -> Overload<Callables...>;
 
 
HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase* 
parent,
                                                          RuntimeState* state)
-        : JoinBuildSinkLocalState(parent, state),
-          _build_block_idx(0),
-          _build_side_mem_used(0),
-          _build_side_last_mem_used(0) {}
+        : JoinBuildSinkLocalState(parent, state) {}
 
 Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
@@ -52,13 +49,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
     if (p._is_broadcast_join && 
state->enable_share_hash_table_for_broadcast_join()) {
-        _shared_state->build_blocks = p._shared_hash_table_context->blocks;
-    } else {
-        _shared_state->build_blocks.reset(new 
std::vector<vectorized::Block>());
-        // avoid vector expand change block address.
-        // one block can store 4g data, _build_blocks can store 128*4g data.
-        // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
-        
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
+        _shared_state->build_block = p._shared_hash_table_context->block;
     }
     _shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join;
     _shared_state->store_null_in_hash_table = p._store_null_in_hash_table;
@@ -82,11 +73,6 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     if (!_should_build_hash_table) {
         _shared_hash_table_dependency->block_writing();
         p._shared_hashtable_controller->append_dependency(p.id(), 
_shared_hash_table_dependency);
-    } else if (p._is_broadcast_join) {
-        // avoid vector expand change block address.
-        // one block can store 4g data, _build_blocks can store 128*4g data.
-        // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
-        
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
     }
 
     _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
@@ -156,25 +142,24 @@ void 
HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
     _shared_state->short_circuit_for_probe =
             (_shared_state->_has_null_in_build_side &&
              p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!p._is_mark_join) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::INNER_JOIN &&
+            (!_shared_state->build_block && p._join_op == TJoinOp::INNER_JOIN 
&&
              !p._is_mark_join) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::LEFT_SEMI_JOIN &&
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::LEFT_SEMI_JOIN &&
              !p._is_mark_join) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::RIGHT_ANTI_JOIN);
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN) ||
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_ANTI_JOIN);
 
     //when build table rows is 0 and not have other_join_conjunct and not 
_is_mark_join and join type is one of 
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
     //we could get the result is probe table + null-column(if need output)
     _shared_state->empty_right_table_need_probe_dispose =
-            (_shared_state->build_blocks->empty() && 
!p._have_other_join_conjunct &&
-             !p._is_mark_join) &&
+            (!_shared_state->build_block && !p._have_other_join_conjunct && 
!p._is_mark_join) &&
             (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == 
TJoinOp::FULL_OUTER_JOIN ||
              p._join_op == TJoinOp::LEFT_ANTI_JOIN);
 }
 
 Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
-                                                        vectorized::Block& 
block, uint8_t offset) {
+                                                        vectorized::Block& 
block) {
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     SCOPED_TIMER(_build_table_timer);
     size_t rows = block.rows();
@@ -220,7 +205,7 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                         vectorized::ProcessHashTableBuild<HashTableCtxType,
                                                           
HashJoinBuildSinkLocalState>
                                 hash_table_build_process(rows, block, 
raw_ptrs, this,
-                                                         state->batch_size(), 
offset, state);
+                                                         state->batch_size(), 
state);
                         return hash_table_build_process
                                 .template run<has_null_value, 
short_circuit_for_null_in_build_side>(
                                         arg,
@@ -321,7 +306,7 @@ void 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
                     }
                     return;
                 }
-                if (!try_get_hash_map_context_fixed<PartitionedHashMap, 
HashCRC32, RowRefListType>(
+                if (!try_get_hash_map_context_fixed<JoinFixedHashMap, 
HashCRC32, RowRefListType>(
                             *_shared_state->hash_table_variants, 
_build_expr_ctxs)) {
                     _shared_state->hash_table_variants
                             
->emplace<vectorized::SerializedHashTableContext<RowRefListType>>();
@@ -331,16 +316,6 @@ void 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
             vectorized::make_bool_variant(p._have_other_join_conjunct));
 
     
DCHECK(!std::holds_alternative<std::monostate>(*_shared_state->hash_table_variants));
-
-    std::visit(vectorized::Overload {[&](std::monostate& arg) {
-                                         LOG(FATAL) << "FATAL: uninited hash 
table";
-                                         __builtin_unreachable();
-                                     },
-                                     [&](auto&& arg) {
-                                         
arg.hash_table->set_partitioned_threshold(
-                                                 
state->partitioned_hash_join_rows_threshold());
-                                     }},
-               *_shared_state->hash_table_variants);
 }
 
 HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, const 
TPlanNode& tnode,
@@ -402,9 +377,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
-    // make one block for each 4 gigabytes
-    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
-
     if (local_state._shared_state->_has_null_in_build_side) {
         // TODO: if _has_null_in_build_side is true we should finish current 
pipeline task.
         DCHECK(state->enable_pipeline_exec());
@@ -417,52 +389,29 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
         if (in_block->rows() != 0) {
             SCOPED_TIMER(local_state._build_side_merge_block_timer);
+            if (local_state._build_side_mutable_block.empty()) {
+                RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(
+                        *(in_block->create_same_struct_block(1, false))));
+            }
             
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(*in_block));
-        }
-
-        if (UNLIKELY(local_state._build_side_mem_used - 
local_state._build_side_last_mem_used >
-                     BUILD_BLOCK_MAX_SIZE)) {
-            if (local_state._shared_state->build_blocks->size() ==
-                vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
+            if (local_state._build_side_mutable_block.rows() >
+                std::numeric_limits<uint32_t>::max()) {
+                return Status::NotSupported(
+                        "Hash join do not support build table rows"
+                        " over:" +
+                        std::to_string(std::numeric_limits<uint32_t>::max()));
             }
-            local_state._shared_state->build_blocks->emplace_back(
-                    local_state._build_side_mutable_block.to_block());
-
-            COUNTER_UPDATE(local_state._build_blocks_memory_usage,
-                           
(*local_state._shared_state->build_blocks)[local_state._build_block_idx]
-                                   .bytes());
-
-            // TODO:: Rethink may we should do the process after we receive 
all build blocks ?
-            // which is better.
-            RETURN_IF_ERROR(local_state.process_build_block(
-                    state, 
(*local_state._shared_state->build_blocks)[local_state._build_block_idx],
-                    local_state._build_block_idx));
-
-            local_state._build_side_mutable_block = vectorized::MutableBlock();
-            ++local_state._build_block_idx;
-            local_state._build_side_last_mem_used = 
local_state._build_side_mem_used;
         }
     }
 
     if (local_state._should_build_hash_table && source_state == 
SourceState::FINISHED) {
         if (!local_state._build_side_mutable_block.empty()) {
-            if (local_state._shared_state->build_blocks->size() ==
-                vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
-            }
-            local_state._shared_state->build_blocks->emplace_back(
+            local_state._shared_state->build_block = 
std::make_shared<vectorized::Block>(
                     local_state._build_side_mutable_block.to_block());
             COUNTER_UPDATE(local_state._build_blocks_memory_usage,
-                           
(*local_state._shared_state->build_blocks)[local_state._build_block_idx]
-                                   .bytes());
+                           (*local_state._shared_state->build_block).bytes());
             RETURN_IF_ERROR(local_state.process_build_block(
-                    state, 
(*local_state._shared_state->build_blocks)[local_state._build_block_idx],
-                    local_state._build_block_idx));
+                    state, (*local_state._shared_state->build_block)));
         }
         auto ret = std::visit(
                 Overload {[&](std::monostate&) -> Status {
@@ -552,7 +501,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     if (source_state == SourceState::FINISHED) {
         // Since the comparison of null values is meaningless, null aware left 
anti join should not output null
         // when the build side is not empty.
-        if (!local_state._shared_state->build_blocks->empty() &&
+        if (!local_state._shared_state->build_block &&
             _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             local_state._shared_state->probe_ignore_null = true;
         }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 9b43f95cd3b..9cf559588cc 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -61,11 +61,11 @@ public:
     ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState);
     using Parent = HashJoinBuildSinkOperatorX;
     HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state);
-    ~HashJoinBuildSinkLocalState() = default;
+    ~HashJoinBuildSinkLocalState() override = default;
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
-    Status process_build_block(RuntimeState* state, vectorized::Block& block, 
uint8_t offset);
+    Status process_build_block(RuntimeState* state, vectorized::Block& block);
 
     void init_short_circuit_for_probe();
     HashJoinBuildSinkOperatorX* join_build() { return 
(HashJoinBuildSinkOperatorX*)_parent; }
@@ -94,7 +94,6 @@ protected:
 
     std::vector<IRuntimeFilter*> _runtime_filters;
     bool _should_build_hash_table = true;
-    uint8_t _build_block_idx = 0;
     int64_t _build_side_mem_used = 0;
     int64_t _build_side_last_mem_used = 0;
     vectorized::MutableBlock _build_side_mutable_block;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 082f45199c1..14503a90357 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -89,9 +89,7 @@ public:
     vectorized::DataTypes right_table_data_types();
     vectorized::DataTypes left_table_data_types();
     bool* has_null_in_build_side() { return 
&_shared_state->_has_null_in_build_side; }
-    std::shared_ptr<std::vector<vectorized::Block>> build_blocks() const {
-        return _shared_state->build_blocks;
-    }
+    std::shared_ptr<vectorized::Block> build_block() const { return 
_shared_state->build_block; }
 
 private:
     void _prepare_probe_block();
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 6725deffa14..a982b079763 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -60,8 +60,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
     auto& mem_used = local_state._shared_state->mem_used;
-    auto& build_blocks = local_state._shared_state->build_blocks;
-    auto& build_block_index = local_state._shared_state->build_block_index;
+    auto& build_block = local_state._shared_state->build_block;
     auto& valid_element_in_hash_tbl = 
local_state._shared_state->valid_element_in_hash_tbl;
 
     if (in_block->rows() != 0) {
@@ -71,11 +70,9 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
 
     if (source_state == SourceState::FINISHED ||
         local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
-        build_blocks.emplace_back(local_state._mutable_block.to_block());
-        RETURN_IF_ERROR(_process_build_block(local_state, 
build_blocks[build_block_index],
-                                             build_block_index, state));
+        build_block = local_state._mutable_block.to_block();
+        RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
         local_state._mutable_block.clear();
-        ++build_block_index;
 
         if (source_state == SourceState::FINISHED) {
             if constexpr (is_intersect) {
@@ -101,7 +98,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
 
 template <bool is_intersect>
 Status SetSinkOperatorX<is_intersect>::_process_build_block(
-        SetSinkLocalState<is_intersect>& local_state, vectorized::Block& 
block, uint8_t offset,
+        SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
         RuntimeState* state) {
     size_t rows = block.rows();
     if (rows == 0) {
@@ -117,7 +114,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     vectorized::HashTableBuild<HashTableCtxType, is_intersect>
-                            hash_table_build_process(&local_state, rows, 
raw_ptrs, offset, state);
+                            hash_table_build_process(&local_state, rows, 
raw_ptrs, state);
                     static_cast<void>(hash_table_build_process(arg, 
local_state._arena));
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 5383b1b3a55..1250433dacc 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -119,7 +119,7 @@ private:
     friend struct HashTableBuild;
 
     Status _process_build_block(SetSinkLocalState<is_intersect>& local_state,
-                                vectorized::Block& block, uint8_t offset, 
RuntimeState* state);
+                                vectorized::Block& block, RuntimeState* state);
     Status _extract_build_column(SetSinkLocalState<is_intersect>& local_state,
                                  vectorized::Block& block, 
vectorized::ColumnRawPtrs& raw_ptrs);
 
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index 6baf084e94a..b9a3433c159 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -176,12 +176,12 @@ void 
SetSourceOperatorX<is_intersect>::_add_result_columns(
         SetSourceLocalState<is_intersect>& local_state, 
vectorized::RowRefListWithFlags& value,
         int& block_size) {
     auto& build_col_idx = local_state._shared_state->build_col_idx;
-    auto& build_blocks = local_state._shared_state->build_blocks;
+    auto& build_block = local_state._shared_state->build_block;
 
     auto it = value.begin();
     for (auto idx = build_col_idx.begin(); idx != build_col_idx.end(); ++idx) {
-        auto& column = 
*build_blocks[it->block_offset].get_by_position(idx->first).column;
-        if (local_state._mutable_cols[idx->second]->is_nullable() xor 
column.is_nullable()) {
+        auto& column = *build_block.get_by_position(idx->first).column;
+        if (local_state._mutable_cols[idx->second]->is_nullable() ^ 
column.is_nullable()) {
             DCHECK(local_state._mutable_cols[idx->second]->is_nullable());
             
((vectorized::ColumnNullable*)(local_state._mutable_cols[idx->second].get()))
                     ->insert_from_not_nullable(column, it->row_num);
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 79089c48ed9..d44692909d0 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -577,7 +577,7 @@ struct HashJoinSharedState : public JoinSharedState {
             std::make_shared<vectorized::HashTableVariants>();
     const std::vector<TupleDescriptor*> build_side_child_desc;
     size_t build_exprs_size = 0;
-    std::shared_ptr<std::vector<vectorized::Block>> build_blocks = nullptr;
+    std::shared_ptr<vectorized::Block> build_block;
     bool probe_ignore_null = false;
 };
 
@@ -670,8 +670,7 @@ struct SetSharedState {
     /// default init
     //record memory during running
     int64_t mem_used = 0;
-    std::vector<vectorized::Block> build_blocks; // build to source
-    int build_block_index = 0;                   // build to source
+    vectorized::Block build_block; // build to source
     //record element size in hashtable
     int64_t valid_element_in_hash_tbl = 0;
     //first:column_id, could point to origin column or cast column
@@ -742,7 +741,7 @@ public:
             return;
         }
 
-        if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32,
+        if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32,
                                             vectorized::RowRefListWithFlags>(
                     *hash_table_variants, child_exprs_lists[0])) {
             hash_table_variants->emplace<
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 5b7cd6f4642..85110deba62 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -20,6 +20,8 @@
 
 #pragma once
 
+#include <span>
+
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/hash_table.h"
 #include "vec/common/hash_table/hash_table_allocator.h"
@@ -193,10 +195,75 @@ public:
     bool has_null_key_data() const { return false; }
 };
 
+template <typename Key, typename Cell, typename Hash = DefaultHash<Key>,
+          typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
+class JoinHashMapTable : public HashMapTable<Key, Cell, Hash, Grower, 
Allocator> {
+public:
+    using Self = JoinHashMapTable;
+    using Base = HashMapTable<Key, Cell, Hash, Grower, Allocator>;
+
+    using key_type = Key;
+    using value_type = typename Cell::value_type;
+    using mapped_type = typename Cell::Mapped;
+
+    using LookupResult = typename Base::LookupResult;
+
+    using HashMapTable<Key, Cell, Hash, Grower, Allocator>::HashMapTable;
+
+    static uint32_t calc_bucket_size(size_t num_elem) {
+        size_t expect_bucket_size = static_cast<size_t>(num_elem) + (num_elem 
- 1) / 7;
+        return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1;
+    }
+
+    void build(const Key* __restrict keys, const size_t* __restrict 
hash_values, int num_elem) {
+        bucket_size = calc_bucket_size(num_elem + 1);
+        first.resize(bucket_size, 0);
+        next.resize(num_elem);
+
+        build_keys = keys;
+        for (size_t i = 1; i < num_elem; i++) {
+            uint32_t bucket_num = hash_values[i] & (bucket_size - 1);
+            next[i] = first[bucket_num];
+            first[bucket_num] = i;
+        }
+    }
+
+    auto find_batch(const Key* __restrict keys, const size_t* __restrict 
hash_values, int probe_idx,
+                    int probe_rows, std::vector<uint32_t>& probe_idxs,
+                    std::vector<int>& build_idxs) {
+        auto matched_cnt = 0;
+        while (probe_idx < probe_rows && matched_cnt < 4096) {
+            uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1);
+            auto build_idx = first[bucket_num];
+            while (build_idx) {
+                if (keys[probe_idx] == build_keys[build_idx]) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = build_idx;
+                    matched_cnt++;
+                }
+                build_idx = next[build_idx];
+            }
+            probe_idx++;
+        }
+        return std::pair {probe_idx, matched_cnt};
+    }
+
+private:
+    const Key* __restrict build_keys;
+    uint32_t bucket_size = 0;
+    std::vector<uint32_t> first;
+    std::vector<uint32_t> next;
+    Cell cell;
+    doris::vectorized::Arena* pool;
+};
+
 template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
           typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
 using HashMap = HashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, 
Grower, Allocator>;
 
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>>
+using JoinFixedHashMap = JoinHashMapTable<Key, HashMapCell<Key, Mapped, Hash>, 
Hash>;
+
 template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
           typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
 using HashMapWithSavedHash =
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index f40a351f9d8..0b1748a9723 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -478,14 +478,14 @@ struct MethodSingleNullableColumn : public 
SingleColumnMethod {
 };
 
 template <typename RowRefListType>
-using SerializedHashTableContext = 
MethodSerialized<PartitionedHashMap<StringRef, RowRefListType>>;
+using SerializedHashTableContext = 
MethodSerialized<JoinFixedHashMap<StringRef, RowRefListType>>;
 
 template <class T, typename RowRefListType>
 using PrimaryTypeHashTableContext =
-        MethodOneNumber<T, PartitionedHashMap<T, RowRefListType, 
HashCRC32<T>>>;
+        MethodOneNumber<T, JoinFixedHashMap<T, RowRefListType, HashCRC32<T>>>;
 
 template <class Key, bool has_null, typename Value>
 using FixedKeyHashTableContext =
-        MethodKeysFixed<PartitionedHashMap<Key, Value, HashCRC32<Key>>, 
has_null>;
+        MethodKeysFixed<JoinFixedHashMap<Key, Value, HashCRC32<Key>>, 
has_null>;
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h 
b/be/src/vec/common/hash_table/hash_table_set_build.h
index e3c1ed27b1f..48c9e2d1673 100644
--- a/be/src/vec/common/hash_table/hash_table_set_build.h
+++ b/be/src/vec/common/hash_table/hash_table_set_build.h
@@ -24,11 +24,9 @@ namespace doris::vectorized {
 template <class HashTableContext, bool is_intersect>
 struct HashTableBuild {
     template <typename Parent>
-    HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, 
uint8_t offset,
-                   RuntimeState* state)
+    HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, 
RuntimeState* state)
             : _mem_used(parent->mem_used()),
               _rows(rows),
-              _offset(offset),
               _build_raw_ptrs(build_raw_ptrs),
               _state(state) {}
 
@@ -48,9 +46,9 @@ struct HashTableBuild {
         size_t k = 0;
         auto creator = [&](const auto& ctor, auto& key, auto& origin) {
             HashTableContext::try_presis_key(key, origin, arena);
-            ctor(key, Mapped {k, _offset});
+            ctor(key, Mapped {k});
         };
-        auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; 
};
+        auto creator_for_null_key = [&](auto& mapped) { mapped = {k}; };
 
         for (; k < _rows; ++k) {
             if (k % CHECK_FRECUENCY == 0) {
@@ -64,7 +62,6 @@ struct HashTableBuild {
 private:
     int64_t* _mem_used;
     const int _rows;
-    const uint8_t _offset;
     ColumnRawPtrs& _build_raw_ptrs;
     RuntimeState* _state;
 };
diff --git a/be/src/vec/exec/join/join_op.h b/be/src/vec/exec/join/join_op.h
index 1b8b8f2c695..858f5197b03 100644
--- a/be/src/vec/exec/join/join_op.h
+++ b/be/src/vec/exec/join/join_op.h
@@ -18,7 +18,6 @@
 #pragma once
 #include "vec/common/arena.h"
 #include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash_map.h"
 #include "vec/core/block.h"
 
 namespace doris::vectorized {
@@ -45,19 +44,19 @@ namespace doris::vectorized {
  */
 struct RowRef {
     uint32_t row_num = 0;
-    uint8_t block_offset;
 
     RowRef() = default;
-    RowRef(size_t row_num_count, uint8_t block_offset_)
-            : row_num(row_num_count), block_offset(block_offset_) {}
+    RowRef(size_t row_num_count) : row_num(row_num_count) {}
+    void clear() {};
 };
 
 struct RowRefWithFlag : public RowRef {
     bool visited;
 
     RowRefWithFlag() = default;
-    RowRefWithFlag(size_t row_num_count, uint8_t block_offset_, bool 
is_visited = false)
-            : RowRef(row_num_count, block_offset_), visited(is_visited) {}
+    RowRefWithFlag(size_t row_num_count, bool is_visited = false)
+            : RowRef(row_num_count), visited(is_visited) {}
+    void clear() {};
 };
 
 /// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized.
@@ -93,14 +92,15 @@ public:
     ForwardIterator() : root(nullptr), first(false), batch(nullptr), 
position(0) {}
 
     ForwardIterator(RowRefListType* begin)
-            : root(begin), first(true), batch(root->next), position(0) {}
+            : root(begin), first(true), batch((&root->next)), position(0) {}
 
     RowRefType& operator*() {
         if (first) {
             return *root;
         }
-        return batch->row_refs[position];
+        return batch->operator[](position);
     }
+
     RowRefType* operator->() { return &(**this); }
 
     void operator++() {
@@ -109,21 +109,17 @@ public:
             return;
         }
 
-        if (batch) {
+        if (batch && position < batch->size()) {
             ++position;
-            if (position >= batch->size) {
-                batch = batch->next;
-                position = 0;
-            }
         }
     }
 
-    bool ok() const { return first || batch; }
+    bool ok() const { return first || (batch && position < batch->size()); }
 
 private:
     RowRefListType* root;
     bool first;
-    Batch<RowRefType>* batch;
+    std::vector<RowRefType>* batch;
     size_t position;
 };
 
@@ -131,76 +127,60 @@ struct RowRefList : RowRef {
     using RowRefType = RowRef;
 
     RowRefList() = default;
-    RowRefList(size_t row_num_, uint8_t block_offset_) : RowRef(row_num_, 
block_offset_) {}
+    RowRefList(size_t row_num_) : RowRef(row_num_) {}
 
     ForwardIterator<RowRefList> begin() { return 
ForwardIterator<RowRefList>(this); }
 
     /// insert element after current one
-    void insert(RowRefType&& row_ref, Arena& pool) {
-        if (!next) {
-            next = pool.alloc<Batch<RowRefType>>();
-            *next = Batch<RowRefType>(nullptr);
-        }
-        next = next->insert(std::move(row_ref), pool);
-    }
+    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(std::move(row_ref)); }
+
+    void clear() { next.clear(); }
 
 private:
     friend class ForwardIterator<RowRefList>;
-
-    Batch<RowRefType>* next = nullptr;
+    std::vector<RowRefType> next;
 };
 
 struct RowRefListWithFlag : RowRef {
     using RowRefType = RowRef;
 
     RowRefListWithFlag() = default;
-    RowRefListWithFlag(size_t row_num_, uint8_t block_offset_) : 
RowRef(row_num_, block_offset_) {}
+    RowRefListWithFlag(size_t row_num_) : RowRef(row_num_) {}
 
     ForwardIterator<RowRefListWithFlag> const begin() {
         return ForwardIterator<RowRefListWithFlag>(this);
     }
 
     /// insert element after current one
-    void insert(RowRef&& row_ref, Arena& pool) {
-        if (!next) {
-            next = pool.alloc<Batch<RowRefType>>();
-            *next = Batch<RowRefType>(nullptr);
-        }
-        next = next->insert(std::move(row_ref), pool);
-    }
+    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(row_ref); }
+
+    void clear() { next.clear(); }
 
     bool visited = false;
 
 private:
     friend class ForwardIterator<RowRefListWithFlag>;
-
-    Batch<RowRefType>* next = nullptr;
+    std::vector<RowRefType> next;
 };
 
 struct RowRefListWithFlags : RowRefWithFlag {
     using RowRefType = RowRefWithFlag;
 
     RowRefListWithFlags() = default;
-    RowRefListWithFlags(size_t row_num_, uint8_t block_offset_)
-            : RowRefWithFlag(row_num_, block_offset_) {}
+    RowRefListWithFlags(size_t row_num_) : RowRefWithFlag(row_num_) {}
 
     ForwardIterator<RowRefListWithFlags> const begin() {
         return ForwardIterator<RowRefListWithFlags>(this);
     }
 
     /// insert element after current one
-    void insert(RowRefWithFlag&& row_ref, Arena& pool) {
-        if (!next) {
-            next = pool.alloc<Batch<RowRefType>>();
-            *next = Batch<RowRefType>(nullptr);
-        }
-        next = next->insert(std::move(row_ref), pool);
-    }
+    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(row_ref); }
+
+    void clear() { next.clear(); }
 
 private:
     friend class ForwardIterator<RowRefListWithFlags>;
-
-    Batch<RowRefType>* next = nullptr;
+    std::vector<RowRefType> next;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index 435cea84186..8aea6492e77 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -75,7 +75,7 @@ struct ProcessHashTableProbe {
                                                UInt8* __restrict null_map_data,
                                                UInt8* __restrict filter_map, 
Block* output_block);
 
-    void _emplace_element(int8_t block_offset, int32_t block_row, int& 
current_offset);
+    void _emplace_element(int32_t block_row, int& current_offset);
 
     template <typename HashTableType>
     typename HashTableType::State _init_probe_side(HashTableType& 
hash_table_ctx, size_t probe_rows,
@@ -94,14 +94,13 @@ struct ProcessHashTableProbe {
 
     Parent* _parent;
     const int _batch_size;
-    std::shared_ptr<std::vector<Block>> _build_blocks;
+    std::shared_ptr<Block> _build_block;
     std::unique_ptr<Arena> _arena;
     std::vector<StringRef> _probe_keys;
 
     std::vector<uint32_t> _probe_indexs;
-    PaddedPODArray<int8_t> _build_block_offsets;
-    PaddedPODArray<int32_t> _build_block_rows;
-    std::vector<std::pair<int8_t, int>> _build_blocks_locs;
+    std::vector<int> _build_block_rows;
+    std::vector<int> _build_blocks_locs;
     // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
     ColumnUInt8::Container* _tuple_is_null_left_flags;
     // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index f4a3010c49f..dd6f79b9eb4 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -32,7 +32,7 @@ template <int JoinOpType, typename Parent>
 ProcessHashTableProbe<JoinOpType, Parent>::ProcessHashTableProbe(Parent* 
parent, int batch_size)
         : _parent(parent),
           _batch_size(batch_size),
-          _build_blocks(parent->build_blocks()),
+          _build_block(parent->build_block()),
           _tuple_is_null_left_flags(parent->is_outer_join()
                                             ? &(reinterpret_cast<ColumnUInt8&>(
                                                         
*parent->_tuple_is_null_left_flag_column)
@@ -69,51 +69,13 @@ void ProcessHashTableProbe<JoinOpType, 
Parent>::build_side_output_column(
             JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN;
 
     if (!is_semi_anti_join || have_other_join_conjunct) {
-        if (_build_blocks->size() == 1) {
-            for (int i = 0; i < _right_col_len; i++) {
-                auto& column = *(*_build_blocks)[0].get_by_position(i).column;
-                if (output_slot_flags[i]) {
-                    mcol[i + _right_col_idx]->insert_indices_from(column, 
_build_block_rows.data(),
-                                                                  
_build_block_rows.data() + size);
-                } else {
-                    mcol[i + _right_col_idx]->insert_many_defaults(size);
-                }
-            }
-        } else {
-            for (int i = 0; i < _right_col_len; i++) {
-                if (output_slot_flags[i]) {
-                    for (int j = 0; j < size; j++) {
-                        if constexpr (probe_all) {
-                            if (_build_block_offsets[j] == -1) {
-                                DCHECK(mcol[i + 
_right_col_idx]->is_nullable());
-                                assert_cast<ColumnNullable*>(mcol[i + 
_right_col_idx].get())
-                                        ->insert_default();
-                            } else {
-                                auto& column = 
*(*_build_blocks)[_build_block_offsets[j]]
-                                                        .get_by_position(i)
-                                                        .column;
-                                mcol[i + _right_col_idx]->insert_from(column, 
_build_block_rows[j]);
-                            }
-                        } else {
-                            if (_build_block_offsets[j] == -1) {
-                                // the only case to reach here:
-                                // 1. left anti join with other conjuncts, and
-                                // 2. equal conjuncts does not match
-                                // since nullptr is emplaced back to 
visited_map,
-                                // the output value of the build side does not 
matter,
-                                // just insert default value
-                                mcol[i + _right_col_idx]->insert_default();
-                            } else {
-                                auto& column = 
*(*_build_blocks)[_build_block_offsets[j]]
-                                                        .get_by_position(i)
-                                                        .column;
-                                mcol[i + _right_col_idx]->insert_from(column, 
_build_block_rows[j]);
-                            }
-                        }
-                    }
-                } else {
-                    mcol[i + _right_col_idx]->insert_many_defaults(size);
-                }
+        for (int i = 0; i < _right_col_len; i++) {
+            const auto& column = *_build_block->get_by_position(i).column;
+            if (output_slot_flags[i]) {
+                mcol[i + _right_col_idx]->insert_indices_from(column, 
_build_block_rows.data(),
+                                                              
_build_block_rows.data() + size);
+            } else {
+                mcol[i + _right_col_idx]->insert_many_defaults(size);
             }
         }
     }
@@ -167,7 +129,6 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
     _row_count_from_last_probe = 0;
 
     _build_block_rows.clear();
-    _build_block_offsets.clear();
     _probe_indexs.clear();
     if (with_other_join_conjuncts) {
         // use in right join to change visited state after exec the vother 
join conjunct
@@ -178,7 +139,6 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
     }
     _probe_indexs.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
     _build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
-    _build_block_offsets.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
 
     if (!_parent->_ready_probe) {
         _parent->_ready_probe = true;
@@ -199,8 +159,7 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, 
Parent>::_probe_row_m
 
     SCOPED_TIMER(_search_hashtable_timer);
     for (; probe_row_match_iter.ok() && current_offset < _batch_size; 
++probe_row_match_iter) {
-        _emplace_element(probe_row_match_iter->block_offset, 
probe_row_match_iter->row_num,
-                         current_offset);
+        _emplace_element(probe_row_match_iter->row_num, current_offset);
         _probe_indexs.emplace_back(probe_index);
         if constexpr (with_other_join_conjuncts) {
             _visited_map.emplace_back(&probe_row_match_iter->visited);
@@ -218,10 +177,8 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, 
Parent>::_probe_row_m
 }
 
 template <int JoinOpType, typename Parent>
-void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int8_t 
block_offset,
-                                                                 int32_t 
block_row,
+void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int32_t 
block_row,
                                                                  int& 
current_offset) {
-    _build_block_offsets.emplace_back(block_offset);
     _build_block_rows.emplace_back(block_row);
     current_offset++;
 }
@@ -236,21 +193,13 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
                                                              size_t 
probe_rows) {
     auto& probe_index = _parent->_probe_index;
 
-    using KeyGetter = typename HashTableType::State;
     using Mapped = typename HashTableType::Mapped;
 
-    KeyGetter key_getter =
-            _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, 
with_other_conjuncts,
-                                            need_null_map_for_probe ? 
null_map->data() : nullptr);
+    _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, 
with_other_conjuncts,
+                                    need_null_map_for_probe ? null_map->data() 
: nullptr);
 
     auto& mcol = mutable_block.mutable_columns();
 
-    constexpr auto is_right_semi_anti_join =
-            JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == 
TJoinOp::RIGHT_SEMI_JOIN;
-
-    constexpr auto probe_all =
-            JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN;
-
     int last_probe_index = probe_index;
 
     int current_offset = 0;
@@ -283,121 +232,12 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
 
     {
         SCOPED_TIMER(_search_hashtable_timer);
-        using FindResult = KeyGetter::FindResult;
-        FindResult empty = {nullptr, false};
-        while (current_offset < _batch_size && probe_index < probe_rows) {
-            if constexpr (ignore_null && need_null_map_for_probe) {
-                if ((*null_map)[probe_index]) {
-                    if constexpr (probe_all) {
-                        // only full outer / left outer need insert the data 
of right table
-                        _emplace_element(-1, -1, current_offset);
-                        _probe_indexs.emplace_back(probe_index);
-
-                        if constexpr (with_other_conjuncts) {
-                            _same_to_prev.emplace_back(false);
-                            _visited_map.emplace_back(nullptr);
-                        }
-                    } else {
-                        all_match_one = false;
-                    }
-                    probe_index++;
-                    continue;
-                }
-            }
-
-            const auto& find_result = need_null_map_for_probe && 
(*null_map)[probe_index]
-                                              ? empty
-                                              : 
hash_table_ctx.find(key_getter, probe_index);
-
-            auto current_probe_index = probe_index;
-            if constexpr (!with_other_conjuncts &&
-                          (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
-                           JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-                           JoinOpType == TJoinOp::LEFT_SEMI_JOIN)) {
-                bool need_go_ahead =
-                        (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ 
find_result.is_found();
-                if constexpr (is_mark_join) {
-                    ++current_offset;
-                    bool null_result = (need_null_map_for_probe && 
(*null_map)[probe_index]) ||
-                                       (!need_go_ahead && 
*_has_null_in_build_side);
-                    if (null_result) {
-                        mark_column->insert_null();
-                    } else {
-                        mark_column->insert_value(need_go_ahead);
-                    }
-                } else {
-                    current_offset += need_go_ahead;
-                }
-                ++probe_index;
-            } else {
-                if (find_result.is_found()) {
-                    auto& mapped = find_result.get_mapped();
-                    auto origin_offset = current_offset;
-
-                    // For mark join, if euqual-matched tuple count for one 
probe row
-                    // excceeds batch size, it's difficult to implement the 
logic to
-                    // split them into multiple sub blocks and handle them, 
keep the original
-                    // logic for now.
-                    if constexpr (is_mark_join && with_other_conjuncts) {
-                        for (auto it = mapped.begin(); it.ok(); ++it) {
-                            _emplace_element(it->block_offset, it->row_num, 
current_offset);
-                            _visited_map.emplace_back(&it->visited);
-                        }
-                        ++probe_index;
-                    } else if constexpr (with_other_conjuncts || 
!is_right_semi_anti_join) {
-                        auto multi_match_last_offset = current_offset;
-                        auto it = mapped.begin();
-                        for (; it.ok() && current_offset < _batch_size; ++it) {
-                            _emplace_element(it->block_offset, it->row_num, 
current_offset);
-
-                            if constexpr (with_other_conjuncts) {
-                                _visited_map.emplace_back(&it->visited);
-                            }
-                        }
-                        probe_row_match_iter = it;
-                        if (!it.ok()) {
-                            // If all matched rows for the current probe row 
are handled,
-                            // advance to next probe row.
-                            // If not(which means it excceed batch size), 
probe_index is not increased and
-                            // remaining matched rows for the current probe 
row will be
-                            // handled in the next call of this function
-                            ++probe_index;
-                        } else if constexpr (with_other_conjuncts) {
-                            multi_matched_output_row_count =
-                                    current_offset - multi_match_last_offset;
-                        }
-                    } else {
-                        ++probe_index;
-                    }
-                    if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
-                        mapped.visited = true;
-                    }
-
-                    if constexpr (with_other_conjuncts) {
-                        _same_to_prev.emplace_back(false);
-                        for (int i = 0; i < current_offset - origin_offset - 
1; ++i) {
-                            _same_to_prev.emplace_back(true);
-                        }
-                    }
-                } else if constexpr (probe_all || JoinOpType == 
TJoinOp::LEFT_ANTI_JOIN ||
-                                     JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-                                     (JoinOpType == TJoinOp::LEFT_SEMI_JOIN && 
is_mark_join)) {
-                    // only full outer / left outer need insert the data of 
right table
-                    _emplace_element(-1, -1, current_offset);
-
-                    if constexpr (with_other_conjuncts) {
-                        _same_to_prev.emplace_back(false);
-                        _visited_map.emplace_back(nullptr);
-                    }
-                    ++probe_index;
-                } else {
-                    ++probe_index;
-                }
-            }
-            all_match_one &= (current_offset == _probe_indexs.size() + 1);
-            _probe_indexs.resize(current_offset, current_probe_index);
-        }
-        probe_size = probe_index - last_probe_index + 
probe_row_match_iter.ok();
+        auto [new_probe_idx, new_current_offset] = 
hash_table_ctx.hash_table->find_batch(
+                hash_table_ctx.keys, hash_table_ctx.hash_values.data(), 
probe_index, probe_rows,
+                _probe_indexs, _build_block_rows);
+        probe_index = new_probe_idx;
+        current_offset = new_current_offset;
+        probe_size = probe_index - last_probe_index;
     }
 
     build_side_output_column(mcol, *_right_output_slot_flags, current_offset, 
with_other_conjuncts);
@@ -793,24 +633,20 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
         auto& visited_iter =
                 
std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter);
         _build_blocks_locs.resize(_batch_size);
-        auto register_build_loc = [&](int8_t offset, int32_t row_nums) {
-            _build_blocks_locs[block_size++] = std::pair<int8_t, int>(offset, 
row_nums);
-        };
-
         if (visited_iter.ok()) {
             if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
                 for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                    register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                    _build_blocks_locs[block_size++] = visited_iter->row_num;
                 }
             } else {
                 for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
                     if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
                         if (visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                     } else {
                         if (!visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                     }
                 }
@@ -827,7 +663,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
                     if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
                         visited_iter = mapped.begin();
                         for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                         if (visited_iter.ok()) {
                             // block_size >= _batch_size, quit for loop
@@ -838,7 +674,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
                     if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
                         visited_iter = mapped.begin();
                         for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                         if (visited_iter.ok()) {
                             // block_size >= _batch_size, quit for loop
@@ -851,11 +687,11 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
                 for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
                     if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
                         if (visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                     } else {
                         if (!visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                     }
                 }
@@ -867,38 +703,17 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
         }
         _build_blocks_locs.resize(block_size);
 
-        auto insert_build_rows = [&](int8_t offset) {
-            for (size_t j = 0; j < right_col_len; ++j) {
-                auto& column = 
*(*_build_blocks)[offset].get_by_position(j).column;
-                mcol[j + right_col_idx]->insert_indices_from(
-                        column, _build_block_rows.data(),
-                        _build_block_rows.data() + _build_block_rows.size());
-            }
-        };
-        if (_build_blocks->size() > 1) {
-            std::sort(_build_blocks_locs.begin(), _build_blocks_locs.end(),
-                      [](const auto a, const auto b) { return a.first > 
b.first; });
-            auto start = 0, end = 0;
-            while (start < _build_blocks_locs.size()) {
-                while (end < _build_blocks_locs.size() &&
-                       _build_blocks_locs[start].first == 
_build_blocks_locs[end].first) {
-                    end++;
-                }
-                auto offset = _build_blocks_locs[start].first;
-                _build_block_rows.resize(end - start);
-                for (int i = 0; start + i < end; i++) {
-                    _build_block_rows[i] = _build_blocks_locs[start + 
i].second;
-                }
-                start = end;
-                insert_build_rows(offset);
-            }
-        } else if (_build_blocks->size() == 1) {
-            const auto size = _build_blocks_locs.size();
-            _build_block_rows.resize(_build_blocks_locs.size());
-            for (int i = 0; i < size; i++) {
-                _build_block_rows[i] = _build_blocks_locs[i].second;
-            }
-            insert_build_rows(0);
+        const auto size = _build_blocks_locs.size();
+        _build_block_rows.resize(_build_blocks_locs.size());
+        for (int i = 0; i < size; i++) {
+            _build_block_rows[i] = _build_blocks_locs[i];
+        }
+
+        for (size_t j = 0; j < right_col_len; ++j) {
+            const auto& column = *_build_block->get_by_position(j).column;
+            mcol[j + right_col_idx]->insert_indices_from(
+                    column, _build_block_rows.data(),
+                    _build_block_rows.data() + _build_block_rows.size());
         }
 
         // just resize the left table column in case with other conjunct to 
make block size is not zero
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index f0427a16bd9..10aadbe771d 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -98,12 +98,7 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const 
TPlanNode& tnode, const Descr
     _arena = std::make_shared<Arena>();
     _hash_table_variants = std::make_shared<HashTableVariants>();
     _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
-    _build_blocks.reset(new std::vector<Block>());
-
-    // avoid vector expand change block address.
-    // one block can store 4g data, _build_blocks can store 128*4g data.
-    // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
-    _build_blocks->reserve(HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
+    _build_block = std::make_shared<Block>();
 }
 
 Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
@@ -726,9 +721,6 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* 
state) {
 Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* 
in_block, bool eos) {
     SCOPED_TIMER(_build_timer);
 
-    // make one block for each 4 gigabytes
-    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
-
     if (_has_null_in_build_side) {
         // TODO: if _has_null_in_build_side is true we should finish current 
pipeline task.
         DCHECK(state->enable_pipeline_exec());
@@ -741,41 +733,25 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
 
         if (in_block->rows() != 0) {
             SCOPED_TIMER(_build_side_merge_block_timer);
+            if (_build_side_mutable_block.empty()) {
+                RETURN_IF_ERROR(_build_side_mutable_block.merge(
+                        *(in_block->create_same_struct_block(1, false))));
+            }
             RETURN_IF_ERROR(_build_side_mutable_block.merge(*in_block));
-        }
-
-        if (UNLIKELY(_build_side_mem_used - _build_side_last_mem_used > 
BUILD_BLOCK_MAX_SIZE)) {
-            if (_build_blocks->size() == HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
+            if (_build_side_mutable_block.rows() > 
std::numeric_limits<uint32_t>::max()) {
+                return Status::NotSupported(
+                        "Hash join do not support build table rows"
+                        " over:" +
+                        std::to_string(std::numeric_limits<uint32_t>::max()));
             }
-            _build_blocks->emplace_back(_build_side_mutable_block.to_block());
-
-            COUNTER_UPDATE(_build_blocks_memory_usage, 
(*_build_blocks)[_build_block_idx].bytes());
-
-            // TODO:: Rethink may we should do the process after we receive 
all build blocks ?
-            // which is better.
-            RETURN_IF_ERROR(_process_build_block(state, 
(*_build_blocks)[_build_block_idx],
-                                                 _build_block_idx));
-
-            _build_side_mutable_block = MutableBlock();
-            ++_build_block_idx;
-            _build_side_last_mem_used = _build_side_mem_used;
         }
     }
 
     if (_should_build_hash_table && eos) {
         if (!_build_side_mutable_block.empty()) {
-            if (_build_blocks->size() == HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
-            }
-            _build_blocks->emplace_back(_build_side_mutable_block.to_block());
-            COUNTER_UPDATE(_build_blocks_memory_usage, 
(*_build_blocks)[_build_block_idx].bytes());
-            RETURN_IF_ERROR(_process_build_block(state, 
(*_build_blocks)[_build_block_idx],
-                                                 _build_block_idx));
+            _build_block = 
std::make_shared<Block>(_build_side_mutable_block.to_block());
+            COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes());
+            RETURN_IF_ERROR(_process_build_block(state, *_build_block));
         }
         auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
                                             LOG(FATAL) << "FATAL: uninited 
hash table";
@@ -797,7 +773,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
             _shared_hash_table_context->status = Status::OK();
             // arena will be shared with other instances.
             _shared_hash_table_context->arena = _arena;
-            _shared_hash_table_context->blocks = _build_blocks;
+            _shared_hash_table_context->block = _build_block;
             _shared_hash_table_context->hash_table_variants = 
_hash_table_variants;
             _shared_hash_table_context->short_circuit_for_null_in_probe_side =
                     _has_null_in_build_side;
@@ -829,7 +805,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
                 *_hash_table_variants,
                 *std::static_pointer_cast<HashTableVariants>(
                         _shared_hash_table_context->hash_table_variants));
-        _build_blocks = _shared_hash_table_context->blocks;
+        _build_block = _shared_hash_table_context->block;
 
         if (!_shared_hash_table_context->runtime_filters.empty()) {
             auto ret = std::visit(
@@ -862,7 +838,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
 
     // Since the comparison of null values is meaningless, null aware left 
anti join should not output null
     // when the build side is not empty.
-    if (!_build_blocks->empty() && _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+    if (_build_block && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
         _probe_ignore_null = true;
     }
     _init_short_circuit_for_probe();
@@ -961,7 +937,7 @@ void HashJoinNode::_set_build_ignore_flag(Block& block, 
const std::vector<int>&
     }
 }
 
-Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, 
uint8_t offset) {
+Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) {
     SCOPED_TIMER(_build_table_timer);
     size_t rows = block.rows();
     if (UNLIKELY(rows == 0)) {
@@ -1004,7 +980,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block, uin
                         using HashTableCtxType = std::decay_t<decltype(arg)>;
                         ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
                                 hash_table_build_process(rows, block, 
raw_ptrs, this,
-                                                         state->batch_size(), 
offset, state);
+                                                         state->batch_size(), 
state);
                         return hash_table_build_process
                                 .template run<has_null_value, 
short_circuit_for_null_in_build_side>(
                                         arg,
@@ -1082,7 +1058,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) {
                     return;
                 }
 
-                if (!try_get_hash_map_context_fixed<PartitionedHashMap, 
HashCRC32, RowRefListType>(
+                if (!try_get_hash_map_context_fixed<JoinFixedHashMap, 
HashCRC32, RowRefListType>(
                             *_hash_table_variants, _build_expr_ctxs)) {
                     
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListType>>();
                 }
@@ -1090,16 +1066,6 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) 
{
             _join_op_variants, make_bool_variant(_have_other_join_conjunct));
 
     DCHECK(!std::holds_alternative<std::monostate>(*_hash_table_variants));
-
-    std::visit(Overload {[&](std::monostate& arg) {
-                             LOG(FATAL) << "FATAL: uninited hash table";
-                             __builtin_unreachable();
-                         },
-                         [&](auto&& arg) {
-                             arg.hash_table->set_partitioned_threshold(
-                                     
state->partitioned_hash_join_rows_threshold());
-                         }},
-               *_hash_table_variants);
 }
 
 void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 50e1567f4e2..ef5a61eae17 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -106,21 +106,18 @@ using ProfileCounter = RuntimeProfile::Counter;
 template <class HashTableContext, typename Parent>
 struct ProcessHashTableBuild {
     ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& 
build_raw_ptrs,
-                          Parent* parent, int batch_size, uint8_t offset, 
RuntimeState* state)
+                          Parent* parent, int batch_size, RuntimeState* state)
             : _rows(rows),
-              _skip_rows(0),
               _acquired_block(acquired_block),
               _build_raw_ptrs(build_raw_ptrs),
               _parent(parent),
               _batch_size(batch_size),
-              _offset(offset),
               _state(state),
               
_build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {}
 
     template <bool ignore_null, bool short_circuit_for_null>
     Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, 
bool* has_null_key) {
         using KeyGetter = typename HashTableContext::State;
-        using Mapped = typename HashTableContext::Mapped;
 
         Defer defer {[&]() {
             int64_t bucket_size = 
hash_table_ctx.hash_table->get_buffer_size_in_cells();
@@ -132,18 +129,14 @@ struct ProcessHashTableBuild {
                         hash_table_ctx.hash_table->get_collisions());
             COUNTER_SET(_parent->_build_buckets_fill_counter, 
filled_bucket_size);
 
-            auto hash_table_buckets = 
hash_table_ctx.hash_table->get_buffer_sizes_in_cells();
             std::string hash_table_buckets_info;
-            for (auto bucket_count : hash_table_buckets) {
-                hash_table_buckets_info += std::to_string(bucket_count) + ", ";
-            }
+
+            hash_table_buckets_info +=
+                    
std::to_string(hash_table_ctx.hash_table->get_buffer_size_in_cells()) + ", ";
             _parent->add_hash_buckets_info(hash_table_buckets_info);
 
-            auto hash_table_sizes = hash_table_ctx.hash_table->sizes();
             hash_table_buckets_info.clear();
-            for (auto table_size : hash_table_sizes) {
-                hash_table_buckets_info += std::to_string(table_size) + ", ";
-            }
+            hash_table_buckets_info += 
std::to_string(hash_table_ctx.hash_table->size()) + ", ";
             _parent->add_hash_buckets_filled_info(hash_table_buckets_info);
         }};
 
@@ -152,17 +145,6 @@ struct ProcessHashTableBuild {
         SCOPED_TIMER(_parent->_build_table_insert_timer);
         hash_table_ctx.hash_table->reset_resize_timer();
 
-        // only not build_unique, we need expanse hash table before insert data
-        // 1. There are fewer duplicate keys, reducing the number of resize 
hash tables
-        // can improve performance to a certain extent, about 2%-5%
-        // 2. There are many duplicate keys, and the hash table filled bucket 
is far less than
-        // the hash table build bucket, which may waste a lot of memory.
-        // TODO, use the NDV expansion of the key column in the optimizer 
statistics
-        if (!_parent->build_unique()) {
-            
RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table->expanse_for_add_elem(
-                    std::min<int>(_rows, 
config::hash_table_pre_expanse_max_rows)));
-        }
-
         vector<int>& inserted_rows = _parent->_inserted_rows[&_acquired_block];
         bool has_runtime_filter = !_parent->runtime_filter_descs().empty();
         if (has_runtime_filter) {
@@ -174,72 +156,24 @@ struct ProcessHashTableBuild {
 
         auto& arena = *_parent->arena();
         auto old_build_arena_memory = arena.size();
-
-        size_t k = 0;
-        bool inserted = false;
-        auto creator = [&](const auto& ctor, auto& key, auto& origin) {
-            HashTableContext::try_presis_key(key, origin, arena);
-            inserted = true;
-            ctor(key, Mapped {k, _offset});
-        };
-
-        bool build_unique = _parent->build_unique();
-#define EMPLACE_IMPL(stmt)                                                    \
-    for (; k < _rows; ++k) {                                                  \
-        if (k % CHECK_FRECUENCY == 0) {                                       \
-            RETURN_IF_CANCELLED(_state);                                      \
-        }                                                                     \
-        if constexpr (short_circuit_for_null) {                               \
-            if ((*null_map)[k]) {                                             \
-                *has_null_key = true;                                         \
-                return Status::OK();                                          \
-            }                                                                 \
-        } else if constexpr (ignore_null) {                                   \
-            if ((*null_map)[k]) {                                             \
-                *has_null_key = true;                                         \
-                continue;                                                     \
-            }                                                                 \
-        }                                                                     \
-        inserted = false;                                                     \
-        [[maybe_unused]] auto& mapped =                                       \
-                hash_table_ctx.lazy_emplace(key_getter, k, creator, nullptr); \
-        stmt;                                                                 \
-    }
-
-        if (has_runtime_filter && build_unique) {
-            EMPLACE_IMPL(
-                    if (inserted) { inserted_rows.push_back(k); } else { 
_skip_rows++; });
-        } else if (has_runtime_filter && !build_unique) {
-            EMPLACE_IMPL(
-                    if (inserted) { inserted_rows.push_back(k); } else {
-                        mapped.insert({k, _offset}, *_parent->arena());
-                        inserted_rows.push_back(k);
-                    });
-        } else if (!has_runtime_filter && build_unique) {
-            EMPLACE_IMPL(if (!inserted) { _skip_rows++; });
-        } else {
-            EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset}, 
*_parent->arena()); });
-        }
+        hash_table_ctx.hash_table->build(hash_table_ctx.keys, 
hash_table_ctx.hash_values.data(),
+                                         _rows);
         _parent->_build_rf_cardinality += inserted_rows.size();
 
         _parent->_build_arena_memory_usage->add(arena.size() - 
old_build_arena_memory);
 
         COUNTER_UPDATE(_parent->_build_table_expanse_timer,
                        hash_table_ctx.hash_table->get_resize_timer_value());
-        COUNTER_UPDATE(_parent->_build_table_convert_timer,
-                       hash_table_ctx.hash_table->get_convert_timer_value());
 
         return Status::OK();
     }
 
 private:
     const int _rows;
-    int _skip_rows;
     Block& _acquired_block;
     ColumnRawPtrs& _build_raw_ptrs;
     Parent* _parent;
     int _batch_size;
-    uint8_t _offset;
     RuntimeState* _state;
 
     ProfileCounter* _build_side_compute_hash_timer;
@@ -325,8 +259,6 @@ using HashTableIteratorVariants =
         std::variant<std::monostate, ForwardIterator<RowRefList>,
                      ForwardIterator<RowRefListWithFlag>, 
ForwardIterator<RowRefListWithFlags>>;
 
-static constexpr auto HASH_JOIN_MAX_BUILD_BLOCK_COUNT = 128;
-
 class HashJoinNode final : public VJoinNodeBase {
 public:
     HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
@@ -369,7 +301,7 @@ public:
     bool have_other_join_conjunct() const { return _have_other_join_conjunct; }
     bool is_right_semi_anti() const { return _is_right_semi_anti; }
     bool is_outer_join() const { return _is_outer_join; }
-    std::shared_ptr<std::vector<Block>> build_blocks() const { return 
_build_blocks; }
+    std::shared_ptr<Block> build_block() const { return _build_block; }
     std::vector<bool>* left_output_slot_flags() { return 
&_left_output_slot_flags; }
     std::vector<bool>* right_output_slot_flags() { return 
&_right_output_slot_flags; }
     bool* has_null_in_build_side() { return &_has_null_in_build_side; }
@@ -390,16 +322,16 @@ private:
         _short_circuit_for_probe =
                 (_has_null_in_build_side && _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
                  !_is_mark_join) ||
-                (_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && 
!_is_mark_join) ||
-                (_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN 
&& !_is_mark_join) ||
-                (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
-                (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_SEMI_JOIN) ||
-                (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_ANTI_JOIN);
+                (!_build_block && _join_op == TJoinOp::INNER_JOIN && 
!_is_mark_join) ||
+                (!_build_block && _join_op == TJoinOp::LEFT_SEMI_JOIN && 
!_is_mark_join) ||
+                (!_build_block && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
+                (!_build_block && _join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
+                (!_build_block && _join_op == TJoinOp::RIGHT_ANTI_JOIN);
 
         //when build table rows is 0 and not have other_join_conjunct and not 
_is_mark_join and join type is one of 
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
         //we could get the result is probe table + null-column(if need output)
         _empty_right_table_need_probe_dispose =
-                (_build_blocks->empty() && !_have_other_join_conjunct && 
!_is_mark_join) &&
+                (!_build_block && !_have_other_join_conjunct && 
!_is_mark_join) &&
                 (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN ||
                  _join_op == TJoinOp::LEFT_ANTI_JOIN);
     }
@@ -467,7 +399,7 @@ private:
     HashTableIteratorVariants _outer_join_pull_visited_iter;
     HashTableIteratorVariants _probe_row_match_iter;
 
-    std::shared_ptr<std::vector<Block>> _build_blocks;
+    std::shared_ptr<Block> _build_block;
     Block _probe_block;
     ColumnRawPtrs _probe_columns;
     ColumnUInt8::MutablePtr _null_map_column;
@@ -501,7 +433,7 @@ private:
 
     Status _materialize_build_side(RuntimeState* state) override;
 
-    Status _process_build_block(RuntimeState* state, Block& block, uint8_t 
offset);
+    Status _process_build_block(RuntimeState* state, Block& block);
 
     Status _do_evaluate(Block& block, VExprContextSPtrs& exprs,
                         RuntimeProfile::Counter& expr_call_timer, 
std::vector<int>& res_col_ids);
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index d284385b8ed..bb866025df4 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -60,7 +60,6 @@ 
VSetOperationNode<is_intersect>::VSetOperationNode(ObjectPool* pool, const TPlan
         : ExecNode(pool, tnode, descs),
           _valid_element_in_hash_tbl(0),
           _mem_used(0),
-          _build_block_index(0),
           _build_finished(false) {
     _hash_table_variants = std::make_unique<HashTableVariants>();
 }
@@ -215,7 +214,7 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
         }
         return;
     }
-    if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32, 
RowRefListWithFlags>(
+    if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32, 
RowRefListWithFlags>(
                 *_hash_table_variants, _child_expr_lists[0])) {
         
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListWithFlags>>();
     }
@@ -223,36 +222,45 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
 
 template <bool is_intersect>
 Status VSetOperationNode<is_intersect>::sink(RuntimeState* state, Block* 
block, bool eos) {
-    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
-
     if (block->rows() != 0) {
         _mem_used += block->allocated_bytes();
         RETURN_IF_ERROR(_mutable_block.merge(*block));
     }
 
-    if (eos || _mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
-        _build_blocks.emplace_back(_mutable_block.to_block());
-        RETURN_IF_ERROR(
-                process_build_block(_build_blocks[_build_block_index], 
_build_block_index, state));
+    if (block->rows() != 0) {
+        if (_build_block.empty()) {
+            
RETURN_IF_ERROR(_mutable_block.merge(*(block->create_same_struct_block(0, 
false))));
+        }
+        RETURN_IF_ERROR(_mutable_block.merge(*block));
+        if (_mutable_block.rows() > std::numeric_limits<uint32_t>::max()) {
+            return Status::NotSupported(
+                    "Hash join do not support build table rows"
+                    " over:" +
+                    std::to_string(std::numeric_limits<uint32_t>::max()));
+        }
+    }
+
+    if (eos) {
+        if (!_mutable_block.empty()) {
+            _build_block = _mutable_block.to_block();
+        }
+        RETURN_IF_ERROR(process_build_block(_build_block, state));
         _mutable_block.clear();
-        ++_build_block_index;
 
-        if (eos) {
-            if constexpr (is_intersect) {
-                _valid_element_in_hash_tbl = 0;
-            } else {
-                std::visit(
-                        [&](auto&& arg) {
-                            using HashTableCtxType = 
std::decay_t<decltype(arg)>;
-                            if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                                _valid_element_in_hash_tbl = 
arg.hash_table->size();
-                            }
-                        },
-                        *_hash_table_variants);
-            }
-            _build_finished = true;
-            _can_read = _children.size() == 1;
+        if constexpr (is_intersect) {
+            _valid_element_in_hash_tbl = 0;
+        } else {
+            std::visit(
+                    [&](auto&& arg) {
+                        using HashTableCtxType = std::decay_t<decltype(arg)>;
+                        if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                            _valid_element_in_hash_tbl = 
arg.hash_table->size();
+                        }
+                    },
+                    *_hash_table_variants);
         }
+        _build_finished = true;
+        _can_read = _children.size() == 1;
     }
     return Status::OK();
 }
@@ -304,8 +312,7 @@ Status 
VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) {
 }
 
 template <bool is_intersect>
-Status VSetOperationNode<is_intersect>::process_build_block(Block& block, 
uint8_t offset,
-                                                            RuntimeState* 
state) {
+Status VSetOperationNode<is_intersect>::process_build_block(Block& block, 
RuntimeState* state) {
     size_t rows = block.rows();
     if (rows == 0) {
         return Status::OK();
@@ -320,7 +327,7 @@ Status 
VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     HashTableBuild<HashTableCtxType, is_intersect> 
hash_table_build_process(
-                            this, rows, raw_ptrs, offset, state);
+                            this, rows, raw_ptrs, state);
                     st = hash_table_build_process(arg, _arena);
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
@@ -336,8 +343,8 @@ void 
VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& va
                                                          int& block_size) {
     auto it = value.begin();
     for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); 
++idx) {
-        auto& column = 
*_build_blocks[it->block_offset].get_by_position(idx->first).column;
-        if (_mutable_cols[idx->second]->is_nullable() xor 
column.is_nullable()) {
+        const auto& column = *_build_block.get_by_position(idx->first).column;
+        if (_mutable_cols[idx->second]->is_nullable() ^ column.is_nullable()) {
             DCHECK(_mutable_cols[idx->second]->is_nullable());
             ((ColumnNullable*)(_mutable_cols[idx->second].get()))
                     ->insert_from_not_nullable(column, it->row_num);
@@ -508,10 +515,6 @@ void VSetOperationNode<is_intersect>::debug_string(int 
indentation_level,
 template <bool is_intersect>
 void VSetOperationNode<is_intersect>::release_mem() {
     _hash_table_variants = nullptr;
-
-    std::vector<Block> tmp_build_blocks;
-    _build_blocks.swap(tmp_build_blocks);
-
     _probe_block.clear();
 }
 
diff --git a/be/src/vec/exec/vset_operation_node.h 
b/be/src/vec/exec/vset_operation_node.h
index ff016469f49..8ca04f2f71f 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -82,7 +82,7 @@ private:
     //It's time to abstract out the same methods and provide them directly to 
others;
     void hash_table_init();
     Status hash_table_build(RuntimeState* state);
-    Status process_build_block(Block& block, uint8_t offset, RuntimeState* 
state);
+    Status process_build_block(Block& block, RuntimeState* state);
     Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs);
     Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int 
child_id);
     void refresh_hash_table();
@@ -115,11 +115,10 @@ private:
     //record insert column id during probe
     std::vector<uint16_t> _probe_column_inserted_id;
 
-    std::vector<Block> _build_blocks;
+    Block _build_block;
     Block _probe_block;
     ColumnRawPtrs _probe_columns;
     std::vector<MutableColumnPtr> _mutable_cols;
-    int _build_block_index;
     bool _build_finished;
     std::vector<bool> _probe_finished_children_index;
     MutableBlock _mutable_block;
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index 6b31cf07ec9..e1c01709042 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -53,18 +53,15 @@ struct SharedRuntimeFilterContext {
 
 struct SharedHashTableContext {
     SharedHashTableContext()
-            : hash_table_variants(nullptr),
-              blocks(new std::vector<vectorized::Block>()),
-              signaled(false),
-              short_circuit_for_null_in_probe_side(false) {}
+            : hash_table_variants(nullptr), 
block(std::make_shared<vectorized::Block>()) {}
 
     Status status;
     std::shared_ptr<Arena> arena;
     std::shared_ptr<void> hash_table_variants;
-    std::shared_ptr<std::vector<Block>> blocks;
+    std::shared_ptr<Block> block;
     std::map<int, SharedRuntimeFilterContext> runtime_filters;
-    bool signaled;
-    bool short_circuit_for_null_in_probe_side;
+    bool signaled {};
+    bool short_circuit_for_null_in_probe_side {};
 };
 
 using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to