This is an automated email from the ASF dual-hosted git repository. jakevin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1121e7d0c3 [feature](Nereids): pushdown distinct through join. (#21437) 1121e7d0c3 is described below commit 1121e7d0c35285673890169e8975bc91e5d7b0da Author: jakevin <jakevin...@gmail.com> AuthorDate: Wed Jul 5 15:55:21 2023 +0800 [feature](Nereids): pushdown distinct through join. (#21437) --- .../doris/nereids/jobs/executor/Rewriter.java | 4 + .../org/apache/doris/nereids/rules/RuleType.java | 2 + .../analysis/ProjectWithDistinctToAggregate.java | 2 +- .../rules/rewrite/InferSetOperatorDistinct.java | 2 +- .../rules/rewrite/PushdownDistinctThroughJoin.java | 92 ++++++++++++++++++++++ .../trees/plans/logical/LogicalAggregate.java | 68 ++++++++++------ .../rewrite/PushdownDistinctThroughJoinTest.java | 84 ++++++++++++++++++++ .../doris/nereids/util/LogicalPlanBuilder.java | 13 ++- 8 files changed, 242 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index a97fc4a2d4..97af7cc14d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -239,6 +239,10 @@ public class Rewriter extends AbstractBatchJobExecutor { topDown(new BuildAggForUnion()) ), + // topic("Distinct", + // costBased(custom(RuleType.PUSH_DOWN_DISTINCT_THROUGH_JOIN, PushdownDistinctThroughJoin::new)) + // ), + topic("Window optimization", topDown( new PushdownLimit(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index d70b45d1f0..9427296d34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -147,6 +147,8 @@ public enum RuleType { PUSHDOWN_FILTER_THROUGH_CTE(RuleTypeClass.REWRITE), PUSHDOWN_FILTER_THROUGH_CTE_ANCHOR(RuleTypeClass.REWRITE), + PUSH_DOWN_DISTINCT_THROUGH_JOIN(RuleTypeClass.REWRITE), + COLUMN_PRUNING(RuleTypeClass.REWRITE), PUSHDOWN_TOP_N_THROUGH_PROJECTION_WINDOW(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ProjectWithDistinctToAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ProjectWithDistinctToAggregate.java index 230eef9e1f..f858820d61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ProjectWithDistinctToAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ProjectWithDistinctToAggregate.java @@ -47,7 +47,7 @@ public class ProjectWithDistinctToAggregate extends OneAnalysisRuleFactory { logicalProject() .when(LogicalProject::isDistinct) .whenNot(project -> project.getProjects().stream().anyMatch(this::hasAggregateFunction)) - .then(project -> new LogicalAggregate<>(project.getProjects(), project.child())) + .then(project -> new LogicalAggregate<>(project.getProjects(), false, project.child())) ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InferSetOperatorDistinct.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InferSetOperatorDistinct.java index 8b9f46408f..9bb24869e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InferSetOperatorDistinct.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InferSetOperatorDistinct.java @@ -50,7 +50,7 @@ public class InferSetOperatorDistinct extends OneRewriteRuleFactory { } List<Plan> newChildren = setOperation.children().stream() - .map(child -> new LogicalAggregate<>(ImmutableList.copyOf(child.getOutput()), child)) + .map(child -> new LogicalAggregate<>(ImmutableList.copyOf(child.getOutput()), true, child)) .collect(ImmutableList.toImmutableList()); if (newChildren.equals(setOperation.children())) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownDistinctThroughJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownDistinctThroughJoin.java new file mode 100644 index 0000000000..7cfc9f35aa --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownDistinctThroughJoin.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.nereids.jobs.JobContext; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter; +import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; + +import com.google.common.collect.ImmutableList; + +import java.util.function.Function; + +/** + * PushdownDistinctThroughJoin + */ +public class PushdownDistinctThroughJoin extends DefaultPlanRewriter<JobContext> implements CustomRewriter { + @Override + public Plan rewriteRoot(Plan plan, JobContext context) { + return plan.accept(this, context); + } + + @Override + public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> agg, JobContext context) { + agg = visitChildren(this, agg, context); + if (agg.hasPushed() || !agg.isDistinct() || isLeaf(agg.child())) { + return agg; + } + + // After we push down distinct, if this distinct is generated, we will eliminate this distinct + if (agg.isGenerated()) { + return skipProjectPushDistinct(agg.child()); + } else { + return agg.withChildren(skipProjectPushDistinct(agg.child())); + } + } + + private Plan skipProjectPushDistinct(Plan plan) { + if (plan instanceof LogicalProject) { + LogicalProject project = (LogicalProject) plan; + Plan pushJoin = pushDistinct((LogicalJoin<? extends Plan, ? extends Plan>) project.child()); + return project.withChildren(ImmutableList.of(pushJoin)); + } else { + Plan pushJoin = pushDistinct((LogicalJoin<? extends Plan, ? extends Plan>) plan); + return pushJoin; + } + } + + private Plan pushDistinct(LogicalJoin<? extends Plan, ? extends Plan> join) { + Function<Plan, Plan> pushChild = (Plan plan) -> { + if (isLeaf(plan)) { + return withDistinct(plan); + } else { + // Due to there isn't statistics during Rewrite, so we just push down through 1 join. + // return skipProjectPushDistinct(plan); + return withDistinct(plan); + } + }; + Plan left = pushChild.apply(join.left()); + Plan right = pushChild.apply(join.right()); + return join.withChildren(ImmutableList.of(left, right)); + } + + private Plan withDistinct(Plan plan) { + return new LogicalAggregate<>(ImmutableList.copyOf(plan.getOutput()), true, true, plan); + } + + private boolean isLeaf(Plan plan) { + if (plan instanceof LogicalProject && ((LogicalProject<?>) plan).isAllSlots()) { + plan = plan.child(0); + } + return !(plan instanceof LogicalJoin); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java index d6136a0812..8056e03c42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java @@ -56,14 +56,16 @@ public class LogicalAggregate<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements Aggregate<CHILD_TYPE> { - private final boolean normalized; private final List<Expression> groupByExpressions; private final List<NamedExpression> outputExpressions; // When there are grouping sets/rollup/cube, LogicalAgg is generated by LogicalRepeat. private final Optional<LogicalRepeat> sourceRepeat; + private final boolean normalized; private final boolean ordinalIsResolved; + private final boolean generated; + private final boolean hasPushed; /** * Desc: Constructor for LogicalAggregate. @@ -79,13 +81,20 @@ public class LogicalAggregate<CHILD_TYPE extends Plan> /** * Distinct Agg */ - public LogicalAggregate(List<NamedExpression> namedExpressions, CHILD_TYPE child) { - this(ImmutableList.copyOf(namedExpressions), namedExpressions, false, Optional.empty(), child); + public LogicalAggregate(List<NamedExpression> namedExpressions, boolean generated, CHILD_TYPE child) { + this(ImmutableList.copyOf(namedExpressions), namedExpressions, false, true, generated, false, Optional.empty(), + Optional.empty(), Optional.empty(), child); + } + + public LogicalAggregate(List<NamedExpression> namedExpressions, boolean generated, boolean hasPushed, + CHILD_TYPE child) { + this(ImmutableList.copyOf(namedExpressions), namedExpressions, false, true, generated, hasPushed, + Optional.empty(), Optional.empty(), Optional.empty(), child); } public LogicalAggregate(List<Expression> groupByExpressions, List<NamedExpression> outputExpressions, boolean ordinalIsResolved, CHILD_TYPE child) { - this(groupByExpressions, outputExpressions, false, ordinalIsResolved, Optional.empty(), + this(groupByExpressions, outputExpressions, false, ordinalIsResolved, false, false, Optional.empty(), Optional.empty(), Optional.empty(), child); } @@ -107,18 +116,20 @@ public class LogicalAggregate<CHILD_TYPE extends Plan> boolean normalized, Optional<LogicalRepeat> sourceRepeat, CHILD_TYPE child) { - this(groupByExpressions, outputExpressions, normalized, false, sourceRepeat, + this(groupByExpressions, outputExpressions, normalized, false, false, false, sourceRepeat, Optional.empty(), Optional.empty(), child); } /** * Whole parameters constructor for LogicalAggregate. */ - public LogicalAggregate( + private LogicalAggregate( List<Expression> groupByExpressions, List<NamedExpression> outputExpressions, boolean normalized, boolean ordinalIsResolved, + boolean generated, + boolean hasPushed, Optional<LogicalRepeat> sourceRepeat, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, @@ -128,6 +139,8 @@ public class LogicalAggregate<CHILD_TYPE extends Plan> this.outputExpressions = ImmutableList.copyOf(outputExpressions); this.normalized = normalized; this.ordinalIsResolved = ordinalIsResolved; + this.generated = generated; + this.hasPushed = hasPushed; this.sourceRepeat = Objects.requireNonNull(sourceRepeat, "sourceRepeat cannot be null"); } @@ -151,6 +164,18 @@ public class LogicalAggregate<CHILD_TYPE extends Plan> return sourceRepeat.isPresent(); } + public boolean isDistinct() { + return outputExpressions.equals(groupByExpressions); + } + + public boolean isGenerated() { + return generated; + } + + public boolean hasPushed() { + return hasPushed; + } + @Override public String toString() { return Utils.toSqlString("LogicalAggregate[" + id.asInt() + "]", @@ -203,6 +228,7 @@ public class LogicalAggregate<CHILD_TYPE extends Plan> && Objects.equals(outputExpressions, that.outputExpressions) && normalized == that.normalized && ordinalIsResolved == that.ordinalIsResolved + && generated == that.generated && Objects.equals(sourceRepeat, that.sourceRepeat); } @@ -214,28 +240,26 @@ public class LogicalAggregate<CHILD_TYPE extends Plan> @Override public LogicalAggregate<Plan> withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalAggregate<>(groupByExpressions, outputExpressions, - normalized, ordinalIsResolved, sourceRepeat, Optional.empty(), Optional.empty(), children.get(0)); + return new LogicalAggregate<>(groupByExpressions, outputExpressions, normalized, ordinalIsResolved, generated, + hasPushed, sourceRepeat, Optional.empty(), Optional.empty(), children.get(0)); } @Override public LogicalAggregate<Plan> withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalAggregate<>(groupByExpressions, outputExpressions, - normalized, ordinalIsResolved, sourceRepeat, groupExpression, Optional.of(getLogicalProperties()), - children.get(0)); + return new LogicalAggregate<>(groupByExpressions, outputExpressions, normalized, ordinalIsResolved, generated, + hasPushed, sourceRepeat, groupExpression, Optional.of(getLogicalProperties()), children.get(0)); } @Override public LogicalAggregate<Plan> withLogicalProperties(Optional<LogicalProperties> logicalProperties) { - return new LogicalAggregate<>(groupByExpressions, outputExpressions, - normalized, ordinalIsResolved, sourceRepeat, - Optional.empty(), logicalProperties, children.get(0)); + return new LogicalAggregate<>(groupByExpressions, outputExpressions, normalized, ordinalIsResolved, generated, + hasPushed, sourceRepeat, Optional.empty(), logicalProperties, children.get(0)); } public LogicalAggregate<Plan> withGroupByAndOutput(List<Expression> groupByExprList, List<NamedExpression> outputExpressionList) { - return new LogicalAggregate<>(groupByExprList, outputExpressionList, normalized, ordinalIsResolved, - sourceRepeat, Optional.empty(), Optional.empty(), child()); + return new LogicalAggregate<>(groupByExprList, outputExpressionList, normalized, ordinalIsResolved, generated, + hasPushed, sourceRepeat, Optional.empty(), Optional.empty(), child()); } @Override @@ -245,18 +269,18 @@ public class LogicalAggregate<CHILD_TYPE extends Plan> @Override public LogicalAggregate<CHILD_TYPE> withAggOutput(List<NamedExpression> newOutput) { - return new LogicalAggregate<>(groupByExpressions, newOutput, normalized, ordinalIsResolved, - sourceRepeat, Optional.empty(), Optional.empty(), child()); + return new LogicalAggregate<>(groupByExpressions, newOutput, normalized, ordinalIsResolved, generated, + hasPushed, sourceRepeat, Optional.empty(), Optional.empty(), child()); } public LogicalAggregate<Plan> withAggOutputChild(List<NamedExpression> newOutput, Plan newChild) { - return new LogicalAggregate<>(groupByExpressions, newOutput, normalized, ordinalIsResolved, - sourceRepeat, Optional.empty(), Optional.empty(), newChild); + return new LogicalAggregate<>(groupByExpressions, newOutput, normalized, ordinalIsResolved, generated, + hasPushed, sourceRepeat, Optional.empty(), Optional.empty(), newChild); } public LogicalAggregate<Plan> withNormalized(List<Expression> normalizedGroupBy, List<NamedExpression> normalizedOutput, Plan normalizedChild) { - return new LogicalAggregate<>(normalizedGroupBy, normalizedOutput, true, ordinalIsResolved, - sourceRepeat, Optional.empty(), Optional.empty(), normalizedChild); + return new LogicalAggregate<>(normalizedGroupBy, normalizedOutput, true, ordinalIsResolved, generated, + hasPushed, sourceRepeat, Optional.empty(), Optional.empty(), normalizedChild); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushdownDistinctThroughJoinTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushdownDistinctThroughJoinTest.java new file mode 100644 index 0000000000..cc2e08d188 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushdownDistinctThroughJoinTest.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.trees.plans.JoinType; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.util.LogicalPlanBuilder; +import org.apache.doris.nereids.util.MemoPatternMatchSupported; +import org.apache.doris.nereids.util.MemoTestUtils; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.nereids.util.PlanConstructor; + +import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +class PushdownDistinctThroughJoinTest implements MemoPatternMatchSupported { + private static final LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0); + private static final LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0); + private static final LogicalOlapScan scan3 = PlanConstructor.newLogicalOlapScan(2, "t3", 0); + private static final LogicalOlapScan scan4 = PlanConstructor.newLogicalOlapScan(3, "t4", 0); + + @Test + void testPushdownJoin() { + LogicalPlan plan = new LogicalPlanBuilder(scan1) + .join(scan2, JoinType.INNER_JOIN, Pair.of(0, 0)) + .join(scan3, JoinType.INNER_JOIN, Pair.of(0, 0)) + .join(scan4, JoinType.INNER_JOIN, Pair.of(0, 0)) + .distinct(ImmutableList.of(1, 3, 5, 7)) + .build(); + + PlanChecker.from(MemoTestUtils.createConnectContext(), plan) + .applyTopDown(new PushdownDistinctThroughJoin()) + .matches( + logicalAggregate( + logicalJoin( + logicalAggregate(logicalJoin()), + logicalAggregate(logicalOlapScan()) + ) + ) + ) + .printlnTree(); + } + + @Test + void testPushdownProjectJoin() { + LogicalPlan plan = new LogicalPlanBuilder(scan1) + .join(scan2, JoinType.INNER_JOIN, Pair.of(0, 0)) + .project(ImmutableList.of(0, 2)) + .join(scan3, JoinType.INNER_JOIN, Pair.of(0, 0)) + .project(ImmutableList.of(0, 2, 3)) + .join(scan4, JoinType.INNER_JOIN, Pair.of(0, 0)) + .distinct(ImmutableList.of(1, 2, 3)) + .build(); + + PlanChecker.from(MemoTestUtils.createConnectContext(), plan) + .applyTopDown(new PushdownDistinctThroughJoin()) + .matches( + logicalAggregate( + logicalJoin( + logicalAggregate(logicalProject(logicalJoin())), + logicalAggregate(logicalOlapScan()) + ) + ) + ) + .printlnTree(); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java index 7ae95e1511..1857652bde 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java @@ -171,12 +171,23 @@ public class LogicalPlanBuilder { for (Integer index : groupByKeysIndex) { groupByBuilder.add(this.plan.getOutput().get(index)); } - ImmutableList<Expression> groupByKeys = groupByBuilder.build(); + List<Expression> groupByKeys = groupByBuilder.build(); LogicalAggregate<Plan> agg = new LogicalAggregate<>(groupByKeys, outputExprsList, this.plan); return from(agg); } + public LogicalPlanBuilder distinct(List<Integer> groupByKeysIndex) { + Builder<NamedExpression> groupByBuilder = ImmutableList.builder(); + for (Integer index : groupByKeysIndex) { + groupByBuilder.add(this.plan.getOutput().get(index)); + } + List<NamedExpression> groupByKeys = groupByBuilder.build(); + + LogicalAggregate<Plan> agg = new LogicalAggregate<>(groupByKeys, false, this.plan); + return from(agg); + } + public LogicalPlanBuilder agg(List<Expression> groupByKeys, List<NamedExpression> outputExprsList) { LogicalAggregate<Plan> agg = new LogicalAggregate<>(groupByKeys, outputExprsList, this.plan); return from(agg); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org