This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new b6fb227402 [fix](nereids) fix cte bucket shuffle path (#22311) b6fb227402 is described below commit b6fb227402bd9a408da0845c8365297c635a009c Author: xzj7019 <131111794+xzj7...@users.noreply.github.com> AuthorDate: Thu Jul 27 22:44:51 2023 +0800 [fix](nereids) fix cte bucket shuffle path (#22311) --- .../main/java/org/apache/doris/qe/Coordinator.java | 40 +++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 39c94320c7..f4a03f9c07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1432,7 +1432,45 @@ public class Coordinator { } List<TPlanFragmentDestination> destinations = multiSink.getDestinations().get(i); - if (enablePipelineEngine && enableShareHashTableForBroadcastJoin + if (sink.getOutputPartition() != null + && sink.getOutputPartition().isBucketShuffleHashPartition()) { + // the destFragment must be bucket shuffle + Preconditions.checkState(bucketShuffleJoinController + .isBucketShuffleJoin(destFragment.getFragmentId().asInt()), "Sink is" + + "Bucket Shuffle Partition, The destFragment must have bucket shuffle join node "); + + int bucketSeq = 0; + int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId()); + + // when left table is empty, it's bucketset is empty. + // set right table destination address to the address of left table + if (destParams.instanceExecParams.size() == 1 && (bucketNum == 0 + || destParams.instanceExecParams.get(0).bucketSeqSet.isEmpty())) { + bucketNum = 1; + destParams.instanceExecParams.get(0).bucketSeqSet.add(0); + } + // process bucket shuffle join on fragment without scan node + TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); + while (bucketSeq < bucketNum) { + TPlanFragmentDestination dest = new TPlanFragmentDestination(); + + dest.fragment_instance_id = new TUniqueId(-1, -1); + dest.server = dummyServer; + dest.setBrpcServer(dummyServer); + + for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) { + if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { + dest.fragment_instance_id = instanceExecParams.instanceId; + dest.server = toRpcHost(instanceExecParams.host); + dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); + break; + } + } + + bucketSeq++; + destinations.add(dest); + } + } else if (enablePipelineEngine && enableShareHashTableForBroadcastJoin && ((ExchangeNode) exchNode).isRightChildOfBroadcastHashJoin()) { // here choose the first instance to build hash table. Map<TNetworkAddress, FInstanceExecParam> destHosts = new HashMap<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org