This is an automated email from the ASF dual-hosted git repository.
jacktengg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e64df362958 [fix](spill) use different algorithm to avoid partition
data skew (#34162)
e64df362958 is described below
commit e64df3629588a98fd0daeb5257f393e4a0610b54
Author: Jerry Hu <[email protected]>
AuthorDate: Sat Apr 27 10:43:02 2024 +0800
[fix](spill) use different algorithm to avoid partition data skew (#34162)
---
be/src/pipeline/exec/partitioned_hash_join_probe_operator.h | 3 +--
be/src/pipeline/exec/partitioned_hash_join_sink_operator.h | 3 +--
be/src/vec/runtime/partitioner.cpp | 1 +
be/src/vec/runtime/partitioner.h | 7 +++++++
4 files changed, 10 insertions(+), 4 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 3702c2e1a6b..d650dd1590d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -26,14 +26,13 @@
#include "pipeline/exec/join_build_sink_operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/runtime/partitioner.h"
-#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds
namespace doris {
class RuntimeState;
namespace pipeline {
-using PartitionerType =
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>;
+using PartitionerType =
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
class PartitionedHashJoinProbeOperatorX;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 5c6b7e1f74f..9120392e104 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -26,14 +26,13 @@
#include "pipeline/exec/join_build_sink_operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/runtime/partitioner.h"
-#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds
namespace doris {
class RuntimeState;
namespace pipeline {
-using PartitionerType =
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>;
+using PartitionerType =
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
class PartitionedHashJoinSinkOperatorX;
diff --git a/be/src/vec/runtime/partitioner.cpp
b/be/src/vec/runtime/partitioner.cpp
index fadf6d73b95..bbb6ebfc1a8 100644
--- a/be/src/vec/runtime/partitioner.cpp
+++ b/be/src/vec/runtime/partitioner.cpp
@@ -103,5 +103,6 @@ template class Partitioner<size_t, ShuffleChannelIds>;
template class XXHashPartitioner<ShuffleChannelIds>;
template class Partitioner<uint32_t, ShuffleChannelIds>;
template class Crc32HashPartitioner<ShuffleChannelIds>;
+template class Crc32HashPartitioner<SpillPartitionChannelIds>;
} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h
index 8d715a41285..3152edb5cb5 100644
--- a/be/src/vec/runtime/partitioner.h
+++ b/be/src/vec/runtime/partitioner.h
@@ -125,5 +125,12 @@ private:
void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int
idx) const override;
};
+struct SpillPartitionChannelIds {
+ template <typename HashValueType>
+ HashValueType operator()(HashValueType l, size_t r) {
+ return ((l >> 16) | (l << 16)) % r;
+ }
+};
+
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]