This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc6b355326 [Exec](mem) Reduce the memory usage during JOIN operations 
(#41388)
1fc6b355326 is described below

commit 1fc6b355326721d22069c082eb56caa6954190c4
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Tue Oct 8 09:55:39 2024 +0800

    [Exec](mem) Reduce the memory usage during JOIN operations (#41388)
    
    ## Proposed changes
    ```
    select t1.a from t1, t2 where t1.a = t2.a
    ```
    Before:
    ```
    38G
    ```
    
    After:
    ```
    30.8G
    ```
    
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 32 +++++++++++++++++++++-
 be/src/pipeline/exec/hashjoin_build_sink.h         |  4 +++
 .../exec/join/process_hash_table_probe_impl.h      | 10 +++++--
 be/src/vec/core/block.cpp                          | 18 ++++++++++++
 be/src/vec/core/block.h                            |  3 ++
 .../vec/runtime/shared_hash_table_controller.cpp   |  6 +---
 be/src/vec/runtime/shared_hash_table_controller.h  |  1 -
 7 files changed, 64 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 8f7b176a979..b6cd342d398 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -113,6 +113,20 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
 Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
     auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
     Defer defer {[&]() {
+        if (_should_build_hash_table) {
+            // The build side hash key column maybe no need output, but we 
need to keep the column in block
+            // because it is used to compare with probe side hash key column
+            if (p._should_keep_hash_key_column && _build_col_ids.size() == 1) {
+                p._should_keep_column_flags[_build_col_ids[0]] = true;
+            }
+
+            if (_shared_state->build_block) {
+                // release the memory of unused column in probe stage
+                _shared_state->build_block->clear_column_mem_not_keep(
+                        p._should_keep_column_flags, 
bool(p._shared_hashtable_controller));
+            }
+        }
+
         if (_should_build_hash_table && p._shared_hashtable_controller) {
             p._shared_hashtable_controller->signal_finish(p.node_id());
         }
@@ -386,7 +400,9 @@ void 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
                     default:
                         _shared_state->hash_table_variants
                                 
->emplace<vectorized::SerializedHashTableContext>();
+                        return;
                     }
+                    p._should_keep_hash_key_column = true;
                     return;
                 }
 
@@ -433,6 +449,10 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* st
     RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state));
     DCHECK(tnode.__isset.hash_join_node);
 
+    if (tnode.hash_join_node.__isset.hash_output_slot_ids) {
+        _hash_output_slot_ids = tnode.hash_join_node.hash_output_slot_ids;
+    }
+
     const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN ||
                                    _join_op == TJoinOp::FULL_OUTER_JOIN ||
                                    _join_op == TJoinOp::RIGHT_ANTI_JOIN;
@@ -494,6 +514,17 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* 
state) {
             _shared_hash_table_context = 
_shared_hashtable_controller->get_context(node_id());
         }
     }
+    auto init_keep_column_flags = [&](auto& tuple_descs, auto& 
output_slot_flags) {
+        for (const auto& tuple_desc : tuple_descs) {
+            for (const auto& slot_desc : tuple_desc->slots()) {
+                output_slot_flags.emplace_back(
+                        _hash_output_slot_ids.empty() ||
+                        std::find(_hash_output_slot_ids.begin(), 
_hash_output_slot_ids.end(),
+                                  slot_desc->id()) != 
_hash_output_slot_ids.end());
+            }
+        }
+    };
+    init_keep_column_flags(row_desc().tuple_descriptors(), 
_should_keep_column_flags);
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, 
_child->row_desc()));
     return vectorized::VExpr::open(_build_expr_ctxs, state);
 }
@@ -565,7 +596,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             _shared_hash_table_context->build_indexes_null =
                     local_state._shared_state->build_indexes_null;
             
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
-            _shared_hashtable_controller->signal(node_id());
         }
     } else if (!local_state._should_build_hash_table) {
         DCHECK(_shared_hashtable_controller != nullptr);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index cf677833fb5..a544cdcf456 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -173,6 +173,10 @@ private:
     const std::vector<TExpr> _partition_exprs;
 
     const bool _need_local_merge;
+
+    std::vector<SlotId> _hash_output_slot_ids;
+    std::vector<bool> _should_keep_column_flags;
+    bool _should_keep_hash_key_column = false;
 };
 
 template <class HashTableContext>
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 653cc8ab447..6bb5a2006ab 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -640,9 +640,13 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(
                     mcol.size(), _right_col_len, _right_col_idx);
         }
         for (size_t j = 0; j < _right_col_len; ++j) {
-            const auto& column = *_build_block->safe_get_by_position(j).column;
-            mcol[j + _right_col_idx]->insert_indices_from(column, 
_build_indexs.data(),
-                                                          _build_indexs.data() 
+ block_size);
+            if (_right_output_slot_flags->at(j)) {
+                const auto& column = 
*_build_block->safe_get_by_position(j).column;
+                mcol[j + _right_col_idx]->insert_indices_from(column, 
_build_indexs.data(),
+                                                              
_build_indexs.data() + block_size);
+            } else {
+                mcol[j + _right_col_idx]->resize(block_size);
+            }
         }
 
         // just resize the left table column in case with other conjunct to 
make block size is not zero
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 6efec59544e..2e54bd76fff 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -753,6 +753,24 @@ void Block::erase_tmp_columns() noexcept {
     }
 }
 
+void Block::clear_column_mem_not_keep(const std::vector<bool>& 
column_keep_flags,
+                                      bool need_keep_first) {
+    if (data.size() >= column_keep_flags.size()) {
+        auto origin_rows = rows();
+        for (size_t i = 0; i < column_keep_flags.size(); ++i) {
+            if (!column_keep_flags[i]) {
+                data[i].column = data[i].column->clone_empty();
+            }
+        }
+
+        if (need_keep_first && !column_keep_flags[0]) {
+            auto first_column = data[0].column->clone_empty();
+            first_column->resize(origin_rows);
+            data[0].column = std::move(first_column);
+        }
+    }
+}
+
 void Block::swap(Block& other) noexcept {
     SCOPED_SKIP_MEMORY_CHECK();
     data.swap(other.data);
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 9abc7514129..f1804601693 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -404,6 +404,9 @@ public:
     // we built some temporary columns into block
     void erase_tmp_columns() noexcept;
 
+    void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
+                                   bool need_keep_first);
+
 private:
     void erase_impl(size_t position);
 };
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp 
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 4b77b1ed8a3..e5a28bed6ce 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -42,7 +42,7 @@ SharedHashTableContextPtr 
SharedHashTableController::get_context(int my_node_id)
     return _shared_contexts[my_node_id];
 }
 
-void SharedHashTableController::signal(int my_node_id) {
+void SharedHashTableController::signal_finish(int my_node_id) {
     std::lock_guard<std::mutex> lock(_mutex);
     auto it = _shared_contexts.find(my_node_id);
     if (it != _shared_contexts.cend()) {
@@ -52,10 +52,6 @@ void SharedHashTableController::signal(int my_node_id) {
     for (auto& dep : _dependencies[my_node_id]) {
         dep->set_ready();
     }
-}
-
-void SharedHashTableController::signal_finish(int my_node_id) {
-    std::lock_guard<std::mutex> lock(_mutex);
     for (auto& dep : _finish_dependencies[my_node_id]) {
         dep->set_ready();
     }
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index b04b1cdba06..173f9d46e89 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -76,7 +76,6 @@ public:
     void set_builder_and_consumers(TUniqueId builder, int node_id);
     TUniqueId get_builder_fragment_instance_id(int my_node_id);
     SharedHashTableContextPtr get_context(int my_node_id);
-    void signal(int my_node_id);
     void signal_finish(int my_node_id);
     void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency> 
dep,
                            std::shared_ptr<pipeline::Dependency> finish_dep) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to