morningman commented on a change in pull request #5521: URL: https://github.com/apache/incubator-doris/pull/5521#discussion_r608779297
########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -294,16 +298,52 @@ private boolean isBroadcastCostSmaller(long broadcastCost, long partitionCost) } /** - * Creates either a broadcast join or a repartitioning join, depending on the expected cost. If any of the inputs to - * the cost computation is unknown, it assumes the cost will be 0. Costs being equal, it'll favor partitioned over - * broadcast joins. If perNodeMemLimit > 0 and the size of the hash table for a broadcast join is expected to exceed - * that mem limit, switches to partitioned join instead. TODO: revisit the choice of broadcast as the default TODO: - * don't create a broadcast join if we already anticipate that this will exceed the query's memory budget. + * There are 4 kinds of distributed hash join methods in Doris: + * Colocate, Bucket Shuffle, Broadcast, Shuffle + * The priority between these four distributed execution methods is following: + * Colocate > Bucket Shuffle > Broadcast > Shuffle + * This function is mainly used to choose the most suitable distributed method for the 'node', + * and transform it into PlanFragment. */ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment rightChildFragment, PlanFragment leftChildFragment, long perNodeMemLimit, ArrayList<PlanFragment> fragments) throws UserException { + List<String> reason = Lists.newArrayList(); + if (canColocateJoin(node, leftChildFragment, rightChildFragment, reason)) { + node.setColocate(true, ""); + //node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); Review comment: remove unused code ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -471,58 +468,167 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ rightChildFragment.setDestination(rhsExchange); rightChildFragment.setOutputPartition(rhsJoinPartition); - // Before we support global runtime filter, only shuffle join do not enable local runtime filter + // TODO: Before we support global runtime filter, only shuffle join do not enable local runtime filter node.setIsPushDown(false); return joinFragment; } } + /** + * Colocate Join can be performed when the following 4 conditions are met at the same time. + * 1. Session variables disable_colocate_plan = true + * 2. There is no join hints in HashJoinNode + * 3. There are no exchange node between source scan node and HashJoinNode. + * 4. The scan nodes which are related by EqConjuncts in HashJoinNode are colocate and group can be matched. + */ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragment, PlanFragment rightChildFragment, - List<String> cannotReason) { - if (Config.disable_colocate_join) { - cannotReason.add("Disabled"); + List<String> cannotReason) { + // Condition1 + if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) { + cannotReason.add(DistributedPlanColocateRule.SESSION_DISABLED); return false; } - if (ConnectContext.get().getSessionVariable().isDisableColocateJoin()) { - cannotReason.add("Session disabled"); + // Condition2: If user have a join hint to use proper way of join, can not be colocate join + if (node.getInnerRef().hasJoinHints()) { + cannotReason.add(DistributedPlanColocateRule.HAS_JOIN_HINT); return false; } - // If user have a join hint to use proper way of join, can not be colocate join - if (node.getInnerRef().hasJoinHints()) { - cannotReason.add("Has join hint"); - return false; + // Condition3: + // If there is an exchange node between the HashJoinNode and their real associated ScanNode, + // it means that the data has been rehashed. + // The rehashed data can no longer be guaranteed to correspond to the left and right buckets, + // and naturally cannot be colocate + Map<Pair<OlapScanNode, OlapScanNode>, List<BinaryPredicate>> scanNodeWithJoinConjuncts = Maps.newHashMap(); + for (BinaryPredicate eqJoinPredicate : node.getEqJoinConjuncts()) { + OlapScanNode leftScanNode = genSrcScanNode(eqJoinPredicate.getChild(0), leftChildFragment, cannotReason); + if (leftScanNode == null) { + return false; + } + OlapScanNode rightScanNode = genSrcScanNode(eqJoinPredicate.getChild(1), rightChildFragment, cannotReason); + if (rightScanNode == null) { + return false; + } + Pair<OlapScanNode, OlapScanNode> eqPair = new Pair<>(leftScanNode, rightScanNode); + List<BinaryPredicate> predicateList = scanNodeWithJoinConjuncts.get(eqPair); + if (predicateList == null) { + predicateList = Lists.newArrayList(); + scanNodeWithJoinConjuncts.put(eqPair, predicateList); + } + predicateList.add(eqJoinPredicate); } - PlanNode leftRoot = leftChildFragment.getPlanRoot(); - PlanNode rightRoot = rightChildFragment.getPlanRoot(); + // Condition4 + return dataDistributionMatchEqPredicate(scanNodeWithJoinConjuncts, cannotReason); + } - //leftRoot should be ScanNode or HashJoinNode, rightRoot should be ScanNode - if (leftRoot instanceof OlapScanNode && rightRoot instanceof OlapScanNode) { - return canColocateJoin(node, leftRoot, rightRoot, cannotReason); + private OlapScanNode genSrcScanNode(Expr expr, PlanFragment planFragment, List<String> cannotReason) { + SlotRef slotRef = expr.getSrcSlotRef(); + if (slotRef == null) { + cannotReason.add(DistributedPlanColocateRule.TRANSFORMED_SRC_COLUMN); + return null; + } + ScanNode scanNode = planFragment.getPlanRoot() + .getScanNodeInOneFragmentByTupleId(slotRef.getDesc().getParent().getId()); + if (scanNode == null) { + cannotReason.add(DistributedPlanColocateRule.REDISTRIBUTED_SRC_DATA); + return null; } + if (scanNode instanceof OlapScanNode) { + return ((OlapScanNode) scanNode); Review comment: ```suggestion return (OlapScanNode) scanNode; ``` ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -1175,8 +1207,9 @@ private PlanFragment createAnalyticFragment( // required if the sort partition exprs reference a tuple that is made nullable in // 'childFragment' to bring NULLs from outer-join non-matches together. DataPartition sortPartition = sortNode.getInputPartition(); + // TODO(ML): here Review comment: ? ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -812,6 +843,7 @@ private PlanFragment createSetOperationNodeFragment( } // There is at least one partitioned child fragment. + // TODO(ML): here Review comment: TODO what? ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -471,58 +468,167 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ rightChildFragment.setDestination(rhsExchange); rightChildFragment.setOutputPartition(rhsJoinPartition); - // Before we support global runtime filter, only shuffle join do not enable local runtime filter + // TODO: Before we support global runtime filter, only shuffle join do not enable local runtime filter node.setIsPushDown(false); return joinFragment; } } + /** + * Colocate Join can be performed when the following 4 conditions are met at the same time. + * 1. Session variables disable_colocate_plan = true Review comment: ```suggestion * 1. Session variables disable_colocate_plan = false ``` ########## File path: fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java ########## @@ -932,9 +964,9 @@ private PlanFragment createAggregationFragment( if (isDistinct) { return createPhase2DistinctAggregationFragment(node, childFragment, fragments); } else { - // Check table's distribution. See #4481. PlanNode childPlan = childFragment.getPlanRoot(); + // TODO(ML): here Review comment: ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org