This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 22f1cd1f641 [Improvement](local exchange) Reuse memory in 
PassToOneExchanger (#48745)
22f1cd1f641 is described below

commit 22f1cd1f6418ab31e510c00b289da3c3909983aa
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Mon Mar 10 09:49:26 2025 +0800

    [Improvement](local exchange) Reuse memory in PassToOneExchanger (#48745)
    
    ### What problem does this PR solve?
    
    pick #39031
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../pipeline_x/local_exchange/local_exchanger.cpp     | 15 +++++++++++++--
 .../pipeline_x/pipeline_x_fragment_context.cpp        | 19 ++++++++++++++-----
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 33312201f4e..1dbb2a72ff6 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -270,7 +270,13 @@ Status PassthroughExchanger::get_block(RuntimeState* 
state, vectorized::Block* b
 
 Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
                                 LocalExchangeSinkLocalState& local_state) {
-    vectorized::Block new_block(in_block->clone_empty());
+    if (in_block->empty()) {
+        return Status::OK();
+    }
+    vectorized::Block new_block;
+    if (!_free_blocks.try_dequeue(new_block)) {
+        new_block = {in_block->clone_empty()};
+    }
     new_block.swap(*in_block);
     _enqueue_data_and_set_ready(0, local_state, std::move(new_block));
 
@@ -285,7 +291,12 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
     }
     vectorized::Block next_block;
     if (_dequeue_data(local_state, next_block, eos)) {
-        *block = std::move(next_block);
+        block->swap(next_block);
+        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
+        if (_free_block_limit == 0 ||
+            _free_blocks.size_approx() < _free_block_limit * _num_sources) {
+            _free_blocks.enqueue(std::move(next_block));
+        }
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1eedd1af92e..d54633de29b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -937,11 +937,20 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
                         : 0);
         break;
     case ExchangeType::PASS_TO_ONE:
-        shared_state->exchanger = BroadcastExchanger::create_unique(
-                cur_pipe->num_tasks(), _num_instances,
-                
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
-                        ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
-                        : 0);
+        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
+            // If shared hash table is enabled for BJ, hash table will be 
built by only one task
+            shared_state->exchanger = PassToOneExchanger::create_unique(
+                    cur_pipe->num_tasks(), _num_instances,
+                    
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                            ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                            : 0);
+        } else {
+            shared_state->exchanger = BroadcastExchanger::create_unique(
+                    cur_pipe->num_tasks(), _num_instances,
+                    
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                            ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                            : 0);
+        }
         break;
     case ExchangeType::ADAPTIVE_PASSTHROUGH:
         shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to