This is an automated email from the ASF dual-hosted git repository.

zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dc6d28a3b6b [improvement](be) Eliminate redundant MultiCast block 
copies (#63580)
dc6d28a3b6b is described below

commit dc6d28a3b6bd10a6905ff0c2e139046479b14f44
Author: zclllyybb <[email protected]>
AuthorDate: Mon May 25 21:34:30 2026 +0800

    [improvement](be) Eliminate redundant MultiCast block copies (#63580)
    
    Related PR: #60386
    
    Problem Summary: MultiCastDataStreamer already shares pulled blocks
    through the column copy-on-write contract. The previous pull path still
    cloned every column for each consumer through _copy_block(), which added
    unnecessary per-consumer allocation and copy work. The copy-on-write
    assertion changes and regression coverage from the original attempt are
    already present on current master, so this change keeps the pull
    completion accounting in _finish_pull() and returns the shared block
    directly.
    
    test performance with:
    ```sql
    SET enable_profile = true;
      SET profile_level = 2;
      SET inline_cte_referenced_threshold = 0;
      SET parallel_pipeline_task_num = 16;
    
      WITH base AS (
        SELECT id, k1, ..., k8, v01, ..., v24, s1, s2, s3, s4
        FROM bench_cte_multicast_wide
        WHERE id >= 0
      )
      SELECT 0 AS branch, COUNT(*),
             SUM(id + k1 + ... + k8 + v01 + ... + v24),
             SUM(LENGTH(s1) + LENGTH(s2) + LENGTH(s3) + LENGTH(s4))
      FROM base WHERE k1 % 16 = 0
      UNION ALL
      ...
      UNION ALL
      SELECT 15 AS branch, COUNT(*),
             SUM(id + k1 + ... + k8 + v01 + ... + v24),
             SUM(LENGTH(s1) + LENGTH(s2) + LENGTH(s3) + LENGTH(s4))
      FROM base WHERE k1 % 16 = 15
      ORDER BY branch;
    ```
    
    result:
    | version | min | median | avg | max |
    |---|---:|---:|---:|---:|
    | before | 1.450s | 1.470s | 1.472s | 1.500s |
    | after | 0.720s | 0.735s | 0.737s | 0.760s |
---
 be/src/exec/operator/multi_cast_data_streamer.cpp | 12 +++---------
 be/src/exec/operator/multi_cast_data_streamer.h   |  5 ++---
 2 files changed, 5 insertions(+), 12 deletions(-)

diff --git a/be/src/exec/operator/multi_cast_data_streamer.cpp 
b/be/src/exec/operator/multi_cast_data_streamer.cpp
index 538af373b4a..403d8111018 100644
--- a/be/src/exec/operator/multi_cast_data_streamer.cpp
+++ b/be/src/exec/operator/multi_cast_data_streamer.cpp
@@ -151,16 +151,10 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, 
int sender_idx, Block* b
         }
     }
 
-    return _copy_block(state, sender_idx, block, *multi_cast_block);
+    return _finish_pull(state, *multi_cast_block);
 }
 
-Status MultiCastDataStreamer::_copy_block(RuntimeState* state, int32_t 
sender_idx, Block* block,
-                                          MultiCastBlock& multi_cast_block) {
-    const auto rows = block->rows();
-    for (int i = 0; i < block->columns(); ++i) {
-        block->get_by_position(i).column = 
block->get_by_position(i).column->clone_resized(rows);
-    }
-
+Status MultiCastDataStreamer::_finish_pull(RuntimeState* state, 
MultiCastBlock& multi_cast_block) {
     LockGuard l(_mutex);
     multi_cast_block._un_finish_copy--;
     auto copying_count = _copying_count.fetch_sub(1) - 1;
@@ -402,4 +396,4 @@ std::string MultiCastDataStreamer::debug_string() {
     return fmt::to_string(debug_string_buffer);
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/operator/multi_cast_data_streamer.h 
b/be/src/exec/operator/multi_cast_data_streamer.h
index 461e7ff34d2..01264301b77 100644
--- a/be/src/exec/operator/multi_cast_data_streamer.h
+++ b/be/src/exec/operator/multi_cast_data_streamer.h
@@ -97,8 +97,7 @@ private:
     void _set_ready_for_read(int sender_idx);
     void _block_reading(int sender_idx);
 
-    Status _copy_block(RuntimeState* state, int32_t sender_idx, Block* block,
-                       MultiCastBlock& multi_cast_block);
+    Status _finish_pull(RuntimeState* state, MultiCastBlock& multi_cast_block);
 
     Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file) 
REQUIRES(_mutex);
 
@@ -128,4 +127,4 @@ private:
     // operator_profile of each source operator
     std::vector<RuntimeProfile*> _source_operator_profiles;
 };
-} // namespace doris
\ No newline at end of file
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to