This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize in repository https://gitbox.apache.org/repos/asf/doris.git
commit 87c0e730ab6f545d7260699bca6e4e92c24dd9b4 Author: 924060929 <[email protected]> AuthorDate: Thu Jun 4 12:01:29 2026 +0800 [fix](local shuffle) spread bucket-shuffle dests across pooled instances by assigned join buckets --- .../trees/plans/distribute/DistributePlanner.java | 46 ++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java index 0069ddd37db..c6ee632edfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJobBui import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource; import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource; import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder; @@ -234,9 +235,24 @@ public class DistributePlanner { List<AssignedJob> receiverInstances) { UnassignedScanBucketOlapTableJob bucketJob = (UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob(); int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum(); + if (isEnableLocalShufflePlanner() + && !joinSide.getInstanceJobs().isEmpty() + && joinSide.getInstanceJobs().stream() + .allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance)) { + // When FE local shuffle planner is on, spread bucket destinations across all pooled + // instances by their assigned join buckets — the same bucket -> instance mapping as + // bucket_seq_to_instance_id sent to BE — instead of funneling every bucket of a worker + // into its first instance and relying on BE local exchange to fan out. + return sortDestinationInstancesByJoinBuckets(joinSide, bucketNum); + } return sortDestinationInstancesByBuckets(joinSide, receiverInstances, bucketNum); } + private boolean isEnableLocalShufflePlanner() { + ConnectContext connectContext = statementContext.getConnectContext(); + return connectContext != null && connectContext.getSessionVariable().isEnableLocalShufflePlanner(); + } + private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote( PipelineDistributedPlan receiverPlan, boolean enableShareHashTableForBroadcastJoin, @@ -252,6 +268,36 @@ public class DistributePlanner { } } + private List<AssignedJob> sortDestinationInstancesByJoinBuckets( + PipelineDistributedPlan plan, int bucketNum) { + AssignedJob[] instances = new AssignedJob[bucketNum]; + for (AssignedJob instanceJob : plan.getInstanceJobs()) { + LocalShuffleBucketJoinAssignedJob localShuffleJob = (LocalShuffleBucketJoinAssignedJob) instanceJob; + for (Integer bucketIndex : localShuffleJob.getAssignedJoinBucketIndexes()) { + if (instances[bucketIndex] != null) { + throw new IllegalStateException( + "Multi instances assigned same join bucket: " + instances[bucketIndex] + + " and " + instanceJob + ); + } + instances[bucketIndex] = instanceJob; + } + } + + for (int i = 0; i < instances.length; i++) { + if (instances[i] == null) { + instances[i] = new StaticAssignedJob( + i, + new TUniqueId(-1, -1), + plan.getFragmentJob(), + DummyWorker.INSTANCE, + new DefaultScanSource(ImmutableMap.of()) + ); + } + } + return Arrays.asList(instances); + } + private List<AssignedJob> sortDestinationInstancesByBuckets( PipelineDistributedPlan plan, List<AssignedJob> unsorted, int bucketNum) { AssignedJob[] instances = new AssignedJob[bucketNum]; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
