This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ac9480123c [refactor](Nereids) push down all non-slot order key in sort and prune them upper sort (#22034) ac9480123c is described below commit ac9480123cc8e073866019f0ea617013f0d46cca Author: morrySnow <101034200+morrys...@users.noreply.github.com> AuthorDate: Mon Jul 24 15:36:33 2023 +0800 [refactor](Nereids) push down all non-slot order key in sort and prune them upper sort (#22034) According the implementation in execution engine, all order keys in SortNode will be output. We must normalize LogicalSort follow by it. We push down all non-slot order key in sort to materialize them behind sort. So, all order key will be slot and do not need do projection by SortNode itself. This will simplify translation of SortNode by avoid to generate resolvedTupleExprs and sortTupleDesc. --- .../main/java/org/apache/doris/analysis/Expr.java | 10 + .../glue/translator/PhysicalPlanTranslator.java | 245 ++++++++------------- .../doris/nereids/rules/rewrite/NormalizeSort.java | 47 ++-- .../nereids/trees/plans/logical/LogicalSort.java | 35 +-- .../apache/doris/planner/PartitionSortNode.java | 76 +------ .../java/org/apache/doris/planner/SortNode.java | 28 --- .../nereids_tpcds_shape_sf100_p0/shape/query47.out | 4 +- .../nereids_tpcds_shape_sf100_p0/shape/query57.out | 4 +- 8 files changed, 150 insertions(+), 299 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 3e1f3943a7..24031bfb89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -722,6 +722,16 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl return false; } + public static void extractSlots(Expr root, Set<SlotId> slotIdSet) { + if (root instanceof SlotRef) { + slotIdSet.add(((SlotRef) root).getDesc().getId()); + return; + } + for (Expr child : root.getChildren()) { + extractSlots(child, slotIdSet); + } + } + /** * Returns an analyzed clone of 'this' with exprs substituted according to smap. * Removes implicit casts and analysis state while cloning/substituting exprs within diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 0ae104ed5a..d9a8aba8ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -65,7 +65,6 @@ import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWind import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.UnaryNode; import org.apache.doris.nereids.trees.expressions.AggregateExpression; -import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.CTEId; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.ExprId; @@ -176,6 +175,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -1430,12 +1430,11 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanFragment inputFragment = project.child(0).accept(this, context); - List<Expr> execExprList = project.getProjects() + List<Expr> projectionExprs = project.getProjects() .stream() .map(e -> ExpressionTranslator.translate(e, context)) .collect(Collectors.toList()); - // TODO: fix the project alias of an aliased relation. - List<Slot> slotList = project.getProjects() + List<Slot> slots = project.getProjects() .stream() .map(NamedExpression::toSlot) .collect(Collectors.toList()); @@ -1445,45 +1444,45 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); - TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context); - dataStreamSink.setProjections(execExprList); - dataStreamSink.setOutputTupleDesc(tupleDescriptor); + TupleDescriptor projectionTuple = generateTupleDesc(slots, null, context); + dataStreamSink.setProjections(projectionExprs); + dataStreamSink.setOutputTupleDesc(projectionTuple); return inputFragment; } PlanNode inputPlanNode = inputFragment.getPlanRoot(); - List<Expr> predicateList = inputPlanNode.getConjuncts(); + List<Expr> conjuncts = inputPlanNode.getConjuncts(); Set<SlotId> requiredSlotIdSet = Sets.newHashSet(); - for (Expr expr : execExprList) { - extractExecSlot(expr, requiredSlotIdSet); + for (Expr expr : projectionExprs) { + Expr.extractSlots(expr, requiredSlotIdSet); } Set<SlotId> requiredByProjectSlotIdSet = Sets.newHashSet(requiredSlotIdSet); - for (Expr expr : predicateList) { - extractExecSlot(expr, requiredSlotIdSet); + for (Expr expr : conjuncts) { + Expr.extractSlots(expr, requiredSlotIdSet); } // For hash join node, use vSrcToOutputSMap to describe the expression calculation, use // vIntermediateTupleDescList as input, and set vOutputTupleDesc as the final output. // TODO: HashJoinNode's be implementation is not support projection yet, remove this after when supported. if (inputPlanNode instanceof JoinNodeBase) { - TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context); - JoinNodeBase hashJoinNode = (JoinNodeBase) inputPlanNode; - hashJoinNode.setvOutputTupleDesc(tupleDescriptor); - hashJoinNode.setvSrcToOutputSMap(execExprList); + TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context); + JoinNodeBase joinNode = (JoinNodeBase) inputPlanNode; + joinNode.setvOutputTupleDesc(tupleDescriptor); + joinNode.setvSrcToOutputSMap(projectionExprs); // prune the hashOutputSlotIds - if (hashJoinNode instanceof HashJoinNode) { - ((HashJoinNode) hashJoinNode).getHashOutputSlotIds().clear(); + if (joinNode instanceof HashJoinNode) { + ((HashJoinNode) joinNode).getHashOutputSlotIds().clear(); Set<ExprId> requiredExprIds = Sets.newHashSet(); Set<SlotId> requiredOtherConjunctsSlotIdSet = Sets.newHashSet(); - List<Expr> otherConjuncts = ((HashJoinNode) hashJoinNode).getOtherJoinConjuncts(); + List<Expr> otherConjuncts = ((HashJoinNode) joinNode).getOtherJoinConjuncts(); for (Expr expr : otherConjuncts) { - extractExecSlot(expr, requiredOtherConjunctsSlotIdSet); + Expr.extractSlots(expr, requiredOtherConjunctsSlotIdSet); } requiredOtherConjunctsSlotIdSet.forEach(e -> requiredExprIds.add(context.findExprId(e))); requiredSlotIdSet.forEach(e -> requiredExprIds.add(context.findExprId(e))); for (ExprId exprId : requiredExprIds) { - SlotId slotId = ((HashJoinNode) hashJoinNode).getHashOutputExprSlotIdMap().get(exprId); + SlotId slotId = ((HashJoinNode) joinNode).getHashOutputExprSlotIdMap().get(exprId); Preconditions.checkState(slotId != null); - ((HashJoinNode) hashJoinNode).addSlotIdToHashOutputSlotIds(slotId); + ((HashJoinNode) joinNode).addSlotIdToHashOutputSlotIds(slotId); } } return inputFragment; @@ -1495,42 +1494,49 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } if (inputPlanNode instanceof ScanNode) { - TupleDescriptor tupleDescriptor = null; + TupleDescriptor projectionTuple = null; + // slotIdsByOrder is used to ensure the ScanNode's output order is same with current Project + // if we change the output order in translate project, the upper node will receive wrong order + // tuple, since they get the order from project.getOutput() not scan.getOutput()./ + List<SlotId> slotIdsByOrder = Lists.newArrayList(); if (requiredByProjectSlotIdSet.size() != requiredSlotIdSet.size() - || new HashSet<>(execExprList).size() != execExprList.size() - || execExprList.stream().anyMatch(expr -> !(expr instanceof SlotRef))) { - tupleDescriptor = generateTupleDesc(slotList, null, context); - inputPlanNode.setProjectList(execExprList); - inputPlanNode.setOutputTupleDesc(tupleDescriptor); + || new HashSet<>(projectionExprs).size() != projectionExprs.size() + || projectionExprs.stream().anyMatch(expr -> !(expr instanceof SlotRef))) { + projectionTuple = generateTupleDesc(slots, null, context); + inputPlanNode.setProjectList(projectionExprs); + inputPlanNode.setOutputTupleDesc(projectionTuple); } else { - for (int i = 0; i < slotList.size(); ++i) { - context.addExprIdSlotRefPair(slotList.get(i).getExprId(), - (SlotRef) execExprList.get(i)); + for (int i = 0; i < slots.size(); ++i) { + context.addExprIdSlotRefPair(slots.get(i).getExprId(), + (SlotRef) projectionExprs.get(i)); + slotIdsByOrder.add(((SlotRef) projectionExprs.get(i)).getSlotId()); } } // TODO: this is a temporary scheme to support two phase read when has project. // we need to refactor all topn opt into rbo stage. if (inputPlanNode instanceof OlapScanNode) { - ArrayList<SlotDescriptor> slots = + ArrayList<SlotDescriptor> olapScanSlots = context.getTupleDesc(inputPlanNode.getTupleIds().get(0)).getSlots(); - SlotDescriptor lastSlot = slots.get(slots.size() - 1); + SlotDescriptor lastSlot = olapScanSlots.get(olapScanSlots.size() - 1); if (lastSlot.getColumn() != null && lastSlot.getColumn().getName().equals(Column.ROWID_COL)) { - if (tupleDescriptor != null) { - injectRowIdColumnSlot(tupleDescriptor); + if (projectionTuple != null) { + injectRowIdColumnSlot(projectionTuple); SlotRef slotRef = new SlotRef(lastSlot); inputPlanNode.getProjectList().add(slotRef); requiredByProjectSlotIdSet.add(lastSlot.getId()); + } else { + slotIdsByOrder.add(lastSlot.getId()); } requiredSlotIdSet.add(lastSlot.getId()); } } - updateChildSlotsMaterialization(inputPlanNode, requiredSlotIdSet, - requiredByProjectSlotIdSet, context); + updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet, + requiredByProjectSlotIdSet, slotIdsByOrder, context); } else { - TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context); - inputPlanNode.setProjectList(execExprList); + TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context); + inputPlanNode.setProjectList(projectionExprs); inputPlanNode.setOutputTupleDesc(tupleDescriptor); } return inputFragment; @@ -1854,34 +1860,24 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla * ******************************************************************************************** */ private PartitionSortNode translatePartitionSortNode(PhysicalPartitionTopN<? extends Plan> partitionTopN, - PlanNode childNode, PlanTranslatorContext context) { - // Generate the SortInfo, similar to 'translateSortNode'. - List<Expr> oldOrderingExprList = Lists.newArrayList(); - List<Boolean> ascOrderList = Lists.newArrayList(); - List<Boolean> nullsFirstParamList = Lists.newArrayList(); - List<OrderKey> orderKeyList = partitionTopN.getOrderKeys(); - // 1. Get previous slotRef - orderKeyList.forEach(k -> { - oldOrderingExprList.add(ExpressionTranslator.translate(k.getExpr(), context)); - ascOrderList.add(k.isAsc()); - nullsFirstParamList.add(k.isNullFirst()); + PlanNode childNode, PlanTranslatorContext context) { + TupleDescriptor sortTuple = generateTupleDesc(partitionTopN.child().getOutput(), null, context); + List<Expr> orderingExprs = Lists.newArrayList(); + List<Boolean> ascOrders = Lists.newArrayList(); + List<Boolean> nullsFirstParams = Lists.newArrayList(); + List<OrderKey> orderKeys = partitionTopN.getOrderKeys(); + orderKeys.forEach(k -> { + orderingExprs.add(ExpressionTranslator.translate(k.getExpr(), context)); + ascOrders.add(k.isAsc()); + nullsFirstParams.add(k.isNullFirst()); }); - List<Expr> sortTupleOutputList = new ArrayList<>(); - List<Slot> outputList = partitionTopN.getOutput(); - outputList.forEach(k -> sortTupleOutputList.add(ExpressionTranslator.translate(k, context))); List<Expr> partitionExprs = partitionTopN.getPartitionKeys().stream() .map(e -> ExpressionTranslator.translate(e, context)) .collect(Collectors.toList()); - // 2. Generate new Tuple and get current slotRef for newOrderingExprList - List<Expr> newOrderingExprList = Lists.newArrayList(); - TupleDescriptor tupleDesc = generateTupleDesc(outputList, orderKeyList, newOrderingExprList, context, null); - // 3. fill in SortInfo members - SortInfo sortInfo = new SortInfo(newOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc); - + SortInfo sortInfo = new SortInfo(orderingExprs, ascOrders, nullsFirstParams, sortTuple); PartitionSortNode partitionSortNode = new PartitionSortNode(context.nextPlanNodeId(), childNode, partitionTopN.getFunction(), partitionExprs, sortInfo, partitionTopN.hasGlobalLimit(), - partitionTopN.getPartitionLimit(), sortTupleOutputList, oldOrderingExprList); - + partitionTopN.getPartitionLimit()); if (partitionTopN.getStats() != null) { partitionSortNode.setCardinality((long) partitionTopN.getStats().getRowCount()); } @@ -1891,33 +1887,23 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla private SortNode translateSortNode(AbstractPhysicalSort<? extends Plan> sort, PlanNode childNode, PlanTranslatorContext context) { - List<Expr> oldOrderingExprList = Lists.newArrayList(); - List<Boolean> ascOrderList = Lists.newArrayList(); - List<Boolean> nullsFirstParamList = Lists.newArrayList(); - List<OrderKey> orderKeyList = sort.getOrderKeys(); - // 1. Get previous slotRef - orderKeyList.forEach(k -> { - oldOrderingExprList.add(ExpressionTranslator.translate(k.getExpr(), context)); - ascOrderList.add(k.isAsc()); - nullsFirstParamList.add(k.isNullFirst()); + TupleDescriptor sortTuple = generateTupleDesc(sort.child().getOutput(), null, context); + List<Expr> orderingExprs = Lists.newArrayList(); + List<Boolean> ascOrders = Lists.newArrayList(); + List<Boolean> nullsFirstParams = Lists.newArrayList(); + List<OrderKey> orderKeys = sort.getOrderKeys(); + orderKeys.forEach(k -> { + orderingExprs.add(ExpressionTranslator.translate(k.getExpr(), context)); + ascOrders.add(k.isAsc()); + nullsFirstParams.add(k.isNullFirst()); }); - List<Expr> sortTupleOutputList = new ArrayList<>(); - List<Slot> outputList = sort.getOutput(); - outputList.forEach(k -> sortTupleOutputList.add(ExpressionTranslator.translate(k, context))); - // 2. Generate new Tuple and get current slotRef for newOrderingExprList - List<Expr> newOrderingExprList = Lists.newArrayList(); - TupleDescriptor tupleDesc = generateTupleDesc(outputList, orderKeyList, newOrderingExprList, context, null); - // 3. fill in SortInfo members - SortInfo sortInfo = new SortInfo(newOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc); - SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, sortInfo, true); - sortNode.finalizeForNereids(tupleDesc, sortTupleOutputList, oldOrderingExprList); + SortInfo sortInfo = new SortInfo(orderingExprs, ascOrders, nullsFirstParams, sortTuple); + SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, sortInfo, sort instanceof PhysicalTopN); if (sort.getMutableState(PhysicalTopN.TWO_PHASE_READ_OPT).isPresent()) { sortNode.setUseTwoPhaseReadOpt(true); sortNode.getSortInfo().setUseTwoPhaseRead(); injectRowIdColumnSlot(sortNode.getSortInfo().getSortTupleDescriptor()); - TupleDescriptor childTuple = childNode.getOutputTupleDesc() != null - ? childNode.getOutputTupleDesc() : context.getTupleDesc(childNode.getTupleIds().get(0)); - SlotDescriptor childRowIdDesc = childTuple.getSlots().get(childTuple.getSlots().size() - 1); + SlotDescriptor childRowIdDesc = sortTuple.getSlots().get(sortTuple.getSlots().size() - 1); sortNode.getResolvedTupleExprs().add(new SlotRef(childRowIdDesc)); } if (sort.getStats() != null) { @@ -1927,34 +1913,32 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla return sortNode; } - private void updateChildSlotsMaterialization(PlanNode execPlan, + private void updateScanSlotsMaterialization(ScanNode scanNode, Set<SlotId> requiredSlotIdSet, Set<SlotId> requiredByProjectSlotIdSet, - PlanTranslatorContext context) { - Set<SlotRef> slotRefSet = new HashSet<>(); - for (Expr expr : execPlan.getConjuncts()) { - expr.collect(SlotRef.class, slotRefSet); - } - Set<SlotId> slotIdSet = slotRefSet.stream() - .map(SlotRef::getSlotId).collect(Collectors.toSet()); - slotIdSet.addAll(requiredSlotIdSet); - boolean noneMaterialized = execPlan.getTupleIds().stream() - .map(context::getTupleDesc) - .map(TupleDescriptor::getSlots) - .flatMap(List::stream) - .peek(s -> s.setIsMaterialized(slotIdSet.contains(s.getId()))) - .filter(SlotDescriptor::isMaterialized) - .count() == 0; - if (noneMaterialized) { - context.getDescTable() - .getTupleDesc(execPlan.getTupleIds().get(0)).getSlots().get(0).setIsMaterialized(true); - } - if (execPlan instanceof ScanNode) { - try { - ((ScanNode) execPlan).updateRequiredSlots(context, requiredByProjectSlotIdSet); - } catch (UserException e) { - Util.logAndThrowRuntimeException(LOG, - "User Exception while reset external file scan node contexts.", e); + List<SlotId> slotIdsByOrder, PlanTranslatorContext context) { + // TODO: use smallest slot if do not need any slot in upper node + SlotDescriptor smallest = scanNode.getTupleDesc().getSlots().get(0); + if (CollectionUtils.isNotEmpty(slotIdsByOrder)) { + // if we eliminate project above scan, we should ensure the slot order of scan's output is same with + // the projection's output. So, we need to reorder the output slot in scan's tuple. + Map<SlotId, SlotDescriptor> idToSlotDescMap = scanNode.getTupleDesc().getSlots().stream() + .filter(s -> requiredSlotIdSet.contains(s.getId())) + .collect(Collectors.toMap(SlotDescriptor::getId, s -> s)); + scanNode.getTupleDesc().getSlots().clear(); + for (SlotId slotId : slotIdsByOrder) { + scanNode.getTupleDesc().getSlots().add(idToSlotDescMap.get(slotId)); } + } else { + scanNode.getTupleDesc().getSlots().removeIf(s -> !requiredSlotIdSet.contains(s.getId())); + } + if (scanNode.getTupleDesc().getSlots().isEmpty()) { + scanNode.getTupleDesc().getSlots().add(smallest); + } + try { + scanNode.updateRequiredSlots(context, requiredByProjectSlotIdSet); + } catch (UserException e) { + Util.logAndThrowRuntimeException(LOG, + "User Exception while reset external file scan node contexts.", e); } } @@ -1967,16 +1951,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla updateLegacyPlanIdToPhysicalPlan(planNode, filter); } - private void extractExecSlot(Expr root, Set<SlotId> slotIdList) { - if (root instanceof SlotRef) { - slotIdList.add(((SlotRef) root).getDesc().getId()); - return; - } - for (Expr child : root.getChildren()) { - extractExecSlot(child, slotIdList); - } - } - private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table, Set<ExprId> deferredMaterializedExprIds, PlanTranslatorContext context) { TupleDescriptor tupleDescriptor = context.generateTupleDesc(); @@ -1999,39 +1973,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla return tupleDescriptor; } - private TupleDescriptor generateTupleDesc(List<Slot> slotList, List<OrderKey> orderKeyList, - List<Expr> newOrderingExprList, - PlanTranslatorContext context, Table table) { - TupleDescriptor tupleDescriptor = context.generateTupleDesc(); - Set<ExprId> alreadyExists = Sets.newHashSet(); - tupleDescriptor.setTable(table); - for (OrderKey orderKey : orderKeyList) { - SlotReference slotReference; - if (orderKey.getExpr() instanceof SlotReference) { - slotReference = (SlotReference) orderKey.getExpr(); - } else { - slotReference = (SlotReference) new Alias(orderKey.getExpr(), orderKey.getExpr().toString()).toSlot(); - } - // TODO: trick here, we need semanticEquals to remove redundant expression - if (alreadyExists.contains(slotReference.getExprId())) { - newOrderingExprList.add(context.findSlotRef(slotReference.getExprId())); - continue; - } - context.createSlotDesc(tupleDescriptor, slotReference); - newOrderingExprList.add(context.findSlotRef(slotReference.getExprId())); - alreadyExists.add(slotReference.getExprId()); - } - for (Slot slot : slotList) { - if (alreadyExists.contains(slot.getExprId())) { - continue; - } - context.createSlotDesc(tupleDescriptor, (SlotReference) slot); - alreadyExists.add(slot.getExprId()); - } - - return tupleDescriptor; - } - private PlanFragment connectJoinNode(HashJoinNode hashJoinNode, PlanFragment leftFragment, PlanFragment rightFragment, PlanTranslatorContext context, AbstractPlan join) { hashJoinNode.setChild(0, leftFragment.getPlanRoot()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeSort.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeSort.java index 9480a1bb83..5426b5af6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeSort.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeSort.java @@ -17,33 +17,52 @@ package org.apache.doris.nereids.rules.rewrite; +import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; -import java.util.stream.Collectors; +import java.util.List; +import java.util.stream.Stream; /** - * the sort node will create new slots for order by keys if the order by keys is not in the output - * so need create a project above sort node to prune the unnecessary order by keys. This means the - * Tuple slots size is difference to PhysicalSort.output.size. If not prune and hide the order key, - * the upper plan node will see the temporary slots and treat as output, and then translate failed. - * This is trick, we should add sort output tuple to ensure the tuple slot size is equals, but it - * has large workload. I think we should refactor the PhysicalPlanTranslator in the future, and - * process PhysicalProject(output)/PhysicalDistribute more general. + * SortNode on BE always output order keys because BE needs them to do merge sort. So we normalize LogicalSort as BE + * expected to materialize order key before sort by bottom project and then prune the useless column after sort by + * top project. */ public class NormalizeSort extends OneRewriteRuleFactory { @Override public Rule build() { - return logicalSort() - .when(sort -> !sort.isNormalized() && !sort.getOutputSet() - .containsAll(sort.getOrderKeys().stream() - .map(orderKey -> orderKey.getExpr()).collect(Collectors.toSet()))) + return logicalSort().whenNot(sort -> sort.getOrderKeys().stream() + .map(OrderKey::getExpr).allMatch(Slot.class::isInstance)) .then(sort -> { - return new LogicalProject(sort.getOutput(), ImmutableList.of(), false, - sort.withNormalize(true)); + List<NamedExpression> newProjects = Lists.newArrayList(); + List<OrderKey> newOrderKeys = sort.getOrderKeys().stream() + .map(orderKey -> { + Expression expr = orderKey.getExpr(); + if (!(expr instanceof Slot)) { + Alias alias = new Alias(expr, expr.toSql()); + newProjects.add(alias); + expr = alias.toSlot(); + } + return orderKey.withExpression(expr); + }).collect(ImmutableList.toImmutableList()); + List<NamedExpression> bottomProjections = Stream.concat( + sort.child().getOutput().stream(), + newProjects.stream() + ).collect(ImmutableList.toImmutableList()); + List<NamedExpression> topProjections = sort.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()); + return new LogicalProject<>(topProjections, sort.withOrderKeysAndChild(newOrderKeys, + new LogicalProject<>(bottomProjections, sort.child()))); }).toRule(RuleType.NORMALIZE_SORT); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java index 5918422966..f139656f01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java @@ -46,29 +46,17 @@ public class LogicalSort<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYP private final List<OrderKey> orderKeys; - private final boolean normalized; - public LogicalSort(List<OrderKey> orderKeys, CHILD_TYPE child) { this(orderKeys, Optional.empty(), Optional.empty(), child); } - public LogicalSort(List<OrderKey> orderKeys, CHILD_TYPE child, boolean normalized) { - this(orderKeys, Optional.empty(), Optional.empty(), child, normalized); - } - /** * Constructor for LogicalSort. */ public LogicalSort(List<OrderKey> orderKeys, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { - this(orderKeys, groupExpression, logicalProperties, child, false); - } - - public LogicalSort(List<OrderKey> orderKeys, Optional<GroupExpression> groupExpression, - Optional<LogicalProperties> logicalProperties, CHILD_TYPE child, boolean normalized) { super(PlanType.LOGICAL_SORT, groupExpression, logicalProperties, child); this.orderKeys = ImmutableList.copyOf(Objects.requireNonNull(orderKeys, "orderKeys can not be null")); - this.normalized = normalized; } @Override @@ -80,10 +68,6 @@ public class LogicalSort<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYP return orderKeys; } - public boolean isNormalized() { - return normalized; - } - @Override public String toString() { return Utils.toSqlString("LogicalSort[" + id.asInt() + "]", @@ -98,7 +82,7 @@ public class LogicalSort<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYP if (o == null || getClass() != o.getClass()) { return false; } - LogicalSort that = (LogicalSort) o; + LogicalSort<?> that = (LogicalSort<?>) o; return Objects.equals(orderKeys, that.orderKeys); } @@ -122,30 +106,27 @@ public class LogicalSort<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYP @Override public LogicalSort<Plan> withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalSort<>(orderKeys, children.get(0), normalized); + return new LogicalSort<>(orderKeys, children.get(0)); } @Override public LogicalSort<Plan> withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalSort<>(orderKeys, groupExpression, Optional.of(getLogicalProperties()), child(), - normalized); + return new LogicalSort<>(orderKeys, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override - public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + public LogicalSort<Plan> withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalSort<>(orderKeys, groupExpression, logicalProperties, children.get(0), - normalized); + return new LogicalSort<>(orderKeys, groupExpression, logicalProperties, children.get(0)); } public LogicalSort<Plan> withOrderKeys(List<OrderKey> orderKeys) { return new LogicalSort<>(orderKeys, Optional.empty(), - Optional.of(getLogicalProperties()), child(), false); + Optional.of(getLogicalProperties()), child()); } - public LogicalSort<Plan> withNormalize(boolean orderKeysPruned) { - return new LogicalSort<>(orderKeys, groupExpression, Optional.of(getLogicalProperties()), child(), - orderKeysPruned); + public LogicalSort<Plan> withOrderKeysAndChild(List<OrderKey> orderKeys, Plan child) { + return new LogicalSort<>(orderKeys, child); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java index ef198891cc..9e14b4f267 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java @@ -17,12 +17,8 @@ package org.apache.doris.planner; -import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SortInfo; -import org.apache.doris.common.NotImplementedException; import org.apache.doris.nereids.trees.plans.WindowFuncType; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -34,40 +30,29 @@ import org.apache.doris.thrift.TopNAlgorithm; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Set; /** * PartitionSortNode. * PartitionSortNode is only used in the Nereids. */ public class PartitionSortNode extends PlanNode { - private static final Logger LOG = LogManager.getLogger(PartitionSortNode.class); - private List<Expr> resolvedTupleExprs; private final WindowFuncType function; private final List<Expr> partitionExprs; private final SortInfo info; private final boolean hasGlobalLimit; private final long partitionLimit; - private boolean isUnusedExprRemoved = false; - private ArrayList<Boolean> nullabilityChangedFlags = Lists.newArrayList(); - /** * Constructor. */ public PartitionSortNode(PlanNodeId id, PlanNode input, WindowFuncType function, List<Expr> partitionExprs, - SortInfo info, boolean hasGlobalLimit, long partitionLimit, - List<Expr> outputList, List<Expr> orderingExpr) { + SortInfo info, boolean hasGlobalLimit, long partitionLimit) { super(id, "PartitionTopN", StatisticalType.PARTITION_TOPN_NODE); + Preconditions.checkArgument(info.getOrderingExprs().size() == info.getIsAscOrder().size()); this.function = function; this.partitionExprs = partitionExprs; this.info = info; @@ -77,38 +62,12 @@ public class PartitionSortNode extends PlanNode { this.tblRefIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId())); this.nullableTupleIds.addAll(input.getNullableTupleIds()); this.children.add(input); - - List<Expr> resolvedTupleExprs = new ArrayList<>(); - for (Expr order : orderingExpr) { - if (!resolvedTupleExprs.contains(order)) { - resolvedTupleExprs.add(order); - } - } - for (Expr output : outputList) { - if (!resolvedTupleExprs.contains(output)) { - resolvedTupleExprs.add(output); - } - } - this.resolvedTupleExprs = ImmutableList.copyOf(resolvedTupleExprs); - info.setSortTupleSlotExprs(resolvedTupleExprs); - - nullabilityChangedFlags.clear(); - for (int i = 0; i < resolvedTupleExprs.size(); i++) { - nullabilityChangedFlags.add(false); - } - Preconditions.checkArgument(info.getOrderingExprs().size() == info.getIsAscOrder().size()); } public SortInfo getSortInfo() { return info; } - @Override - public void getMaterializedIds(Analyzer analyzer, List<SlotId> ids) { - super.getMaterializedIds(analyzer, ids); - Expr.getIds(info.getOrderingExprs(), null, ids); - } - @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { if (detailLevel == TExplainLevel.BRIEF) { @@ -164,34 +123,12 @@ public class PartitionSortNode extends PlanNode { return output.toString(); } - private void removeUnusedExprs() { - if (!isUnusedExprRemoved) { - if (resolvedTupleExprs != null) { - List<SlotDescriptor> slotDescriptorList = this.info.getSortTupleDescriptor().getSlots(); - for (int i = slotDescriptorList.size() - 1; i >= 0; i--) { - if (!slotDescriptorList.get(i).isMaterialized()) { - resolvedTupleExprs.remove(i); - nullabilityChangedFlags.remove(i); - } - } - } - isUnusedExprRemoved = true; - } - } - @Override protected void toThrift(TPlanNode msg) { msg.node_type = TPlanNodeType.PARTITION_SORT_NODE; TSortInfo sortInfo = info.toThrift(); Preconditions.checkState(tupleIds.size() == 1, "Incorrect size for tupleIds in PartitionSortNode"); - removeUnusedExprs(); - if (resolvedTupleExprs != null) { - sortInfo.setSortTupleSlotExprs(Expr.treesToThrift(resolvedTupleExprs)); - // FIXME this is a bottom line solution for wrong nullability of resolvedTupleExprs - // remove the following line after nereids online - sortInfo.setSlotExprsNullabilityChangedFlags(nullabilityChangedFlags); - } TopNAlgorithm topNAlgorithm; if (function == WindowFuncType.ROW_NUMBER) { @@ -210,13 +147,4 @@ public class PartitionSortNode extends PlanNode { partitionSortNode.setPartitionInnerLimit(partitionLimit); msg.partition_sort_node = partitionSortNode; } - - @Override - public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { - removeUnusedExprs(); - List<Expr> materializedTupleExprs = new ArrayList<>(resolvedTupleExprs); - List<SlotId> result = Lists.newArrayList(); - Expr.getIds(materializedTupleExprs, null, result); - return new HashSet<>(result); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index aab8f44186..c4dbe60644 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -27,7 +27,6 @@ import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SortInfo; -import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.statistics.StatisticalType; @@ -329,31 +328,4 @@ public class SortNode extends PlanNode { Expr.getIds(materializedTupleExprs, null, result); return new HashSet<>(result); } - - /** - * Supplement the information needed by be for the sort node. - * TODO: currently we only process slotref, so when order key is a + 1, we will failed. - */ - public void finalizeForNereids(TupleDescriptor tupleDescriptor, - List<Expr> outputList, List<Expr> orderingExpr) { - resolvedTupleExprs = Lists.newArrayList(); - // TODO: should fix the duplicate order by exprs in nereids code later - for (Expr order : orderingExpr) { - if (!resolvedTupleExprs.contains(order)) { - resolvedTupleExprs.add(order); - } - } - for (Expr output : outputList) { - if (!resolvedTupleExprs.contains(output)) { - resolvedTupleExprs.add(output); - } - } - info.setSortTupleDesc(tupleDescriptor); - info.setSortTupleSlotExprs(resolvedTupleExprs); - - nullabilityChangedFlags.clear(); - for (int i = 0; i < resolvedTupleExprs.size(); i++) { - nullabilityChangedFlags.add(false); - } - } } diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out index ed8bcb300f..7b27c69128 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out @@ -34,13 +34,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) ------PhysicalDistribute --------PhysicalTopN ----------PhysicalProject -------------hashJoin[INNER_JOIN](s_store_name = v1_lead.s_store_name)(v1.i_category = v1_lead.i_category)(v1.i_brand = v1_lead.i_brand)(v1.s_company_name = v1_lead.s_company_name)(v1.rn = expr_(rn - 1)) +------------hashJoin[INNER_JOIN](v1.i_category = v1_lead.i_category)(v1.i_brand = v1_lead.i_brand)(v1.s_store_name = v1_lead.s_store_name)(v1.s_company_name = v1_lead.s_company_name)(v1.rn = expr_(rn - 1)) --------------PhysicalDistribute ----------------PhysicalProject ------------------PhysicalCteConsumer ( cteId=CTEId#0 ) --------------PhysicalDistribute ----------------PhysicalProject -------------------hashJoin[INNER_JOIN](s_store_name = v1_lag.s_store_name)(v1.i_category = v1_lag.i_category)(v1.i_brand = v1_lag.i_brand)(v1.s_company_name = v1_lag.s_company_name)(v1.rn = expr_(rn + 1)) +------------------hashJoin[INNER_JOIN](v1.i_category = v1_lag.i_category)(v1.i_brand = v1_lag.i_brand)(v1.s_store_name = v1_lag.s_store_name)(v1.s_company_name = v1_lag.s_company_name)(v1.rn = expr_(rn + 1)) --------------------PhysicalDistribute ----------------------PhysicalProject ------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out index b54f9d5abc..0e13df083b 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out @@ -34,13 +34,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) ------PhysicalDistribute --------PhysicalTopN ----------PhysicalProject -------------hashJoin[INNER_JOIN](i_brand = v1_lead.i_brand)(v1.i_category = v1_lead.i_category)(v1.cc_name = v1_lead.cc_name)(v1.rn = expr_(rn - 1)) +------------hashJoin[INNER_JOIN](v1.i_category = v1_lead.i_category)(v1.i_brand = v1_lead.i_brand)(v1.cc_name = v1_lead.cc_name)(v1.rn = expr_(rn - 1)) --------------PhysicalDistribute ----------------PhysicalProject ------------------PhysicalCteConsumer ( cteId=CTEId#0 ) --------------PhysicalDistribute ----------------PhysicalProject -------------------hashJoin[INNER_JOIN](i_brand = v1_lag.i_brand)(v1.i_category = v1_lag.i_category)(v1.cc_name = v1_lag.cc_name)(v1.rn = expr_(rn + 1)) +------------------hashJoin[INNER_JOIN](v1.i_category = v1_lag.i_category)(v1.i_brand = v1_lag.i_brand)(v1.cc_name = v1_lag.cc_name)(v1.rn = expr_(rn + 1)) --------------------PhysicalDistribute ----------------------PhysicalProject ------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org