This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d9a6f3165527fe8559d160f072862c113b6039b1
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 21:25:32 2026 +0800

    [fix](local shuffle) do not spread bucket destinations for a serial exchange
    
    The bucket destination spread (87c0e730ab6) was gated on planner-on + pooled
    bucket fragment but not on the exchange's own seriality. With
    use_serial_exchange=true a bucket-shuffle exchange is serial: BE receives
    through one task per worker and expects funnel destinations; spreading them
    across bucket owners silently drops every row addressed to a non-first
    instance (caught by the RQG-bugs regression suite, Bug 20: 3-BE
    use_serial_exchange + RIGHT OUTER self-join lost 12 of 20 groups).
    
    Gate the spread on !linkNode.isSerialOperatorOnBe(), completing the 
symmetric
    guard set around the serial-exchange contract:
    - FE dest spread: only for non-serial exchanges (this commit)
    - BE orphan receiver EOS: only for non-serial exchanges (9fc18602238)
    - serial exchanges keep funnel destinations + fan-out LE (the old world)
    
    Verified: Bug 20 returns all 20 groups again and the whole
    nereids_p0/local_shuffle regression directory passes 7/7 on a 3-BE cluster.
---
 .../nereids/trees/plans/distribute/DistributePlanner.java     | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
index c6ee632edfa..fd1ff52f5f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
@@ -212,7 +212,7 @@ public class DistributePlanner {
         List<AssignedJob> receiverInstances = 
filterInstancesWhichCanReceiveDataFromRemote(
                 receiverPlan, enableShareHashTableForBroadcastJoin, linkNode);
         if (linkNode.getPartitionType() == 
TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) {
-            receiverInstances = getDestinationsByBuckets(receiverPlan, 
receiverInstances);
+            receiverInstances = getDestinationsByBuckets(receiverPlan, 
receiverInstances, linkNode);
         }
 
         DataSink sink = senderPlan.getFragmentJob().getFragment().getSink();
@@ -232,10 +232,17 @@ public class DistributePlanner {
 
     private List<AssignedJob> getDestinationsByBuckets(
             PipelineDistributedPlan joinSide,
-            List<AssignedJob> receiverInstances) {
+            List<AssignedJob> receiverInstances,
+            ExchangeNode linkNode) {
         UnassignedScanBucketOlapTableJob bucketJob = 
(UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob();
         int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum();
+        // The spread is only valid for a NON-serial exchange: a serial 
exchange
+        // (use_serial_exchange / UNPARTITIONED) receives through one task per 
worker and
+        // expects funnel destinations; spreading them loses every row 
addressed to a
+        // non-first instance. Mirrors the !is_serial_operator() gate on the 
BE orphan
+        // receiver fix.
         if (isEnableLocalShufflePlanner()
+                && 
!linkNode.isSerialOperatorOnBe(statementContext.getConnectContext())
                 && !joinSide.getInstanceJobs().isEmpty()
                 && joinSide.getInstanceJobs().stream()
                         
.allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to