This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch dev_join in repository https://gitbox.apache.org/repos/asf/doris.git
commit d15c3f064a26199c7c66c5605e2f638daed1b622 Author: BiteTheDDDDt <pxl...@qq.com> AuthorDate: Wed Oct 18 14:22:02 2023 +0800 update rf --- be/src/exprs/runtime_filter_slots.h | 35 ++++++++--------- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- be/src/vec/exec/join/vhash_join_node.h | 61 ++++++++---------------------- 3 files changed, 35 insertions(+), 63 deletions(-) diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index e0ff2cb0067..307253f430c 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -161,7 +161,7 @@ public: return Status::OK(); } - void insert(std::unordered_map<const vectorized::Block*, std::vector<int>>& datas) { + void insert(const std::unordered_set<const vectorized::Block*>& datas) { for (int i = 0; i < _build_expr_context.size(); ++i) { auto iter = _runtime_filters.find(i); if (iter == _runtime_filters.end()) { @@ -169,30 +169,31 @@ public: } int result_column_id = _build_expr_context[i]->get_last_result_column_id(); - for (auto it : datas) { - auto& column = it.first->get_by_position(result_column_id).column; + for (const auto* it : datas) { + auto column = it->get_by_position(result_column_id).column; - if (auto* nullable = + std::vector<int> indexs; + if (const auto* nullable = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) { - auto& column_nested = nullable->get_nested_column_ptr(); - auto& column_nullmap = nullable->get_null_map_column_ptr(); - std::vector<int> indexs; - for (int row_num : it.second) { - if (assert_cast<const vectorized::ColumnUInt8*>(column_nullmap.get()) - ->get_bool(row_num)) { + column = nullable->get_nested_column_ptr(); + const uint8_t* null_map = assert_cast<const vectorized::ColumnUInt8*>( + nullable->get_null_map_column_ptr().get()) + ->get_data() + .data(); + for (int i = 0; i < column->size(); i++) { + if (null_map[i]) { continue; } - indexs.push_back(row_num); + indexs.push_back(i); } - for (auto filter : iter->second) { - filter->insert_batch(column_nested, indexs); - } - } else { - for (auto filter : iter->second) { - filter->insert_batch(column, it.second); + for (int i = 0; i < column->size(); i++) { + indexs.push_back(i); } } + for (auto* filter : iter->second) { + filter->insert_batch(column, indexs); + } } } } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 9cf559588cc..49c1a459b70 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -101,7 +101,7 @@ protected: bool _has_set_need_null_map_for_build = false; bool _build_side_ignore_null = false; size_t _build_rf_cardinality = 0; - std::unordered_map<const vectorized::Block*, std::vector<int>> _inserted_rows; + std::unordered_set<const vectorized::Block*> _inserted_blocks; std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency; RuntimeProfile::Counter* _build_table_timer; diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index ef5a61eae17..c0d964fd66c 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -86,10 +86,10 @@ struct ProcessRuntimeFilterBuild { RETURN_IF_ERROR(parent->_runtime_filter_slots->init( state, hash_table_ctx.hash_table->size(), parent->_build_rf_cardinality)); - if (!parent->_runtime_filter_slots->empty() && !parent->_inserted_rows.empty()) { + if (!parent->_runtime_filter_slots->empty() && !parent->_inserted_blocks.empty()) { { SCOPED_TIMER(parent->_push_compute_timer); - parent->_runtime_filter_slots->insert(parent->_inserted_rows); + parent->_runtime_filter_slots->insert(parent->_inserted_blocks); } } { @@ -117,54 +117,25 @@ struct ProcessHashTableBuild { template <bool ignore_null, bool short_circuit_for_null> Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) { - using KeyGetter = typename HashTableContext::State; - - Defer defer {[&]() { - int64_t bucket_size = hash_table_ctx.hash_table->get_buffer_size_in_cells(); - int64_t filled_bucket_size = hash_table_ctx.hash_table->size(); - int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); - COUNTER_SET(_parent->_hash_table_memory_usage, bucket_bytes); - COUNTER_SET(_parent->_build_buckets_counter, bucket_size); - COUNTER_SET(_parent->_build_collisions_counter, - hash_table_ctx.hash_table->get_collisions()); - COUNTER_SET(_parent->_build_buckets_fill_counter, filled_bucket_size); - - std::string hash_table_buckets_info; - - hash_table_buckets_info += - std::to_string(hash_table_ctx.hash_table->get_buffer_size_in_cells()) + ", "; - _parent->add_hash_buckets_info(hash_table_buckets_info); - - hash_table_buckets_info.clear(); - hash_table_buckets_info += std::to_string(hash_table_ctx.hash_table->size()) + ", "; - _parent->add_hash_buckets_filled_info(hash_table_buckets_info); - }}; - - KeyGetter key_getter(_build_raw_ptrs); - - SCOPED_TIMER(_parent->_build_table_insert_timer); - hash_table_ctx.hash_table->reset_resize_timer(); - - vector<int>& inserted_rows = _parent->_inserted_rows[&_acquired_block]; - bool has_runtime_filter = !_parent->runtime_filter_descs().empty(); - if (has_runtime_filter) { - inserted_rows.reserve(_batch_size); + if (short_circuit_for_null || ignore_null) { + for (int i = 0; i < _rows; i++) { + if ((*null_map)[i]) { + *has_null_key = true; + } + } + if (short_circuit_for_null && *has_null_key) { + return Status::OK(); + } } + if (!_parent->runtime_filter_descs().empty()) { + _parent->_inserted_blocks.insert(&_acquired_block); + } hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows, null_map ? null_map->data() : nullptr); - - auto& arena = *_parent->arena(); - auto old_build_arena_memory = arena.size(); + SCOPED_TIMER(_parent->_build_table_insert_timer); hash_table_ctx.hash_table->build(hash_table_ctx.keys, hash_table_ctx.hash_values.data(), _rows); - _parent->_build_rf_cardinality += inserted_rows.size(); - - _parent->_build_arena_memory_usage->add(arena.size() - old_build_arena_memory); - - COUNTER_UPDATE(_parent->_build_table_expanse_timer, - hash_table_ctx.hash_table->get_resize_timer_value()); - return Status::OK(); } @@ -471,7 +442,7 @@ private: friend struct ProcessRuntimeFilterBuild; std::vector<TRuntimeFilterDesc> _runtime_filter_descs; - std::unordered_map<const Block*, std::vector<int>> _inserted_rows; + std::unordered_set<const Block*> _inserted_blocks; std::vector<IRuntimeFilter*> _runtime_filters; size_t _build_rf_cardinality = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org