This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 87c0e730ab6f545d7260699bca6e4e92c24dd9b4
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 12:01:29 2026 +0800

    [fix](local shuffle) spread bucket-shuffle dests across pooled instances by 
assigned join buckets
---
 .../trees/plans/distribute/DistributePlanner.java  | 46 ++++++++++++++++++++++
 1 file changed, 46 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
index 0069ddd37db..c6ee632edfa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
@@ -31,6 +31,7 @@ import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJobBui
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder;
@@ -234,9 +235,24 @@ public class DistributePlanner {
             List<AssignedJob> receiverInstances) {
         UnassignedScanBucketOlapTableJob bucketJob = 
(UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob();
         int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum();
+        if (isEnableLocalShufflePlanner()
+                && !joinSide.getInstanceJobs().isEmpty()
+                && joinSide.getInstanceJobs().stream()
+                        
.allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance)) {
+            // When FE local shuffle planner is on, spread bucket destinations 
across all pooled
+            // instances by their assigned join buckets — the same bucket -> 
instance mapping as
+            // bucket_seq_to_instance_id sent to BE — instead of funneling 
every bucket of a worker
+            // into its first instance and relying on BE local exchange to fan 
out.
+            return sortDestinationInstancesByJoinBuckets(joinSide, bucketNum);
+        }
         return sortDestinationInstancesByBuckets(joinSide, receiverInstances, 
bucketNum);
     }
 
+    private boolean isEnableLocalShufflePlanner() {
+        ConnectContext connectContext = statementContext.getConnectContext();
+        return connectContext != null && 
connectContext.getSessionVariable().isEnableLocalShufflePlanner();
+    }
+
     private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(
             PipelineDistributedPlan receiverPlan,
             boolean enableShareHashTableForBroadcastJoin,
@@ -252,6 +268,36 @@ public class DistributePlanner {
         }
     }
 
+    private List<AssignedJob> sortDestinationInstancesByJoinBuckets(
+            PipelineDistributedPlan plan, int bucketNum) {
+        AssignedJob[] instances = new AssignedJob[bucketNum];
+        for (AssignedJob instanceJob : plan.getInstanceJobs()) {
+            LocalShuffleBucketJoinAssignedJob localShuffleJob = 
(LocalShuffleBucketJoinAssignedJob) instanceJob;
+            for (Integer bucketIndex : 
localShuffleJob.getAssignedJoinBucketIndexes()) {
+                if (instances[bucketIndex] != null) {
+                    throw new IllegalStateException(
+                            "Multi instances assigned same join bucket: " + 
instances[bucketIndex]
+                                    + " and " + instanceJob
+                    );
+                }
+                instances[bucketIndex] = instanceJob;
+            }
+        }
+
+        for (int i = 0; i < instances.length; i++) {
+            if (instances[i] == null) {
+                instances[i] = new StaticAssignedJob(
+                        i,
+                        new TUniqueId(-1, -1),
+                        plan.getFragmentJob(),
+                        DummyWorker.INSTANCE,
+                        new DefaultScanSource(ImmutableMap.of())
+                );
+            }
+        }
+        return Arrays.asList(instances);
+    }
+
     private List<AssignedJob> sortDestinationInstancesByBuckets(
             PipelineDistributedPlan plan, List<AssignedJob> unsorted, int 
bucketNum) {
         AssignedJob[] instances = new AssignedJob[bucketNum];


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to