This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ff56605c2eb [pipelineX](bug) Fix hash partition shuffle (#28071)
ff56605c2eb is described below
commit ff56605c2ebc26c535933573d5b675b484d799b0
Author: Gabriel <[email protected]>
AuthorDate: Wed Dec 6 19:26:46 2023 +0800
[pipelineX](bug) Fix hash partition shuffle (#28071)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index f8c3e392184..c4319abf03c 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -213,11 +213,11 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
if (p._part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner.reset(
- new
vectorized::XXHashPartitioner<LocalExchangeChannelIds>(channels.size()));
+ new
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(channels.size()));
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
- fmt::format("XXHashPartitioner({})",
_partition_count));
+ fmt::format("Crc32HashPartitioner({})",
_partition_count));
} else if (p._part_type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = channel_shared_ptrs.size();
_partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
@@ -410,7 +410,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
if (_part_type == TPartitionType::HASH_PARTITIONED) {
RETURN_IF_ERROR(channel_add_rows(state, local_state.channels,
local_state._partition_count,
-
(uint64_t*)local_state._partitioner->get_channel_ids(),
+
(uint32_t*)local_state._partitioner->get_channel_ids(),
rows, block, source_state ==
SourceState::FINISHED));
} else {
RETURN_IF_ERROR(channel_add_rows(state,
local_state.channel_shared_ptrs,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]