This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 22f1cd1f641 [Improvement](local exchange) Reuse memory in PassToOneExchanger (#48745) 22f1cd1f641 is described below commit 22f1cd1f6418ab31e510c00b289da3c3909983aa Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Mon Mar 10 09:49:26 2025 +0800 [Improvement](local exchange) Reuse memory in PassToOneExchanger (#48745) ### What problem does this PR solve? pick #39031 ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- .../pipeline_x/local_exchange/local_exchanger.cpp | 15 +++++++++++++-- .../pipeline_x/pipeline_x_fragment_context.cpp | 19 ++++++++++++++----- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 33312201f4e..1dbb2a72ff6 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -270,7 +270,13 @@ Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* b Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) { - vectorized::Block new_block(in_block->clone_empty()); + if (in_block->empty()) { + return Status::OK(); + } + vectorized::Block new_block; + if (!_free_blocks.try_dequeue(new_block)) { + new_block = {in_block->clone_empty()}; + } new_block.swap(*in_block); _enqueue_data_and_set_ready(0, local_state, std::move(new_block)); @@ -285,7 +291,12 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo } vectorized::Block next_block; if (_dequeue_data(local_state, next_block, eos)) { - *block = std::move(next_block); + block->swap(next_block); + local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); + if (_free_block_limit == 0 || + _free_blocks.size_approx() < _free_block_limit * _num_sources) { + _free_blocks.enqueue(std::move(next_block)); + } } return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 1eedd1af92e..d54633de29b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -937,11 +937,20 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( : 0); break; case ExchangeType::PASS_TO_ONE: - shared_state->exchanger = BroadcastExchanger::create_unique( - cur_pipe->num_tasks(), _num_instances, - _runtime_state->query_options().__isset.local_exchange_free_blocks_limit - ? _runtime_state->query_options().local_exchange_free_blocks_limit - : 0); + if (_runtime_state->enable_share_hash_table_for_broadcast_join()) { + // If shared hash table is enabled for BJ, hash table will be built by only one task + shared_state->exchanger = PassToOneExchanger::create_unique( + cur_pipe->num_tasks(), _num_instances, + _runtime_state->query_options().__isset.local_exchange_free_blocks_limit + ? _runtime_state->query_options().local_exchange_free_blocks_limit + : 0); + } else { + shared_state->exchanger = BroadcastExchanger::create_unique( + cur_pipe->num_tasks(), _num_instances, + _runtime_state->query_options().__isset.local_exchange_free_blocks_limit + ? _runtime_state->query_options().local_exchange_free_blocks_limit + : 0); + } break; case ExchangeType::ADAPTIVE_PASSTHROUGH: shared_state->exchanger = AdaptivePassthroughExchanger::create_unique( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org