This is an automated email from the ASF dual-hosted git repository. lingmiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 51855633e4 [feature](Nereids): cost and enforcer job in cascades. (#10657) 51855633e4 is described below commit 51855633e47b6eb6c7b2618bf16d4d073d073168 Author: jakevin <jakevin...@gmail.com> AuthorDate: Mon Jul 11 15:01:59 2022 +0800 [feature](Nereids): cost and enforcer job in cascades. (#10657) Issue Number: close #9640 Add enforcer job for cascades. Inspired by to *NoisePage enforcer job*, and *ORCA paper* During this period, we will derive physical property for plan tree, and prune the plan according to the cos. --- .../apache/doris/nereids/cost/CostCalculator.java | 4 +- .../org/apache/doris/nereids/jobs/JobContext.java | 6 +- .../nereids/jobs/cascades/CostAndEnforcerJob.java | 189 +++++++++++++++++++++ .../java/org/apache/doris/nereids/memo/Group.java | 31 ++++ .../apache/doris/nereids/memo/GroupExpression.java | 34 ++++ .../java/org/apache/doris/nereids/memo/Memo.java | 7 + .../doris/nereids/operators/OperatorType.java | 7 +- .../plans/physical/PhysicalDistribution.java} | 29 ++-- .../properties/ChildrenOutputPropertyDeriver.java | 74 ++++++++ .../doris/nereids/properties/DistributionSpec.java | 29 +++- .../properties/EnforceMissingPropertiesHelper.java | 125 ++++++++++++++ ...Properties.java => GatherDistributionSpec.java} | 15 +- .../apache/doris/nereids/properties/OrderKey.java | 9 + .../apache/doris/nereids/properties/OrderSpec.java | 65 +++++++ .../properties/ParentRequiredPropertyDeriver.java | 58 +++++++ .../nereids/properties/PhysicalProperties.java | 35 +++- 16 files changed, 678 insertions(+), 39 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java index 0960dcb703..6e7bfd895e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java @@ -40,14 +40,14 @@ public class CostCalculator { /** * Constructor. */ - public double calculateCost(GroupExpression groupExpression) { + public static double calculateCost(GroupExpression groupExpression) { PlanContext planContext = new PlanContext(groupExpression); CostEstimator costCalculator = new CostEstimator(); CostEstimate costEstimate = groupExpression.getOperator().accept(costCalculator, planContext); return costFormula(costEstimate); } - private double costFormula(CostEstimate costEstimate) { + private static double costFormula(CostEstimate costEstimate) { double cpuCostWeight = 1; double memoryCostWeight = 1; double networkCostWeight = 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java index 0aee081b9b..62143398e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java @@ -26,7 +26,7 @@ import org.apache.doris.nereids.properties.PhysicalProperties; public class JobContext { private final PlannerContext plannerContext; private final PhysicalProperties requiredProperties; - private final double costUpperBound; + private double costUpperBound; public JobContext(PlannerContext plannerContext, PhysicalProperties requiredProperties, double costUpperBound) { this.plannerContext = plannerContext; @@ -45,4 +45,8 @@ public class JobContext { public double getCostUpperBound() { return costUpperBound; } + + public void setCostUpperBound(double costUpperBound) { + this.costUpperBound = costUpperBound; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java index ee72328afc..5516525233 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java @@ -17,18 +17,49 @@ package org.apache.doris.nereids.jobs.cascades; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.PlanContext; +import org.apache.doris.nereids.cost.CostCalculator; import org.apache.doris.nereids.jobs.Job; import org.apache.doris.nereids.jobs.JobContext; import org.apache.doris.nereids.jobs.JobType; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.ChildrenOutputPropertyDeriver; +import org.apache.doris.nereids.properties.EnforceMissingPropertiesHelper; +import org.apache.doris.nereids.properties.ParentRequiredPropertyDeriver; +import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.plans.Plan; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Optional; + /** * Job to compute cost and add enforcer. */ public class CostAndEnforcerJob extends Job<Plan> { + // GroupExpression to optimize private final GroupExpression groupExpression; + // Current total cost + private double curTotalCost; + + // Properties from parent plan node. + // Like: Physical Hash Join + // [ [Properties ["", ANY], Properties ["", BROADCAST]], + // [Properties ["", SHUFFLE_JOIN], Properties ["", SHUFFLE_JOIN]] ] + private List<List<PhysicalProperties>> propertiesListList; + + private List<GroupExpression> childrenBestGroupExprList; + private List<PhysicalProperties> childrenOutputProperties = Lists.newArrayList(); + + // Current stage of enumeration through child groups + private int curChildIndex = -1; + // Indicator of last child group that we waited for optimization + private int prevChildIndex = -1; + // Current stage of enumeration through outputInputProperties + private int curPropertyPairIndex = 0; public CostAndEnforcerJob(GroupExpression groupExpression, JobContext context) { super(JobType.OPTIMIZE_CHILDREN, context); @@ -47,4 +78,162 @@ public class CostAndEnforcerJob extends Job<Plan> { } } } + + /** + * execute. + */ + public void execute1() { + // Do init logic of root operator/groupExpr of `subplan`, only run once per task. + if (curChildIndex != -1) { + curTotalCost = 0; + + // Get property from groupExpression operator (it's root of subplan). + ParentRequiredPropertyDeriver parentRequiredPropertyDeriver = new ParentRequiredPropertyDeriver(context); + propertiesListList = parentRequiredPropertyDeriver.getRequiredPropertyListList(groupExpression); + + curChildIndex = 0; + } + + for (; curPropertyPairIndex < propertiesListList.size(); curPropertyPairIndex++) { + // children input properties + List<PhysicalProperties> childrenInputProperties = propertiesListList.get(curPropertyPairIndex); + + // Calculate cost of groupExpression and update total cost + if (curChildIndex == 0 && prevChildIndex == -1) { + curTotalCost += CostCalculator.calculateCost(groupExpression); + } + + for (; curChildIndex < groupExpression.arity(); curChildIndex++) { + PhysicalProperties childInputProperties = childrenInputProperties.get(curChildIndex); + Group childGroup = groupExpression.child(curChildIndex); + + // Whether the child group was optimized for this childInputProperties according to + // the result of returning. + Optional<Pair<Double, GroupExpression>> lowestCostPlanOpt = childGroup.getLowestCostPlan( + childInputProperties); + + if (!lowestCostPlanOpt.isPresent()) { + // The child should be pruned due to cost prune. + if (prevChildIndex >= curChildIndex) { + break; + } + + // This child isn't optimized, create new tasks to optimize it. + // Meaning that optimize recursively by derive tasks. + prevChildIndex = curChildIndex; + pushTask((CostAndEnforcerJob) clone()); + double newCostUpperBound = context.getCostUpperBound() - curTotalCost; + JobContext jobContext = new JobContext(context.getPlannerContext(), childInputProperties, + newCostUpperBound); + pushTask(new OptimizeGroupJob(childGroup, jobContext)); + return; + } + + GroupExpression lowestCostExpr = lowestCostPlanOpt.get().second; + + PhysicalProperties childOutputProperty = lowestCostExpr.getPropertyFromMap(childInputProperties); + // TODO: maybe need to record children lowestCostExpr + childrenInputProperties.set(curChildIndex, childOutputProperty); + + // todo: check whether split agg broadcast row count limit. + + curTotalCost += lowestCostExpr.getLowestCostTable().get(childInputProperties).first; + if (curTotalCost > context.getCostUpperBound()) { + break; + } + } + + // When we successfully optimize all child group, it's last child. + if (curChildIndex == groupExpression.arity()) { + // Not need to do pruning here because it has been done when we get the + // best expr from the child group + + // TODO: it could update the cost. + PhysicalProperties outputProperty = ChildrenOutputPropertyDeriver.getProperties( + context.getRequiredProperties(), + childrenOutputProperties, groupExpression); + + if (curTotalCost > context.getCostUpperBound()) { + break; + } + + /* update current group statistics and re-compute costs. */ + if (groupExpression.children().stream().anyMatch(group -> group.getStatistics() != null)) { + return; + } + PlanContext planContext = new PlanContext(groupExpression); + // TODO: calculate stats. + groupExpression.getParent().setStatistics(planContext.getStatistics()); + + enforce(outputProperty, childrenInputProperties); + } + + // Reset child idx and total cost + prevChildIndex = -1; + curChildIndex = 0; + curTotalCost = 0; + } + } + + private void enforce(PhysicalProperties outputProperty, List<PhysicalProperties> inputProperties) { + + // groupExpression can satisfy its own output property + putProperty(groupExpression, outputProperty, outputProperty, inputProperties); + // groupExpression can satisfy the ANY type output property + putProperty(groupExpression, outputProperty, new PhysicalProperties(), inputProperties); + + EnforceMissingPropertiesHelper enforceMissingPropertiesHelper = new EnforceMissingPropertiesHelper(context, + groupExpression, curTotalCost); + + PhysicalProperties requiredProperties = context.getRequiredProperties(); + if (outputProperty.meet(requiredProperties)) { + Pair<PhysicalProperties, Double> pair = enforceMissingPropertiesHelper.enforceProperty(outputProperty, + requiredProperties); + PhysicalProperties addEnforcedProperty = pair.first; + curTotalCost = pair.second; + + // enforcedProperty is superset of requiredProperty + if (!addEnforcedProperty.equals(requiredProperties)) { + putProperty(groupExpression.getParent().getBestExpression(addEnforcedProperty), + requiredProperties, requiredProperties, Lists.newArrayList(outputProperty)); + } + } else { + if (!outputProperty.equals(requiredProperties)) { + putProperty(groupExpression, outputProperty, requiredProperties, inputProperties); + } + } + + if (curTotalCost < context.getCostUpperBound()) { + context.setCostUpperBound(curTotalCost); + } + } + + private void putProperty(GroupExpression groupExpression, + PhysicalProperties outputProperty, + PhysicalProperties requiredProperty, + List<PhysicalProperties> inputProperties) { + if (groupExpression.updateLowestCostTable(requiredProperty, inputProperties, curTotalCost)) { + // Each group expression need to record the outputProperty satisfy what requiredProperty, + // because group expression can generate multi outputProperty. eg. Join may have shuffle local + // and shuffle join two types outputProperty. + groupExpression.putOutputPropertiesMap(outputProperty, requiredProperty); + } + this.groupExpression.getParent().setBestPlan(groupExpression, + curTotalCost, requiredProperty); + } + + + /** + * Shallow clone (ignore clone propertiesListList and groupExpression). + */ + @Override + public Object clone() { + CostAndEnforcerJob task; + try { + task = (CostAndEnforcerJob) super.clone(); + } catch (CloneNotSupportedException ignored) { + return null; + } + return task; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java index d2e3e2e86c..a9343cdd97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java @@ -24,6 +24,7 @@ import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.statistics.StatsDeriveResult; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -50,6 +51,7 @@ public class Group { private double costLowerBound = -1; private boolean isExplored = false; private boolean hasCost = false; + private StatsDeriveResult statistics; /** * Constructor for Group. @@ -135,6 +137,35 @@ public class Group { this.costLowerBound = costLowerBound; } + /** + * Set or update lowestCostPlans: properties --> new Pair<>(cost, expression) + */ + public void setBestPlan(GroupExpression expression, double cost, PhysicalProperties properties) { + if (lowestCostPlans.containsKey(properties)) { + if (lowestCostPlans.get(properties).first > cost) { + lowestCostPlans.put(properties, new Pair<>(cost, expression)); + } + } else { + lowestCostPlans.put(properties, new Pair<>(cost, expression)); + } + } + + public GroupExpression getBestExpression(PhysicalProperties properties) { + if (lowestCostPlans.containsKey(properties)) { + return lowestCostPlans.get(properties).second; + } + return null; + } + + public StatsDeriveResult getStatistics() { + return statistics; + } + + public void setStatistics(StatsDeriveResult statistics) { + this.statistics = statistics; + } + + public List<GroupExpression> getLogicalExpressions() { return logicalExpressions; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java index bb7da075cd..da4373de66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java @@ -44,6 +44,8 @@ public class GroupExpression { // Mapping from output properties to the corresponding best cost, statistics, and child properties. private final Map<PhysicalProperties, Pair<Double, List<PhysicalProperties>>> lowestCostTable; + // Each physical group expression maintains mapping incoming requests to the corresponding child requests. + private final Map<PhysicalProperties, PhysicalProperties> requestPropertiesMap; public GroupExpression(Operator op) { this(op, Lists.newArrayList()); @@ -61,6 +63,14 @@ public class GroupExpression { this.ruleMasks = new BitSet(RuleType.SENTINEL.ordinal()); this.statDerived = false; this.lowestCostTable = Maps.newHashMap(); + this.requestPropertiesMap = Maps.newHashMap(); + } + + // TODO: rename + public PhysicalProperties getPropertyFromMap(PhysicalProperties requiredPropertySet) { + PhysicalProperties outputProperty = requestPropertiesMap.get(requiredPropertySet); + Preconditions.checkState(outputProperty != null); + return outputProperty; } public int arity() { @@ -124,6 +134,30 @@ public class GroupExpression { return lowestCostTable.get(require).second; } + /** + * Add a (parentOutputProperties) -> (cost, childrenInputProperties) in lowestCostTable. + */ + public boolean updateLowestCostTable( + PhysicalProperties parentOutputProperties, + List<PhysicalProperties> childrenInputProperties, + double cost) { + if (lowestCostTable.containsKey(parentOutputProperties)) { + if (lowestCostTable.get(parentOutputProperties).first > cost) { + lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties)); + return true; + } + } else { + lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties)); + return true; + } + return false; + } + + public void putOutputPropertiesMap(PhysicalProperties outputPropertySet, + PhysicalProperties requiredPropertySet) { + this.requestPropertiesMap.put(requiredPropertySet, outputPropertySet); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java index 5924a75bdc..e07f4e46ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java @@ -193,4 +193,11 @@ public class Memo { } groups.remove(source); } + + /** + * Add enforcer expression into the target group. + */ + public void addEnforcerPlan(GroupExpression groupExpression, Group group) { + groupExpression.setParent(group); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java index ad112935cf..a7f72ca3b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java @@ -23,9 +23,9 @@ package org.apache.doris.nereids.operators; * 1. ANY: match any operator * 2. MULTI: match multiple operators * 3. FIXED: the leaf node of pattern tree, which can be matched by a single operator - * but this operator cannot be used in rules + * but this operator cannot be used in rules * 4. MULTI_FIXED: the leaf node of pattern tree, which can be matched by multiple operators, - * but these operators cannot be used in rules + * but these operators cannot be used in rules */ public enum OperatorType { UNKNOWN, @@ -48,5 +48,6 @@ public enum OperatorType { PHYSICAL_AGGREGATION, PHYSICAL_SORT, PHYSICAL_HASH_JOIN, - PHYSICAL_EXCHANGE; + PHYSICAL_EXCHANGE, + PHYSICAL_DISTRIBUTION; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalDistribution.java similarity index 58% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalDistribution.java index 4122de6901..64c23ee024 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalDistribution.java @@ -15,29 +15,28 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.properties; +package org.apache.doris.nereids.operators.plans.physical; -import org.apache.doris.planner.DataPartition; +import org.apache.doris.nereids.operators.OperatorType; +import org.apache.doris.nereids.properties.DistributionSpec; +import org.apache.doris.nereids.trees.expressions.Expression; + +import java.util.List; /** - * Base class for data distribution. + * Enforcer operator. */ -public class DistributionSpec { - - private DataPartition dataPartition; +public class PhysicalDistribution extends PhysicalUnaryOperator { - public DistributionSpec() { - } + protected DistributionSpec distributionSpec; - public DistributionSpec(DataPartition dataPartition) { - this.dataPartition = dataPartition; - } - public DataPartition getDataPartition() { - return dataPartition; + public PhysicalDistribution(DistributionSpec spec) { + super(OperatorType.PHYSICAL_DISTRIBUTION); } - public void setDataPartition(DataPartition dataPartition) { - this.dataPartition = dataPartition; + @Override + public List<Expression> getExpressions() { + return null; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java new file mode 100644 index 0000000000..6df2cfe15c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.properties; + +import org.apache.doris.nereids.PlanContext; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.operators.Operator; +import org.apache.doris.nereids.operators.OperatorVisitor; + +import java.util.List; + +/** + * Used for property drive. + */ +public class ChildrenOutputPropertyDeriver extends OperatorVisitor<PhysicalProperties, PlanContext> { + PhysicalProperties requirements; + List<PhysicalProperties> childrenOutputProperties; + + public ChildrenOutputPropertyDeriver(PhysicalProperties requirements, + List<PhysicalProperties> childrenOutputProperties) { + this.childrenOutputProperties = childrenOutputProperties; + this.requirements = requirements; + } + + public static PhysicalProperties getProperties( + PhysicalProperties requirements, + List<PhysicalProperties> childrenOutputProperties, + GroupExpression groupExpression) { + + ChildrenOutputPropertyDeriver childrenOutputPropertyDeriver = new ChildrenOutputPropertyDeriver(requirements, + childrenOutputProperties); + + return groupExpression.getOperator().accept(childrenOutputPropertyDeriver, new PlanContext(groupExpression)); + } + + public PhysicalProperties getRequirements() { + return requirements; + } + + // public List<List<PhysicalProperties>> getProperties(GroupExpression groupExpression) { + // properties = Lists.newArrayList(); + // groupExpression.getOperator().accept(this, new PlanContext(groupExpression)); + // return properties; + // } + + // @Override + // public Void visitOperator(Operator operator, PlanContext context) { + // List<PhysicalProperties> props = Lists.newArrayList(); + // for (int childIndex = 0; childIndex < context.getGroupExpression().arity(); ++childIndex) { + // props.add(new PhysicalProperties()); + // } + // properties.add(props); + // return null; + // } + @Override + public PhysicalProperties visitOperator(Operator node, PlanContext context) { + return new PhysicalProperties(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java index 4122de6901..cd0ea5a438 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java @@ -17,15 +17,21 @@ package org.apache.doris.nereids.properties; +import org.apache.doris.nereids.memo.Group; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.operators.plans.physical.PhysicalDistribution; import org.apache.doris.planner.DataPartition; +import com.google.common.collect.Lists; + /** - * Base class for data distribution. + * Spec of data distribution. */ public class DistributionSpec { private DataPartition dataPartition; + // TODO: why exist? public DistributionSpec() { } @@ -33,6 +39,16 @@ public class DistributionSpec { this.dataPartition = dataPartition; } + /** + * TODO: need read ORCA. + * Whether other `DistributionSpec` is satisfied the current `DistributionSpec`. + * + * @param other another DistributionSpec. + */ + public boolean meet(DistributionSpec other) { + return false; + } + public DataPartition getDataPartition() { return dataPartition; } @@ -40,4 +56,15 @@ public class DistributionSpec { public void setDataPartition(DataPartition dataPartition) { this.dataPartition = dataPartition; } + + public GroupExpression addEnforcer(Group child) { + return new GroupExpression(new PhysicalDistribution(new DistributionSpec(dataPartition)), + Lists.newArrayList(child)); + } + + // TODO + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java new file mode 100644 index 0000000000..aaa7a0d113 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.nereids.properties; + +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.cost.CostCalculator; +import org.apache.doris.nereids.jobs.JobContext; +import org.apache.doris.nereids.memo.GroupExpression; + +import com.google.common.collect.Lists; + +/** + * When parent node request some properties but children don't have. + * Enforce add missing properties for child. + */ +public class EnforceMissingPropertiesHelper { + + private JobContext context; + private GroupExpression groupExpression; + private double curTotalCost; + + public EnforceMissingPropertiesHelper(JobContext context, GroupExpression groupExpression, + double curTotalCost) { + this.context = context; + this.groupExpression = groupExpression; + this.curTotalCost = curTotalCost; + } + + /** + * Enforce missing property. + */ + public Pair<PhysicalProperties, Double> enforceProperty(PhysicalProperties output, PhysicalProperties required) { + boolean isMeetOrder = output.getOrderSpec().meet(required.getOrderSpec()); + boolean isMeetDistribution = output.getDistributionSpec().meet(required.getDistributionSpec()); + + if (!isMeetDistribution && !isMeetOrder) { + return new Pair<>(enforceSortAndDistribution(output, required), curTotalCost); + } else if (isMeetDistribution && isMeetOrder) { + return new Pair<>(null, curTotalCost); + } else if (!isMeetDistribution) { + if (required.getOrderSpec().getOrderKeys().isEmpty()) { + return new Pair<>(enforceDistribution(output), curTotalCost); + } else { + // TODO + // It's wrong that SortSpec is empty. + // After redistribute data , original order requirement may be wrong. Need enforce "SortNode" here. + // PhysicalProperties newProperty = + // new PhysicalProperties(new DistributionSpec(), new OrderSpec(Lists.newArrayList())); + // groupExpression.getParent(). + // return enforceSortAndDistribution(newProperty, required); + return new Pair<>(enforceDistribution(output), curTotalCost); + } + } else { + return new Pair<>(enforceSort(output), curTotalCost); + } + } + + private PhysicalProperties enforceSort(PhysicalProperties oldOutputProperty) { + // clone + PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(), + oldOutputProperty.getOrderSpec()); + newOutputProperty.setOrderSpec(context.getRequiredProperties().getOrderSpec()); + GroupExpression enforcer = + context.getRequiredProperties().getOrderSpec().addEnforcer(groupExpression.getParent()); + + updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty); + + return newOutputProperty; + } + + private PhysicalProperties enforceDistribution(PhysicalProperties oldOutputProperty) { + PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(), + oldOutputProperty.getOrderSpec()); + newOutputProperty.setDistributionSpec(context.getRequiredProperties().getDistributionSpec()); + GroupExpression enforcer = + context.getRequiredProperties().getDistributionSpec().addEnforcer(groupExpression.getParent()); + + updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty); + + return newOutputProperty; + } + + private void updateCostWithEnforcer(GroupExpression enforcer, + PhysicalProperties oldOutputProperty, + PhysicalProperties newOutputProperty) { + context.getPlannerContext().getMemo().addEnforcerPlan(enforcer, groupExpression.getParent()); + curTotalCost += CostCalculator.calculateCost(enforcer); + + if (enforcer.updateLowestCostTable(newOutputProperty, Lists.newArrayList(oldOutputProperty), curTotalCost)) { + enforcer.putOutputPropertiesMap(newOutputProperty, newOutputProperty); + } + groupExpression.getParent().setBestPlan(enforcer, curTotalCost, newOutputProperty); + } + + private PhysicalProperties enforceSortAndDistribution(PhysicalProperties outputProperty, + PhysicalProperties requiredProperty) { + PhysicalProperties enforcedProperty; + if (requiredProperty.getDistributionSpec() + .equals(new GatherDistributionSpec())) { + enforcedProperty = enforceSort(outputProperty); + enforcedProperty = enforceDistribution(enforcedProperty); + } else { + enforcedProperty = enforceDistribution(outputProperty); + enforcedProperty = enforceSort(enforcedProperty); + } + + return enforcedProperty; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java similarity index 69% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java index 50899b7f31..e197ade368 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java @@ -18,18 +18,11 @@ package org.apache.doris.nereids.properties; /** - * Physical properties used in cascades. + * Re-shuffle. */ -public class PhysicalProperties { - private DistributionSpec distributionDesc; +public class GatherDistributionSpec extends DistributionSpec { - public PhysicalProperties() {} - - public DistributionSpec getDistributionDesc() { - return distributionDesc; - } - - public void setDistributionDesc(DistributionSpec distributionDesc) { - this.distributionDesc = distributionDesc; + public GatherDistributionSpec() { + super(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java index b9923db80a..7d6ee6eb8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java @@ -42,6 +42,15 @@ public class OrderKey { this.nullFirst = nullFirst; } + /** + * Whether other `OrderKey` is satisfied the current `OrderKey`. + * + * @param other another OrderKey. + */ + public boolean matches(OrderKey other) { + return expr.equals(other.expr) && isAsc == other.isAsc && nullFirst == other.nullFirst; + } + public Expression getExpr() { return expr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java new file mode 100644 index 0000000000..1d1c3777ae --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.properties; + +import org.apache.doris.nereids.memo.Group; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.operators.plans.physical.PhysicalHeapSort; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Spec of sort order. + */ +public class OrderSpec { + private final List<OrderKey> orderKeys; + + public OrderSpec(List<OrderKey> orderKeys) { + this.orderKeys = orderKeys; + } + + /** + * Whether other `OrderSpec` is satisfied the current `OrderSpec`. + * + * @param other another OrderSpec. + */ + public boolean meet(OrderSpec other) { + if (this.orderKeys.size() < other.getOrderKeys().size()) { + return false; + } + for (int i = 0; i < other.getOrderKeys().size(); ++i) { + if (!this.orderKeys.get(i).matches(other.getOrderKeys().get(i))) { + return false; + } + } + return true; + } + + public GroupExpression addEnforcer(Group child) { + return new GroupExpression( + new PhysicalHeapSort(orderKeys, -1, 0), + Lists.newArrayList(child) + ); + } + + public List<OrderKey> getOrderKeys() { + return orderKeys; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java new file mode 100644 index 0000000000..681500ed69 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.properties; + +import org.apache.doris.nereids.PlanContext; +import org.apache.doris.nereids.jobs.JobContext; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.operators.Operator; +import org.apache.doris.nereids.operators.OperatorVisitor; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Used for parent property drive. + */ +public class ParentRequiredPropertyDeriver extends OperatorVisitor<Void, PlanContext> { + + PhysicalProperties requestPropertyFromParent; + List<List<PhysicalProperties>> requiredPropertyListList; + + public ParentRequiredPropertyDeriver(JobContext context) { + this.requestPropertyFromParent = context.getRequiredProperties(); + } + + public List<List<PhysicalProperties>> getRequiredPropertyListList(GroupExpression groupExpression) { + requiredPropertyListList = Lists.newArrayList(); + groupExpression.getOperator().accept(this, new PlanContext(groupExpression)); + return requiredPropertyListList; + } + + @Override + public Void visitOperator(Operator operator, PlanContext context) { + List<PhysicalProperties> requiredPropertyList = Lists.newArrayList(); + for (int i = 0; i < context.getGroupExpression().arity(); i++) { + requiredPropertyList.add(new PhysicalProperties()); + } + requiredPropertyListList.add(requiredPropertyList); + return null; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java index 50899b7f31..75b02d7945 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java @@ -19,17 +19,40 @@ package org.apache.doris.nereids.properties; /** * Physical properties used in cascades. + * TODO(wj): Do we need to `PhysicalPropertySpec` Interface like NoisePage? */ public class PhysicalProperties { - private DistributionSpec distributionDesc; + private OrderSpec orderSpec; - public PhysicalProperties() {} + private DistributionSpec distributionSpec; - public DistributionSpec getDistributionDesc() { - return distributionDesc; + public PhysicalProperties() { } - public void setDistributionDesc(DistributionSpec distributionDesc) { - this.distributionDesc = distributionDesc; + public PhysicalProperties(DistributionSpec distributionSpec, OrderSpec orderSpec) { + this.distributionSpec = distributionSpec; + this.orderSpec = orderSpec; + } + + public boolean meet(PhysicalProperties other) { + // TODO: handle distributionSpec meet() + return orderSpec.meet(other.orderSpec) && distributionSpec.meet(other.distributionSpec); + } + + + public OrderSpec getOrderSpec() { + return orderSpec; + } + + public void setOrderSpec(OrderSpec orderSpec) { + this.orderSpec = orderSpec; + } + + public DistributionSpec getDistributionSpec() { + return distributionSpec; + } + + public void setDistributionSpec(DistributionSpec distributionSpec) { + this.distributionSpec = distributionSpec; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org