This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bd12a49baf [feature](Nereids) enable bucket shuffle join on fragment 
without scan node (#12891)
bd12a49baf is described below

commit bd12a49bafd68cc60b8615656184a9194539544c
Author: morrySnow <101034200+morrys...@users.noreply.github.com>
AuthorDate: Fri Sep 23 15:01:50 2022 +0800

    [feature](Nereids) enable bucket shuffle join on fragment without scan node 
(#12891)
    
    In the past, with legacy planner, we could only do bucket shuffle join on 
the join node belonging to the fragment with at least one scan node.
    But, bucket shuffle join should do on each join node that left child's data 
distribution satisfy join's demand.
    In nereids, we have data distribution info on each node. So we could enable 
bucket shuffle join on fragment without scan node.
---
 .../doris/nereids/glue/translator/PhysicalPlanTranslator.java | 11 +++++++++--
 .../doris/nereids/properties/ChildOutputPropertyDeriver.java  |  2 +-
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java |  6 +++---
 .../nereids/properties/ChildOutputPropertyDeriverTest.java    |  2 +-
 4 files changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 9d8da28230..dd7204a3c2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -35,6 +35,7 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.common.Pair;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.properties.DistributionSpecHash;
+import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
 import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.trees.expressions.EqualTo;
 import org.apache.doris.nereids.trees.expressions.ExprId;
@@ -969,10 +970,16 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
         // assemble fragment
         
hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
+        if (leftDistributionSpec.getShuffleType() != ShuffleType.NATURAL) {
+            hashJoinNode.setDistributionMode(DistributionMode.PARTITIONED);
+        }
         connectChildFragment(hashJoinNode, 1, leftFragment, rightFragment, 
context);
         leftFragment.setPlanRoot(hashJoinNode);
-        // TODO: use left fragment d
-        DataPartition rhsJoinPartition = new 
DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED,
+        TPartitionType partitionType = 
TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
+        if (leftDistributionSpec.getShuffleType() != ShuffleType.NATURAL) {
+            partitionType = TPartitionType.HASH_PARTITIONED;
+        }
+        DataPartition rhsJoinPartition = new DataPartition(partitionType,
                 
rightPartitionExprIds.stream().map(context::findSlotRef).collect(Collectors.toList()));
         rightFragment.setOutputPartition(rhsJoinPartition);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 715bf5e5d8..43cedd78a4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -87,7 +87,7 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
                     return PhysicalProperties.GATHER;
                 }
                 // TODO: change ENFORCED back to bucketed, when coordinator 
could process bucket on agg correctly.
-                return PhysicalProperties.createHash(new 
DistributionSpecHash(columns, ShuffleType.ENFORCED));
+                return PhysicalProperties.createHash(new 
DistributionSpecHash(columns, ShuffleType.BUCKETED));
             case DISTINCT_GLOBAL:
             default:
                 throw new RuntimeException("Could not derive output properties 
for agg phase: " + agg.getAggPhase());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7755d61ece..f0d6b3e36a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -420,7 +420,6 @@ public class Coordinator {
             }
             FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getDestFragment().getFragmentId());
             params.inputFragments.add(fragment.getFragmentId());
-
         }
 
         coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
@@ -1017,7 +1016,6 @@ public class Coordinator {
 
                 int bucketSeq = 0;
                 int bucketNum = 
bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
-                TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 
0);
 
                 // when left table is empty, it's bucketset is empty.
                 // set right table destination address to the address of left 
table
@@ -1026,6 +1024,8 @@ public class Coordinator {
                     bucketNum = 1;
                     destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
                 }
+                // process bucket shuffle join on fragment without scan node
+                TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 
0);
                 while (bucketSeq < bucketNum) {
                     TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
 
@@ -1521,7 +1521,7 @@ public class Coordinator {
                 
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) 
scanNode,
                         idToBackend, addressToBackendID);
             }
-            if (!(fragmentContainsColocateJoin | 
fragmentContainsBucketShuffleJoin)) {
+            if (!(fragmentContainsColocateJoin || 
fragmentContainsBucketShuffleJoin)) {
                 computeScanRangeAssignmentByScheduler(scanNode, locations, 
assignment, assignedBytesPerHost);
             }
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
index 052537c755..6e53265dfb 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
@@ -311,7 +311,7 @@ public class ChildOutputPropertyDeriverTest {
         Assertions.assertTrue(result.getOrderSpec().getOrderKeys().isEmpty());
         Assertions.assertTrue(result.getDistributionSpec() instanceof 
DistributionSpecHash);
         DistributionSpecHash actual = (DistributionSpecHash) 
result.getDistributionSpec();
-        Assertions.assertEquals(ShuffleType.ENFORCED, actual.getShuffleType());
+        Assertions.assertEquals(ShuffleType.BUCKETED, actual.getShuffleType());
         Assertions.assertEquals(Lists.newArrayList(partition).stream()
                         
.map(SlotReference::getExprId).collect(Collectors.toList()),
                 actual.getOrderedShuffledColumns());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to