This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e64df362958 [fix](spill) use different algorithm to avoid partition 
data skew (#34162)
e64df362958 is described below

commit e64df3629588a98fd0daeb5257f393e4a0610b54
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Sat Apr 27 10:43:02 2024 +0800

    [fix](spill) use different algorithm to avoid partition data skew (#34162)
---
 be/src/pipeline/exec/partitioned_hash_join_probe_operator.h | 3 +--
 be/src/pipeline/exec/partitioned_hash_join_sink_operator.h  | 3 +--
 be/src/vec/runtime/partitioner.cpp                          | 1 +
 be/src/vec/runtime/partitioner.h                            | 7 +++++++
 4 files changed, 10 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 3702c2e1a6b..d650dd1590d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -26,14 +26,13 @@
 #include "pipeline/exec/join_build_sink_operator.h"
 #include "pipeline/pipeline_x/operator.h"
 #include "vec/runtime/partitioner.h"
-#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds
 
 namespace doris {
 class RuntimeState;
 
 namespace pipeline {
 
-using PartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>;
+using PartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
 
 class PartitionedHashJoinProbeOperatorX;
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 5c6b7e1f74f..9120392e104 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -26,14 +26,13 @@
 #include "pipeline/exec/join_build_sink_operator.h"
 #include "pipeline/pipeline_x/operator.h"
 #include "vec/runtime/partitioner.h"
-#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds
 
 namespace doris {
 class RuntimeState;
 
 namespace pipeline {
 
-using PartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>;
+using PartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
 
 class PartitionedHashJoinSinkOperatorX;
 
diff --git a/be/src/vec/runtime/partitioner.cpp 
b/be/src/vec/runtime/partitioner.cpp
index fadf6d73b95..bbb6ebfc1a8 100644
--- a/be/src/vec/runtime/partitioner.cpp
+++ b/be/src/vec/runtime/partitioner.cpp
@@ -103,5 +103,6 @@ template class Partitioner<size_t, ShuffleChannelIds>;
 template class XXHashPartitioner<ShuffleChannelIds>;
 template class Partitioner<uint32_t, ShuffleChannelIds>;
 template class Crc32HashPartitioner<ShuffleChannelIds>;
+template class Crc32HashPartitioner<SpillPartitionChannelIds>;
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h
index 8d715a41285..3152edb5cb5 100644
--- a/be/src/vec/runtime/partitioner.h
+++ b/be/src/vec/runtime/partitioner.h
@@ -125,5 +125,12 @@ private:
     void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int 
idx) const override;
 };
 
+struct SpillPartitionChannelIds {
+    template <typename HashValueType>
+    HashValueType operator()(HashValueType l, size_t r) {
+        return ((l >> 16) | (l << 16)) % r;
+    }
+};
+
 } // namespace vectorized
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to