This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1ffecfd196c [fix](local exchange) Fix unbalance data distribution (#44421) 1ffecfd196c is described below commit 1ffecfd196c0848faad031f498359c7f506688b5 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Fri Nov 22 15:48:08 2024 +0800 [fix](local exchange) Fix unbalance data distribution (#44421) Follow-up for #44137 --- be/src/pipeline/local_exchange/local_exchanger.h | 16 +++++++--------- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 4912ab33698..f518e2649f8 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -53,7 +53,7 @@ public: ExchangerBase(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) : _running_sink_operators(running_sink_operators), - _running_source_operators(num_partitions), + _running_source_operators(num_sources), _num_partitions(num_partitions), _num_senders(running_sink_operators), _num_sources(num_sources), @@ -202,10 +202,13 @@ struct BlockWrapper { class ShuffleExchanger : public Exchanger<PartitionedBlock> { public: ENABLE_FACTORY_CREATOR(ShuffleExchanger); - ShuffleExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<PartitionedBlock>(running_sink_operators, num_partitions, + ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, + int free_block_limit) + : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions, free_block_limit) { - _data_queue.resize(num_partitions); + DCHECK_GT(num_partitions, 0); + DCHECK_GT(num_sources, 0); + _data_queue.resize(num_sources); } ~ShuffleExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, @@ -217,10 +220,6 @@ public: ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } protected: - ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, - int free_block_limit) - : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions, - free_block_limit) {} Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, LocalExchangeSinkLocalState& local_state); }; @@ -232,7 +231,6 @@ class BucketShuffleExchanger final : public ShuffleExchanger { : ShuffleExchanger(running_sink_operators, num_sources, num_partitions, free_block_limit) { DCHECK_GT(num_partitions, 0); - _data_queue.resize(std::max(num_partitions, num_sources)); } ~BucketShuffleExchanger() override = default; ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 7572b20d341..0775ef3fb19 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -747,7 +747,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( switch (data_distribution.distribution_type) { case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( - std::max(cur_pipe->num_tasks(), _num_instances), + std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, use_global_hash_shuffle ? _total_instances : _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? cast_set<int>( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org