This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_join
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a0d727cccac81ec4761e4de9d63913839979a7b2
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Fri Oct 20 10:33:31 2023 +0800

    support batch size (#25629)
    
    * support batch size
    
    * support left anti/semi/outer join
---
 be/src/olap/delta_writer.cpp                       |  2 +-
 be/src/olap/delta_writer.h                         |  2 +-
 be/src/olap/delta_writer_v2.cpp                    |  2 +-
 be/src/olap/delta_writer_v2.h                      |  2 +-
 be/src/olap/memtable.cpp                           |  4 +-
 be/src/olap/memtable.h                             |  2 +-
 be/src/olap/memtable_writer.cpp                    |  2 +-
 be/src/olap/memtable_writer.h                      |  2 +-
 be/src/olap/tablet.cpp                             |  2 +-
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  2 +-
 .../exec/nested_loop_join_probe_operator.cpp       |  2 +-
 be/src/runtime/tablets_channel.cpp                 |  7 +-
 be/src/vec/columns/column.h                        |  6 +-
 be/src/vec/columns/column_array.cpp                | 10 +--
 be/src/vec/columns/column_array.h                  |  4 +-
 be/src/vec/columns/column_complex.h                | 10 +--
 be/src/vec/columns/column_const.h                  |  4 +-
 be/src/vec/columns/column_decimal.h                |  6 +-
 be/src/vec/columns/column_dictionary.h             |  4 +-
 be/src/vec/columns/column_fixed_length_object.h    | 12 +--
 be/src/vec/columns/column_map.cpp                  | 10 +--
 be/src/vec/columns/column_map.h                    |  4 +-
 be/src/vec/columns/column_nullable.cpp             |  5 +-
 be/src/vec/columns/column_nullable.h               |  4 +-
 be/src/vec/columns/column_object.cpp               | 15 ++--
 be/src/vec/columns/column_object.h                 |  8 +-
 be/src/vec/columns/column_string.cpp               | 28 +++----
 be/src/vec/columns/column_string.h                 |  4 +-
 be/src/vec/columns/column_struct.cpp               |  4 +-
 be/src/vec/columns/column_struct.h                 |  4 +-
 be/src/vec/columns/column_vector.cpp               | 13 ++--
 be/src/vec/columns/column_vector.h                 |  4 +-
 be/src/vec/columns/predicate_column.h              |  4 +-
 be/src/vec/common/hash_table/hash_map.h            | 91 ++++++++++++++++++++--
 be/src/vec/core/block.cpp                          |  3 +-
 be/src/vec/core/block.h                            |  2 +-
 be/src/vec/exec/join/process_hash_table_probe.h    |  2 +-
 .../vec/exec/join/process_hash_table_probe_impl.h  |  9 ++-
 be/src/vec/exec/join/vhash_join_node.h             |  2 +-
 be/src/vec/exec/join/vnested_loop_join_node.cpp    |  2 +-
 be/src/vec/exec/scan/pip_scanner_context.h         |  6 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  6 +-
 be/src/vec/sink/vdata_stream_sender.h              |  8 +-
 be/src/vec/sink/vtablet_sink_v2.h                  |  2 +-
 44 files changed, 197 insertions(+), 130 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 23e1718cb7d..4098f512120 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -108,7 +108,7 @@ Status DeltaWriter::append(const vectorized::Block* block) {
     return write(block, {}, true);
 }
 
-Status DeltaWriter::write(const vectorized::Block* block, const 
std::vector<int>& row_idxs,
+Status DeltaWriter::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs,
                           bool is_append) {
     if (UNLIKELY(row_idxs.empty() && !is_append)) {
         return Status::OK();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 303f17f14f8..4b2db0c9cc3 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -67,7 +67,7 @@ public:
 
     Status init();
 
-    Status write(const vectorized::Block* block, const std::vector<int>& 
row_idxs,
+    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs,
                  bool is_append = false);
 
     Status append(const vectorized::Block* block);
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index ef3ff23f9d8..47723f26d72 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -138,7 +138,7 @@ Status DeltaWriterV2::append(const vectorized::Block* 
block) {
     return write(block, {}, true);
 }
 
-Status DeltaWriterV2::write(const vectorized::Block* block, const 
std::vector<int>& row_idxs,
+Status DeltaWriterV2::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs,
                             bool is_append) {
     if (UNLIKELY(row_idxs.empty() && !is_append)) {
         return Status::OK();
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index b2b1f5f1c19..8f92ec74273 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -71,7 +71,7 @@ public:
 
     Status init();
 
-    Status write(const vectorized::Block* block, const std::vector<int>& 
row_idxs,
+    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs,
                  bool is_append = false);
 
     Status append(const vectorized::Block* block);
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index d163abd26a7..bf4175c541b 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -166,7 +166,7 @@ int RowInBlockComparator::operator()(const RowInBlock* 
left, const RowInBlock* r
                                *_pblock, -1);
 }
 
-void MemTable::insert(const vectorized::Block* input_block, const 
std::vector<int>& row_idxs,
+void MemTable::insert(const vectorized::Block* input_block, const 
std::vector<uint32_t>& row_idxs,
                       bool is_append) {
     SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
     vectorized::Block target_block = *input_block;
@@ -239,7 +239,7 @@ void 
MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_blo
 }
 void MemTable::_put_into_output(vectorized::Block& in_block) {
     SCOPED_RAW_TIMER(&_stat.put_into_output_ns);
-    std::vector<int> row_pos_vec;
+    std::vector<uint32_t> row_pos_vec;
     DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
     row_pos_vec.reserve(in_block.rows());
     for (int i = 0; i < _row_in_blocks.size(); i++) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index b98e7411e3b..2b36d1bf8fd 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -180,7 +180,7 @@ public:
                _flush_mem_tracker->consumption();
     }
     // insert tuple from (row_pos) to (row_pos+num_rows)
-    void insert(const vectorized::Block* block, const std::vector<int>& 
row_idxs,
+    void insert(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs,
                 bool is_append = false);
 
     void shrink_memtable_by_agg();
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 2ef704f075a..75d2bdc8716 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -89,7 +89,7 @@ Status MemTableWriter::append(const vectorized::Block* block) 
{
     return write(block, {}, true);
 }
 
-Status MemTableWriter::write(const vectorized::Block* block, const 
std::vector<int>& row_idxs,
+Status MemTableWriter::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs,
                              bool is_append) {
     if (UNLIKELY(row_idxs.empty() && !is_append)) {
         return Status::OK();
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index 3491f72abd5..fedf4f7c148 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -71,7 +71,7 @@ public:
                 std::shared_ptr<PartialUpdateInfo> partial_update_info,
                 bool unique_key_mow = false);
 
-    Status write(const vectorized::Block* block, const std::vector<int>& 
row_idxs,
+    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs,
                  bool is_append = false);
 
     Status append(const vectorized::Block* block);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 5b7c07fbd1a..24db68d02a5 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2928,7 +2928,7 @@ void Tablet::sort_block(vectorized::Block& in_block, 
vectorized::Block& output_b
                                      << " r_pos: " << r->_row_pos;
                   return value < 0;
               });
-    std::vector<int> row_pos_vec;
+    std::vector<uint32_t> row_pos_vec;
     row_pos_vec.reserve(in_block.rows());
     for (int i = 0; i < row_in_blocks.size(); i++) {
         row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 89679ceff89..91c352989a0 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -445,7 +445,7 @@ Status 
ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch
                                                int num_channels,
                                                const HashValueType* __restrict 
channel_ids,
                                                int rows, vectorized::Block* 
block, bool eos) {
-    std::vector<int> channel2rows[num_channels];
+    std::vector<uint32_t> channel2rows[num_channels];
 
     for (int i = 0; i < rows; i++) {
         channel2rows[channel_ids[i]].emplace_back(i);
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index e1454c8c3c0..ed7ad93726c 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -255,7 +255,7 @@ void 
NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB
                             .data();
             const auto num_rows = cur_block.rows();
 
-            std::vector<int> selector(num_rows);
+            std::vector<uint32_t> selector(num_rows);
             size_t selector_idx = 0;
             for (size_t j = 0; j < num_rows; j++) {
                 if constexpr (IsSemi) {
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 91294135a06..eebfeb69f5b 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -497,8 +497,9 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
         return Status::OK();
     }
 
-    std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index 
*/> tablet_to_rowidxs;
-    for (int i = 0; i < request.tablet_ids_size(); ++i) {
+    std::unordered_map<int64_t /* tablet_id */, std::vector<uint32_t> /* row 
index */>
+            tablet_to_rowidxs;
+    for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) {
         if (request.is_single_tablet_block()) {
             break;
         }
@@ -510,7 +511,7 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
         }
         auto it = tablet_to_rowidxs.find(tablet_id);
         if (it == tablet_to_rowidxs.end()) {
-            tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int> 
{i});
+            tablet_to_rowidxs.emplace(tablet_id, 
std::initializer_list<uint32_t> {i});
         } else {
             it->second.emplace_back(i);
         }
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 68afe3947d9..29f7ed109ea 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -238,9 +238,9 @@ public:
     /// Appends a batch elements from other column with the same type
     /// indices_begin + indices_end represent the row indices of column src
     /// Warning:
-    ///       if *indices == -1 means the row is null, only use in outer join, 
do not use in any other place
-    virtual void insert_indices_from(const IColumn& src, const int* 
indices_begin,
-                                     const int* indices_end) = 0;
+    ///       if *indices == 0 means the row is null, only use in outer join, 
do not use in any other place
+    virtual void insert_indices_from(const IColumn& src, const uint32* 
__restrict indices_begin,
+                                     const uint32_t* __restrict indices_end) = 
0;
 
     /// Appends data located in specified memory chunk if it is possible 
(throws an exception if it cannot be implemented).
     /// Is used to optimize some computations (in aggregation, for example).
diff --git a/be/src/vec/columns/column_array.cpp 
b/be/src/vec/columns/column_array.cpp
index 47949580bea..cdfcfa22c7b 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -793,14 +793,10 @@ size_t ColumnArray::filter_nullable(const Filter& filter) 
{
     return result_size;
 }
 
-void ColumnArray::insert_indices_from(const IColumn& src, const int* 
indices_begin,
-                                      const int* indices_end) {
+void ColumnArray::insert_indices_from(const IColumn& src, const uint32_t* 
__restrict indices_begin,
+                                      const uint32_t* __restrict indices_end) {
     for (auto x = indices_begin; x != indices_end; ++x) {
-        if (*x == -1) {
-            ColumnArray::insert_default();
-        } else {
-            ColumnArray::insert_from(src, *x);
-        }
+        ColumnArray::insert_from(src, *x);
     }
 }
 
diff --git a/be/src/vec/columns/column_array.h 
b/be/src/vec/columns/column_array.h
index 44391ae8c74..93ef3a3e324 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -220,8 +220,8 @@ public:
         callback(data);
     }
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override;
+    void insert_indices_from(const IColumn& src, const uint32_t* __restrict 
indices_begin,
+                             const uint32_t* __restrict indices_end) override;
 
     void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 
0) override {
         DCHECK(size() > self_row);
diff --git a/be/src/vec/columns/column_complex.h 
b/be/src/vec/columns/column_complex.h
index b25ae9f1577..a8f8189bef7 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -184,18 +184,14 @@ public:
         data.insert(data.end(), st, ed);
     }
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override {
+    void insert_indices_from(const IColumn& src, const uint32_t* __restrict 
indices_begin,
+                             const uint32_t* __restrict indices_end) override {
         const Self& src_vec = assert_cast<const Self&>(src);
         auto new_size = indices_end - indices_begin;
 
         for (int i = 0; i < new_size; ++i) {
             auto offset = *(indices_begin + i);
-            if (offset == -1) {
-                data.emplace_back(T {});
-            } else {
-                data.emplace_back(src_vec.get_element(offset));
-            }
+            data.emplace_back(src_vec.get_element(offset));
         }
     }
 
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 307066a7ae9..8d03087cc3d 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -111,8 +111,8 @@ public:
         s += length;
     }
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override {
+    void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+                             const uint32_t* indices_end) override {
         s += (indices_end - indices_begin);
     }
 
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index dcd135d46b9..ecec54a4b52 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -120,12 +120,12 @@ public:
         data.push_back(assert_cast<const Self&>(src).get_data()[n]);
     }
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override {
+    void insert_indices_from(const IColumn& src, const uint32_t* __restrict 
indices_begin,
+                             const uint32_t* __restrict indices_end) override {
         auto origin_size = size();
         auto new_size = indices_end - indices_begin;
         data.resize(origin_size + new_size);
-        const T* src_data = reinterpret_cast<const 
T*>(src.get_raw_data().data);
+        const T* __restrict src_data = reinterpret_cast<const 
T*>(src.get_raw_data().data);
 
         for (int i = 0; i < new_size; ++i) {
             data[origin_size + i] = src_data[indices_begin[i]];
diff --git a/be/src/vec/columns/column_dictionary.h 
b/be/src/vec/columns/column_dictionary.h
index e00e5b425f8..6d3a2f46c71 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -77,8 +77,8 @@ public:
         LOG(FATAL) << "insert_range_from not supported in ColumnDictionary";
     }
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override {
+    void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+                             const uint32_t* indices_end) override {
         LOG(FATAL) << "insert_indices_from not supported in ColumnDictionary";
     }
 
diff --git a/be/src/vec/columns/column_fixed_length_object.h 
b/be/src/vec/columns/column_fixed_length_object.h
index dce6666f132..8c42ea2325a 100644
--- a/be/src/vec/columns/column_fixed_length_object.h
+++ b/be/src/vec/columns/column_fixed_length_object.h
@@ -83,8 +83,8 @@ public:
         return res;
     }
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override {
+    void insert_indices_from(const IColumn& src, const uint32_t* __restrict 
indices_begin,
+                             const uint32_t* __restrict indices_end) override {
         const Self& src_vec = assert_cast<const Self&>(src);
         auto origin_size = size();
         auto new_size = indices_end - indices_begin;
@@ -96,12 +96,8 @@ public:
 
         for (int i = 0; i < new_size; ++i) {
             int offset = indices_begin[i];
-            if (offset > -1) {
-                memcpy(&_data[(origin_size + i) * _item_size], 
&src_vec._data[offset * _item_size],
-                       _item_size);
-            } else {
-                memset(&_data[(origin_size + i) * _item_size], 0, _item_size);
-            }
+            memcpy(&_data[(origin_size + i) * _item_size], 
&src_vec._data[offset * _item_size],
+                   _item_size);
         }
     }
 
diff --git a/be/src/vec/columns/column_map.cpp 
b/be/src/vec/columns/column_map.cpp
index e25cfd52dd8..38d83d93ffa 100644
--- a/be/src/vec/columns/column_map.cpp
+++ b/be/src/vec/columns/column_map.cpp
@@ -185,14 +185,10 @@ void ColumnMap::insert_from(const IColumn& src_, size_t 
n) {
     get_offsets().push_back(get_offsets().back() + size);
 }
 
-void ColumnMap::insert_indices_from(const IColumn& src, const int* 
indices_begin,
-                                    const int* indices_end) {
+void ColumnMap::insert_indices_from(const IColumn& src, const uint32_t* 
__restrict indices_begin,
+                                    const uint32_t* __restrict indices_end) {
     for (auto x = indices_begin; x != indices_end; ++x) {
-        if (*x == -1) {
-            ColumnMap::insert_default();
-        } else {
-            ColumnMap::insert_from(src, *x);
-        }
+        ColumnMap::insert_from(src, *x);
     }
 }
 
diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h
index 7da2200fe2d..d5d58095c41 100644
--- a/be/src/vec/columns/column_map.h
+++ b/be/src/vec/columns/column_map.h
@@ -128,8 +128,8 @@ public:
                          Permutation& res) const override {
         LOG(FATAL) << "get_permutation not implemented";
     }
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override;
+    void insert_indices_from(const IColumn& src, const uint32_t* __restrict 
indices_begin,
+                             const uint32_t* __restrict indices_end) override;
 
     void append_data_by_selector(MutableColumnPtr& res,
                                  const IColumn::Selector& selector) const 
override {
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index 42b88ac7ae9..c0022a29a2a 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -298,8 +298,9 @@ void ColumnNullable::insert_range_from(const IColumn& src, 
size_t start, size_t
     _has_null |= simd::contain_byte(src_null_map_data.data() + start, length, 
1);
 }
 
-void ColumnNullable::insert_indices_from(const IColumn& src, const int* 
indices_begin,
-                                         const int* indices_end) {
+void ColumnNullable::insert_indices_from(const IColumn& src,
+                                         const uint32_t* __restrict 
indices_begin,
+                                         const uint32_t* __restrict 
indices_end) {
     const auto& src_concrete = assert_cast<const ColumnNullable&>(src);
     get_nested_column().insert_indices_from(src_concrete.get_nested_column(), 
indices_begin,
                                             indices_end);
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 953c66e45bc..023a87acab7 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -121,8 +121,8 @@ public:
     void deserialize_vec(std::vector<StringRef>& keys, size_t num_rows) 
override;
 
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override;
+    void insert_indices_from(const IColumn& src, const uint32_t* __restrict 
indices_begin,
+                             const uint32_t* __restrict indices_end) override;
     void insert(const Field& x) override;
     void insert_from(const IColumn& src, size_t n) override;
 
diff --git a/be/src/vec/columns/column_object.cpp 
b/be/src/vec/columns/column_object.cpp
index f3571c8ba29..a8f99adfdd3 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -666,14 +666,11 @@ void ColumnObject::get(size_t n, Field& res) const {
     }
 }
 
-Status ColumnObject::try_insert_indices_from(const IColumn& src, const int* 
indices_begin,
-                                             const int* indices_end) {
+Status ColumnObject::try_insert_indices_from(const IColumn& src,
+                                             const uint32_t* __restrict 
indices_begin,
+                                             const uint32_t* __restrict 
indices_end) {
     for (auto x = indices_begin; x != indices_end; ++x) {
-        if (*x == -1) {
-            ColumnObject::insert_default();
-        } else {
-            ColumnObject::try_insert_from(src, *x);
-        }
+        ColumnObject::try_insert_from(src, *x);
     }
     finalize();
     return Status::OK();
@@ -992,8 +989,8 @@ void 
ColumnObject::append_data_by_selector(MutableColumnPtr& res,
                                    });
 }
 
-void ColumnObject::insert_indices_from(const IColumn& src, const int* 
indices_begin,
-                                       const int* indices_end) {
+void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* 
__restrict indices_begin,
+                                       const uint32_t* __restrict indices_end) 
{
     // insert_indices_from with alignment
     const ColumnObject& src_column = *check_and_get_column<ColumnObject>(src);
     align_variant_by_name_and_type(*this, src_column, indices_end - 
indices_begin,
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index 6bff69b1e67..de7a5ec3d5d 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -284,8 +284,8 @@ public:
     void append_data_by_selector(MutableColumnPtr& res,
                                  const IColumn::Selector& selector) const 
override;
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override;
+    void insert_indices_from(const IColumn& src, const uint32_t* __restrict 
indices_begin,
+                             const uint32_t* __restrict indices_end) override;
 
     // May throw execption
     void try_insert(const Field& field);
@@ -315,8 +315,8 @@ public:
         return StringRef();
     }
 
-    Status try_insert_indices_from(const IColumn& src, const int* 
indices_begin,
-                                   const int* indices_end);
+    Status try_insert_indices_from(const IColumn& src, const uint32_t* 
__restrict indices_begin,
+                                   const uint32_t* __restrict indices_end);
 
     StringRef serialize_value_into_arena(size_t n, Arena& arena,
                                          char const*& begin) const override {
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index 5d5abd64349..7d7597e7dc3 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -124,8 +124,8 @@ void ColumnString::insert_range_from(const IColumn& src, 
size_t start, size_t le
     }
 }
 
-void ColumnString::insert_indices_from(const IColumn& src, const int* 
indices_begin,
-                                       const int* indices_end) {
+void ColumnString::insert_indices_from(const IColumn& src, const uint32_t* 
__restrict indices_begin,
+                                       const uint32_t* __restrict indices_end) 
{
     const ColumnString& src_str = assert_cast<const ColumnString&>(src);
     auto src_offset_data = src_str.offsets.data();
 
@@ -136,10 +136,9 @@ void ColumnString::insert_indices_from(const IColumn& src, 
const int* indices_be
     offsets.resize(offsets.size() + indices_end - indices_begin);
     auto* dst_offsets_data = offsets.data();
 
-    for (auto x = indices_begin; x != indices_end; ++x) {
-        if (*x != -1) {
-            total_chars_size += src_offset_data[*x] - src_offset_data[*x - 1];
-        }
+    for (auto* __restrict x = indices_begin; x != indices_end; ++x) {
+        const auto offset = *x;
+        total_chars_size += src_offset_data[offset] - src_offset_data[offset - 
1];
         dst_offsets_data[dst_offsets_pos++] = total_chars_size;
     }
     check_chars_length(total_chars_size, offsets.size());
@@ -150,14 +149,15 @@ void ColumnString::insert_indices_from(const IColumn& 
src, const int* indices_be
     auto* dst_data_ptr = chars.data();
 
     size_t dst_chars_pos = old_char_size;
-    for (auto x = indices_begin; x != indices_end; ++x) {
-        if (*x != -1) {
-            const size_t size_to_append = src_offset_data[*x] - 
src_offset_data[*x - 1];
-            const size_t offset = src_offset_data[*x - 1];
-            memcpy_small_allow_read_write_overflow15(dst_data_ptr + 
dst_chars_pos,
-                                                     src_data_ptr + offset, 
size_to_append);
-            dst_chars_pos += size_to_append;
-        }
+    for (auto* __restrict x = indices_begin; x != indices_end; ++x) {
+        const auto offset = *x;
+        const auto start = src_offset_data[offset - 1];
+        const auto end = src_offset_data[offset];
+
+        const size_t size_to_append = end - start;
+        memcpy_small_allow_read_write_overflow15(dst_data_ptr + dst_chars_pos, 
src_data_ptr + start,
+                                                 size_to_append);
+        dst_chars_pos += size_to_append;
     }
 }
 
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index ae2bb9d25f9..79e649cb642 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -472,8 +472,8 @@ public:
 
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override;
+    void insert_indices_from(const IColumn& src, const uint32_t* __restrict 
indices_begin,
+                             const uint32_t* __restrict indices_end) override;
 
     ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const 
override;
     size_t filter(const Filter& filter) override;
diff --git a/be/src/vec/columns/column_struct.cpp 
b/be/src/vec/columns/column_struct.cpp
index 832bc32189c..f46f01bd16f 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -225,8 +225,8 @@ void ColumnStruct::update_crcs_with_value(uint32_t* 
__restrict hash, PrimitiveTy
     }
 }
 
-void ColumnStruct::insert_indices_from(const IColumn& src, const int* 
indices_begin,
-                                       const int* indices_end) {
+void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t* 
__restrict indices_begin,
+                                       const uint32_t* __restrict indices_end) 
{
     const ColumnStruct& src_concrete = assert_cast<const ColumnStruct&>(src);
     for (size_t i = 0; i < columns.size(); ++i) {
         columns[i]->insert_indices_from(src_concrete.get_column(i), 
indices_begin, indices_end);
diff --git a/be/src/vec/columns/column_struct.h 
b/be/src/vec/columns/column_struct.h
index 23f50582780..7567b28f452 100644
--- a/be/src/vec/columns/column_struct.h
+++ b/be/src/vec/columns/column_struct.h
@@ -121,8 +121,8 @@ public:
                                 uint32_t offset = 0,
                                 const uint8_t* __restrict null_data = nullptr) 
const override;
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override;
+    void insert_indices_from(const IColumn& src, const uint32* __restrict 
indices_begin,
+                             const uint32_t* __restrict indices_end) override;
 
     void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
                          Permutation& res) const override {
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index bae633d1490..d206fdc04f9 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -366,8 +366,9 @@ void ColumnVector<T>::insert_range_from(const IColumn& src, 
size_t start, size_t
 }
 
 template <typename T>
-void ColumnVector<T>::insert_indices_from(const IColumn& src, const int* 
indices_begin,
-                                          const int* indices_end) {
+void ColumnVector<T>::insert_indices_from(const IColumn& src,
+                                          const uint32* __restrict 
indices_begin,
+                                          const uint32* __restrict 
indices_end) {
     auto origin_size = size();
     auto new_size = indices_end - indices_begin;
     data.resize(origin_size + new_size);
@@ -375,13 +376,13 @@ void ColumnVector<T>::insert_indices_from(const IColumn& 
src, const int* indices
     const T* src_data = reinterpret_cast<const T*>(src.get_raw_data().data);
 
     if constexpr (std::is_same_v<T, UInt8>) {
-        // nullmap : indices_begin[i] == -1 means is null at the here, set 
true here
+        // nullmap : indices_begin[i] == 0 means is null at the here, set true 
here
         for (int i = 0; i < new_size; ++i) {
-            data[origin_size + i] = (indices_begin[i] == -1) +
-                                    (indices_begin[i] != -1) * 
src_data[indices_begin[i]];
+            data[origin_size + i] =
+                    (indices_begin[i] == 0) + (indices_begin[i] == 0) * 
src_data[indices_begin[i]];
         }
     } else {
-        // real data : indices_begin[i] == -1 what at is meaningless
+        // real data : indices_begin[i] == 0 what at is meaningless
         for (int i = 0; i < new_size; ++i) {
             data[origin_size + i] = src_data[indices_begin[i]];
         }
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 5f6ff285ab2..553816d1ec6 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -380,8 +380,8 @@ public:
 
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override;
+    void insert_indices_from(const IColumn& src, const uint32* __restrict 
indices_begin,
+                             const uint32* __restrict indices_end) override;
 
     void fill(const value_type& element, size_t num) {
         auto old_size = data.size();
diff --git a/be/src/vec/columns/predicate_column.h 
b/be/src/vec/columns/predicate_column.h
index c6a085acd6a..04318b9c1e0 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -161,8 +161,8 @@ public:
         LOG(FATAL) << "insert_range_from not supported in PredicateColumnType";
     }
 
-    void insert_indices_from(const IColumn& src, const int* indices_begin,
-                             const int* indices_end) override {
+    void insert_indices_from(const IColumn& src, const uint32_t* __restrict 
indices_begin,
+                             const uint32_t* __restrict indices_end) override {
         LOG(FATAL) << "insert_indices_from not supported in 
PredicateColumnType";
     }
 
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 85110deba62..53cb01dbfaa 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -22,6 +22,7 @@
 
 #include <span>
 
+#include "gen_cpp/PlanNodes_types.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/hash_table.h"
 #include "vec/common/hash_table/hash_table_allocator.h"
@@ -215,7 +216,9 @@ public:
         return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1;
     }
 
-    void build(const Key* __restrict keys, const size_t* __restrict 
hash_values, int num_elem) {
+    void build(const Key* __restrict keys, const size_t* __restrict 
hash_values, size_t num_elem,
+               int batch_size) {
+        max_batch_size = batch_size;
         bucket_size = calc_bucket_size(num_elem + 1);
         first.resize(bucket_size, 0);
         next.resize(num_elem);
@@ -228,14 +231,60 @@ public:
         }
     }
 
+    template <int JoinOpType>
     auto find_batch(const Key* __restrict keys, const size_t* __restrict 
hash_values, int probe_idx,
                     int probe_rows, std::vector<uint32_t>& probe_idxs,
-                    std::vector<int>& build_idxs) {
+                    std::vector<uint32_t>& build_idxs) {
+        if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN ||
+                      JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN) {
+            return _find_batch_inner_outer_join<JoinOpType>(keys, hash_values, 
probe_idx,
+                                                            probe_rows, 
probe_idxs, build_idxs);
+        }
+        if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
+                      JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN) {
+            return _find_batch_left_semi_anti<JoinOpType>(keys, hash_values, 
probe_idx, probe_rows,
+                                                          probe_idxs);
+        }
+        return std::pair {0, 0};
+    }
+
+private:
+    template <int JoinOpType>
+    auto _find_batch_left_semi_anti(const Key* __restrict keys,
+                                    const size_t* __restrict hash_values, int 
probe_idx,
+                                    int probe_rows, std::vector<uint32_t>& 
probe_idxs) {
         auto matched_cnt = 0;
-        while (probe_idx < probe_rows && matched_cnt < 4096) {
+        const auto batch_size = max_batch_size;
+
+        while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) {
             uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1);
             auto build_idx = first[bucket_num];
+
             while (build_idx) {
+                if (keys[probe_idx] == build_keys[build_idx]) {
+                    break;
+                }
+                build_idx = next[build_idx];
+            }
+            const bool matched =
+                    JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx 
!= 0 : build_idx == 0;
+            matched_cnt += matched;
+            probe_idxs[matched_cnt - matched] = probe_idx++;
+        }
+        return std::pair {probe_idx, matched_cnt};
+    }
+
+    template <int JoinOpType>
+    auto _find_batch_inner_outer_join(const Key* __restrict keys,
+                                      const size_t* __restrict hash_values, 
int probe_idx,
+                                      int probe_rows, std::vector<uint32_t>& 
probe_idxs,
+                                      std::vector<uint32_t>& build_idxs) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+        uint32_t build_idx = 0;
+
+        auto do_the_probe = [&]() {
+            while (build_idx && LIKELY(matched_cnt < batch_size)) {
                 if (keys[probe_idx] == build_keys[build_idx]) {
                     probe_idxs[matched_cnt] = probe_idx;
                     build_idxs[matched_cnt] = build_idx;
@@ -243,14 +292,46 @@ public:
                 }
                 build_idx = next[build_idx];
             }
-            probe_idx++;
+
+            if constexpr (JoinOpType != doris::TJoinOp::INNER_JOIN) {
+                // `(!matched_cnt || probe_idxs[matched_cnt - 1] != 
probe_idx)` means not match one build side
+                if (!build_idx && (!matched_cnt || probe_idxs[matched_cnt - 1] 
!= probe_idx)) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = build_idx;
+                    matched_cnt++;
+                }
+            }
+
+            if (matched_cnt == max_batch_size && build_idx) {
+                current_probe_idx = probe_idx;
+                current_build_idx = build_idx;
+            } else {
+                probe_idx++;
+            }
+        };
+
+        // some row over the batch_size, need dispose first
+        if (probe_idx == current_probe_idx) {
+            current_probe_idx = -1;
+            build_idx = current_build_idx;
+            current_build_idx = 0;
+            do_the_probe();
+        }
+        while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) {
+            uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1);
+            build_idx = first[bucket_num];
+            do_the_probe();
         }
         return std::pair {probe_idx, matched_cnt};
     }
 
-private:
     const Key* __restrict build_keys;
     uint32_t bucket_size = 0;
+    int max_batch_size = 0;
+
+    int current_probe_idx = -1;
+    uint32_t current_build_idx = 0;
+
     std::vector<uint32_t> first;
     std::vector<uint32_t> next;
     Cell cell;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8492044215f..ee889f951a1 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -944,7 +944,8 @@ void MutableBlock::add_row(const Block* block, int row) {
     }
 }
 
-void MutableBlock::add_rows(const Block* block, const int* row_begin, const 
int* row_end) {
+void MutableBlock::add_rows(const Block* block, const uint32_t* row_begin,
+                            const uint32_t* row_end) {
     DCHECK_LE(columns(), block->columns());
     auto& block_data = block->get_columns_with_type_and_name();
     for (size_t i = 0; i < _columns.size(); ++i) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 927ed5c655c..55f470f67fe 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -592,7 +592,7 @@ public:
     void swap(MutableBlock&& other) noexcept;
 
     void add_row(const Block* block, int row);
-    void add_rows(const Block* block, const int* row_begin, const int* 
row_end);
+    void add_rows(const Block* block, const uint32_t* row_begin, const 
uint32_t* row_end);
     void add_rows(const Block* block, size_t row_begin, size_t length);
 
     /// remove the column with the specified name
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index 8aea6492e77..eb1f6b78df5 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -99,7 +99,7 @@ struct ProcessHashTableProbe {
     std::vector<StringRef> _probe_keys;
 
     std::vector<uint32_t> _probe_indexs;
-    std::vector<int> _build_block_rows;
+    std::vector<uint32_t> _build_block_rows;
     std::vector<int> _build_blocks_locs;
     // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
     ColumnUInt8::Container* _tuple_is_null_left_flags;
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index dd6f79b9eb4..3c6ab25c21d 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -203,7 +203,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
     int last_probe_index = probe_index;
 
     int current_offset = 0;
-    bool all_match_one = true;
+    bool all_match_one = false;
     size_t probe_size = 0;
 
     auto& probe_row_match_iter = _probe_row_match<Mapped, 
with_other_conjuncts>(
@@ -232,9 +232,10 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
 
     {
         SCOPED_TIMER(_search_hashtable_timer);
-        auto [new_probe_idx, new_current_offset] = 
hash_table_ctx.hash_table->find_batch(
-                hash_table_ctx.keys, hash_table_ctx.hash_values.data(), 
probe_index, probe_rows,
-                _probe_indexs, _build_block_rows);
+        auto [new_probe_idx, new_current_offset] =
+                hash_table_ctx.hash_table->template find_batch<JoinOpType>(
+                        hash_table_ctx.keys, 
hash_table_ctx.hash_values.data(), probe_index,
+                        probe_rows, _probe_indexs, _build_block_rows);
         probe_index = new_probe_idx;
         current_offset = new_current_offset;
         probe_size = probe_index - last_probe_index;
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index c0d964fd66c..ee941fa5c1b 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -135,7 +135,7 @@ struct ProcessHashTableBuild {
                                             null_map ? null_map->data() : 
nullptr);
         SCOPED_TIMER(_parent->_build_table_insert_timer);
         hash_table_ctx.hash_table->build(hash_table_ctx.keys, 
hash_table_ctx.hash_values.data(),
-                                         _rows);
+                                         _rows, _state->batch_size());
         return Status::OK();
     }
 
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index c409939aa82..fbd64eeb801 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -410,7 +410,7 @@ void 
VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
                             .data();
             const auto num_rows = cur_block.rows();
 
-            std::vector<int> selector(num_rows);
+            std::vector<uint32_t> selector(num_rows);
             size_t selector_idx = 0;
             for (size_t j = 0; j < num_rows; j++) {
                 if constexpr (IsSemi) {
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 66eaed7f284..c1680ad4aec 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -125,7 +125,7 @@ public:
                     hashes[i] = hashes[i] % element_size;
                 }
 
-                std::vector<int> channel2rows[element_size];
+                std::vector<uint32_t> channel2rows[element_size];
                 for (int i = 0; i < rows; i++) {
                     channel2rows[hashes[i]].emplace_back(i);
                 }
@@ -231,10 +231,10 @@ private:
     std::shared_ptr<DataReadyDependency> _data_dependency = nullptr;
 
     void _add_rows_colocate_blocks(vectorized::Block* block, int loc,
-                                   const std::vector<int>& rows) {
+                                   const std::vector<uint32_t>& rows) {
         int row_wait_add = rows.size();
         const int batch_size = _batch_size;
-        const int* begin = &rows[0];
+        const uint32_t* begin = &rows[0];
         std::lock_guard<std::mutex> l(*_colocate_block_mutexs[loc]);
 
         while (row_wait_add > 0) {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 3bce57eda98..5bf76a05f72 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -222,7 +222,7 @@ Status Channel<Parent>::send_remote_block(PBlock* block, 
bool eos, Status exec_s
 }
 
 template <typename Parent>
-Status Channel<Parent>::add_rows(Block* block, const std::vector<int>& rows, 
bool eos) {
+Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>& 
rows, bool eos) {
     if (_fragment_instance_id.lo == -1) {
         return Status::OK();
     }
@@ -707,7 +707,7 @@ BlockSerializer<Parent>::BlockSerializer(Parent* parent, 
bool is_local)
 template <typename Parent>
 Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* 
dest, int num_receivers,
                                                       bool* serialized, bool 
eos,
-                                                      const std::vector<int>* 
rows) {
+                                                      const 
std::vector<uint32_t>* rows) {
     if (_mutable_block == nullptr) {
         SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
         _mutable_block = MutableBlock::create_unique(block->clone_empty());
@@ -720,7 +720,7 @@ Status 
BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
                 if constexpr 
(!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
                     
SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
                 }
-                const int* begin = &(*rows)[0];
+                const uint32_t* begin = &(*rows)[0];
                 _mutable_block->add_rows(block, begin, begin + rows->size());
             }
         } else if (!block->empty()) {
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 203d59dc664..73e03b76fd5 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -78,7 +78,7 @@ class BlockSerializer {
 public:
     BlockSerializer(Parent* parent, bool is_local = true);
     Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, 
bool* serialized,
-                                 bool eos, const std::vector<int>* rows = 
nullptr);
+                                 bool eos, const std::vector<uint32_t>* rows = 
nullptr);
     Status serialize_block(PBlock* dest, int num_receivers = 1);
     Status serialize_block(const Block* src, PBlock* dest, int num_receivers = 
1);
 
@@ -275,7 +275,7 @@ public:
         return Status::InternalError("Send BroadcastPBlockHolder is not 
allowed!");
     }
 
-    virtual Status add_rows(Block* block, const std::vector<int>& row, bool 
eos);
+    virtual Status add_rows(Block* block, const std::vector<uint32_t>& row, 
bool eos);
 
     virtual Status send_current_block(bool eos, Status exec_status);
 
@@ -415,7 +415,7 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* 
state, Channels& channe
                                            int num_channels,
                                            const HashValueType* __restrict 
channel_ids, int rows,
                                            Block* block, bool eos) {
-    std::vector<int> channel2rows[num_channels];
+    std::vector<uint32_t> channel2rows[num_channels];
 
     for (int i = 0; i < rows; i++) {
         channel2rows[channel_ids[i]].emplace_back(i);
@@ -507,7 +507,7 @@ public:
         return Status::OK();
     }
 
-    Status add_rows(Block* block, const std::vector<int>& rows, bool eos) 
override {
+    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
override {
         if (Channel<Parent>::_fragment_instance_id.lo == -1) {
             return Status::OK();
         }
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index f70f74b9da6..2d0b5d7ae94 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -105,7 +105,7 @@ private:
 struct Rows {
     int64_t partition_id;
     int64_t index_id;
-    std::vector<int32_t> row_idxes;
+    std::vector<uint32_t> row_idxes;
 };
 
 using RowsForTablet = std::unordered_map<int64_t, Rows>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to