morningman commented on a change in pull request #5521:
URL: https://github.com/apache/incubator-doris/pull/5521#discussion_r608779297



##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -294,16 +298,52 @@ private boolean isBroadcastCostSmaller(long 
broadcastCost, long partitionCost)
     }
 
     /**
-     * Creates either a broadcast join or a repartitioning join, depending on 
the expected cost. If any of the inputs to
-     * the cost computation is unknown, it assumes the cost will be 0. Costs 
being equal, it'll favor partitioned over
-     * broadcast joins. If perNodeMemLimit > 0 and the size of the hash table 
for a broadcast join is expected to exceed
-     * that mem limit, switches to partitioned join instead. TODO: revisit the 
choice of broadcast as the default TODO:
-     * don't create a broadcast join if we already anticipate that this will 
exceed the query's memory budget.
+     * There are 4 kinds of distributed hash join methods in Doris:
+     * Colocate, Bucket Shuffle, Broadcast, Shuffle
+     * The priority between these four distributed execution methods is 
following:
+     * Colocate > Bucket Shuffle > Broadcast > Shuffle
+     * This function is mainly used to choose the most suitable distributed 
method for the 'node',
+     * and transform it into PlanFragment.
      */
     private PlanFragment createHashJoinFragment(HashJoinNode node, 
PlanFragment rightChildFragment,
                                                 PlanFragment 
leftChildFragment, long perNodeMemLimit,
                                                 ArrayList<PlanFragment> 
fragments)
             throws UserException {
+        List<String> reason = Lists.newArrayList();
+        if (canColocateJoin(node, leftChildFragment, rightChildFragment, 
reason)) {
+            node.setColocate(true, "");
+            
//node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);

Review comment:
       remove unused code

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -471,58 +468,167 @@ private PlanFragment createHashJoinFragment(HashJoinNode 
node, PlanFragment righ
             rightChildFragment.setDestination(rhsExchange);
             rightChildFragment.setOutputPartition(rhsJoinPartition);
 
-            // Before we support global runtime filter, only shuffle join do 
not enable local runtime filter
+            // TODO: Before we support global runtime filter, only shuffle 
join do not enable local runtime filter
             node.setIsPushDown(false);
             return joinFragment;
         }
     }
 
+    /**
+     * Colocate Join can be performed when the following 4 conditions are met 
at the same time.
+     * 1. Session variables disable_colocate_plan = true
+     * 2. There is no join hints in HashJoinNode
+     * 3. There are no exchange node between source scan node and HashJoinNode.
+     * 4. The scan nodes which are related by EqConjuncts in HashJoinNode are 
colocate and group can be matched.
+     */
     private boolean canColocateJoin(HashJoinNode node, PlanFragment 
leftChildFragment, PlanFragment rightChildFragment,
-            List<String> cannotReason) {
-        if (Config.disable_colocate_join) {
-            cannotReason.add("Disabled");
+                                    List<String> cannotReason) {
+        // Condition1
+        if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) 
{
+            cannotReason.add(DistributedPlanColocateRule.SESSION_DISABLED);
             return false;
         }
 
-        if (ConnectContext.get().getSessionVariable().isDisableColocateJoin()) 
{
-            cannotReason.add("Session disabled");
+        // Condition2: If user have a join hint to use proper way of join, can 
not be colocate join
+        if (node.getInnerRef().hasJoinHints()) {
+            cannotReason.add(DistributedPlanColocateRule.HAS_JOIN_HINT);
             return false;
         }
 
-        // If user have a join hint to use proper way of join, can not be 
colocate join
-        if (node.getInnerRef().hasJoinHints()) {
-            cannotReason.add("Has join hint");
-            return false;
+        // Condition3:
+        // If there is an exchange node between the HashJoinNode and their 
real associated ScanNode,
+        //   it means that the data has been rehashed.
+        // The rehashed data can no longer be guaranteed to correspond to the 
left and right buckets,
+        //   and naturally cannot be colocate
+        Map<Pair<OlapScanNode, OlapScanNode>, List<BinaryPredicate>> 
scanNodeWithJoinConjuncts = Maps.newHashMap();
+        for (BinaryPredicate eqJoinPredicate : node.getEqJoinConjuncts()) {
+            OlapScanNode leftScanNode = 
genSrcScanNode(eqJoinPredicate.getChild(0), leftChildFragment, cannotReason);
+            if (leftScanNode == null) {
+                return false;
+            }
+            OlapScanNode rightScanNode = 
genSrcScanNode(eqJoinPredicate.getChild(1), rightChildFragment, cannotReason);
+            if (rightScanNode == null) {
+                return false;
+            }
+            Pair<OlapScanNode, OlapScanNode> eqPair = new Pair<>(leftScanNode, 
rightScanNode);
+            List<BinaryPredicate> predicateList = 
scanNodeWithJoinConjuncts.get(eqPair);
+            if (predicateList == null) {
+                predicateList = Lists.newArrayList();
+                scanNodeWithJoinConjuncts.put(eqPair, predicateList);
+            }
+            predicateList.add(eqJoinPredicate);
         }
 
-        PlanNode leftRoot = leftChildFragment.getPlanRoot();
-        PlanNode rightRoot = rightChildFragment.getPlanRoot();
+        // Condition4
+        return dataDistributionMatchEqPredicate(scanNodeWithJoinConjuncts, 
cannotReason);
+    }
 
-        //leftRoot should be ScanNode or HashJoinNode, rightRoot should be 
ScanNode
-        if (leftRoot instanceof OlapScanNode && rightRoot instanceof 
OlapScanNode) {
-            return canColocateJoin(node, leftRoot, rightRoot, cannotReason);
+    private OlapScanNode genSrcScanNode(Expr expr, PlanFragment planFragment, 
List<String> cannotReason) {
+        SlotRef slotRef = expr.getSrcSlotRef();
+        if (slotRef == null) {
+            
cannotReason.add(DistributedPlanColocateRule.TRANSFORMED_SRC_COLUMN);
+            return null;
+        }
+        ScanNode scanNode = planFragment.getPlanRoot()
+                
.getScanNodeInOneFragmentByTupleId(slotRef.getDesc().getParent().getId());
+        if (scanNode == null) {
+            
cannotReason.add(DistributedPlanColocateRule.REDISTRIBUTED_SRC_DATA);
+            return null;
         }
+        if (scanNode instanceof OlapScanNode) {
+            return ((OlapScanNode) scanNode);

Review comment:
       ```suggestion
               return (OlapScanNode) scanNode;
   ```

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -1175,8 +1207,9 @@ private PlanFragment createAnalyticFragment(
             // required if the sort partition exprs reference a tuple that is 
made nullable in
             // 'childFragment' to bring NULLs from outer-join non-matches 
together.
             DataPartition sortPartition = sortNode.getInputPartition();
+            // TODO(ML): here

Review comment:
       ?

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -812,6 +843,7 @@ private PlanFragment createSetOperationNodeFragment(
         }
 
         // There is at least one partitioned child fragment.
+        // TODO(ML): here

Review comment:
       TODO what?

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -471,58 +468,167 @@ private PlanFragment createHashJoinFragment(HashJoinNode 
node, PlanFragment righ
             rightChildFragment.setDestination(rhsExchange);
             rightChildFragment.setOutputPartition(rhsJoinPartition);
 
-            // Before we support global runtime filter, only shuffle join do 
not enable local runtime filter
+            // TODO: Before we support global runtime filter, only shuffle 
join do not enable local runtime filter
             node.setIsPushDown(false);
             return joinFragment;
         }
     }
 
+    /**
+     * Colocate Join can be performed when the following 4 conditions are met 
at the same time.
+     * 1. Session variables disable_colocate_plan = true

Review comment:
       ```suggestion
        * 1. Session variables disable_colocate_plan = false
   ```

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
##########
@@ -932,9 +964,9 @@ private PlanFragment createAggregationFragment(
         if (isDistinct) {
             return createPhase2DistinctAggregationFragment(node, 
childFragment, fragments);
         } else {
-
             // Check table's distribution. See #4481.
             PlanNode childPlan = childFragment.getPlanRoot();
+            // TODO(ML): here

Review comment:
       ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to