This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_join
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d15c3f064a26199c7c66c5605e2f638daed1b622
Author: BiteTheDDDDt <pxl...@qq.com>
AuthorDate: Wed Oct 18 14:22:02 2023 +0800

    update rf
---
 be/src/exprs/runtime_filter_slots.h        | 35 ++++++++---------
 be/src/pipeline/exec/hashjoin_build_sink.h |  2 +-
 be/src/vec/exec/join/vhash_join_node.h     | 61 ++++++++----------------------
 3 files changed, 35 insertions(+), 63 deletions(-)

diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index e0ff2cb0067..307253f430c 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -161,7 +161,7 @@ public:
         return Status::OK();
     }
 
-    void insert(std::unordered_map<const vectorized::Block*, 
std::vector<int>>& datas) {
+    void insert(const std::unordered_set<const vectorized::Block*>& datas) {
         for (int i = 0; i < _build_expr_context.size(); ++i) {
             auto iter = _runtime_filters.find(i);
             if (iter == _runtime_filters.end()) {
@@ -169,30 +169,31 @@ public:
             }
 
             int result_column_id = 
_build_expr_context[i]->get_last_result_column_id();
-            for (auto it : datas) {
-                auto& column = 
it.first->get_by_position(result_column_id).column;
+            for (const auto* it : datas) {
+                auto column = it->get_by_position(result_column_id).column;
 
-                if (auto* nullable =
+                std::vector<int> indexs;
+                if (const auto* nullable =
                             
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) {
-                    auto& column_nested = nullable->get_nested_column_ptr();
-                    auto& column_nullmap = nullable->get_null_map_column_ptr();
-                    std::vector<int> indexs;
-                    for (int row_num : it.second) {
-                        if (assert_cast<const 
vectorized::ColumnUInt8*>(column_nullmap.get())
-                                    ->get_bool(row_num)) {
+                    column = nullable->get_nested_column_ptr();
+                    const uint8_t* null_map = assert_cast<const 
vectorized::ColumnUInt8*>(
+                                                      
nullable->get_null_map_column_ptr().get())
+                                                      ->get_data()
+                                                      .data();
+                    for (int i = 0; i < column->size(); i++) {
+                        if (null_map[i]) {
                             continue;
                         }
-                        indexs.push_back(row_num);
+                        indexs.push_back(i);
                     }
-                    for (auto filter : iter->second) {
-                        filter->insert_batch(column_nested, indexs);
-                    }
-
                 } else {
-                    for (auto filter : iter->second) {
-                        filter->insert_batch(column, it.second);
+                    for (int i = 0; i < column->size(); i++) {
+                        indexs.push_back(i);
                     }
                 }
+                for (auto* filter : iter->second) {
+                    filter->insert_batch(column, indexs);
+                }
             }
         }
     }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 9cf559588cc..49c1a459b70 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -101,7 +101,7 @@ protected:
     bool _has_set_need_null_map_for_build = false;
     bool _build_side_ignore_null = false;
     size_t _build_rf_cardinality = 0;
-    std::unordered_map<const vectorized::Block*, std::vector<int>> 
_inserted_rows;
+    std::unordered_set<const vectorized::Block*> _inserted_blocks;
     std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
 
     RuntimeProfile::Counter* _build_table_timer;
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index ef5a61eae17..c0d964fd66c 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -86,10 +86,10 @@ struct ProcessRuntimeFilterBuild {
         RETURN_IF_ERROR(parent->_runtime_filter_slots->init(
                 state, hash_table_ctx.hash_table->size(), 
parent->_build_rf_cardinality));
 
-        if (!parent->_runtime_filter_slots->empty() && 
!parent->_inserted_rows.empty()) {
+        if (!parent->_runtime_filter_slots->empty() && 
!parent->_inserted_blocks.empty()) {
             {
                 SCOPED_TIMER(parent->_push_compute_timer);
-                parent->_runtime_filter_slots->insert(parent->_inserted_rows);
+                
parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
             }
         }
         {
@@ -117,54 +117,25 @@ struct ProcessHashTableBuild {
 
     template <bool ignore_null, bool short_circuit_for_null>
     Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, 
bool* has_null_key) {
-        using KeyGetter = typename HashTableContext::State;
-
-        Defer defer {[&]() {
-            int64_t bucket_size = 
hash_table_ctx.hash_table->get_buffer_size_in_cells();
-            int64_t filled_bucket_size = hash_table_ctx.hash_table->size();
-            int64_t bucket_bytes = 
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
-            COUNTER_SET(_parent->_hash_table_memory_usage, bucket_bytes);
-            COUNTER_SET(_parent->_build_buckets_counter, bucket_size);
-            COUNTER_SET(_parent->_build_collisions_counter,
-                        hash_table_ctx.hash_table->get_collisions());
-            COUNTER_SET(_parent->_build_buckets_fill_counter, 
filled_bucket_size);
-
-            std::string hash_table_buckets_info;
-
-            hash_table_buckets_info +=
-                    
std::to_string(hash_table_ctx.hash_table->get_buffer_size_in_cells()) + ", ";
-            _parent->add_hash_buckets_info(hash_table_buckets_info);
-
-            hash_table_buckets_info.clear();
-            hash_table_buckets_info += 
std::to_string(hash_table_ctx.hash_table->size()) + ", ";
-            _parent->add_hash_buckets_filled_info(hash_table_buckets_info);
-        }};
-
-        KeyGetter key_getter(_build_raw_ptrs);
-
-        SCOPED_TIMER(_parent->_build_table_insert_timer);
-        hash_table_ctx.hash_table->reset_resize_timer();
-
-        vector<int>& inserted_rows = _parent->_inserted_rows[&_acquired_block];
-        bool has_runtime_filter = !_parent->runtime_filter_descs().empty();
-        if (has_runtime_filter) {
-            inserted_rows.reserve(_batch_size);
+        if (short_circuit_for_null || ignore_null) {
+            for (int i = 0; i < _rows; i++) {
+                if ((*null_map)[i]) {
+                    *has_null_key = true;
+                }
+            }
+            if (short_circuit_for_null && *has_null_key) {
+                return Status::OK();
+            }
         }
 
+        if (!_parent->runtime_filter_descs().empty()) {
+            _parent->_inserted_blocks.insert(&_acquired_block);
+        }
         hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
                                             null_map ? null_map->data() : 
nullptr);
-
-        auto& arena = *_parent->arena();
-        auto old_build_arena_memory = arena.size();
+        SCOPED_TIMER(_parent->_build_table_insert_timer);
         hash_table_ctx.hash_table->build(hash_table_ctx.keys, 
hash_table_ctx.hash_values.data(),
                                          _rows);
-        _parent->_build_rf_cardinality += inserted_rows.size();
-
-        _parent->_build_arena_memory_usage->add(arena.size() - 
old_build_arena_memory);
-
-        COUNTER_UPDATE(_parent->_build_table_expanse_timer,
-                       hash_table_ctx.hash_table->get_resize_timer_value());
-
         return Status::OK();
     }
 
@@ -471,7 +442,7 @@ private:
     friend struct ProcessRuntimeFilterBuild;
 
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
-    std::unordered_map<const Block*, std::vector<int>> _inserted_rows;
+    std::unordered_set<const Block*> _inserted_blocks;
 
     std::vector<IRuntimeFilter*> _runtime_filters;
     size_t _build_rf_cardinality = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to