This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 8d182a1029a [feature](Nereids): Pushdown LimitDistinct Through Join 
(#25113) (#27288)
8d182a1029a is described below

commit 8d182a1029a03febd91e5a37cf3b90b340ba2d6c
Author: jakevin <[email protected]>
AuthorDate: Mon Nov 20 19:59:28 2023 +0800

    [feature](Nereids): Pushdown LimitDistinct Through Join (#25113) (#27288)
    
    Push down limit-distinct through left/right outer join or cross join.
    such as select t1.c1 from t1 left join t2 on t1.c1 = t2.c1 order by t1.c1 
limit 1;
---
 .../doris/nereids/jobs/executor/Rewriter.java      |   2 +
 .../org/apache/doris/nereids/rules/RuleType.java   |   3 +
 .../rewrite/PushdownLimitDistinctThroughJoin.java  | 109 +++++++++++++++++++++
 .../rules/rewrite/PushdownTopNThroughJoin.java     |  34 ++++---
 .../trees/plans/logical/LogicalAggregate.java      |   5 +
 .../nereids/trees/plans/logical/LogicalLimit.java  |   6 ++
 .../nereids/trees/plans/logical/LogicalTopN.java   |   7 ++
 .../data/nereids_p0/join/test_limit_join.out       |  23 +++++
 .../suites/nereids_p0/join/test_limit_join.groovy  | 105 ++++++++++++++++++++
 9 files changed, 279 insertions(+), 15 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index a3f800340b9..fcbbdee6ee6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -91,6 +91,7 @@ import 
org.apache.doris.nereids.rules.rewrite.PushProjectIntoOneRowRelation;
 import org.apache.doris.nereids.rules.rewrite.PushProjectThroughUnion;
 import org.apache.doris.nereids.rules.rewrite.PushdownFilterThroughProject;
 import org.apache.doris.nereids.rules.rewrite.PushdownLimit;
+import org.apache.doris.nereids.rules.rewrite.PushdownLimitDistinctThroughJoin;
 import org.apache.doris.nereids.rules.rewrite.PushdownTopNThroughJoin;
 import org.apache.doris.nereids.rules.rewrite.PushdownTopNThroughWindow;
 import org.apache.doris.nereids.rules.rewrite.ReorderJoin;
@@ -272,6 +273,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
                     topDown(new SplitLimit()),
                     topDown(new PushdownLimit(),
                             new PushdownTopNThroughJoin(),
+                            new PushdownLimitDistinctThroughJoin(),
                             new PushdownTopNThroughWindow(),
                             new CreatePartitionTopNFromWindow()
                     )
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index dcf13637c96..84ce5506eb0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -250,6 +250,9 @@ public enum RuleType {
     PUSH_TOP_N_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
     PUSH_TOP_N_THROUGH_PROJECT_WINDOW(RuleTypeClass.REWRITE),
     PUSH_TOP_N_THROUGH_WINDOW(RuleTypeClass.REWRITE),
+    // limit distinct push down
+    PUSH_LIMIT_DISTINCT_THROUGH_JOIN(RuleTypeClass.REWRITE),
+    PUSH_LIMIT_DISTINCT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
     // adjust nullable
     ADJUST_NULLABLE(RuleTypeClass.REWRITE),
     ADJUST_CONJUNCTS_RETURN_TYPE(RuleTypeClass.REWRITE),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownLimitDistinctThroughJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownLimitDistinctThroughJoin.java
new file mode 100644
index 00000000000..24737b63625
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownLimitDistinctThroughJoin.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Same with PushdownLimit
+ */
+public class PushdownLimitDistinctThroughJoin implements RewriteRuleFactory {
+
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                // limit -> distinct -> join
+                logicalLimit(logicalAggregate(logicalJoin())
+                        .when(LogicalAggregate::isDistinct))
+                        .then(limit -> {
+                            LogicalAggregate<LogicalJoin<Plan, Plan>> agg = 
limit.child();
+                            LogicalJoin<Plan, Plan> join = agg.child();
+
+                            Plan newJoin = pushLimitThroughJoin(limit, join);
+                            if (newJoin == null || 
join.children().equals(newJoin.children())) {
+                                return null;
+                            }
+                            return 
limit.withChildren(agg.withChildren(newJoin));
+                        })
+                        .toRule(RuleType.PUSH_LIMIT_DISTINCT_THROUGH_JOIN),
+
+                // limit -> distinct -> project -> join
+                
logicalLimit(logicalAggregate(logicalProject(logicalJoin()).when(LogicalProject::isAllSlots))
+                        .when(LogicalAggregate::isDistinct))
+                        .then(limit -> {
+                            LogicalAggregate<LogicalProject<LogicalJoin<Plan, 
Plan>>> agg = limit.child();
+                            LogicalProject<LogicalJoin<Plan, Plan>> project = 
agg.child();
+                            LogicalJoin<Plan, Plan> join = project.child();
+
+                            Plan newJoin = pushLimitThroughJoin(limit, join);
+                            if (newJoin == null || 
join.children().equals(newJoin.children())) {
+                                return null;
+                            }
+                            return 
limit.withChildren(agg.withChildren(project.withChildren(newJoin)));
+                        }).toRule(RuleType.PUSH_LIMIT_DISTINCT_THROUGH_JOIN)
+        );
+    }
+
+    private Plan pushLimitThroughJoin(LogicalLimit<?> limit, LogicalJoin<Plan, 
Plan> join) {
+        LogicalAggregate<?> agg = (LogicalAggregate<?>) limit.child();
+        List<Slot> groupBySlots = agg.getGroupByExpressions().stream()
+                .flatMap(e -> 
e.getInputSlots().stream()).collect(Collectors.toList());
+        switch (join.getJoinType()) {
+            case LEFT_OUTER_JOIN:
+                if (join.left().getOutputSet().containsAll(groupBySlots)
+                        && 
join.left().getOutputSet().equals(agg.getOutputSet())) {
+                    return 
join.withChildren(limit.withLimitChild(limit.getLimit() + limit.getOffset(), 0,
+                            agg.withChildren(join.left())), join.right());
+                }
+                return null;
+            case RIGHT_OUTER_JOIN:
+                if (join.right().getOutputSet().containsAll(groupBySlots)
+                        && 
join.right().getOutputSet().equals(agg.getOutputSet())) {
+                    return join.withChildren(join.left(), 
limit.withLimitChild(limit.getLimit() + limit.getOffset(), 0,
+                            agg.withChildren(join.right())));
+                }
+                return null;
+            case CROSS_JOIN:
+                if (join.left().getOutputSet().containsAll(groupBySlots)
+                        && 
join.left().getOutputSet().equals(agg.getOutputSet())) {
+                    return 
join.withChildren(limit.withLimitChild(limit.getLimit() + limit.getOffset(), 0,
+                            agg.withChildren(join.left())), join.right());
+                } else if 
(join.right().getOutputSet().containsAll(groupBySlots)
+                        && 
join.right().getOutputSet().equals(agg.getOutputSet())) {
+                    return join.withChildren(join.left(), 
limit.withLimitChild(limit.getLimit() + limit.getOffset(), 0,
+                            agg.withChildren(join.right())));
+                } else {
+                    return null;
+                }
+            default:
+                return null;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownTopNThroughJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownTopNThroughJoin.java
index ac179864393..b025d40b6d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownTopNThroughJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownTopNThroughJoin.java
@@ -25,7 +25,6 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
-import org.apache.doris.nereids.util.Utils;
 
 import com.google.common.collect.ImmutableList;
 
@@ -85,28 +84,33 @@ public class PushdownTopNThroughJoin implements 
RewriteRuleFactory {
     }
 
     private Plan pushLimitThroughJoin(LogicalTopN<? extends Plan> topN, 
LogicalJoin<Plan, Plan> join) {
+        List<Slot> orderbySlots = 
topN.getOrderKeys().stream().map(OrderKey::getExpr)
+                .flatMap(e -> 
e.getInputSlots().stream()).collect(Collectors.toList());
         switch (join.getJoinType()) {
             case LEFT_OUTER_JOIN:
-                Set<Slot> rightOutputSet = join.right().getOutputSet();
-                if (topN.getOrderKeys().stream().map(OrderKey::getExpr)
-                        .anyMatch(e -> Utils.isIntersecting(rightOutputSet, 
e.getInputSlots()))) {
-                    return null;
+                if (join.left().getOutputSet().containsAll(orderbySlots)) {
+                    return join.withChildren(
+                            topN.withLimitChild(topN.getLimit() + 
topN.getOffset(), 0, join.left()),
+                            join.right());
                 }
-                return join.withChildren(topN.withChildren(join.left()), 
join.right());
+                return null;
             case RIGHT_OUTER_JOIN:
-                Set<Slot> leftOutputSet = join.left().getOutputSet();
-                if (topN.getOrderKeys().stream().map(OrderKey::getExpr)
-                        .anyMatch(e -> Utils.isIntersecting(leftOutputSet, 
e.getInputSlots()))) {
-                    return null;
+                if (join.right().getOutputSet().containsAll(orderbySlots)) {
+                    return join.withChildren(
+                            join.left(),
+                            topN.withLimitChild(topN.getLimit() + 
topN.getOffset(), 0, join.right()));
                 }
-                return join.withChildren(join.left(), 
topN.withChildren(join.right()));
+                return null;
             case CROSS_JOIN:
-                List<Slot> orderbySlots = 
topN.getOrderKeys().stream().map(OrderKey::getExpr)
-                        .flatMap(e -> 
e.getInputSlots().stream()).collect(Collectors.toList());
+
                 if (join.left().getOutputSet().containsAll(orderbySlots)) {
-                    return join.withChildren(topN.withChildren(join.left()), 
join.right());
+                    return join.withChildren(
+                            topN.withLimitChild(topN.getLimit() + 
topN.getOffset(), 0, join.left()),
+                            join.right());
                 } else if 
(join.right().getOutputSet().containsAll(orderbySlots)) {
-                    return join.withChildren(join.left(), 
topN.withChildren(join.right()));
+                    return join.withChildren(
+                            join.left(),
+                            topN.withLimitChild(topN.getLimit() + 
topN.getOffset(), 0, join.right()));
                 } else {
                     return null;
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
index 38f2b161900..ef278c61fbe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
@@ -140,6 +140,11 @@ public class LogicalAggregate<CHILD_TYPE extends Plan>
         return sourceRepeat;
     }
 
+    public boolean isDistinct() {
+        return outputExpressions.stream().allMatch(e -> e instanceof Slot)
+                && groupByExpressions.stream().allMatch(e -> e instanceof 
Slot);
+    }
+
     public boolean hasRepeat() {
         return sourceRepeat.isPresent();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
index f03f335f018..e2bd6998a53 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
@@ -117,6 +117,12 @@ public class LogicalLimit<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TY
         return ImmutableList.of();
     }
 
+    public LogicalLimit<Plan> withLimitChild(long limit, long offset, Plan 
child) {
+        Preconditions.checkArgument(children.size() == 1,
+                "LogicalTopN should have 1 child, but input is %s", 
children.size());
+        return new LogicalLimit<>(limit, offset, phase, child);
+    }
+
     public LogicalLimit<Plan> withLimitPhase(LimitPhase phase) {
         return new LogicalLimit<>(limit, offset, phase, child());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
index 80de9d6215a..02b239f1735 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
@@ -122,6 +122,13 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
                 Optional.empty(), Optional.of(getLogicalProperties()), 
child());
     }
 
+    public LogicalTopN<Plan> withLimitChild(long limit, long offset, Plan 
child) {
+        Preconditions.checkArgument(children.size() == 1,
+                "LogicalTopN should have 1 child, but input is %s", 
children.size());
+        return new LogicalTopN<>(orderKeys, limit, offset,
+                Optional.empty(), Optional.of(getLogicalProperties()), child);
+    }
+
     @Override
     public LogicalTopN<Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1,
diff --git a/regression-test/data/nereids_p0/join/test_limit_join.out 
b/regression-test/data/nereids_p0/join/test_limit_join.out
new file mode 100644
index 00000000000..311d110c2bb
--- /dev/null
+++ b/regression-test/data/nereids_p0/join/test_limit_join.out
@@ -0,0 +1,23 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !join1 --
+1
+2
+
+-- !join3 --
+0
+1
+
+-- !join5 --
+1
+1
+
+-- !join6 --
+1
+
+-- !join7 --
+0
+0
+
+-- !join8 --
+0
+
diff --git a/regression-test/suites/nereids_p0/join/test_limit_join.groovy 
b/regression-test/suites/nereids_p0/join/test_limit_join.groovy
new file mode 100644
index 00000000000..8f4cbf88f34
--- /dev/null
+++ b/regression-test/suites/nereids_p0/join/test_limit_join.groovy
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_limit_join", "nereids_p0") {
+    def DBname = "nereids_regression_test_limit_join"
+    sql "DROP DATABASE IF EXISTS ${DBname}"
+    sql "CREATE DATABASE IF NOT EXISTS ${DBname}"
+    sql "use ${DBname}"
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+
+    def tbName1 = "t1"
+    def tbName2 = "t2"
+
+    sql "DROP TABLE IF EXISTS ${tbName1};"
+    sql "DROP TABLE IF EXISTS ${tbName2};"
+
+    sql """create table if not exists ${tbName1} (c1 int, c2 int) DISTRIBUTED 
BY HASH(c1) properties("replication_num" = "1");"""
+    sql """create table if not exists ${tbName2} (c1 int, c2 int, c3 int) 
DISTRIBUTED BY HASH(c1) properties("replication_num" = "1");"""
+
+    sql "insert into ${tbName1} values (1,1);"
+    sql "insert into ${tbName1} values (2,2);"
+    sql "insert into ${tbName1} values (1,null);"
+    sql "insert into ${tbName1} values (2,null);"
+    sql "insert into ${tbName2} values (0,1,9999);"
+    sql "insert into ${tbName2} values (1,1,9999);"
+    sql "insert into ${tbName2} values (0,null,9999);"
+    sql "insert into ${tbName2} values (1,null,9999);"
+
+
+    /* test push limit-distinct through join */
+    order_qt_join1 """
+        SELECT t1.c1
+        FROM ${tbName1} t1 left join ${tbName2} t2 on t1.c1 = t2.c1
+        GROUP BY t1.c1
+        limit 2;
+        """
+
+    sql """
+        SELECT t1.c1
+        FROM ${tbName1} t1 left join ${tbName2} t2 on t1.c1 = t2.c1
+        GROUP BY t1.c1
+        LIMIT 1 OFFSET 1;
+        """
+
+    order_qt_join3 """
+        SELECT t2.c1
+        FROM ${tbName1} t1 right join ${tbName2} t2 on t1.c1 = t2.c1
+        GROUP BY t2.c1
+        limit 2;
+        """
+    
+    sql """
+        SELECT t2.c1
+        FROM ${tbName1} t1 right join ${tbName2} t2 on t1.c1 = t2.c1
+        GROUP BY t2.c1
+        LIMIT 1 OFFSET 1;
+        """
+
+    /* test push topN through join */
+    qt_join5 """
+        SELECT t1.c1
+        FROM ${tbName1} t1 left join ${tbName2} t2 on t1.c1 = t2.c1
+        ORDER BY t1.c1
+        limit 2;
+        """
+
+    qt_join6 """
+        SELECT t1.c1
+        FROM ${tbName1} t1 left join ${tbName2} t2 on t1.c1 = t2.c1
+        ORDER BY t1.c1
+        LIMIT 1 OFFSET 1;
+        """
+
+    qt_join7 """
+        SELECT t2.c1
+        FROM ${tbName1} t1 right join ${tbName2} t2 on t1.c1 = t2.c1
+        ORDER BY t2.c1
+        limit 2;
+        """
+
+    qt_join8 """
+        SELECT t2.c1
+        FROM ${tbName1} t1 right join ${tbName2} t2 on t1.c1 = t2.c1
+        ORDER BY t2.c1
+        LIMIT 1 OFFSET 1;
+        """
+
+    sql "DROP DATABASE IF EXISTS ${DBname};"
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to