This is an automated email from the ASF dual-hosted git repository.
924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/fe_local_shuffle_optimize by
this push:
new 2dbc6f0fbc6 [opt](local shuffle) cores-aware bucket upgrade gate
2dbc6f0fbc6 is described below
commit 2dbc6f0fbc68ba2fda8643c716611f3ae615e962
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 22:12:03 2026 +0800
[opt](local shuffle) cores-aware bucket upgrade gate
The upgrade gate compared raw instance count to bucket count, but the real
question is whether the bucket parallelism already saturates the backend's
execution threads: local calibration (20M x 20M join, bucket gradient
4/6/8/10/13 at 16 instances on an oversubscribed host) showed the bucket
baseline keeps improving with bucket count while the upgraded plan stays
flat — once min(buckets, threads) is close to min(instances, threads), the
extra local exchange is a pure cost (-9% to -22%).
Gate per worker on EFFECTIVE parallelism gain:
min(instances, threads) > min(buckets, threads) * ratio
where threads = pipelineExecutorSize from the heartbeat (fallback cpuCores;
<= 1 means not reported yet and the capacity is treated as uncapped). Every
worker owning buckets must clear the bar (conservative min-gain). On normal
deployments (threads >= instances) this reduces exactly to the previous
formula.
UT 16/16, nereids_p0/local_shuffle 7/7 on 3 BEs.
---
.../org/apache/doris/planner/AddLocalExchange.java | 50 ++++++++++++++++++++--
1 file changed, 46 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
index 92925292f12..dcbabc856f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
@@ -126,13 +126,55 @@ public class AddLocalExchange {
return false;
}
Map<Long, Set<Integer>> bucketsPerWorker = new HashMap<>();
+ Map<Long, Integer> instancesPerWorker = new HashMap<>();
+ Map<Long, Integer> coresPerWorker = new HashMap<>();
for (AssignedJob job : instanceJobs) {
- bucketsPerWorker.computeIfAbsent(job.getAssignedWorker().id(), k
-> new HashSet<>())
+ long workerId = job.getAssignedWorker().id();
+ bucketsPerWorker.computeIfAbsent(workerId, k -> new HashSet<>())
.addAll(((LocalShuffleBucketJoinAssignedJob)
job).getAssignedJoinBucketIndexes());
+ instancesPerWorker.merge(workerId, 1, Integer::sum);
+ coresPerWorker.computeIfAbsent(workerId, k ->
resolveWorkerCores(job.getAssignedWorker()));
}
- long maxBucketsPerWorker = bucketsPerWorker.values().stream()
- .mapToLong(Set::size).max().orElse(0);
- return shouldUpgradeBucketParallelism(ratio, maxPerBeInstances,
maxBucketsPerWorker);
+ // Conservative: every worker that owns buckets must clear the gain
bar. The gain is
+ // computed on EFFECTIVE parallelism (capped by the BE's executor
threads): when the
+ // bucket count already saturates the cores, adding instances cannot
speed the join
+ // up and the extra local exchange is a pure cost.
+ boolean anyBuckets = false;
+ for (Map.Entry<Long, Set<Integer>> entry :
bucketsPerWorker.entrySet()) {
+ int buckets = entry.getValue().size();
+ if (buckets == 0) {
+ continue;
+ }
+ anyBuckets = true;
+ int instances = instancesPerWorker.getOrDefault(entry.getKey(), 0);
+ int cores = coresPerWorker.getOrDefault(entry.getKey(),
Integer.MAX_VALUE);
+ if (!shouldUpgradeBucketParallelism(ratio,
+ Math.min(instances, cores), Math.min(buckets, cores))) {
+ return false;
+ }
+ }
+ return anyBuckets;
+ }
+
+ /**
+ * Effective execution threads of the worker's backend
(pipelineExecutorSize, falling
+ * back to cpuCores). Values <= 1 mean the heartbeat has not reported yet
— treat the
+ * capacity as unknown/uncapped rather than blocking the upgrade.
+ */
+ private static int resolveWorkerCores(
+
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker
worker) {
+ if (worker instanceof
org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker) {
+ org.apache.doris.system.Backend backend =
+
((org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker)
worker).getBackend();
+ int size = backend.getPipelineExecutorSize();
+ if (size <= 1) {
+ size = backend.getCputCores();
+ }
+ if (size > 1) {
+ return size;
+ }
+ }
+ return Integer.MAX_VALUE;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]