This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch dev_join in repository https://gitbox.apache.org/repos/asf/doris.git
commit a0d727cccac81ec4761e4de9d63913839979a7b2 Author: HappenLee <happen...@hotmail.com> AuthorDate: Fri Oct 20 10:33:31 2023 +0800 support batch size (#25629) * support batch size * support left anti/semi/outer join --- be/src/olap/delta_writer.cpp | 2 +- be/src/olap/delta_writer.h | 2 +- be/src/olap/delta_writer_v2.cpp | 2 +- be/src/olap/delta_writer_v2.h | 2 +- be/src/olap/memtable.cpp | 4 +- be/src/olap/memtable.h | 2 +- be/src/olap/memtable_writer.cpp | 2 +- be/src/olap/memtable_writer.h | 2 +- be/src/olap/tablet.cpp | 2 +- be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +- .../exec/nested_loop_join_probe_operator.cpp | 2 +- be/src/runtime/tablets_channel.cpp | 7 +- be/src/vec/columns/column.h | 6 +- be/src/vec/columns/column_array.cpp | 10 +-- be/src/vec/columns/column_array.h | 4 +- be/src/vec/columns/column_complex.h | 10 +-- be/src/vec/columns/column_const.h | 4 +- be/src/vec/columns/column_decimal.h | 6 +- be/src/vec/columns/column_dictionary.h | 4 +- be/src/vec/columns/column_fixed_length_object.h | 12 +-- be/src/vec/columns/column_map.cpp | 10 +-- be/src/vec/columns/column_map.h | 4 +- be/src/vec/columns/column_nullable.cpp | 5 +- be/src/vec/columns/column_nullable.h | 4 +- be/src/vec/columns/column_object.cpp | 15 ++-- be/src/vec/columns/column_object.h | 8 +- be/src/vec/columns/column_string.cpp | 28 +++---- be/src/vec/columns/column_string.h | 4 +- be/src/vec/columns/column_struct.cpp | 4 +- be/src/vec/columns/column_struct.h | 4 +- be/src/vec/columns/column_vector.cpp | 13 ++-- be/src/vec/columns/column_vector.h | 4 +- be/src/vec/columns/predicate_column.h | 4 +- be/src/vec/common/hash_table/hash_map.h | 91 ++++++++++++++++++++-- be/src/vec/core/block.cpp | 3 +- be/src/vec/core/block.h | 2 +- be/src/vec/exec/join/process_hash_table_probe.h | 2 +- .../vec/exec/join/process_hash_table_probe_impl.h | 9 ++- be/src/vec/exec/join/vhash_join_node.h | 2 +- be/src/vec/exec/join/vnested_loop_join_node.cpp | 2 +- be/src/vec/exec/scan/pip_scanner_context.h | 6 +- be/src/vec/sink/vdata_stream_sender.cpp | 6 +- be/src/vec/sink/vdata_stream_sender.h | 8 +- be/src/vec/sink/vtablet_sink_v2.h | 2 +- 44 files changed, 197 insertions(+), 130 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 23e1718cb7d..4098f512120 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -108,7 +108,7 @@ Status DeltaWriter::append(const vectorized::Block* block) { return write(block, {}, true); } -Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>& row_idxs, +Status DeltaWriter::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs, bool is_append) { if (UNLIKELY(row_idxs.empty() && !is_append)) { return Status::OK(); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 303f17f14f8..4b2db0c9cc3 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -67,7 +67,7 @@ public: Status init(); - Status write(const vectorized::Block* block, const std::vector<int>& row_idxs, + Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs, bool is_append = false); Status append(const vectorized::Block* block); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index ef3ff23f9d8..47723f26d72 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -138,7 +138,7 @@ Status DeltaWriterV2::append(const vectorized::Block* block) { return write(block, {}, true); } -Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<int>& row_idxs, +Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs, bool is_append) { if (UNLIKELY(row_idxs.empty() && !is_append)) { return Status::OK(); diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index b2b1f5f1c19..8f92ec74273 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -71,7 +71,7 @@ public: Status init(); - Status write(const vectorized::Block* block, const std::vector<int>& row_idxs, + Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs, bool is_append = false); Status append(const vectorized::Block* block); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index d163abd26a7..bf4175c541b 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -166,7 +166,7 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r *_pblock, -1); } -void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs, +void MemTable::insert(const vectorized::Block* input_block, const std::vector<uint32_t>& row_idxs, bool is_append) { SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get()); vectorized::Block target_block = *input_block; @@ -239,7 +239,7 @@ void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_blo } void MemTable::_put_into_output(vectorized::Block& in_block) { SCOPED_RAW_TIMER(&_stat.put_into_output_ns); - std::vector<int> row_pos_vec; + std::vector<uint32_t> row_pos_vec; DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); row_pos_vec.reserve(in_block.rows()); for (int i = 0; i < _row_in_blocks.size(); i++) { diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index b98e7411e3b..2b36d1bf8fd 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -180,7 +180,7 @@ public: _flush_mem_tracker->consumption(); } // insert tuple from (row_pos) to (row_pos+num_rows) - void insert(const vectorized::Block* block, const std::vector<int>& row_idxs, + void insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs, bool is_append = false); void shrink_memtable_by_agg(); diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 2ef704f075a..75d2bdc8716 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -89,7 +89,7 @@ Status MemTableWriter::append(const vectorized::Block* block) { return write(block, {}, true); } -Status MemTableWriter::write(const vectorized::Block* block, const std::vector<int>& row_idxs, +Status MemTableWriter::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs, bool is_append) { if (UNLIKELY(row_idxs.empty() && !is_append)) { return Status::OK(); diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index 3491f72abd5..fedf4f7c148 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -71,7 +71,7 @@ public: std::shared_ptr<PartialUpdateInfo> partial_update_info, bool unique_key_mow = false); - Status write(const vectorized::Block* block, const std::vector<int>& row_idxs, + Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs, bool is_append = false); Status append(const vectorized::Block* block); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 5b7c07fbd1a..24db68d02a5 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2928,7 +2928,7 @@ void Tablet::sort_block(vectorized::Block& in_block, vectorized::Block& output_b << " r_pos: " << r->_row_pos; return value < 0; }); - std::vector<int> row_pos_vec; + std::vector<uint32_t> row_pos_vec; row_pos_vec.reserve(in_block.rows()); for (int i = 0; i < row_in_blocks.size(); i++) { row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 89679ceff89..91c352989a0 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -445,7 +445,7 @@ Status ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch int num_channels, const HashValueType* __restrict channel_ids, int rows, vectorized::Block* block, bool eos) { - std::vector<int> channel2rows[num_channels]; + std::vector<uint32_t> channel2rows[num_channels]; for (int i = 0; i < rows; i++) { channel2rows[channel_ids[i]].emplace_back(i); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index e1454c8c3c0..ed7ad93726c 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -255,7 +255,7 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB .data(); const auto num_rows = cur_block.rows(); - std::vector<int> selector(num_rows); + std::vector<uint32_t> selector(num_rows); size_t selector_idx = 0; for (size_t j = 0; j < num_rows; j++) { if constexpr (IsSemi) { diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 91294135a06..eebfeb69f5b 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -497,8 +497,9 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, return Status::OK(); } - std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index */> tablet_to_rowidxs; - for (int i = 0; i < request.tablet_ids_size(); ++i) { + std::unordered_map<int64_t /* tablet_id */, std::vector<uint32_t> /* row index */> + tablet_to_rowidxs; + for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) { if (request.is_single_tablet_block()) { break; } @@ -510,7 +511,7 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, } auto it = tablet_to_rowidxs.find(tablet_id); if (it == tablet_to_rowidxs.end()) { - tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int> {i}); + tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<uint32_t> {i}); } else { it->second.emplace_back(i); } diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 68afe3947d9..29f7ed109ea 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -238,9 +238,9 @@ public: /// Appends a batch elements from other column with the same type /// indices_begin + indices_end represent the row indices of column src /// Warning: - /// if *indices == -1 means the row is null, only use in outer join, do not use in any other place - virtual void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) = 0; + /// if *indices == 0 means the row is null, only use in outer join, do not use in any other place + virtual void insert_indices_from(const IColumn& src, const uint32* __restrict indices_begin, + const uint32_t* __restrict indices_end) = 0; /// Appends data located in specified memory chunk if it is possible (throws an exception if it cannot be implemented). /// Is used to optimize some computations (in aggregation, for example). diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 47949580bea..cdfcfa22c7b 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -793,14 +793,10 @@ size_t ColumnArray::filter_nullable(const Filter& filter) { return result_size; } -void ColumnArray::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { +void ColumnArray::insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) { for (auto x = indices_begin; x != indices_end; ++x) { - if (*x == -1) { - ColumnArray::insert_default(); - } else { - ColumnArray::insert_from(src, *x); - } + ColumnArray::insert_from(src, *x); } } diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 44391ae8c74..93ef3a3e324 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -220,8 +220,8 @@ public: callback(data); } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) override; void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override { DCHECK(size() > self_row); diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index b25ae9f1577..a8f8189bef7 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -184,18 +184,14 @@ public: data.insert(data.end(), st, ed); } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) override { const Self& src_vec = assert_cast<const Self&>(src); auto new_size = indices_end - indices_begin; for (int i = 0; i < new_size; ++i) { auto offset = *(indices_begin + i); - if (offset == -1) { - data.emplace_back(T {}); - } else { - data.emplace_back(src_vec.get_element(offset)); - } + data.emplace_back(src_vec.get_element(offset)); } } diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 307066a7ae9..8d03087cc3d 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -111,8 +111,8 @@ public: s += length; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override { s += (indices_end - indices_begin); } diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index dcd135d46b9..ecec54a4b52 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -120,12 +120,12 @@ public: data.push_back(assert_cast<const Self&>(src).get_data()[n]); } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) override { auto origin_size = size(); auto new_size = indices_end - indices_begin; data.resize(origin_size + new_size); - const T* src_data = reinterpret_cast<const T*>(src.get_raw_data().data); + const T* __restrict src_data = reinterpret_cast<const T*>(src.get_raw_data().data); for (int i = 0; i < new_size; ++i) { data[origin_size + i] = src_data[indices_begin[i]]; diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h index e00e5b425f8..6d3a2f46c71 100644 --- a/be/src/vec/columns/column_dictionary.h +++ b/be/src/vec/columns/column_dictionary.h @@ -77,8 +77,8 @@ public: LOG(FATAL) << "insert_range_from not supported in ColumnDictionary"; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override { LOG(FATAL) << "insert_indices_from not supported in ColumnDictionary"; } diff --git a/be/src/vec/columns/column_fixed_length_object.h b/be/src/vec/columns/column_fixed_length_object.h index dce6666f132..8c42ea2325a 100644 --- a/be/src/vec/columns/column_fixed_length_object.h +++ b/be/src/vec/columns/column_fixed_length_object.h @@ -83,8 +83,8 @@ public: return res; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) override { const Self& src_vec = assert_cast<const Self&>(src); auto origin_size = size(); auto new_size = indices_end - indices_begin; @@ -96,12 +96,8 @@ public: for (int i = 0; i < new_size; ++i) { int offset = indices_begin[i]; - if (offset > -1) { - memcpy(&_data[(origin_size + i) * _item_size], &src_vec._data[offset * _item_size], - _item_size); - } else { - memset(&_data[(origin_size + i) * _item_size], 0, _item_size); - } + memcpy(&_data[(origin_size + i) * _item_size], &src_vec._data[offset * _item_size], + _item_size); } } diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index e25cfd52dd8..38d83d93ffa 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -185,14 +185,10 @@ void ColumnMap::insert_from(const IColumn& src_, size_t n) { get_offsets().push_back(get_offsets().back() + size); } -void ColumnMap::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { +void ColumnMap::insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) { for (auto x = indices_begin; x != indices_end; ++x) { - if (*x == -1) { - ColumnMap::insert_default(); - } else { - ColumnMap::insert_from(src, *x); - } + ColumnMap::insert_from(src, *x); } } diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index 7da2200fe2d..d5d58095c41 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -128,8 +128,8 @@ public: Permutation& res) const override { LOG(FATAL) << "get_permutation not implemented"; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) override; void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector) const override { diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 42b88ac7ae9..c0022a29a2a 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -298,8 +298,9 @@ void ColumnNullable::insert_range_from(const IColumn& src, size_t start, size_t _has_null |= simd::contain_byte(src_null_map_data.data() + start, length, 1); } -void ColumnNullable::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { +void ColumnNullable::insert_indices_from(const IColumn& src, + const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) { const auto& src_concrete = assert_cast<const ColumnNullable&>(src); get_nested_column().insert_indices_from(src_concrete.get_nested_column(), indices_begin, indices_end); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 953c66e45bc..023a87acab7 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -121,8 +121,8 @@ public: void deserialize_vec(std::vector<StringRef>& keys, size_t num_rows) override; void insert_range_from(const IColumn& src, size_t start, size_t length) override; - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) override; void insert(const Field& x) override; void insert_from(const IColumn& src, size_t n) override; diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index f3571c8ba29..a8f99adfdd3 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -666,14 +666,11 @@ void ColumnObject::get(size_t n, Field& res) const { } } -Status ColumnObject::try_insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { +Status ColumnObject::try_insert_indices_from(const IColumn& src, + const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) { for (auto x = indices_begin; x != indices_end; ++x) { - if (*x == -1) { - ColumnObject::insert_default(); - } else { - ColumnObject::try_insert_from(src, *x); - } + ColumnObject::try_insert_from(src, *x); } finalize(); return Status::OK(); @@ -992,8 +989,8 @@ void ColumnObject::append_data_by_selector(MutableColumnPtr& res, }); } -void ColumnObject::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { +void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) { // insert_indices_from with alignment const ColumnObject& src_column = *check_and_get_column<ColumnObject>(src); align_variant_by_name_and_type(*this, src_column, indices_end - indices_begin, diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 6bff69b1e67..de7a5ec3d5d 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -284,8 +284,8 @@ public: void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector) const override; - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) override; // May throw execption void try_insert(const Field& field); @@ -315,8 +315,8 @@ public: return StringRef(); } - Status try_insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end); + Status try_insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end); StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override { diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 5d5abd64349..7d7597e7dc3 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -124,8 +124,8 @@ void ColumnString::insert_range_from(const IColumn& src, size_t start, size_t le } } -void ColumnString::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { +void ColumnString::insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) { const ColumnString& src_str = assert_cast<const ColumnString&>(src); auto src_offset_data = src_str.offsets.data(); @@ -136,10 +136,9 @@ void ColumnString::insert_indices_from(const IColumn& src, const int* indices_be offsets.resize(offsets.size() + indices_end - indices_begin); auto* dst_offsets_data = offsets.data(); - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x != -1) { - total_chars_size += src_offset_data[*x] - src_offset_data[*x - 1]; - } + for (auto* __restrict x = indices_begin; x != indices_end; ++x) { + const auto offset = *x; + total_chars_size += src_offset_data[offset] - src_offset_data[offset - 1]; dst_offsets_data[dst_offsets_pos++] = total_chars_size; } check_chars_length(total_chars_size, offsets.size()); @@ -150,14 +149,15 @@ void ColumnString::insert_indices_from(const IColumn& src, const int* indices_be auto* dst_data_ptr = chars.data(); size_t dst_chars_pos = old_char_size; - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x != -1) { - const size_t size_to_append = src_offset_data[*x] - src_offset_data[*x - 1]; - const size_t offset = src_offset_data[*x - 1]; - memcpy_small_allow_read_write_overflow15(dst_data_ptr + dst_chars_pos, - src_data_ptr + offset, size_to_append); - dst_chars_pos += size_to_append; - } + for (auto* __restrict x = indices_begin; x != indices_end; ++x) { + const auto offset = *x; + const auto start = src_offset_data[offset - 1]; + const auto end = src_offset_data[offset]; + + const size_t size_to_append = end - start; + memcpy_small_allow_read_write_overflow15(dst_data_ptr + dst_chars_pos, src_data_ptr + start, + size_to_append); + dst_chars_pos += size_to_append; } } diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index ae2bb9d25f9..79e649cb642 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -472,8 +472,8 @@ public: void insert_range_from(const IColumn& src, size_t start, size_t length) override; - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) override; ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; size_t filter(const Filter& filter) override; diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index 832bc32189c..f46f01bd16f 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -225,8 +225,8 @@ void ColumnStruct::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTy } } -void ColumnStruct::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { +void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) { const ColumnStruct& src_concrete = assert_cast<const ColumnStruct&>(src); for (size_t i = 0; i < columns.size(); ++i) { columns[i]->insert_indices_from(src_concrete.get_column(i), indices_begin, indices_end); diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index 23f50582780..7567b28f452 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -121,8 +121,8 @@ public: uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32* __restrict indices_begin, + const uint32_t* __restrict indices_end) override; void get_permutation(bool reverse, size_t limit, int nan_direction_hint, Permutation& res) const override { diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index bae633d1490..d206fdc04f9 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -366,8 +366,9 @@ void ColumnVector<T>::insert_range_from(const IColumn& src, size_t start, size_t } template <typename T> -void ColumnVector<T>::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { +void ColumnVector<T>::insert_indices_from(const IColumn& src, + const uint32* __restrict indices_begin, + const uint32* __restrict indices_end) { auto origin_size = size(); auto new_size = indices_end - indices_begin; data.resize(origin_size + new_size); @@ -375,13 +376,13 @@ void ColumnVector<T>::insert_indices_from(const IColumn& src, const int* indices const T* src_data = reinterpret_cast<const T*>(src.get_raw_data().data); if constexpr (std::is_same_v<T, UInt8>) { - // nullmap : indices_begin[i] == -1 means is null at the here, set true here + // nullmap : indices_begin[i] == 0 means is null at the here, set true here for (int i = 0; i < new_size; ++i) { - data[origin_size + i] = (indices_begin[i] == -1) + - (indices_begin[i] != -1) * src_data[indices_begin[i]]; + data[origin_size + i] = + (indices_begin[i] == 0) + (indices_begin[i] == 0) * src_data[indices_begin[i]]; } } else { - // real data : indices_begin[i] == -1 what at is meaningless + // real data : indices_begin[i] == 0 what at is meaningless for (int i = 0; i < new_size; ++i) { data[origin_size + i] = src_data[indices_begin[i]]; } diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 5f6ff285ab2..553816d1ec6 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -380,8 +380,8 @@ public: void insert_range_from(const IColumn& src, size_t start, size_t length) override; - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32* __restrict indices_begin, + const uint32* __restrict indices_end) override; void fill(const value_type& element, size_t num) { auto old_size = data.size(); diff --git a/be/src/vec/columns/predicate_column.h b/be/src/vec/columns/predicate_column.h index c6a085acd6a..04318b9c1e0 100644 --- a/be/src/vec/columns/predicate_column.h +++ b/be/src/vec/columns/predicate_column.h @@ -161,8 +161,8 @@ public: LOG(FATAL) << "insert_range_from not supported in PredicateColumnType"; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* __restrict indices_begin, + const uint32_t* __restrict indices_end) override { LOG(FATAL) << "insert_indices_from not supported in PredicateColumnType"; } diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 85110deba62..53cb01dbfaa 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -22,6 +22,7 @@ #include <span> +#include "gen_cpp/PlanNodes_types.h" #include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/hash_table.h" #include "vec/common/hash_table/hash_table_allocator.h" @@ -215,7 +216,9 @@ public: return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1; } - void build(const Key* __restrict keys, const size_t* __restrict hash_values, int num_elem) { + void build(const Key* __restrict keys, const size_t* __restrict hash_values, size_t num_elem, + int batch_size) { + max_batch_size = batch_size; bucket_size = calc_bucket_size(num_elem + 1); first.resize(bucket_size, 0); next.resize(num_elem); @@ -228,14 +231,60 @@ public: } } + template <int JoinOpType> auto find_batch(const Key* __restrict keys, const size_t* __restrict hash_values, int probe_idx, int probe_rows, std::vector<uint32_t>& probe_idxs, - std::vector<int>& build_idxs) { + std::vector<uint32_t>& build_idxs) { + if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN || + JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN) { + return _find_batch_inner_outer_join<JoinOpType>(keys, hash_values, probe_idx, + probe_rows, probe_idxs, build_idxs); + } + if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN) { + return _find_batch_left_semi_anti<JoinOpType>(keys, hash_values, probe_idx, probe_rows, + probe_idxs); + } + return std::pair {0, 0}; + } + +private: + template <int JoinOpType> + auto _find_batch_left_semi_anti(const Key* __restrict keys, + const size_t* __restrict hash_values, int probe_idx, + int probe_rows, std::vector<uint32_t>& probe_idxs) { auto matched_cnt = 0; - while (probe_idx < probe_rows && matched_cnt < 4096) { + const auto batch_size = max_batch_size; + + while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) { uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1); auto build_idx = first[bucket_num]; + while (build_idx) { + if (keys[probe_idx] == build_keys[build_idx]) { + break; + } + build_idx = next[build_idx]; + } + const bool matched = + JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx != 0 : build_idx == 0; + matched_cnt += matched; + probe_idxs[matched_cnt - matched] = probe_idx++; + } + return std::pair {probe_idx, matched_cnt}; + } + + template <int JoinOpType> + auto _find_batch_inner_outer_join(const Key* __restrict keys, + const size_t* __restrict hash_values, int probe_idx, + int probe_rows, std::vector<uint32_t>& probe_idxs, + std::vector<uint32_t>& build_idxs) { + auto matched_cnt = 0; + const auto batch_size = max_batch_size; + uint32_t build_idx = 0; + + auto do_the_probe = [&]() { + while (build_idx && LIKELY(matched_cnt < batch_size)) { if (keys[probe_idx] == build_keys[build_idx]) { probe_idxs[matched_cnt] = probe_idx; build_idxs[matched_cnt] = build_idx; @@ -243,14 +292,46 @@ public: } build_idx = next[build_idx]; } - probe_idx++; + + if constexpr (JoinOpType != doris::TJoinOp::INNER_JOIN) { + // `(!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx)` means not match one build side + if (!build_idx && (!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx)) { + probe_idxs[matched_cnt] = probe_idx; + build_idxs[matched_cnt] = build_idx; + matched_cnt++; + } + } + + if (matched_cnt == max_batch_size && build_idx) { + current_probe_idx = probe_idx; + current_build_idx = build_idx; + } else { + probe_idx++; + } + }; + + // some row over the batch_size, need dispose first + if (probe_idx == current_probe_idx) { + current_probe_idx = -1; + build_idx = current_build_idx; + current_build_idx = 0; + do_the_probe(); + } + while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) { + uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1); + build_idx = first[bucket_num]; + do_the_probe(); } return std::pair {probe_idx, matched_cnt}; } -private: const Key* __restrict build_keys; uint32_t bucket_size = 0; + int max_batch_size = 0; + + int current_probe_idx = -1; + uint32_t current_build_idx = 0; + std::vector<uint32_t> first; std::vector<uint32_t> next; Cell cell; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 8492044215f..ee889f951a1 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -944,7 +944,8 @@ void MutableBlock::add_row(const Block* block, int row) { } } -void MutableBlock::add_rows(const Block* block, const int* row_begin, const int* row_end) { +void MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, + const uint32_t* row_end) { DCHECK_LE(columns(), block->columns()); auto& block_data = block->get_columns_with_type_and_name(); for (size_t i = 0; i < _columns.size(); ++i) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 927ed5c655c..55f470f67fe 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -592,7 +592,7 @@ public: void swap(MutableBlock&& other) noexcept; void add_row(const Block* block, int row); - void add_rows(const Block* block, const int* row_begin, const int* row_end); + void add_rows(const Block* block, const uint32_t* row_begin, const uint32_t* row_end); void add_rows(const Block* block, size_t row_begin, size_t length); /// remove the column with the specified name diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/vec/exec/join/process_hash_table_probe.h index 8aea6492e77..eb1f6b78df5 100644 --- a/be/src/vec/exec/join/process_hash_table_probe.h +++ b/be/src/vec/exec/join/process_hash_table_probe.h @@ -99,7 +99,7 @@ struct ProcessHashTableProbe { std::vector<StringRef> _probe_keys; std::vector<uint32_t> _probe_indexs; - std::vector<int> _build_block_rows; + std::vector<uint32_t> _build_block_rows; std::vector<int> _build_blocks_locs; // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN ColumnUInt8::Container* _tuple_is_null_left_flags; diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index dd6f79b9eb4..3c6ab25c21d 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -203,7 +203,7 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash int last_probe_index = probe_index; int current_offset = 0; - bool all_match_one = true; + bool all_match_one = false; size_t probe_size = 0; auto& probe_row_match_iter = _probe_row_match<Mapped, with_other_conjuncts>( @@ -232,9 +232,10 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash { SCOPED_TIMER(_search_hashtable_timer); - auto [new_probe_idx, new_current_offset] = hash_table_ctx.hash_table->find_batch( - hash_table_ctx.keys, hash_table_ctx.hash_values.data(), probe_index, probe_rows, - _probe_indexs, _build_block_rows); + auto [new_probe_idx, new_current_offset] = + hash_table_ctx.hash_table->template find_batch<JoinOpType>( + hash_table_ctx.keys, hash_table_ctx.hash_values.data(), probe_index, + probe_rows, _probe_indexs, _build_block_rows); probe_index = new_probe_idx; current_offset = new_current_offset; probe_size = probe_index - last_probe_index; diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index c0d964fd66c..ee941fa5c1b 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -135,7 +135,7 @@ struct ProcessHashTableBuild { null_map ? null_map->data() : nullptr); SCOPED_TIMER(_parent->_build_table_insert_timer); hash_table_ctx.hash_table->build(hash_table_ctx.keys, hash_table_ctx.hash_values.data(), - _rows); + _rows, _state->batch_size()); return Status::OK(); } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index c409939aa82..fbd64eeb801 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -410,7 +410,7 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s .data(); const auto num_rows = cur_block.rows(); - std::vector<int> selector(num_rows); + std::vector<uint32_t> selector(num_rows); size_t selector_idx = 0; for (size_t j = 0; j < num_rows; j++) { if constexpr (IsSemi) { diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 66eaed7f284..c1680ad4aec 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -125,7 +125,7 @@ public: hashes[i] = hashes[i] % element_size; } - std::vector<int> channel2rows[element_size]; + std::vector<uint32_t> channel2rows[element_size]; for (int i = 0; i < rows; i++) { channel2rows[hashes[i]].emplace_back(i); } @@ -231,10 +231,10 @@ private: std::shared_ptr<DataReadyDependency> _data_dependency = nullptr; void _add_rows_colocate_blocks(vectorized::Block* block, int loc, - const std::vector<int>& rows) { + const std::vector<uint32_t>& rows) { int row_wait_add = rows.size(); const int batch_size = _batch_size; - const int* begin = &rows[0]; + const uint32_t* begin = &rows[0]; std::lock_guard<std::mutex> l(*_colocate_block_mutexs[loc]); while (row_wait_add > 0) { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 3bce57eda98..5bf76a05f72 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -222,7 +222,7 @@ Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status exec_s } template <typename Parent> -Status Channel<Parent>::add_rows(Block* block, const std::vector<int>& rows, bool eos) { +Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) { if (_fragment_instance_id.lo == -1) { return Status::OK(); } @@ -707,7 +707,7 @@ BlockSerializer<Parent>::BlockSerializer(Parent* parent, bool is_local) template <typename Parent> Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest, int num_receivers, bool* serialized, bool eos, - const std::vector<int>* rows) { + const std::vector<uint32_t>* rows) { if (_mutable_block == nullptr) { SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); _mutable_block = MutableBlock::create_unique(block->clone_empty()); @@ -720,7 +720,7 @@ Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer()); } - const int* begin = &(*rows)[0]; + const uint32_t* begin = &(*rows)[0]; _mutable_block->add_rows(block, begin, begin + rows->size()); } } else if (!block->empty()) { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 203d59dc664..73e03b76fd5 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -78,7 +78,7 @@ class BlockSerializer { public: BlockSerializer(Parent* parent, bool is_local = true); Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, bool* serialized, - bool eos, const std::vector<int>* rows = nullptr); + bool eos, const std::vector<uint32_t>* rows = nullptr); Status serialize_block(PBlock* dest, int num_receivers = 1); Status serialize_block(const Block* src, PBlock* dest, int num_receivers = 1); @@ -275,7 +275,7 @@ public: return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); } - virtual Status add_rows(Block* block, const std::vector<int>& row, bool eos); + virtual Status add_rows(Block* block, const std::vector<uint32_t>& row, bool eos); virtual Status send_current_block(bool eos, Status exec_status); @@ -415,7 +415,7 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& channe int num_channels, const HashValueType* __restrict channel_ids, int rows, Block* block, bool eos) { - std::vector<int> channel2rows[num_channels]; + std::vector<uint32_t> channel2rows[num_channels]; for (int i = 0; i < rows; i++) { channel2rows[channel_ids[i]].emplace_back(i); @@ -507,7 +507,7 @@ public: return Status::OK(); } - Status add_rows(Block* block, const std::vector<int>& rows, bool eos) override { + Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) override { if (Channel<Parent>::_fragment_instance_id.lo == -1) { return Status::OK(); } diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index f70f74b9da6..2d0b5d7ae94 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -105,7 +105,7 @@ private: struct Rows { int64_t partition_id; int64_t index_id; - std::vector<int32_t> row_idxes; + std::vector<uint32_t> row_idxes; }; using RowsForTablet = std::unordered_map<int64_t, Rows>; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org