This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ffecfd196c [fix](local exchange) Fix unbalance data distribution 
(#44421)
1ffecfd196c is described below

commit 1ffecfd196c0848faad031f498359c7f506688b5
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Fri Nov 22 15:48:08 2024 +0800

    [fix](local exchange) Fix unbalance data distribution (#44421)
    
    Follow-up for #44137
---
 be/src/pipeline/local_exchange/local_exchanger.h | 16 +++++++---------
 be/src/pipeline/pipeline_fragment_context.cpp    |  2 +-
 2 files changed, 8 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 4912ab33698..f518e2649f8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -53,7 +53,7 @@ public:
     ExchangerBase(int running_sink_operators, int num_sources, int 
num_partitions,
                   int free_block_limit)
             : _running_sink_operators(running_sink_operators),
-              _running_source_operators(num_partitions),
+              _running_source_operators(num_sources),
               _num_partitions(num_partitions),
               _num_senders(running_sink_operators),
               _num_sources(num_sources),
@@ -202,10 +202,13 @@ struct BlockWrapper {
 class ShuffleExchanger : public Exchanger<PartitionedBlock> {
 public:
     ENABLE_FACTORY_CREATOR(ShuffleExchanger);
-    ShuffleExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger<PartitionedBlock>(running_sink_operators, 
num_partitions,
+    ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
+                     int free_block_limit)
+            : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, 
num_partitions,
                                           free_block_limit) {
-        _data_queue.resize(num_partitions);
+        DCHECK_GT(num_partitions, 0);
+        DCHECK_GT(num_sources, 0);
+        _data_queue.resize(num_sources);
     }
     ~ShuffleExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -217,10 +220,6 @@ public:
     ExchangeType get_type() const override { return 
ExchangeType::HASH_SHUFFLE; }
 
 protected:
-    ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
-                     int free_block_limit)
-            : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, 
num_partitions,
-                                          free_block_limit) {}
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
                        vectorized::Block* block, LocalExchangeSinkLocalState& 
local_state);
 };
@@ -232,7 +231,6 @@ class BucketShuffleExchanger final : public 
ShuffleExchanger {
             : ShuffleExchanger(running_sink_operators, num_sources, 
num_partitions,
                                free_block_limit) {
         DCHECK_GT(num_partitions, 0);
-        _data_queue.resize(std::max(num_partitions, num_sources));
     }
     ~BucketShuffleExchanger() override = default;
     ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 7572b20d341..0775ef3fb19 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -747,7 +747,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     switch (data_distribution.distribution_type) {
     case ExchangeType::HASH_SHUFFLE:
         shared_state->exchanger = ShuffleExchanger::create_unique(
-                std::max(cur_pipe->num_tasks(), _num_instances),
+                std::max(cur_pipe->num_tasks(), _num_instances), 
_num_instances,
                 use_global_hash_shuffle ? _total_instances : _num_instances,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
                         ? cast_set<int>(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to