This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize in repository https://gitbox.apache.org/repos/asf/doris.git
commit d9a6f3165527fe8559d160f072862c113b6039b1 Author: 924060929 <[email protected]> AuthorDate: Thu Jun 4 21:25:32 2026 +0800 [fix](local shuffle) do not spread bucket destinations for a serial exchange The bucket destination spread (87c0e730ab6) was gated on planner-on + pooled bucket fragment but not on the exchange's own seriality. With use_serial_exchange=true a bucket-shuffle exchange is serial: BE receives through one task per worker and expects funnel destinations; spreading them across bucket owners silently drops every row addressed to a non-first instance (caught by the RQG-bugs regression suite, Bug 20: 3-BE use_serial_exchange + RIGHT OUTER self-join lost 12 of 20 groups). Gate the spread on !linkNode.isSerialOperatorOnBe(), completing the symmetric guard set around the serial-exchange contract: - FE dest spread: only for non-serial exchanges (this commit) - BE orphan receiver EOS: only for non-serial exchanges (9fc18602238) - serial exchanges keep funnel destinations + fan-out LE (the old world) Verified: Bug 20 returns all 20 groups again and the whole nereids_p0/local_shuffle regression directory passes 7/7 on a 3-BE cluster. --- .../nereids/trees/plans/distribute/DistributePlanner.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java index c6ee632edfa..fd1ff52f5f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -212,7 +212,7 @@ public class DistributePlanner { List<AssignedJob> receiverInstances = filterInstancesWhichCanReceiveDataFromRemote( receiverPlan, enableShareHashTableForBroadcastJoin, linkNode); if (linkNode.getPartitionType() == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) { - receiverInstances = getDestinationsByBuckets(receiverPlan, receiverInstances); + receiverInstances = getDestinationsByBuckets(receiverPlan, receiverInstances, linkNode); } DataSink sink = senderPlan.getFragmentJob().getFragment().getSink(); @@ -232,10 +232,17 @@ public class DistributePlanner { private List<AssignedJob> getDestinationsByBuckets( PipelineDistributedPlan joinSide, - List<AssignedJob> receiverInstances) { + List<AssignedJob> receiverInstances, + ExchangeNode linkNode) { UnassignedScanBucketOlapTableJob bucketJob = (UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob(); int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum(); + // The spread is only valid for a NON-serial exchange: a serial exchange + // (use_serial_exchange / UNPARTITIONED) receives through one task per worker and + // expects funnel destinations; spreading them loses every row addressed to a + // non-first instance. Mirrors the !is_serial_operator() gate on the BE orphan + // receiver fix. if (isEnableLocalShufflePlanner() + && !linkNode.isSerialOperatorOnBe(statementContext.getConnectContext()) && !joinSide.getInstanceJobs().isEmpty() && joinSide.getInstanceJobs().stream() .allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance)) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
