This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7dd0b12a16401a2be48592a9b57361f6635e732e Author: 924060929 <[email protected]> AuthorDate: Thu Jun 4 19:43:13 2026 +0800 [opt](nereids) make bucket shuffle downgrade threshold tunable The bucket-shuffle-join downgrade (give up bucket shuffle when the base table side's totalBucketNum < totalInstanceNum * 0.8 and fall back to shuffle join) hardcoded the 0.8 threshold. Make it a session variable bucket_shuffle_downgrade_ratio (default 0.8 keeps the original behavior; <= 0 never downgrades). Rationale: the downgrade and the FE local shuffle planner's bucket -> local hash upgrade (local_shuffle_bucket_upgrade_ratio) solve the same problem — few-bucket bucket shuffle runs at bucket-count parallelism — from opposite ends. The downgrade pays a full re-shuffle of BOTH sides for parallelism; the upgrade keeps the anchored side in place and restores parallelism with a local exchange. With the upgrade available the downgrade window can be narrowed or disabled; measured on a 4-bucket 20M x 20M single bucket-shuffle join (200M-row output, 16 instances): bucket 4-way 0.445s, upgraded 16-way 0.40s, downgraded shuffle 0.395s (single-BE, where shuffle pays no real network — multi-BE favors bucket+upgrade further). --- .../properties/ChildrenPropertiesRegulator.java | 10 +++++++++- .../java/org/apache/doris/qe/SessionVariable.java | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 845c87eea9c..e5e0d9d1bd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -311,7 +311,15 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<List<List<PhysicalP int bucketNum = candidate.getTable().getDefaultDistributionInfo().getBucketNum(); int totalBucketNum = prunedPartNum * bucketNum; ConnectContext connectContext = ConnectContext.get(); - return totalBucketNum < connectContext.getTotalInstanceNum() * 0.8; + // <= 0 disables the downgrade entirely: with the FE local shuffle planner's + // bucket -> local-hash upgrade (local_shuffle_bucket_upgrade_ratio), few-bucket + // bucket shuffle no longer funnels, so keeping bucket shuffle (anchored side + // needs no re-shuffle) can beat downgrading to shuffle join. + double downgradeRatio = connectContext.getSessionVariable().getBucketShuffleDowngradeRatio(); + if (downgradeRatio <= 0) { + return false; + } + return totalBucketNum < connectContext.getTotalInstanceNum() * downgradeRatio; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 66f80718ee4..2a5e895acf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -341,6 +341,8 @@ public class SessionVariable implements Serializable, Writable { public static final String LOCAL_SHUFFLE_BUCKET_UPGRADE_RATIO = "local_shuffle_bucket_upgrade_ratio"; + public static final String BUCKET_SHUFFLE_DOWNGRADE_RATIO = "bucket_shuffle_downgrade_ratio"; + public static final String FORCE_TO_LOCAL_SHUFFLE = "force_to_local_shuffle"; public static final String ENABLE_LOCAL_MERGE_SORT = "enable_local_merge_sort"; @@ -1654,6 +1656,15 @@ public class SessionVariable implements Serializable, Writable { + " and negatives) disable the upgrade."}, needForward = true) private double localShuffleBucketUpgradeRatio = 1.5; + @VarAttrDef.VarAttr( + name = BUCKET_SHUFFLE_DOWNGRADE_RATIO, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + description = {"当一侧基表总桶数小于总实例数的该倍数时, 放弃bucket shuffle join降级为shuffle join。" + + "小于等于0时永不降级。默认0.8保持原有行为", + "Downgrade bucket shuffle join to shuffle join when the base table side's total" + + " bucket count is less than total instance count times this ratio. Values <= 0" + + " never downgrade. Default 0.8 keeps the original behavior."}, needForward = true) + private double bucketShuffleDowngradeRatio = 0.8; + @VarAttrDef.VarAttr(name = ENABLE_LOCAL_MERGE_SORT) private boolean enableLocalMergeSort = true; @@ -4818,6 +4829,14 @@ public class SessionVariable implements Serializable, Writable { this.localShuffleBucketUpgradeRatio = localShuffleBucketUpgradeRatio; } + public double getBucketShuffleDowngradeRatio() { + return bucketShuffleDowngradeRatio; + } + + public void setBucketShuffleDowngradeRatio(double bucketShuffleDowngradeRatio) { + this.bucketShuffleDowngradeRatio = bucketShuffleDowngradeRatio; + } + public boolean enablePushDownNoGroupAgg() { return enablePushDownNoGroupAgg; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
