This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize in repository https://gitbox.apache.org/repos/asf/doris.git
commit 1ebdfd2ab302ccd6d8b2037ad8cab4766b5113b4 Author: 924060929 <[email protected]> AuthorDate: Thu Jun 4 20:12:30 2026 +0800 [opt](local shuffle) parallel pooled bucket scan: spread ranges to bucket owners (DORIS-24902) In a pooled bucket-join fragment all scan ranges went to instance 0 (serial scan), which dominated fragment latency regardless of join parallelism. When the FE planner's bucket upgrade is enabled (enable_local_shuffle_planner and local_shuffle_bucket_upgrade_ratio > 1): 1. assignLocalShuffleJobs gives each bucket-owner instance its own buckets' scan ranges (the parallelize() split already matches assignedJoinBucketIndexes). 2. ScanNode.isSerialOperatorOnBe returns false for bucket-assigned fragments (hasColocatePlanNode || hasBucketShuffleNode — the same predicate UnassignedJobBuilder uses), because BE runs one scan task per instance and a serial flag would leave only instance 0's ranges scanned (verified: 3/4 of join rows silently dropped before this override). isSerialNode() is untouched so pooling decisions (useSerialSource) stay unchanged. Plain pooled fragments keep all ranges on instance 0 + serial flag + PASSTHROUGH fan-out: the planner=false destination path (sortDestinationInstancesByBuckets) and BE-planned local exchange depend on that contract, hence the gates. 3-BE verification (4-bucket 20M x 20M single bucket-shuffle join, 200M-row output): all arms match local-shuffle-off baseline; upgraded+parallel-scan 0.39s vs bucket 0.48s (+19%) vs downgraded shuffle 0.44s (+11%). --- .../job/UnassignedScanBucketOlapTableJob.java | 15 +++++++++-- .../java/org/apache/doris/planner/ScanNode.java | 29 ++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index b7f654cd023..24c96fd6960 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -217,13 +217,24 @@ public class UnassignedScanBucketOlapTableJob extends AbstractUnassignedScanJob if (nonSerialScanSource.isEmpty()) { List<BucketScanSource> assignedJoinBuckets = (List) serialScanSource.parallelize(scanNodes, instanceNum); + // With the FE local shuffle planner's bucket upgrade enabled, give every + // bucket-owner instance its own buckets' scan ranges so the scan runs in + // parallel across owners, instead of funneling all ranges into instance 0 + // (the serial-scan constant that dominates pooled bucket fragments). The + // planner=false dest path (sortDestinationInstancesByBuckets) depends on the + // first instance holding all buckets, so this must stay gated. + boolean spreadScanRanges = context.getSessionVariable().isEnableLocalShuffle() + && context.getSessionVariable().isEnableLocalShufflePlanner() + && context.getSessionVariable().getLocalShuffleBucketUpgradeRatio() > 1.0; for (int i = 0; i < assignedJoinBuckets.size(); i++) { BucketScanSource assignedJoinBucket = assignedJoinBuckets.get(i); LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob( instances.size(), shareScanId, context.nextInstanceId(), this, worker, - // only first instance to scan all data - i == 0 ? serialScanSource : emptyShareScanSource, + // spread mode: each instance scans its assigned join buckets; + // legacy mode: only first instance to scan all data + spreadScanRanges ? assignedJoinBucket + : (i == 0 ? serialScanSource : emptyShareScanSource), // but join can assign to multiple instances Utils.fastToImmutableSet(assignedJoinBucket.bucketIndexToScanNodeToTablets.keySet()) ); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index e69d3101548..9b57474639f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -749,6 +749,35 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { || context.getSessionVariable().isForceToLocalShuffle(); } + /** + * Spread-scan mode (mirrors the gate in + * {@code UnassignedScanBucketOlapTableJob.assignLocalShuffleJobs}): when the FE local + * shuffle planner's bucket upgrade is enabled, a pooled bucket-assigned fragment gives + * every bucket-owner instance its own buckets' scan ranges. The scan must then NOT be + * reported serial — BE runs one scan task per instance, and a serial flag would leave + * only instance 0's ranges scanned (silently dropping the other buckets). + * + * Plain pooled fragments (non-bucket assignment) keep all ranges on instance 0 and + * still rely on the serial flag + PASSTHROUGH fan-out, so the bucket-fragment shape + * check ({@code hasColocatePlanNode || hasBucketShuffleNode}, the same predicate + * UnassignedJobBuilder uses to pick the bucket job) is essential. + * + * Note this does NOT touch {@link #isSerialNode()}: pooling decisions + * (fragment.useSerialSource via hasSerialScanNode) stay unchanged. + */ + @Override + public boolean isSerialOperatorOnBe(ConnectContext context) { + if (context != null + && context.getSessionVariable().isEnableLocalShuffle() + && context.getSessionVariable().isEnableLocalShufflePlanner() + && context.getSessionVariable().getLocalShuffleBucketUpgradeRatio() > 1.0 + && fragment != null + && (fragment.hasColocatePlanNode() || fragment.hasBucketShuffleNode())) { + return false; + } + return super.isSerialOperatorOnBe(context); + } + @Override public boolean hasSerialScanChildren() { return isSerialNode(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
