This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new b6fb227402 [fix](nereids) fix cte bucket shuffle path (#22311)
b6fb227402 is described below

commit b6fb227402bd9a408da0845c8365297c635a009c
Author: xzj7019 <131111794+xzj7...@users.noreply.github.com>
AuthorDate: Thu Jul 27 22:44:51 2023 +0800

    [fix](nereids) fix cte bucket shuffle path (#22311)
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 40 +++++++++++++++++++++-
 1 file changed, 39 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 39c94320c7..f4a03f9c07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1432,7 +1432,45 @@ public class Coordinator {
                 }
 
                 List<TPlanFragmentDestination> destinations = 
multiSink.getDestinations().get(i);
-                if (enablePipelineEngine && 
enableShareHashTableForBroadcastJoin
+                if (sink.getOutputPartition() != null
+                        && 
sink.getOutputPartition().isBucketShuffleHashPartition()) {
+                    // the destFragment must be bucket shuffle
+                    Preconditions.checkState(bucketShuffleJoinController
+                            
.isBucketShuffleJoin(destFragment.getFragmentId().asInt()), "Sink is"
+                            + "Bucket Shuffle Partition, The destFragment must 
have bucket shuffle join node ");
+
+                    int bucketSeq = 0;
+                    int bucketNum = 
bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
+
+                    // when left table is empty, it's bucketset is empty.
+                    // set right table destination address to the address of 
left table
+                    if (destParams.instanceExecParams.size() == 1 && 
(bucketNum == 0
+                            || 
destParams.instanceExecParams.get(0).bucketSeqSet.isEmpty())) {
+                        bucketNum = 1;
+                        
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
+                    }
+                    // process bucket shuffle join on fragment without scan 
node
+                    TNetworkAddress dummyServer = new 
TNetworkAddress("0.0.0.0", 0);
+                    while (bucketSeq < bucketNum) {
+                        TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
+
+                        dest.fragment_instance_id = new TUniqueId(-1, -1);
+                        dest.server = dummyServer;
+                        dest.setBrpcServer(dummyServer);
+
+                        for (FInstanceExecParam instanceExecParams : 
destParams.instanceExecParams) {
+                            if 
(instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
+                                dest.fragment_instance_id = 
instanceExecParams.instanceId;
+                                dest.server = 
toRpcHost(instanceExecParams.host);
+                                
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+                                break;
+                            }
+                        }
+
+                        bucketSeq++;
+                        destinations.add(dest);
+                    }
+                } else if (enablePipelineEngine && 
enableShareHashTableForBroadcastJoin
                         && ((ExchangeNode) 
exchNode).isRightChildOfBroadcastHashJoin()) {
                     // here choose the first instance to build hash table.
                     Map<TNetworkAddress, FInstanceExecParam> destHosts = new 
HashMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to