This is an automated email from the ASF dual-hosted git repository. lingmiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 25b9d6eba2 [feature](nereids) Plan Translator (#9993) 25b9d6eba2 is described below commit 25b9d6eba24480993941e7b32062279e325dc4fb Author: Kikyou1997 <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Tue Jun 14 19:39:55 2022 +0800 [feature](nereids) Plan Translator (#9993) Issue Number: close #9621 Add following physical operator: PhysicalAgg PhysicalSort PhysicalHashJoin Add basic logic of plan translator 1. add new agg phase enum for nereids 2. remove the Analyzer from PlanContext.java 3. implement PlanTranslator::visitPhysicalFilter --- .../org/apache/doris/analysis/AggregateInfo.java | 15 +- .../org/apache/doris/analysis/DescriptorTable.java | 10 + .../java/org/apache/doris/analysis/SortInfo.java | 13 + .../apache/doris/nereids/PlanOperatorVisitor.java | 71 +++++ .../doris/nereids/operators/AbstractOperator.java | 27 ++ .../apache/doris/nereids/operators/Operator.java | 5 + .../doris/nereids/operators/OperatorType.java | 4 + .../doris/nereids/operators/plans/AggPhase.java | 53 ++++ .../doris/nereids/operators/plans/JoinType.java | 4 +- .../plans/physical/PhysicalAggregation.java | 87 ++++++ .../operators/plans/physical/PhysicalFilter.java | 8 + .../operators/plans/physical/PhysicalHashJoin.java | 62 ++++ .../operators/plans/physical/PhysicalOlapScan.java | 27 +- .../operators/plans/physical/PhysicalOperator.java | 2 + .../operators/plans/physical/PhysicalProject.java | 8 + .../operators/plans/physical/PhysicalScan.java | 14 +- .../operators/plans/physical/PhysicalSort.java | 77 +++++ ...ysicalProperties.java => DistributionSpec.java} | 22 +- ...alProperties.java => HashDistributionSpec.java} | 20 +- .../{PhysicalProperties.java => OrderKey.java} | 34 ++- .../nereids/properties/PhysicalProperties.java | 9 + ...Properties.java => RandomDistributionDesc.java} | 4 +- .../{Slot.java => ExpressionConverter.java} | 18 +- .../trees/expressions/FunctionCallExpression.java | 62 ++++ .../doris/nereids/trees/expressions/Slot.java | 11 + .../nereids/trees/expressions/SlotReference.java | 10 +- .../trees/plans/PhysicalPlanTranslator.java | 322 +++++++++++++++++++++ .../org/apache/doris/nereids/trees/plans/Plan.java | 1 + .../doris/nereids/trees/plans/PlanContext.java | 73 +++++ .../nereids/trees/plans/physical/PhysicalPlan.java | 1 + .../java/org/apache/doris/nereids/util/Utils.java | 15 + .../org/apache/doris/planner/AggregationNode.java | 10 + .../org/apache/doris/planner/HashJoinNode.java | 108 +++++-- 33 files changed, 1139 insertions(+), 68 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java index d2ea7443e9..298837d11e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java @@ -48,6 +48,7 @@ import java.util.List; * SELECT COUNT(*) FROM (SELECT DISTINCT a, b, ..., x, y, ...) GROUP BY x, y, ... * * The tree structure looks as follows: + * <pre> * - for non-distinct aggregation: * - aggInfo: contains the original aggregation functions and grouping exprs * - aggInfo.mergeAggInfo: contains the merging aggregation functions (grouping @@ -61,7 +62,7 @@ import java.util.List; * computation (grouping exprs are identical) * - aggInfo.2ndPhaseDistinctAggInfo.mergeAggInfo: contains the merging aggregate * functions for the phase 2 computation (grouping exprs are identical) - * + * </pre> * In general, merging aggregate computations are idempotent; in other words, * aggInfo.mergeAggInfo == aggInfo.mergeAggInfo.mergeAggInfo. * @@ -224,6 +225,17 @@ public final class AggregateInfo extends AggregateInfoBase { return result; } + /** + * Used by new optimizer. + */ + public static AggregateInfo create( + ArrayList<Expr> groupingExprs, ArrayList<FunctionCallExpr> aggExprs, + TupleDescriptor tupleDesc, TupleDescriptor intermediateTupleDesc, AggPhase phase) { + AggregateInfo result = new AggregateInfo(groupingExprs, aggExprs, phase); + result.outputTupleDesc = tupleDesc; + result.intermediateTupleDesc = intermediateTupleDesc; + return result; + } /** * estimate if functions contains multi distinct @@ -856,4 +868,5 @@ public final class AggregateInfo extends AggregateInfoBase { public List<Expr> getInputPartitionExprs() { return partitionExprs != null ? partitionExprs : groupingExprs; } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index 526c6e4806..f733e3a9a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -73,6 +73,16 @@ public class DescriptorTable { return result; } + /** + * Used by new optimizer. + */ + public SlotDescriptor addSlotDescriptor(TupleDescriptor d, int id) { + SlotDescriptor result = new SlotDescriptor(new SlotId(id), d); + d.addSlot(result); + slotDescs.put(result.getId(), result); + return result; + } + /** * Create copy of src with new id. The returned descriptor has its mem layout * computed. diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java index d57856b08b..fe287e3900 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java @@ -73,6 +73,19 @@ public class SortInfo { materializedOrderingExprs = Lists.newArrayList(); } + /** + * Used by new optimizer. + */ + public SortInfo(List<Expr> orderingExprs, + List<Boolean> isAscOrder, + List<Boolean> nullsFirstParams, + TupleDescriptor sortTupleDesc) { + this.orderingExprs = orderingExprs; + this.isAscOrder = isAscOrder; + this.nullsFirstParams = nullsFirstParams; + this.sortTupleDesc = sortTupleDesc; + } + /** * C'tor for cloning. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java new file mode 100644 index 0000000000..063f92aae7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids; + +import org.apache.doris.nereids.operators.Operator; +import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation; +import org.apache.doris.nereids.operators.plans.physical.PhysicalFilter; +import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.operators.plans.physical.PhysicalProject; +import org.apache.doris.nereids.operators.plans.physical.PhysicalSort; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; + +/** + * Base class for the processing of logical and physical plan. + * + * @param <R> Return type of each visit method. + * @param <C> Context type. + */ +@SuppressWarnings("rawtypes") +public abstract class PlanOperatorVisitor<R, C> { + + public abstract R visit(Plan<? extends Plan, ? extends Operator> plan, C context); + + public R visitPhysicalAggregationPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalAggregation> aggPlan, + C context) { + return null; + } + + public R visitPhysicalOlapScanPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan> olapScanPlan, + C context) { + return null; + } + + public R visitPhysicalSortPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalSort> sortPlan, + C context) { + return null; + } + + public R visitPhysicalHashJoinPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalHashJoin> hashJoinPlan, + C context) { + return null; + } + + public R visitPhysicalProject(PhysicalPlan<? extends PhysicalPlan, PhysicalProject> projectPlan, + C context) { + return null; + } + + public R visitPhysicalFilter(PhysicalPlan<? extends PhysicalPlan, PhysicalFilter> filterPlan, + C context) { + return null; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java index 54064a6f2f..137317cbeb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java @@ -17,6 +17,9 @@ package org.apache.doris.nereids.operators; +import org.apache.doris.nereids.PlanOperatorVisitor; +import org.apache.doris.nereids.trees.plans.Plan; + import java.util.Objects; /** @@ -24,13 +27,37 @@ import java.util.Objects; */ public abstract class AbstractOperator<TYPE extends AbstractOperator<TYPE>> implements Operator<TYPE> { protected final OperatorType type; + protected final long limited; public AbstractOperator(OperatorType type) { this.type = Objects.requireNonNull(type, "type can not be null"); + this.limited = -1; + } + + public AbstractOperator(OperatorType type, long limited) { + this.type = type; + this.limited = limited; } @Override public OperatorType getType() { return type; } + + /** + * Child operator should overwrite this method. + * for example: + * <code> + * visitor.visitPhysicalOlapScanPlan( + * (PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan>) plan, context); + * </code> + */ + public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) { + return null; + } + + public long getLimited() { + return limited; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java index bb2450ce35..efecb3a201 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java @@ -17,8 +17,10 @@ package org.apache.doris.nereids.operators; +import org.apache.doris.nereids.PlanOperatorVisitor; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.trees.TreeNode; +import org.apache.doris.nereids.trees.plans.Plan; /** * interface for all concrete operator. @@ -27,4 +29,7 @@ public interface Operator<TYPE extends Operator<TYPE>> { OperatorType getType(); <NODE_TYPE extends TreeNode> NODE_TYPE toTreeNode(GroupExpression groupExpression); + + public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context); + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java index b506839854..f1a183521f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java @@ -40,6 +40,10 @@ public enum OperatorType { PHYSICAL_PROJECT, PHYSICAL_FILTER, PHYSICAL_BROADCAST_HASH_JOIN, + PHYSICAL_AGGREGATION, + PHYSICAL_SORT, + PHYSICAL_HASH_JOIN, + PHYSICAL_EXCHANGE, // pattern ANY, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/AggPhase.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/AggPhase.java new file mode 100644 index 0000000000..c365b06d89 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/AggPhase.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.operators.plans; + +import org.apache.doris.analysis.AggregateInfo; + +/** + * Represents different phase of agg and map it to the + * enum of agg phase definition of stale optimizer. + */ +public enum AggPhase { + FIRST("FIRST", AggregateInfo.AggPhase.FIRST), + FIRST_MERGE("FIRST_MERGE", AggregateInfo.AggPhase.FIRST_MERGE), + SECOND("SECOND", AggregateInfo.AggPhase.SECOND), + SECOND_MERGE("SECOND_MERGE", AggregateInfo.AggPhase.SECOND_MERGE); + + private final String name; + + private final AggregateInfo.AggPhase execAggPhase; + + AggPhase(String name, AggregateInfo.AggPhase execAggPhase) { + this.name = name; + this.execAggPhase = execAggPhase; + } + + public boolean isMerge() { + return this == FIRST_MERGE || this == SECOND_MERGE; + } + + public AggregateInfo.AggPhase toExec() { + return this.execAggPhase; + } + + @Override + public String toString() { + return name; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java index f22634d649..9e2b8d41e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java @@ -59,7 +59,7 @@ public enum JoinType { * @return legacy join type in Doris * @throws AnalysisException throw this exception when input join type cannot convert to legacy join type in Doris */ - public static JoinOperator toJoinOperator(JoinType joinType) throws AnalysisException { + public static JoinOperator toJoinOperator(JoinType joinType) { switch (joinType) { case INNER_JOIN: return JoinOperator.INNER_JOIN; @@ -80,7 +80,7 @@ public enum JoinType { case CROSS_JOIN: return JoinOperator.CROSS_JOIN; default: - throw new AnalysisException("Not support join operator: " + joinType.name()); + throw new RuntimeException("Unexpected join operator: " + joinType.name()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java new file mode 100644 index 0000000000..feb72bd837 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.operators.plans.physical; + +import org.apache.doris.nereids.PlanOperatorVisitor; +import org.apache.doris.nereids.operators.OperatorType; +import org.apache.doris.nereids.operators.plans.AggPhase; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; + +import java.util.List; + +/** + * Physical aggregation plan operator. + */ +public class PhysicalAggregation extends PhysicalUnaryOperator<PhysicalAggregation, PhysicalPlan> { + + private final List<Expression> groupByExprList; + + private final List<Expression> aggExprList; + + private final List<Expression> partitionExprList; + + private final AggPhase aggPhase; + + private final boolean usingStream; + + /** + * Constructor of PhysicalAggNode. + * + * @param groupByExprList group by expr list. + * @param aggExprList agg expr list. + * @param partitionExprList partition expr list, used for analytic agg. + * @param usingStream whether it's stream agg. + */ + public PhysicalAggregation(List<Expression> groupByExprList, List<Expression> aggExprList, + List<Expression> partitionExprList, AggPhase aggPhase, boolean usingStream) { + super(OperatorType.PHYSICAL_AGGREGATION); + this.groupByExprList = groupByExprList; + this.aggExprList = aggExprList; + this.partitionExprList = partitionExprList; + this.aggPhase = aggPhase; + this.usingStream = usingStream; + } + + public List<Expression> getGroupByExprList() { + return groupByExprList; + } + + public List<Expression> getAggExprList() { + return aggExprList; + } + + public AggPhase getAggPhase() { + return aggPhase; + } + + public boolean isUsingStream() { + return usingStream; + } + + public List<Expression> getPartitionExprList() { + return partitionExprList; + } + + @Override + public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) { + return visitor.visitPhysicalAggregationPlan( + (PhysicalPlan<? extends PhysicalPlan, PhysicalAggregation>) plan, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java index e3a8229b90..f62b69e3ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java @@ -17,9 +17,11 @@ package org.apache.doris.nereids.operators.plans.physical; +import org.apache.doris.nereids.PlanOperatorVisitor; import org.apache.doris.nereids.operators.OperatorType; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import java.util.Objects; @@ -50,4 +52,10 @@ public class PhysicalFilter<INPUT_TYPE extends Plan> } return "Filter (" + cond + ")"; } + + @Override + public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) { + return visitor.visitPhysicalFilter((PhysicalPlan<? extends PhysicalPlan, PhysicalFilter>) plan, + context); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java new file mode 100644 index 0000000000..41785dbab8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.operators.plans.physical; + +import org.apache.doris.nereids.PlanOperatorVisitor; +import org.apache.doris.nereids.operators.OperatorType; +import org.apache.doris.nereids.operators.plans.JoinType; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; + +/** + * Physical hash join plan operator. + */ +public class PhysicalHashJoin extends PhysicalBinaryOperator<PhysicalHashJoin, PhysicalPlan, PhysicalPlan> { + + private final JoinType joinType; + + private final Expression predicate; + + /** + * Constructor of PhysicalHashJoinNode. + * + * @param joinType Which join type, left semi join, inner join... + * @param predicate join condition. + */ + public PhysicalHashJoin(JoinType joinType, Expression predicate) { + super(OperatorType.PHYSICAL_HASH_JOIN); + this.joinType = joinType; + this.predicate = predicate; + } + + public JoinType getJoinType() { + return joinType; + } + + public Expression getPredicate() { + return predicate; + } + + @Override + public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) { + return visitor.visitPhysicalHashJoinPlan( + (PhysicalPlan<? extends PhysicalPlan, PhysicalHashJoin>) plan, context); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java index 62d9b3babc..c56ae50c81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java @@ -19,7 +19,10 @@ package org.apache.doris.nereids.operators.plans.physical; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; +import org.apache.doris.nereids.PlanOperatorVisitor; import org.apache.doris.nereids.operators.OperatorType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import com.clearspring.analytics.util.Lists; import org.apache.commons.lang3.StringUtils; @@ -34,6 +37,8 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> { private final List<Long> selectedTabletId; private final List<Long> selectedPartitionId; + private final OlapTable olapTable; + /** * Constructor for PhysicalOlapScan. * @@ -41,7 +46,8 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> { * @param qualifier table's name */ public PhysicalOlapScan(OlapTable olapTable, List<String> qualifier) { - super(OperatorType.PHYSICAL_OLAP_SCAN, olapTable, qualifier); + super(OperatorType.PHYSICAL_OLAP_SCAN, qualifier); + this.olapTable = olapTable; this.selectedIndexId = olapTable.getBaseIndexId(); this.selectedTabletId = Lists.newArrayList(); this.selectedPartitionId = olapTable.getPartitionIds(); @@ -62,12 +68,21 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> { return selectedPartitionId; } + public OlapTable getTable() { + return olapTable; + } + @Override public String toString() { - return "Scan Olap Table " + StringUtils.join(qualifier, ".") + "." + table.getName() - + " (selected index id: " + selectedTabletId - + ", selected partition ids: " + selectedPartitionId - + ", selected tablet ids: " + selectedTabletId - + ")"; + return "Scan Olap Table " + StringUtils.join(qualifier, ".") + "." + olapTable.getName() + + " (selected index id: " + selectedTabletId + ", selected partition ids: " + selectedPartitionId + + ", selected tablet ids: " + selectedTabletId + ")"; } + + @Override + public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) { + return visitor.visitPhysicalOlapScanPlan( + (PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan>) plan, context); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java index 4982ec342d..dd5aa79a9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java @@ -28,5 +28,7 @@ import java.util.List; * interface for all concrete physical operator. */ public interface PhysicalOperator<TYPE extends PhysicalOperator<TYPE>> extends PlanOperator<TYPE> { + List<Slot> computeOutputs(LogicalProperties logicalProperties, Plan... inputs); + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java index bd74fc813b..d5046083bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java @@ -17,9 +17,11 @@ package org.apache.doris.nereids.operators.plans.physical; +import org.apache.doris.nereids.PlanOperatorVisitor; import org.apache.doris.nereids.operators.OperatorType; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.commons.lang3.StringUtils; @@ -47,4 +49,10 @@ public class PhysicalProject<INPUT_TYPE extends Plan> public String toString() { return "Project (" + StringUtils.join(projects, ", ") + ")"; } + + @Override + public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) { + return visitor.visitPhysicalProject( + (PhysicalPlan<? extends PhysicalPlan, PhysicalProject>) plan, context); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java index 77ce5570b2..10d434c7e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java @@ -17,7 +17,6 @@ package org.apache.doris.nereids.operators.plans.physical; -import org.apache.doris.catalog.Table; import org.apache.doris.nereids.operators.OperatorType; import java.util.List; @@ -26,26 +25,19 @@ import java.util.Objects; /** * Abstract class for all physical scan operator. */ -public abstract class PhysicalScan<TYPE extends PhysicalScan<TYPE>> - extends PhysicalLeafOperator<TYPE> { +public abstract class PhysicalScan<TYPE extends PhysicalScan<TYPE>> extends PhysicalLeafOperator<TYPE> { + - protected final Table table; protected final List<String> qualifier; /** * Constructor for PhysicalScan. * * @param type node type - * @param table scan table * @param qualifier table's name */ - public PhysicalScan(OperatorType type, Table table, List<String> qualifier) { + public PhysicalScan(OperatorType type, List<String> qualifier) { super(type); - this.table = Objects.requireNonNull(table, "table can not be null"); this.qualifier = Objects.requireNonNull(qualifier, "qualifier can not be null"); } - - public Table getTable() { - return table; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java new file mode 100644 index 0000000000..1dcb3d6540 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.operators.plans.physical; + +import org.apache.doris.nereids.PlanOperatorVisitor; +import org.apache.doris.nereids.operators.OperatorType; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; + +import java.util.List; + +/** + * Physical sort plan operator. + */ +public class PhysicalSort extends PhysicalUnaryOperator<PhysicalSort, PhysicalPlan> { + + private final int offset; + + private final int limit; + + private final List<OrderKey> orderList; + + private final boolean useTopN; + + /** + * Constructor of PhysicalHashJoinNode. + */ + public PhysicalSort(int offset, int limit, List<OrderKey> orderList, boolean useTopN) { + super(OperatorType.PHYSICAL_SORT); + this.offset = offset; + this.limit = limit; + this.orderList = orderList; + this.useTopN = useTopN; + } + + public int getOffset() { + return offset; + } + + public int getLimit() { + return limit; + } + + public List<OrderKey> getOrderList() { + return orderList; + } + + public boolean isUseTopN() { + return useTopN; + } + + public boolean hasLimit() { + return limit > -1; + } + + @Override + public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) { + return visitor.visitPhysicalSortPlan((PhysicalPlan<? extends PhysicalPlan, PhysicalSort>) plan, + context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java similarity index 62% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java index fe0db693a5..4122de6901 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java @@ -17,9 +17,27 @@ package org.apache.doris.nereids.properties; +import org.apache.doris.planner.DataPartition; + /** - * Physical properties used in cascades. + * Base class for data distribution. */ -public class PhysicalProperties { +public class DistributionSpec { + + private DataPartition dataPartition; + + public DistributionSpec() { + } + + public DistributionSpec(DataPartition dataPartition) { + this.dataPartition = dataPartition; + } + + public DataPartition getDataPartition() { + return dataPartition; + } + public void setDataPartition(DataPartition dataPartition) { + this.dataPartition = dataPartition; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java similarity index 68% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java index fe0db693a5..ade20758ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java @@ -17,9 +17,25 @@ package org.apache.doris.nereids.properties; +import org.apache.doris.analysis.HashDistributionDesc; + /** - * Physical properties used in cascades. + * Describe hash distribution. */ -public class PhysicalProperties { +public class HashDistributionSpec extends DistributionSpec { + + /** + * Enums for concrete shuffle type. + */ + public enum ShuffleType { + COLOCATE, + BUCKET, + AGG, + NORMAL + } + + private ShuffleType shuffleType; + + private HashDistributionDesc hashDistributionDesc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java similarity index 54% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java index fe0db693a5..b01862dff6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java @@ -17,9 +17,39 @@ package org.apache.doris.nereids.properties; +import org.apache.doris.nereids.trees.expressions.Expression; + /** - * Physical properties used in cascades. + * Represents the order key of a statement. */ -public class PhysicalProperties { +public class OrderKey { + + private Expression expr; + + private boolean isAsc; + + private boolean nullFirst; + + /** + * Constructor of OrderKey. + * + * @param nullFirst True if "NULLS FIRST", false if "NULLS LAST", null if not specified. + */ + public OrderKey(Expression expr, boolean isAsc, boolean nullFirst) { + this.expr = expr; + this.isAsc = isAsc; + this.nullFirst = nullFirst; + } + + public Expression getExpr() { + return expr; + } + + public boolean isAsc() { + return isAsc; + } + public boolean isNullFirst() { + return nullFirst; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java index fe0db693a5..abe767c40e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java @@ -22,4 +22,13 @@ package org.apache.doris.nereids.properties; */ public class PhysicalProperties { + private DistributionSpec distributionDesc; + + public DistributionSpec getDistributionDesc() { + return distributionDesc; + } + + public void setDistributionDesc(DistributionSpec distributionDesc) { + this.distributionDesc = distributionDesc; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java similarity index 89% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java index fe0db693a5..f54ed12a19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java @@ -18,8 +18,8 @@ package org.apache.doris.nereids.properties; /** - * Physical properties used in cascades. + * Describe random distribution. */ -public class PhysicalProperties { +public class RandomDistributionDesc extends DistributionSpec { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ExpressionConverter.java similarity index 67% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ExpressionConverter.java index 16d23ffd98..40016180f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ExpressionConverter.java @@ -17,20 +17,18 @@ package org.apache.doris.nereids.trees.expressions; -import org.apache.doris.nereids.trees.NodeType; +import org.apache.doris.analysis.Expr; /** - * Abstract class for all slot in expression. + * Used to convert expression of new optimizer to stale expr. */ -public abstract class Slot<EXPR_TYPE extends Slot<EXPR_TYPE>> extends NamedExpression<EXPR_TYPE> - implements LeafExpression<EXPR_TYPE> { +public class ExpressionConverter { - public Slot(NodeType type) { - super(type); - } + public static ExpressionConverter converter = new ExpressionConverter(); - @Override - public Slot toSlot() { - return this; + // TODO: implement this, besides if expression is a slot, should set the slotId to + // converted the org.apache.doris.analysis.Expr + public Expr convert(Expression expression) { + return null; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/FunctionCallExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/FunctionCallExpression.java new file mode 100644 index 0000000000..88e4bc6ba5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/FunctionCallExpression.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions; + +import org.apache.doris.analysis.FunctionName; +import org.apache.doris.catalog.Function; +import org.apache.doris.nereids.trees.NodeType; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Temp definition of FunctionCallExpression. + */ +public class FunctionCallExpression extends Expression<FunctionCallExpression> { + + private FunctionName functionName; + + private List<Expression> params; + + private Function fn; + + /** + * Constructor of FunctionCallExpression. + */ + public FunctionCallExpression(FunctionName functionName, + Function fn, Expression... children) { + super(NodeType.EXPRESSION, children); + this.functionName = functionName; + this.params = Arrays.stream(children).collect(Collectors.toList()); + this.fn = fn; + } + + public FunctionName getFunctionName() { + return functionName; + } + + public List<Expression> getParams() { + return params; + } + + public Function getFn() { + return fn; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java index 16d23ffd98..3143b807f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java @@ -25,6 +25,13 @@ import org.apache.doris.nereids.trees.NodeType; public abstract class Slot<EXPR_TYPE extends Slot<EXPR_TYPE>> extends NamedExpression<EXPR_TYPE> implements LeafExpression<EXPR_TYPE> { + private int id; + + public Slot(NodeType type, int id, Expression... children) { + super(type, children); + this.id = id; + } + public Slot(NodeType type) { super(type); } @@ -33,4 +40,8 @@ public abstract class Slot<EXPR_TYPE extends Slot<EXPR_TYPE>> extends NamedExpre public Slot toSlot() { return this; } + + public int getId() { + return id; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java index f779b9eaba..acecad51ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids.trees.expressions; import org.apache.doris.catalog.Column; -import org.apache.doris.nereids.exceptions.UnboundException; import org.apache.doris.nereids.trees.NodeType; import org.apache.doris.nereids.types.DataType; @@ -80,12 +79,12 @@ public class SlotReference extends Slot<SlotReference> { } @Override - public DataType getDataType() throws UnboundException { + public DataType getDataType() { return dataType; } @Override - public boolean nullable() throws UnboundException { + public boolean nullable() { return nullable; } @@ -123,4 +122,9 @@ public class SlotReference extends Slot<SlotReference> { public int hashCode() { return Objects.hash(exprId, name, qualifier, nullable); } + + // TODO: return real org.apache.doris.catalog.Column + public Column getColumn() { + return null; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java new file mode 100644 index 0000000000..3c4384b671 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java @@ -0,0 +1,322 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SortInfo; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.nereids.PlanOperatorVisitor; +import org.apache.doris.nereids.operators.AbstractOperator; +import org.apache.doris.nereids.operators.Operator; +import org.apache.doris.nereids.operators.plans.JoinType; +import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation; +import org.apache.doris.nereids.operators.plans.physical.PhysicalFilter; +import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator; +import org.apache.doris.nereids.operators.plans.physical.PhysicalProject; +import org.apache.doris.nereids.operators.plans.physical.PhysicalSort; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.ExpressionConverter; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.AggregationNode; +import org.apache.doris.planner.CrossJoinNode; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.HashJoinNode; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.SortNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Used to translate to physical plan generated by new optimizer to the plan fragments. + */ +@SuppressWarnings("rawtypes") +public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> { + + public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan, + PlanContext context) { + visit(physicalPlan, context); + } + + @Override + public PlanFragment visit(Plan<? extends Plan, ? extends Operator> plan, PlanContext context) { + PhysicalOperator<?> operator = (PhysicalOperator<?>) plan.getOperator(); + return operator.accept(this, plan, context); + } + + /** + * Translate in following steps: + * 1. + * + */ + @Override + public PlanFragment visitPhysicalAggregationPlan( + PhysicalPlan<? extends PhysicalPlan, PhysicalAggregation> aggPlan, PlanContext context) { + + PlanFragment inputPlanFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) aggPlan.child(0), context); + + AggregationNode aggregationNode = null; + List<Slot> slotList = aggPlan.getOutput(); + TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null); + PhysicalAggregation physicalAggregation = (PhysicalAggregation) aggPlan.getOperator(); + AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase().toExec(); + + List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList(); + ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream() + .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> aggExpressionList = physicalAggregation.getAggExprList(); + // TODO: agg function could be other expr type either + ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + + List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList(); + List<Expr> execPartitionExpressions = partitionExpressionList.stream() + .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList()); + // todo: support DISTINCT + AggregateInfo aggInfo = null; + switch (phase) { + case FIRST: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + aggregationNode.unsetNeedsFinalize(); + aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream()); + aggregationNode.setIntermediateTuple(); + if (!partitionExpressionList.isEmpty()) { + inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions)); + } + break; + case FIRST_MERGE: + aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc, + outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE); + aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + break; + default: + throw new RuntimeException("Unsupported yet"); + } + inputPlanFragment.setPlanRoot(aggregationNode); + return inputPlanFragment; + } + + @Override + public PlanFragment visitPhysicalOlapScanPlan( + PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan> olapScanPlan, PlanContext context) { + // Create OlapScanNode + List<Slot> slotList = olapScanPlan.getOutput(); + PhysicalOlapScan physicalOlapScan = olapScanPlan.getOperator(); + OlapTable olapTable = physicalOlapScan.getTable(); + TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable); + OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName()); + // Create PlanFragment + PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM); + context.addPlanFragment(planFragment); + return planFragment; + } + + @Override + public PlanFragment visitPhysicalSortPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalSort> sortPlan, + PlanContext context) { + PlanFragment childFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) sortPlan.child(0), context); + PhysicalSort physicalSort = sortPlan.getOperator(); + if (!childFragment.isPartitioned()) { + return childFragment; + } + long limit = physicalSort.getLimit(); + + List<Expr> execOrderingExprList = Lists.newArrayList(); + List<Boolean> ascOrderList = Lists.newArrayList(); + List<Boolean> nullsFirstParamList = Lists.newArrayList(); + + List<OrderKey> orderKeyList = physicalSort.getOrderList(); + orderKeyList.forEach(k -> { + execOrderingExprList.add(ExpressionConverter.converter.convert(k.getExpr())); + ascOrderList.add(k.isAsc()); + nullsFirstParamList.add(k.isNullFirst()); + }); + + List<Slot> outputList = sortPlan.getOutput(); + TupleDescriptor tupleDesc = generateTupleDesc(outputList, context, null); + SortInfo sortInfo = new SortInfo(execOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc); + + PlanNode childNode = childFragment.getPlanRoot(); + SortNode sortNode = new SortNode(context.nextNodeId(), childNode, sortInfo, physicalSort.isUseTopN(), + physicalSort.hasLimit(), physicalSort.getOffset()); + + PlanFragment mergeFragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED, context); + ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot(); + exchNode.unsetLimit(); + if (physicalSort.hasLimit()) { + exchNode.setLimit(limit); + } + long offset = physicalSort.getOffset(); + exchNode.setMergeInfo(sortNode.getSortInfo(), offset); + + // Child nodes should not process the offset. If there is a limit, + // the child nodes need only return (offset + limit) rows. + SortNode childSortNode = (SortNode) childFragment.getPlanRoot(); + Preconditions.checkState(sortNode == childSortNode); + if (sortNode.hasLimit()) { + childSortNode.unsetLimit(); + childSortNode.setLimit(limit + offset); + } + childSortNode.setOffset(0); + return mergeFragment; + } + + // TODO: support broadcast join / co-locate / bucket shuffle join later + @Override + public PlanFragment visitPhysicalHashJoinPlan( + PhysicalPlan<? extends PhysicalPlan, PhysicalHashJoin> hashJoinPlan, PlanContext context) { + PlanFragment leftFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) hashJoinPlan.child(0), context); + PlanFragment rightFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) hashJoinPlan.child(0), context); + PhysicalHashJoin physicalHashJoin = hashJoinPlan.getOperator(); + Expression predicateExpr = physicalHashJoin.getPredicate(); + List<Expression> eqExprList = Utils.getEqConjuncts(hashJoinPlan.child(0).getOutput(), + hashJoinPlan.child(1).getOutput(), predicateExpr); + JoinType joinType = physicalHashJoin.getJoinType(); + + PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot(); + PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot(); + + if (joinType.equals(JoinType.CROSS_JOIN) + || physicalHashJoin.getJoinType().equals(JoinType.INNER_JOIN) && eqExprList.isEmpty()) { + CrossJoinNode crossJoinNode = new CrossJoinNode(context.nextNodeId(), leftFragment.getPlanRoot(), + rightFragment.getPlanRoot(), null); + crossJoinNode.setLimit(physicalHashJoin.getLimited()); + List<Expr> conjuncts = Utils.extractConjuncts(predicateExpr).stream() + .map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + crossJoinNode.addConjuncts(conjuncts); + ExchangeNode exchangeNode = new ExchangeNode(context.nextNodeId(), rightFragment.getPlanRoot(), false); + exchangeNode.setNumInstances(rightFragmentPlanRoot.getNumInstances()); + exchangeNode.setFragment(leftFragment); + leftFragmentPlanRoot.setChild(1, exchangeNode); + rightFragment.setDestination(exchangeNode); + crossJoinNode.setChild(0, leftFragment.getPlanRoot()); + leftFragment.setPlanRoot(crossJoinNode); + return leftFragment; + } + + List<Expression> expressionList = Utils.extractConjuncts(predicateExpr); + expressionList.removeAll(eqExprList); + List<Expr> execOtherConjunctList = expressionList.stream().map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + List<Expr> execEqConjunctList = eqExprList.stream().map(e -> ExpressionConverter.converter.convert(e)) + .collect(Collectors.toCollection(ArrayList::new)); + + HashJoinNode hashJoinNode = new HashJoinNode(context.nextNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot, + JoinType.toJoinOperator(physicalHashJoin.getJoinType()), execEqConjunctList, execOtherConjunctList); + + ExchangeNode leftExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false); + leftExch.setNumInstances(leftFragmentPlanRoot.getNumInstances()); + ExchangeNode rightExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false); + rightExch.setNumInstances(rightFragmentPlanRoot.getNumInstances()); + hashJoinNode.setChild(0, leftFragmentPlanRoot); + hashJoinNode.setChild(1, leftFragmentPlanRoot); + hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); + hashJoinNode.setLimit(physicalHashJoin.getLimited()); + leftFragment.setDestination((ExchangeNode) rightFragment.getPlanRoot()); + rightFragment.setDestination((ExchangeNode) leftFragmentPlanRoot); + return new PlanFragment(context.nextFragmentId(), hashJoinNode, leftFragment.getDataPartition()); + } + + @Override + public PlanFragment visitPhysicalProject( + PhysicalPlan<? extends PhysicalPlan, PhysicalProject> projectPlan, PlanContext context) { + return visit((PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) projectPlan.child(0), context); + } + + @Override + public PlanFragment visitPhysicalFilter(PhysicalPlan<? extends PhysicalPlan, PhysicalFilter> filterPlan, + PlanContext context) { + PlanFragment inputFragment = visit( + (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) filterPlan.child(0), context); + PlanNode planNode = inputFragment.getPlanRoot(); + PhysicalFilter filter = filterPlan.getOperator(); + Expression expression = filter.getPredicates(); + List<Expression> expressionList = Utils.extractConjuncts(expression); + expressionList.stream().map(ExpressionConverter.converter::convert).forEach(planNode::addConjunct); + return inputFragment; + } + + private TupleDescriptor generateTupleDesc(List<Slot> slotList, PlanContext context, Table table) { + TupleDescriptor tupleDescriptor = context.generateTupleDesc(); + tupleDescriptor.setTable(table); + for (Slot slot : slotList) { + SlotReference slotReference = (SlotReference) slot; + SlotDescriptor slotDescriptor = context.addSlotDesc(tupleDescriptor, slot.getId()); + slotDescriptor.setColumn(slotReference.getColumn()); + slotDescriptor.setType(slotReference.getDataType().toCatalogDataType()); + slotDescriptor.setIsMaterialized(true); + } + return tupleDescriptor; + } + + private PlanFragment createParentFragment(PlanFragment childFragment, DataPartition parentPartition, + PlanContext ctx) { + ExchangeNode exchangeNode = new ExchangeNode(ctx.nextNodeId(), childFragment.getPlanRoot(), false); + exchangeNode.setNumInstances(childFragment.getPlanRoot().getNumInstances()); + PlanFragment parentFragment = new PlanFragment(ctx.nextFragmentId(), exchangeNode, parentPartition); + childFragment.setDestination(exchangeNode); + childFragment.setOutputPartition(parentPartition); + return parentFragment; + } + + /** + * Helper function to eliminate unnecessary checked exception caught requirement from the main logic of translator. + * + * @param f function which would invoke the logic of + * stale code from old optimizer that could throw + * a checked exception + */ + public void exec(FuncWrapper f) { + try { + f.exec(); + } catch (Exception e) { + throw new RuntimeException("Unexpected Exception: ", e); + } + } + + private static interface FuncWrapper { + void exec() throws Exception; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java index c359a93ed7..77bf7ac625 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java @@ -44,4 +44,5 @@ public interface Plan< @Override Plan child(int index); + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java new file mode 100644 index 0000000000..fc43e5fa30 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans; + +import org.apache.doris.analysis.DescriptorTable; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.planner.PlanNodeId; + +import com.clearspring.analytics.util.Lists; + +import java.util.List; + +/** + * Context of physical plan. + */ +public class PlanContext { + private List<PlanFragment> planFragmentList = Lists.newArrayList(); + + private DescriptorTable descTable = new DescriptorTable(); + + + private final IdGenerator<PlanFragmentId> fragmentIdGenerator = PlanFragmentId.createGenerator(); + + private final IdGenerator<PlanNodeId> nodeIdGenerator = PlanNodeId.createGenerator(); + + public List<PlanFragment> getPlanFragmentList() { + return planFragmentList; + } + + public TupleDescriptor generateTupleDesc() { + return descTable.createTupleDescriptor(); + } + + public PlanNodeId nextNodeId() { + return nodeIdGenerator.getNextId(); + } + + public SlotDescriptor addSlotDesc(TupleDescriptor t) { + return descTable.addSlotDescriptor(t); + } + + public SlotDescriptor addSlotDesc(TupleDescriptor t, int id) { + return descTable.addSlotDescriptor(t, id); + } + + public PlanFragmentId nextFragmentId() { + return fragmentIdGenerator.getNextId(); + } + + public void addPlanFragment(PlanFragment planFragment) { + this.planFragmentList.add(planFragment); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java index ed03a95033..9301c294e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java @@ -35,4 +35,5 @@ public interface PhysicalPlan< @Override Plan child(int index); + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java index a01a867593..ff66e62191 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java @@ -17,6 +17,11 @@ package org.apache.doris.nereids.util; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; + +import java.util.List; + /** * Utils for Nereids. */ @@ -34,4 +39,14 @@ public class Utils { return part.replace("`", "``"); } } + + // TODO: implement later + public static List<Expression> getEqConjuncts(List<Slot> left, List<Slot> right, Expression eqExpr) { + return null; + } + + // TODO: implement later + public static List<Expression> extractConjuncts(Expression expr) { + return null; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index fb16bdebe6..ac7c7fee96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -103,6 +103,16 @@ public class AggregationNode extends PlanNode { && aggInfo.getGroupingExprs().size() > 0; } + // Used by new optimizer + public void setNeedsFinalize(boolean needsFinalize) { + this.needsFinalize = needsFinalize; + } + + // Used by new optimizer + public void setUseStreamingPreagg(boolean useStreamingPreagg) { + this.useStreamingPreagg = useStreamingPreagg; + } + @Override public void setCompactData(boolean on) { this.compactData = on; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 706a127c0a..0bc86e724a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -67,7 +67,7 @@ import java.util.stream.Collectors; public class HashJoinNode extends PlanNode { private final static Logger LOG = LogManager.getLogger(HashJoinNode.class); - private final TableRef innerRef; + private TableRef innerRef; private final JoinOperator joinOp; // predicates of the form 'a=b' or 'a<=>b' private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList(); @@ -84,8 +84,11 @@ public class HashJoinNode extends PlanNode { private List<SlotId> hashOutputSlotIds; - public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef, - List<Expr> eqJoinConjuncts, List<Expr> otherJoinConjuncts) { + /** + * Constructor of HashJoinNode. + */ + public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef, List<Expr> eqJoinConjuncts, + List<Expr> otherJoinConjuncts) { super(id, "HASH JOIN", NodeType.HASH_JOIN_NODE); Preconditions.checkArgument(eqJoinConjuncts != null && !eqJoinConjuncts.isEmpty()); Preconditions.checkArgument(otherJoinConjuncts != null); @@ -140,6 +143,63 @@ public class HashJoinNode extends PlanNode { } } + /** + * This constructor is used by new optimizer. + */ + public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator joinOp, List<Expr> eqJoinConjuncts, + List<Expr> otherJoinConjuncts) { + super(id, "HASH JOIN", NodeType.HASH_JOIN_NODE); + Preconditions.checkArgument(eqJoinConjuncts != null && !eqJoinConjuncts.isEmpty()); + Preconditions.checkArgument(otherJoinConjuncts != null); + tblRefIds.addAll(outer.getTblRefIds()); + tblRefIds.addAll(inner.getTblRefIds()); + this.joinOp = joinOp; + // TODO: Support not vec exec engine cut unless tupleid in semi/anti join + if (VectorizedUtil.isVectorized()) { + if (joinOp.equals(JoinOperator.LEFT_ANTI_JOIN) || joinOp.equals(JoinOperator.LEFT_SEMI_JOIN) + || joinOp.equals(JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN)) { + tupleIds.addAll(outer.getTupleIds()); + } else if (joinOp.equals(JoinOperator.RIGHT_ANTI_JOIN) || joinOp.equals(JoinOperator.RIGHT_SEMI_JOIN)) { + tupleIds.addAll(inner.getTupleIds()); + } else { + tupleIds.addAll(outer.getTupleIds()); + tupleIds.addAll(inner.getTupleIds()); + } + } else { + tupleIds.addAll(outer.getTupleIds()); + tupleIds.addAll(inner.getTupleIds()); + } + + for (Expr eqJoinPredicate : eqJoinConjuncts) { + Preconditions.checkArgument(eqJoinPredicate instanceof BinaryPredicate); + BinaryPredicate eqJoin = (BinaryPredicate) eqJoinPredicate; + if (eqJoin.getOp().equals(BinaryPredicate.Operator.EQ_FOR_NULL)) { + Preconditions.checkArgument(eqJoin.getChildren().size() == 2); + if (!eqJoin.getChild(0).isNullable() || !eqJoin.getChild(1).isNullable()) { + eqJoin.setOp(BinaryPredicate.Operator.EQ); + } + } + this.eqJoinConjuncts.add(eqJoin); + } + this.distrMode = DistributionMode.NONE; + this.otherJoinConjuncts = otherJoinConjuncts; + children.add(outer); + children.add(inner); + + // Inherits all the nullable tuple from the children + // Mark tuples that form the "nullable" side of the outer join as nullable. + nullableTupleIds.addAll(inner.getNullableTupleIds()); + nullableTupleIds.addAll(outer.getNullableTupleIds()); + if (joinOp.equals(JoinOperator.FULL_OUTER_JOIN)) { + nullableTupleIds.addAll(outer.getTupleIds()); + nullableTupleIds.addAll(inner.getTupleIds()); + } else if (joinOp.equals(JoinOperator.LEFT_OUTER_JOIN)) { + nullableTupleIds.addAll(inner.getTupleIds()); + } else if (joinOp.equals(JoinOperator.RIGHT_OUTER_JOIN)) { + nullableTupleIds.addAll(outer.getTupleIds()); + } + } + public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts; } @@ -176,7 +236,7 @@ public class HashJoinNode extends PlanNode { /** * Calculate the slots output after going through the hash table in the hash join node. * The most essential difference between 'hashOutputSlots' and 'outputSlots' is that - * it's output needs to contain other conjunct and conjunct columns. + * it's output needs to contain other conjunct and conjunct columns. * hash output slots = output slots + conjunct slots + other conjunct slots * For example: * select b.k1 from test.t1 a right join test.t1 b on a.k1=b.k1 and b.k2>1 where a.k2>1; @@ -185,6 +245,7 @@ public class HashJoinNode extends PlanNode { * conjuncts: b.k2>1 * hash output slots: a.k2, b.k2, b.k1 * eq conjuncts: a.k1=b.k1 + * * @param slotIdList */ private void initHashOutputSlotIds(List<SlotId> slotIdList) { @@ -204,8 +265,8 @@ public class HashJoinNode extends PlanNode { outputSlotIds = Lists.newArrayList(); for (TupleId tupleId : tupleIds) { for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getSlots()) { - if (slotDescriptor.isMaterialized() - && (requiredSlotIdSet == null || requiredSlotIdSet.contains(slotDescriptor.getId()))) { + if (slotDescriptor.isMaterialized() && (requiredSlotIdSet == null || requiredSlotIdSet.contains( + slotDescriptor.getId()))) { outputSlotIds.add(slotDescriptor.getId()); } } @@ -244,13 +305,11 @@ public class HashJoinNode extends PlanNode { computeStats(analyzer); ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap(); - List<Expr> newEqJoinConjuncts = - Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false); - eqJoinConjuncts = newEqJoinConjuncts.stream() - .map(entity -> (BinaryPredicate) entity).collect(Collectors.toList()); + List<Expr> newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false); + eqJoinConjuncts = newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) entity) + .collect(Collectors.toList()); assignedConjuncts = analyzer.getAssignedConjuncts(); - otherJoinConjuncts = - Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false); + otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false); } private void replaceOutputSmapForOuterJoin() { @@ -286,8 +345,7 @@ public class HashJoinNode extends PlanNode { private final SlotDescriptor lhs; private final SlotDescriptor rhs; - private EqJoinConjunctScanSlots(Expr eqJoinConjunct, SlotDescriptor lhs, - SlotDescriptor rhs) { + private EqJoinConjunctScanSlots(Expr eqJoinConjunct, SlotDescriptor lhs, SlotDescriptor rhs) { this.eqJoinConjunct = eqJoinConjunct; this.lhs = lhs; this.rhs = rhs; @@ -359,8 +417,7 @@ public class HashJoinNode extends PlanNode { */ public static Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> groupByJoinedTupleIds( List<EqJoinConjunctScanSlots> eqJoinConjunctSlots) { - Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> scanSlotsByJoinedTids = - new LinkedHashMap<>(); + Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> scanSlotsByJoinedTids = new LinkedHashMap<>(); for (EqJoinConjunctScanSlots slots : eqJoinConjunctSlots) { Pair<TupleId, TupleId> tids = Pair.create(slots.lhsTid(), slots.rhsTid()); List<EqJoinConjunctScanSlots> scanSlots = scanSlotsByJoinedTids.get(tids); @@ -420,7 +477,8 @@ public class HashJoinNode extends PlanNode { * - we adjust the NDVs from both sides to account for predicates that may * might have reduce the cardinality and NDVs */ - private long getGenericJoinCardinality(List<EqJoinConjunctScanSlots> eqJoinConjunctSlots, long lhsCard, long rhsCard) { + private long getGenericJoinCardinality(List<EqJoinConjunctScanSlots> eqJoinConjunctSlots, long lhsCard, + long rhsCard) { Preconditions.checkState(joinOp.isInnerJoin() || joinOp.isOuterJoin()); Preconditions.checkState(!eqJoinConjunctSlots.isEmpty()); Preconditions.checkState(lhsCard >= 0 && rhsCard >= 0); @@ -531,8 +589,8 @@ public class HashJoinNode extends PlanNode { // FK/PK join (which doesn't alter the cardinality of the left-hand side) cardinality = getChild(0).cardinality; } else { - cardinality = Math.round((double) getChild(0).cardinality * (double) getChild( - 1).cardinality / (double) maxNumDistinct); + cardinality = Math.round( + (double) getChild(0).cardinality * (double) getChild(1).cardinality / (double) maxNumDistinct); LOG.debug("lhs card: {}, rhs card: {}", getChild(0).cardinality, getChild(1).cardinality); } LOG.debug("stats HashJoin: cardinality {}", cardinality); @@ -585,8 +643,7 @@ public class HashJoinNode extends PlanNode { // Return -1 if the cardinality of the returned side is unknown. long cardinality; - if (joinOp == JoinOperator.RIGHT_SEMI_JOIN - || joinOp == JoinOperator.RIGHT_ANTI_JOIN) { + if (joinOp == JoinOperator.RIGHT_SEMI_JOIN || joinOp == JoinOperator.RIGHT_ANTI_JOIN) { if (getChild(1).cardinality == -1) { return -1; } @@ -640,8 +697,8 @@ public class HashJoinNode extends PlanNode { @Override protected String debugString() { - return MoreObjects.toStringHelper(this).add("eqJoinConjuncts", - eqJoinConjunctsDebugString()).addValue(super.debugString()).toString(); + return MoreObjects.toStringHelper(this).add("eqJoinConjuncts", eqJoinConjunctsDebugString()) + .addValue(super.debugString()).toString(); } private String eqJoinConjunctsDebugString() { @@ -750,10 +807,7 @@ public class HashJoinNode extends PlanNode { } public enum DistributionMode { - NONE("NONE"), - BROADCAST("BROADCAST"), - PARTITIONED("PARTITIONED"), - BUCKET_SHUFFLE("BUCKET_SHUFFLE"); + NONE("NONE"), BROADCAST("BROADCAST"), PARTITIONED("PARTITIONED"), BUCKET_SHUFFLE("BUCKET_SHUFFLE"); private final String description; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org