This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ed47a3bb6d [enhancement](Nereids)refactor sort plan in nereids (#11673)
ed47a3bb6d is described below

commit ed47a3bb6dcc325de66d50ca4a42dec2298fa1a3
Author: morrySnow <101034200+morrys...@users.noreply.github.com>
AuthorDate: Fri Aug 12 17:08:23 2022 +0800

    [enhancement](Nereids)refactor sort plan in nereids (#11673)
    
    1. rename PhysicalHeapSort to PhysicalQuickSort
    2. add LogicalTopN and PhysicalTopN
    3. add implementation rule for LogicalTopN
    4. add a interface Sort for both logical and physical sort
    5. add a interface TopN for both logical and physical top-n
    6. add a AbstractPhysicalSort as super class of PhysicalQuickSort and 
PhysicalTopN
---
 .../apache/doris/nereids/cost/CostCalculator.java  |  17 ++-
 .../glue/translator/PhysicalPlanTranslator.java    |  51 ++++++---
 .../apache/doris/nereids/properties/OrderSpec.java |   4 +-
 .../org/apache/doris/nereids/rules/RuleSet.java    |   6 +-
 .../org/apache/doris/nereids/rules/RuleType.java   |   3 +-
 ...rt.java => LogicalSortToPhysicalQuickSort.java} |   8 +-
 ...eapSort.java => LogicalTopNToPhysicalTopN.java} |  18 ++--
 .../doris/nereids/stats/StatsCalculator.java       |  22 +++-
 .../apache/doris/nereids/trees/plans/PlanType.java |   4 +-
 .../plans/algebra/Sort.java}                       |  21 ++--
 .../plans/algebra/TopN.java}                       |  22 ++--
 .../nereids/trees/plans/logical/LogicalSort.java   |  19 +---
 .../logical/{LogicalSort.java => LogicalTopN.java} |  48 ++++-----
 ...PhysicalJoin.java => AbstractPhysicalJoin.java} |   6 +-
 ...icalHeapSort.java => AbstractPhysicalSort.java} |  50 ++-------
 .../trees/plans/physical/PhysicalHashJoin.java     |   2 +-
 .../plans/physical/PhysicalNestedLoopJoin.java     |   2 +-
 ...hysicalHeapSort.java => PhysicalQuickSort.java} |  46 ++------
 .../{PhysicalHeapSort.java => PhysicalTopN.java}   |  52 +++++----
 .../nereids/trees/plans/visitor/PlanVisitor.java   |  19 +++-
 .../org/apache/doris/nereids/util/JoinUtils.java   |  10 +-
 .../rules/implementation/ImplementationTest.java   | 120 +++++++++++++++++++++
 .../LogicalLimitToPhysicalLimitTest.java           |  46 --------
 .../LogicalProjectToPhysicalProjectTest.java       |  82 --------------
 .../doris/nereids/stats/StatsCalculatorTest.java   |  42 ++++++--
 .../doris/nereids/trees/plans/PlanEqualsTest.java  |  10 +-
 26 files changed, 379 insertions(+), 351 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
index ccb99debde..d90970f7f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
@@ -24,10 +24,11 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribution;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.statistics.StatsDeriveResult;
 
@@ -79,7 +80,19 @@ public class CostCalculator {
         }
 
         @Override
-        public CostEstimate visitPhysicalHeapSort(PhysicalHeapSort 
physicalHeapSort, PlanContext context) {
+        public CostEstimate visitPhysicalQuickSort(PhysicalQuickSort 
physicalQuickSort, PlanContext context) {
+            // TODO: consider two-phase sort and enforcer.
+            StatsDeriveResult statistics = context.getStatisticsWithCheck();
+            StatsDeriveResult childStatistics = context.getChildStatistics(0);
+
+            return new CostEstimate(
+                    childStatistics.computeSize(),
+                    statistics.computeSize(),
+                    childStatistics.computeSize());
+        }
+
+        @Override
+        public CostEstimate visitPhysicalTopN(PhysicalTopN<Plan> topN, 
PlanContext context) {
             // TODO: consider two-phase sort and enforcer.
             StatsDeriveResult statistics = context.getStatisticsWithCheck();
             StatsDeriveResult childStatistics = context.getChildStatistics(0);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 1799c70123..2efa6dc6ac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -42,15 +42,17 @@ import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.JoinUtils;
@@ -286,10 +288,42 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     }
 
     @Override
-    public PlanFragment visitPhysicalHeapSort(PhysicalHeapSort<Plan> sort,
+    public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<Plan> sort,
             PlanTranslatorContext context) {
+        PlanFragment childFragment = visitAbstractPhysicalSort(sort, context);
+        SortNode sortNode = (SortNode) childFragment.getPlanRoot();
+        // isPartitioned() == false means there is only one instance, so no 
merge phase
+        if (!childFragment.isPartitioned()) {
+            return childFragment;
+        }
+        PlanFragment mergeFragment = createParentFragment(childFragment, 
DataPartition.UNPARTITIONED, context);
+        ExchangeNode exchangeNode = (ExchangeNode) mergeFragment.getPlanRoot();
+        //exchangeNode.limit/offset will be set in when translating  
PhysicalLimit
+        exchangeNode.setMergeInfo(sortNode.getSortInfo());
+        return mergeFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalTopN(PhysicalTopN<Plan> topN, 
PlanTranslatorContext context) {
+        PlanFragment childFragment = visitAbstractPhysicalSort(topN, context);
+        SortNode sortNode = (SortNode) childFragment.getPlanRoot();
+        sortNode.setOffset(topN.getOffset());
+        sortNode.setLimit(topN.getLimit());
+        // isPartitioned() == false means there is only one instance, so no 
merge phase
+        if (!childFragment.isPartitioned()) {
+            return childFragment;
+        }
+        PlanFragment mergeFragment = createParentFragment(childFragment, 
DataPartition.UNPARTITIONED, context);
+        ExchangeNode exchangeNode = (ExchangeNode) mergeFragment.getPlanRoot();
+        exchangeNode.setMergeInfo(sortNode.getSortInfo());
+        exchangeNode.setOffset(topN.getOffset());
+        exchangeNode.setLimit(topN.getLimit());
+        return mergeFragment;
+    }
+
+    @Override
+    public PlanFragment visitAbstractPhysicalSort(AbstractPhysicalSort<Plan> 
sort, PlanTranslatorContext context) {
         PlanFragment childFragment = sort.child(0).accept(this, context);
-        // TODO: need to discuss how to process field: 
SortNode::resolvedTupleExprs
         List<Expr> oldOrderingExprList = Lists.newArrayList();
         List<Boolean> ascOrderList = Lists.newArrayList();
         List<Boolean> nullsFirstParamList = Lists.newArrayList();
@@ -315,19 +349,10 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         // 4. fill in SortInfo members
         SortInfo sortInfo = new SortInfo(newOrderingExprList, ascOrderList, 
nullsFirstParamList, tupleDesc);
         PlanNode childNode = childFragment.getPlanRoot();
-        // TODO: notice topN
         SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, 
sortInfo, true);
         sortNode.finalizeForNereids(tupleDesc, sortTupleOutputList, 
oldOrderingExprList);
         childFragment.addPlanRoot(sortNode);
-        //isPartitioned()==true means there is only one instance, so no merge 
phase
-        if (!childFragment.isPartitioned()) {
-            return childFragment;
-        }
-        PlanFragment mergeFragment = createParentFragment(childFragment, 
DataPartition.UNPARTITIONED, context);
-        ExchangeNode exchangeNode = (ExchangeNode) mergeFragment.getPlanRoot();
-        //exchangeNode.limit/offset will be set in when translating  
PhysicalLimit
-        exchangeNode.setMergeInfo(sortNode.getSortInfo());
-        return mergeFragment;
+        return childFragment;
     }
 
     // TODO: 1. support shuffle join / co-locate / bucket shuffle join later
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
index 4816a28cdf..afd5797011 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
@@ -20,7 +20,7 @@ package org.apache.doris.nereids.properties;
 import org.apache.doris.nereids.memo.Group;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.trees.plans.GroupPlan;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
 
 import com.google.common.collect.Lists;
 
@@ -61,7 +61,7 @@ public class OrderSpec {
 
     public GroupExpression addEnforcer(Group child) {
         return new GroupExpression(
-                new PhysicalHeapSort(orderKeys, child.getLogicalProperties(), 
new GroupPlan(child)),
+                new PhysicalQuickSort(orderKeys, child.getLogicalProperties(), 
new GroupPlan(child)),
                 Lists.newArrayList(child)
         );
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index 9cc7bf7449..692c50c3d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -25,7 +25,8 @@ import 
org.apache.doris.nereids.rules.implementation.LogicalJoinToNestedLoopJoin
 import 
org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit;
 import 
org.apache.doris.nereids.rules.implementation.LogicalOlapScanToPhysicalOlapScan;
 import 
org.apache.doris.nereids.rules.implementation.LogicalProjectToPhysicalProject;
-import 
org.apache.doris.nereids.rules.implementation.LogicalSortToPhysicalHeapSort;
+import 
org.apache.doris.nereids.rules.implementation.LogicalSortToPhysicalQuickSort;
+import org.apache.doris.nereids.rules.implementation.LogicalTopNToPhysicalTopN;
 import org.apache.doris.nereids.rules.rewrite.AggregateDisassemble;
 
 import com.google.common.collect.ImmutableList;
@@ -53,7 +54,8 @@ public class RuleSet {
             .add(new LogicalOlapScanToPhysicalOlapScan())
             .add(new LogicalProjectToPhysicalProject())
             .add(new LogicalLimitToPhysicalLimit())
-            .add(new LogicalSortToPhysicalHeapSort())
+            .add(new LogicalSortToPhysicalQuickSort())
+            .add(new LogicalTopNToPhysicalTopN())
             .build();
 
     public List<Rule> getExplorationRules() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 3839745885..f4cf4bd8ff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -81,7 +81,8 @@ public enum RuleType {
     LOGICAL_JOIN_TO_NESTED_LOOP_JOIN_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_PROJECT_TO_PHYSICAL_PROJECT_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_FILTER_TO_PHYSICAL_FILTER_RULE(RuleTypeClass.IMPLEMENTATION),
-    LOGICAL_SORT_TO_PHYSICAL_HEAP_SORT_RULE(RuleTypeClass.IMPLEMENTATION),
+    LOGICAL_SORT_TO_PHYSICAL_QUICK_SORT_RULE(RuleTypeClass.IMPLEMENTATION),
+    LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     IMPLEMENTATION_SENTINEL(RuleTypeClass.IMPLEMENTATION),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalQuickSort.java
similarity index 80%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalQuickSort.java
index de00a6968b..d197e5bf99 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalQuickSort.java
@@ -19,18 +19,18 @@ package org.apache.doris.nereids.rules.implementation;
 
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
 
 /**
  * Implementation rule that convert logical sort to physical sort.
  */
-public class LogicalSortToPhysicalHeapSort extends 
OneImplementationRuleFactory {
+public class LogicalSortToPhysicalQuickSort extends 
OneImplementationRuleFactory {
     @Override
     public Rule build() {
-        return logicalSort().then(sort -> new PhysicalHeapSort<>(
+        return logicalSort().then(sort -> new PhysicalQuickSort<>(
                 sort.getOrderKeys(),
                 sort.getLogicalProperties(),
                 sort.child())
-            ).toRule(RuleType.LOGICAL_SORT_TO_PHYSICAL_HEAP_SORT_RULE);
+            ).toRule(RuleType.LOGICAL_SORT_TO_PHYSICAL_QUICK_SORT_RULE);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
similarity index 65%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
index de00a6968b..5e222857c0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
@@ -19,18 +19,20 @@ package org.apache.doris.nereids.rules.implementation;
 
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 
 /**
- * Implementation rule that convert logical sort to physical sort.
+ * Implementation rule that convert logical top-n to physical top-n.
  */
-public class LogicalSortToPhysicalHeapSort extends 
OneImplementationRuleFactory {
+public class LogicalTopNToPhysicalTopN extends OneImplementationRuleFactory {
     @Override
     public Rule build() {
-        return logicalSort().then(sort -> new PhysicalHeapSort<>(
-                sort.getOrderKeys(),
-                sort.getLogicalProperties(),
-                sort.child())
-            ).toRule(RuleType.LOGICAL_SORT_TO_PHYSICAL_HEAP_SORT_RULE);
+        return logicalTopN().then(topN -> new PhysicalTopN<>(
+                topN.getOrderKeys(),
+                topN.getLimit(),
+                topN.getOffset(),
+                topN.getLogicalProperties(),
+                topN.child())
+        ).toRule(RuleType.LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
index e223b65f57..dcd33f9711 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Filter;
 import org.apache.doris.nereids.trees.plans.algebra.Limit;
 import org.apache.doris.nereids.trees.plans.algebra.Project;
 import org.apache.doris.nereids.trees.plans.algebra.Scan;
+import org.apache.doris.nereids.trees.plans.algebra.TopN;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
@@ -36,15 +37,17 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribution;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
 import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.qe.ConnectContext;
@@ -117,6 +120,11 @@ public class StatsCalculator extends 
DefaultPlanVisitor<StatsDeriveResult, Void>
         return groupExpression.getCopyOfChildStats(0);
     }
 
+    @Override
+    public StatsDeriveResult visitLogicalTopN(LogicalTopN<Plan> topN, Void 
context) {
+        return computeTopN(topN);
+    }
+
     @Override
     public StatsDeriveResult visitLogicalJoin(LogicalJoin<Plan, Plan> join, 
Void context) {
         return JoinEstimation.estimate(groupExpression.getCopyOfChildStats(0),
@@ -134,10 +142,15 @@ public class StatsCalculator extends 
DefaultPlanVisitor<StatsDeriveResult, Void>
     }
 
     @Override
-    public StatsDeriveResult visitPhysicalHeapSort(PhysicalHeapSort<Plan> 
sort, Void context) {
+    public StatsDeriveResult visitPhysicalQuickSort(PhysicalQuickSort<Plan> 
sort, Void context) {
         return groupExpression.getCopyOfChildStats(0);
     }
 
+    @Override
+    public StatsDeriveResult visitPhysicalTopN(PhysicalTopN<Plan> topN, Void 
context) {
+        return computeTopN(topN);
+    }
+
     @Override
     public StatsDeriveResult visitPhysicalHashJoin(PhysicalHashJoin<Plan, 
Plan> hashJoin, Void context) {
         return JoinEstimation.estimate(groupExpression.getCopyOfChildStats(0),
@@ -203,6 +216,11 @@ public class StatsCalculator extends 
DefaultPlanVisitor<StatsDeriveResult, Void>
         return stats;
     }
 
+    private StatsDeriveResult computeTopN(TopN topN) {
+        StatsDeriveResult stats = groupExpression.getCopyOfChildStats(0);
+        return stats.updateRowCountByLimit(topN.getLimit());
+    }
+
     private StatsDeriveResult computeLimit(Limit limit) {
         StatsDeriveResult stats = groupExpression.getCopyOfChildStats(0);
         return stats.updateRowCountByLimit(limit.getLimit());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index cfe06b9d97..8361cfeeac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -32,6 +32,7 @@ public enum PlanType {
     LOGICAL_JOIN,
     LOGICAL_AGGREGATE,
     LOGICAL_SORT,
+    LOGICAL_TOP_N,
     LOGICAL_LIMIT,
     LOGICAL_OLAP_SCAN,
     LOGICAL_APPLY,
@@ -46,7 +47,8 @@ public enum PlanType {
     PHYSICAL_FILTER,
     PHYSICAL_BROADCAST_HASH_JOIN,
     PHYSICAL_AGGREGATE,
-    PHYSICAL_SORT,
+    PHYSICAL_QUICK_SORT,
+    PHYSICAL_TOP_N,
     PHYSICAL_LIMIT,
     PHYSICAL_HASH_JOIN,
     PHYSICAL_NESTED_LOOP_JOIN,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Sort.java
similarity index 55%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Sort.java
index de00a6968b..1b3f682416 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Sort.java
@@ -15,22 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.nereids.rules.implementation;
+package org.apache.doris.nereids.trees.plans.algebra;
 
-import org.apache.doris.nereids.rules.Rule;
-import org.apache.doris.nereids.rules.RuleType;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
+import org.apache.doris.nereids.properties.OrderKey;
+
+import java.util.List;
 
 /**
- * Implementation rule that convert logical sort to physical sort.
+ * Common interface for logical/physical sort.
  */
-public class LogicalSortToPhysicalHeapSort extends 
OneImplementationRuleFactory {
-    @Override
-    public Rule build() {
-        return logicalSort().then(sort -> new PhysicalHeapSort<>(
-                sort.getOrderKeys(),
-                sort.getLogicalProperties(),
-                sort.child())
-            ).toRule(RuleType.LOGICAL_SORT_TO_PHYSICAL_HEAP_SORT_RULE);
-    }
+public interface Sort {
+    List<OrderKey> getOrderKeys();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
similarity index 55%
rename from 
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
index de00a6968b..d79fe003ed 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSortToPhysicalHeapSort.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
@@ -15,22 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.nereids.rules.implementation;
-
-import org.apache.doris.nereids.rules.Rule;
-import org.apache.doris.nereids.rules.RuleType;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
+package org.apache.doris.nereids.trees.plans.algebra;
 
 /**
- * Implementation rule that convert logical sort to physical sort.
+ * Common interface for logical/physical TopN.
  */
-public class LogicalSortToPhysicalHeapSort extends 
OneImplementationRuleFactory {
-    @Override
-    public Rule build() {
-        return logicalSort().then(sort -> new PhysicalHeapSort<>(
-                sort.getOrderKeys(),
-                sort.getLogicalProperties(),
-                sort.child())
-            ).toRule(RuleType.LOGICAL_SORT_TO_PHYSICAL_HEAP_SORT_RULE);
-    }
+public interface TopN extends Sort {
+
+    int getOffset();
+
+    int getLimit();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java
index fc58a9bae9..dbad82fad7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Sort;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 
 import com.google.common.base.Preconditions;
@@ -41,14 +42,10 @@ import java.util.Optional;
  * orderKeys: list of column information after order by. eg:[a, asc],[b, desc].
  * OrderKey: Contains order expression information and sorting method. Default 
is ascending.
  */
-public class LogicalSort<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYPE> {
-
-    // Default offset is 0.
-    private int offset = 0;
+public class LogicalSort<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYPE> implements Sort {
 
     private final List<OrderKey> orderKeys;
 
-
     public LogicalSort(List<OrderKey> orderKeys, CHILD_TYPE child) {
         this(orderKeys, Optional.empty(), Optional.empty(), child);
     }
@@ -71,14 +68,6 @@ public class LogicalSort<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
         return orderKeys;
     }
 
-    public int getOffset() {
-        return offset;
-    }
-
-    public void setOffset(int offset) {
-        this.offset = offset;
-    }
-
     @Override
     public String toString() {
         return "LogicalSort (" + StringUtils.join(orderKeys, ", ") + ")";
@@ -93,12 +82,12 @@ public class LogicalSort<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
             return false;
         }
         LogicalSort that = (LogicalSort) o;
-        return offset == that.offset && Objects.equals(orderKeys, 
that.orderKeys);
+        return Objects.equals(orderKeys, that.orderKeys);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(orderKeys, offset);
+        return Objects.hash(orderKeys);
     }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
similarity index 67%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
index fc58a9bae9..d31467f00b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSort.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.TopN;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 
 import com.google.common.base.Preconditions;
@@ -35,31 +36,27 @@ import java.util.Objects;
 import java.util.Optional;
 
 /**
- * Logical Sort plan.
- * <p>
- * eg: select * from table order by a, b desc;
- * orderKeys: list of column information after order by. eg:[a, asc],[b, desc].
- * OrderKey: Contains order expression information and sorting method. Default 
is ascending.
+ * Logical top-N plan.
  */
-public class LogicalSort<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYPE> {
-
-    // Default offset is 0.
-    private int offset = 0;
+public class LogicalTopN<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYPE> implements TopN {
 
     private final List<OrderKey> orderKeys;
+    private final int limit;
+    private final int offset;
 
-
-    public LogicalSort(List<OrderKey> orderKeys, CHILD_TYPE child) {
-        this(orderKeys, Optional.empty(), Optional.empty(), child);
+    public LogicalTopN(List<OrderKey> orderKeys, int limit, int offset, 
CHILD_TYPE child) {
+        this(orderKeys, limit, offset, Optional.empty(), Optional.empty(), 
child);
     }
 
     /**
      * Constructor for LogicalSort.
      */
-    public LogicalSort(List<OrderKey> orderKeys, Optional<GroupExpression> 
groupExpression,
+    public LogicalTopN(List<OrderKey> orderKeys, int limit, int offset, 
Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
-        super(PlanType.LOGICAL_SORT, groupExpression, logicalProperties, 
child);
+        super(PlanType.LOGICAL_TOP_N, groupExpression, logicalProperties, 
child);
         this.orderKeys = Objects.requireNonNull(orderKeys, "orderKeys can not 
be null");
+        this.limit = limit;
+        this.offset = offset;
     }
 
     @Override
@@ -75,13 +72,16 @@ public class LogicalSort<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
         return offset;
     }
 
-    public void setOffset(int offset) {
-        this.offset = offset;
+    public int getLimit() {
+        return limit;
     }
 
     @Override
     public String toString() {
-        return "LogicalSort (" + StringUtils.join(orderKeys, ", ") + ")";
+        return "LogicalTopN ("
+                + "limit=" + limit
+                + ", offset=" + offset
+                + ", " + StringUtils.join(orderKeys, ", ") + ")";
     }
 
     @Override
@@ -92,19 +92,19 @@ public class LogicalSort<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        LogicalSort that = (LogicalSort) o;
-        return offset == that.offset && Objects.equals(orderKeys, 
that.orderKeys);
+        LogicalTopN that = (LogicalTopN) o;
+        return this.offset == that.offset && this.limit == that.limit && 
Objects.equals(this.orderKeys, that.orderKeys);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(orderKeys, offset);
+        return Objects.hash(orderKeys, limit, offset);
     }
 
 
     @Override
     public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-        return visitor.visitLogicalSort((LogicalSort<Plan>) this, context);
+        return visitor.visitLogicalTopN((LogicalTopN<Plan>) this, context);
     }
 
     @Override
@@ -117,16 +117,16 @@ public class LogicalSort<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
     @Override
     public LogicalUnary<Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new LogicalSort<>(orderKeys, children.get(0));
+        return new LogicalTopN<>(orderKeys, limit, offset, children.get(0));
     }
 
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new LogicalSort<>(orderKeys, groupExpression, 
Optional.of(logicalProperties), child());
+        return new LogicalTopN<>(orderKeys, limit, offset, groupExpression, 
Optional.of(logicalProperties), child());
     }
 
     @Override
     public Plan withLogicalProperties(Optional<LogicalProperties> 
logicalProperties) {
-        return new LogicalSort<>(orderKeys, Optional.empty(), 
logicalProperties, child());
+        return new LogicalTopN<>(orderKeys, limit, offset, Optional.empty(), 
logicalProperties, child());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
similarity index 93%
rename from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJoin.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
index d47be13f6c..4a0c8300ee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
@@ -34,7 +34,7 @@ import java.util.Optional;
 /**
  * Abstract class for all physical join node.
  */
-public abstract class PhysicalJoin<
+public abstract class AbstractPhysicalJoin<
         LEFT_CHILD_TYPE extends Plan,
         RIGHT_CHILD_TYPE extends Plan>
         extends PhysicalBinary<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> implements 
Join {
@@ -48,7 +48,7 @@ public abstract class PhysicalJoin<
      * @param joinType Which join type, left semi join, inner join...
      * @param condition join condition.
      */
-    public PhysicalJoin(PlanType type, JoinType joinType, Optional<Expression> 
condition,
+    public AbstractPhysicalJoin(PlanType type, JoinType joinType, 
Optional<Expression> condition,
             Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
             LEFT_CHILD_TYPE leftChild, RIGHT_CHILD_TYPE rightChild) {
         super(type, groupExpression, logicalProperties, leftChild, rightChild);
@@ -77,7 +77,7 @@ public abstract class PhysicalJoin<
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        PhysicalJoin that = (PhysicalJoin) o;
+        AbstractPhysicalJoin that = (AbstractPhysicalJoin) o;
         return joinType == that.joinType && Objects.equals(condition, 
that.condition);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHeapSort.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalSort.java
similarity index 57%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHeapSort.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalSort.java
index 93cc62a763..82a8320863 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHeapSort.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalSort.java
@@ -23,37 +23,34 @@ import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
-import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.trees.plans.algebra.Sort;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 
 /**
- * Physical sort plan.
+ * Abstract class for all concrete physical sort plan.
  */
-public class PhysicalHeapSort<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_TYPE> {
+public abstract class AbstractPhysicalSort<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_TYPE> implements Sort {
 
-    private final List<OrderKey> orderKeys;
+    protected final List<OrderKey> orderKeys;
 
-
-    public PhysicalHeapSort(List<OrderKey> orderKeys,
+    public AbstractPhysicalSort(PlanType type, List<OrderKey> orderKeys,
             LogicalProperties logicalProperties, CHILD_TYPE child) {
-        this(orderKeys, Optional.empty(), logicalProperties, child);
+        this(type, orderKeys, Optional.empty(), logicalProperties, child);
     }
 
     /**
      * Constructor of PhysicalHashJoinNode.
      */
-    public PhysicalHeapSort(List<OrderKey> orderKeys,
+    public AbstractPhysicalSort(PlanType type, List<OrderKey> orderKeys,
             Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
             CHILD_TYPE child) {
-        super(PlanType.PHYSICAL_SORT, groupExpression, logicalProperties, 
child);
-        this.orderKeys = orderKeys;
+        super(type, groupExpression, logicalProperties, child);
+        this.orderKeys = 
ImmutableList.copyOf(Objects.requireNonNull(orderKeys, "orderKeys can not be 
null"));
     }
 
     public List<OrderKey> getOrderKeys() {
@@ -68,7 +65,7 @@ public class PhysicalHeapSort<CHILD_TYPE extends Plan> 
extends PhysicalUnary<CHI
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        PhysicalHeapSort that = (PhysicalHeapSort) o;
+        AbstractPhysicalSort that = (AbstractPhysicalSort) o;
         return Objects.equals(orderKeys, that.orderKeys);
     }
 
@@ -77,37 +74,10 @@ public class PhysicalHeapSort<CHILD_TYPE extends Plan> 
extends PhysicalUnary<CHI
         return Objects.hash(orderKeys);
     }
 
-    @Override
-    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-        return visitor.visitPhysicalHeapSort((PhysicalHeapSort<Plan>) this, 
context);
-    }
-
     @Override
     public List<Expression> getExpressions() {
         return orderKeys.stream()
                 .map(OrderKey::getExpr)
                 .collect(ImmutableList.toImmutableList());
     }
-
-    @Override
-    public PhysicalUnary<Plan> withChildren(List<Plan> children) {
-        Preconditions.checkArgument(children.size() == 1);
-        return new PhysicalHeapSort<>(orderKeys, logicalProperties, 
children.get(0));
-    }
-
-    @Override
-    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new PhysicalHeapSort<>(orderKeys, groupExpression, 
logicalProperties, child());
-    }
-
-    @Override
-    public Plan withLogicalProperties(Optional<LogicalProperties> 
logicalProperties) {
-        return new PhysicalHeapSort<>(orderKeys, Optional.empty(), 
logicalProperties.get(), child());
-    }
-
-    @Override
-    public String toString() {
-        return "PhysicalHeapSort ("
-                + StringUtils.join(orderKeys, ", ") + ")";
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
index 6c30a0c606..5a1d60137f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
@@ -36,7 +36,7 @@ import java.util.Optional;
 public class PhysicalHashJoin<
         LEFT_CHILD_TYPE extends Plan,
         RIGHT_CHILD_TYPE extends Plan>
-        extends PhysicalJoin<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> {
+        extends AbstractPhysicalJoin<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> {
 
     public PhysicalHashJoin(JoinType joinType, Optional<Expression> condition, 
LogicalProperties logicalProperties,
                             LEFT_CHILD_TYPE leftChild, RIGHT_CHILD_TYPE 
rightChild) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
index 0e3452adc9..a5f887cae8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
@@ -36,7 +36,7 @@ import java.util.Optional;
 public class PhysicalNestedLoopJoin<
         LEFT_CHILD_TYPE extends Plan,
         RIGHT_CHILD_TYPE extends Plan>
-        extends PhysicalJoin<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> {
+        extends AbstractPhysicalJoin<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> {
 
     public PhysicalNestedLoopJoin(JoinType joinType, Optional<Expression> 
condition,
             LogicalProperties logicalProperties, LEFT_CHILD_TYPE leftChild, 
RIGHT_CHILD_TYPE rightChild) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHeapSort.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java
similarity index 66%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHeapSort.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java
index 93cc62a763..b1748480d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHeapSort.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java
@@ -30,18 +30,14 @@ import com.google.common.collect.ImmutableList;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
 
 /**
- * Physical sort plan.
+ * Physical quick sort plan.
  */
-public class PhysicalHeapSort<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_TYPE> {
+public class PhysicalQuickSort<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<CHILD_TYPE> {
 
-    private final List<OrderKey> orderKeys;
-
-
-    public PhysicalHeapSort(List<OrderKey> orderKeys,
+    public PhysicalQuickSort(List<OrderKey> orderKeys,
             LogicalProperties logicalProperties, CHILD_TYPE child) {
         this(orderKeys, Optional.empty(), logicalProperties, child);
     }
@@ -49,37 +45,15 @@ public class PhysicalHeapSort<CHILD_TYPE extends Plan> 
extends PhysicalUnary<CHI
     /**
      * Constructor of PhysicalHashJoinNode.
      */
-    public PhysicalHeapSort(List<OrderKey> orderKeys,
+    public PhysicalQuickSort(List<OrderKey> orderKeys,
             Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
             CHILD_TYPE child) {
-        super(PlanType.PHYSICAL_SORT, groupExpression, logicalProperties, 
child);
-        this.orderKeys = orderKeys;
-    }
-
-    public List<OrderKey> getOrderKeys() {
-        return orderKeys;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        PhysicalHeapSort that = (PhysicalHeapSort) o;
-        return Objects.equals(orderKeys, that.orderKeys);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(orderKeys);
+        super(PlanType.PHYSICAL_QUICK_SORT, orderKeys, groupExpression, 
logicalProperties, child);
     }
 
     @Override
     public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-        return visitor.visitPhysicalHeapSort((PhysicalHeapSort<Plan>) this, 
context);
+        return visitor.visitPhysicalQuickSort((PhysicalQuickSort<Plan>) this, 
context);
     }
 
     @Override
@@ -92,22 +66,22 @@ public class PhysicalHeapSort<CHILD_TYPE extends Plan> 
extends PhysicalUnary<CHI
     @Override
     public PhysicalUnary<Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new PhysicalHeapSort<>(orderKeys, logicalProperties, 
children.get(0));
+        return new PhysicalQuickSort<>(orderKeys, logicalProperties, 
children.get(0));
     }
 
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new PhysicalHeapSort<>(orderKeys, groupExpression, 
logicalProperties, child());
+        return new PhysicalQuickSort<>(orderKeys, groupExpression, 
logicalProperties, child());
     }
 
     @Override
     public Plan withLogicalProperties(Optional<LogicalProperties> 
logicalProperties) {
-        return new PhysicalHeapSort<>(orderKeys, Optional.empty(), 
logicalProperties.get(), child());
+        return new PhysicalQuickSort<>(orderKeys, Optional.empty(), 
logicalProperties.get(), child());
     }
 
     @Override
     public String toString() {
-        return "PhysicalHeapSort ("
+        return "PhysicalQuickSort ("
                 + StringUtils.join(orderKeys, ", ") + ")";
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHeapSort.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
similarity index 62%
rename from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHeapSort.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index 93cc62a763..7a93e3b41c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHeapSort.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -23,6 +23,7 @@ import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.TopN;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 
 import com.google.common.base.Preconditions;
@@ -34,30 +35,36 @@ import java.util.Objects;
 import java.util.Optional;
 
 /**
- * Physical sort plan.
+ * Physical top-N plan.
  */
-public class PhysicalHeapSort<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_TYPE> {
+public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<CHILD_TYPE> implements TopN {
 
-    private final List<OrderKey> orderKeys;
+    private final int limit;
+    private final int offset;
 
-
-    public PhysicalHeapSort(List<OrderKey> orderKeys,
+    public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
             LogicalProperties logicalProperties, CHILD_TYPE child) {
-        this(orderKeys, Optional.empty(), logicalProperties, child);
+        this(orderKeys, limit, offset, Optional.empty(), logicalProperties, 
child);
     }
 
     /**
      * Constructor of PhysicalHashJoinNode.
      */
-    public PhysicalHeapSort(List<OrderKey> orderKeys,
+    public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
             Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
             CHILD_TYPE child) {
-        super(PlanType.PHYSICAL_SORT, groupExpression, logicalProperties, 
child);
-        this.orderKeys = orderKeys;
+        super(PlanType.PHYSICAL_TOP_N, orderKeys, groupExpression, 
logicalProperties, child);
+        Objects.requireNonNull(orderKeys, "orderKeys should not be null in 
PhysicalTopN.");
+        this.limit = limit;
+        this.offset = offset;
+    }
+
+    public int getLimit() {
+        return limit;
     }
 
-    public List<OrderKey> getOrderKeys() {
-        return orderKeys;
+    public int getOffset() {
+        return offset;
     }
 
     @Override
@@ -68,18 +75,21 @@ public class PhysicalHeapSort<CHILD_TYPE extends Plan> 
extends PhysicalUnary<CHI
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        PhysicalHeapSort that = (PhysicalHeapSort) o;
-        return Objects.equals(orderKeys, that.orderKeys);
+        if (!super.equals(o)) {
+            return false;
+        }
+        PhysicalTopN<?> that = (PhysicalTopN<?>) o;
+        return limit == that.limit && offset == that.offset;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(orderKeys);
+        return Objects.hash(super.hashCode(), limit, offset);
     }
 
     @Override
     public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-        return visitor.visitPhysicalHeapSort((PhysicalHeapSort<Plan>) this, 
context);
+        return visitor.visitPhysicalTopN((PhysicalTopN<Plan>) this, context);
     }
 
     @Override
@@ -92,22 +102,24 @@ public class PhysicalHeapSort<CHILD_TYPE extends Plan> 
extends PhysicalUnary<CHI
     @Override
     public PhysicalUnary<Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new PhysicalHeapSort<>(orderKeys, logicalProperties, 
children.get(0));
+        return new PhysicalTopN<>(orderKeys, limit, offset, logicalProperties, 
children.get(0));
     }
 
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new PhysicalHeapSort<>(orderKeys, groupExpression, 
logicalProperties, child());
+        return new PhysicalTopN<>(orderKeys, limit, offset, groupExpression, 
logicalProperties, child());
     }
 
     @Override
     public Plan withLogicalProperties(Optional<LogicalProperties> 
logicalProperties) {
-        return new PhysicalHeapSort<>(orderKeys, Optional.empty(), 
logicalProperties.get(), child());
+        return new PhysicalTopN<>(orderKeys, limit, offset, Optional.empty(), 
logicalProperties.get(), child());
     }
 
     @Override
     public String toString() {
-        return "PhysicalHeapSort ("
-                + StringUtils.join(orderKeys, ", ") + ")";
+        return "PhysicalTopN ("
+                + "limit=" + limit
+                + ", offset=" + offset
+                + ", " + StringUtils.join(orderKeys, ", ") + ")";
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
index c755d35154..6d75bf5819 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
@@ -35,16 +35,19 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSelectHint;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribution;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 
 /**
  * Base class for the processing of logical and physical plan.
@@ -109,6 +112,10 @@ public abstract class PlanVisitor<R, C> {
         return visit(sort, context);
     }
 
+    public R visitLogicalTopN(LogicalTopN<Plan> topN, C context) {
+        return visit(topN, context);
+    }
+
     public R visitLogicalLimit(LogicalLimit<Plan> limit, C context) {
         return visit(limit, context);
     }
@@ -149,10 +156,18 @@ public abstract class PlanVisitor<R, C> {
         return visitPhysicalScan(olapScan, context);
     }
 
-    public R visitPhysicalHeapSort(PhysicalHeapSort<Plan> sort, C context) {
+    public R visitAbstractPhysicalSort(AbstractPhysicalSort<Plan> sort, C 
context) {
         return visit(sort, context);
     }
 
+    public R visitPhysicalQuickSort(PhysicalQuickSort<Plan> sort, C context) {
+        return visitAbstractPhysicalSort(sort, context);
+    }
+
+    public R visitPhysicalTopN(PhysicalTopN<Plan> topN, C context) {
+        return visit(topN, context);
+    }
+
     public R visitPhysicalLimit(PhysicalLimit<Plan> limit, C context) {
         return visit(limit, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
index 1246838c05..3648d1603f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
@@ -24,7 +24,7 @@ import 
org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.Join;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalJoin;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -37,19 +37,19 @@ import java.util.Set;
  * Utils for join
  */
 public class JoinUtils {
-    public static boolean onlyBroadcast(PhysicalJoin join) {
+    public static boolean onlyBroadcast(AbstractPhysicalJoin join) {
         // Cross-join only can be broadcast join.
         return join.getJoinType().isCrossJoin();
     }
 
-    public static boolean onlyShuffle(PhysicalJoin join) {
+    public static boolean onlyShuffle(AbstractPhysicalJoin join) {
         return join.getJoinType().isRightJoin() || 
join.getJoinType().isFullOuterJoin();
     }
 
     /**
      * Get all equalTo from onClause of join
      */
-    public static List<EqualTo> getEqualTo(PhysicalJoin<Plan, Plan> join) {
+    public static List<EqualTo> getEqualTo(AbstractPhysicalJoin<Plan, Plan> 
join) {
         List<EqualTo> eqConjuncts = Lists.newArrayList();
         if (!join.getCondition().isPresent()) {
             return eqConjuncts;
@@ -92,7 +92,7 @@ public class JoinUtils {
      * Return pair of left used slots and right used slots.
      */
     public static Pair<List<SlotReference>, List<SlotReference>> 
getOnClauseUsedSlots(
-            PhysicalJoin<Plan, Plan> join) {
+            AbstractPhysicalJoin<Plan, Plan> join) {
         Pair<List<SlotReference>, List<SlotReference>> childSlots =
                 new Pair<>(Lists.newArrayList(), Lists.newArrayList());
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
new file mode 100644
index 0000000000..d61922db91
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
+import org.apache.doris.nereids.types.BigIntType;
+import org.apache.doris.nereids.types.IntegerType;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class ImplementationTest {
+    private static final Map<String, Rule> rulesMap
+            = ImmutableMap.<String, Rule>builder()
+            .put(LogicalProject.class.getName(), (new 
LogicalProjectToPhysicalProject()).build())
+            .put(LogicalAggregate.class.getName(), (new 
LogicalAggToPhysicalHashAgg()).build())
+            .put(LogicalJoin.class.getName(), (new 
LogicalJoinToHashJoin()).build())
+            .put(LogicalOlapScan.class.getName(), (new 
LogicalOlapScanToPhysicalOlapScan()).build())
+            .put(LogicalFilter.class.getName(), (new 
LogicalFilterToPhysicalFilter()).build())
+            .put(LogicalSort.class.getName(), (new 
LogicalSortToPhysicalQuickSort()).build())
+            .put(LogicalTopN.class.getName(), (new 
LogicalTopNToPhysicalTopN()).build())
+            .put(LogicalLimit.class.getName(), (new 
LogicalLimitToPhysicalLimit()).build())
+            .build();
+    @Mocked
+    private CascadesContext cascadesContext;
+    @Mocked
+    private GroupPlan groupPlan;
+
+    public PhysicalPlan executeImplementationRule(LogicalPlan plan) {
+        Rule rule = rulesMap.get(plan.getClass().getName());
+        List<Plan> transform = rule.transform(plan, cascadesContext);
+        Assertions.assertEquals(1, transform.size());
+        Assertions.assertTrue(transform.get(0) instanceof PhysicalPlan);
+        return (PhysicalPlan) transform.get(0);
+    }
+
+    @Test
+    public void toPhysicalProjectTest() {
+        SlotReference col1 = new SlotReference("col1", BigIntType.INSTANCE);
+        SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE);
+        LogicalPlan project = new LogicalProject<>(Lists.newArrayList(col1, 
col2), groupPlan);
+
+        PhysicalPlan physicalPlan = executeImplementationRule(project);
+        Assertions.assertEquals(PlanType.PHYSICAL_PROJECT, 
physicalPlan.getType());
+        PhysicalProject physicalProject = (PhysicalProject) physicalPlan;
+        Assertions.assertEquals(2, physicalProject.getExpressions().size());
+        Assertions.assertEquals(col1, physicalProject.getExpressions().get(0));
+        Assertions.assertEquals(col2, physicalProject.getExpressions().get(1));
+    }
+
+    @Test
+    public void toPhysicalTopNTest() {
+        int limit = 10;
+        int offset = 100;
+        OrderKey key1 = new OrderKey(new SlotReference("col1", 
IntegerType.INSTANCE), true, true);
+        OrderKey key2 = new OrderKey(new SlotReference("col2", 
IntegerType.INSTANCE), true, true);
+        LogicalTopN<GroupPlan> topN = new 
LogicalTopN<>(Lists.newArrayList(key1, key2), limit, offset, groupPlan);
+
+        PhysicalPlan physicalPlan = executeImplementationRule(topN);
+        Assertions.assertEquals(PlanType.PHYSICAL_TOP_N, 
physicalPlan.getType());
+        PhysicalTopN physicalTopN = (PhysicalTopN) physicalPlan;
+        Assertions.assertEquals(limit, physicalTopN.getLimit());
+        Assertions.assertEquals(offset, physicalTopN.getOffset());
+        Assertions.assertEquals(2, physicalTopN.getOrderKeys().size());
+        Assertions.assertEquals(key1, physicalTopN.getOrderKeys().get(0));
+        Assertions.assertEquals(key2, physicalTopN.getOrderKeys().get(1));
+    }
+
+    @Test
+    public void toPhysicalLimitTest() {
+        int limit = 10;
+        int offset = 100;
+        LogicalLimit logicalLimit = new LogicalLimit<>(limit, offset, 
groupPlan);
+        PhysicalPlan physicalPlan = executeImplementationRule(logicalLimit);
+        Assertions.assertEquals(PlanType.PHYSICAL_LIMIT, 
physicalPlan.getType());
+        PhysicalLimit physicalLimit = (PhysicalLimit) physicalPlan;
+        Assertions.assertEquals(limit, physicalLimit.getLimit());
+        Assertions.assertEquals(offset, physicalLimit.getOffset());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimitTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimitTest.java
deleted file mode 100644
index af3e813bb6..0000000000
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimitTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.rules.implementation;
-
-import org.apache.doris.nereids.CascadesContext;
-import org.apache.doris.nereids.memo.Group;
-import org.apache.doris.nereids.rules.Rule;
-import org.apache.doris.nereids.trees.plans.GroupPlan;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.PlanType;
-import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
-import org.apache.doris.nereids.util.MemoTestUtils;
-
-import mockit.Mocked;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-
-public class LogicalLimitToPhysicalLimitTest {
-    @Test
-    public void toPhysicalLimitTest(@Mocked Group group) {
-        Plan logicalPlan = new LogicalLimit<>(3, 4, new GroupPlan(group));
-        CascadesContext cascadesContext = 
MemoTestUtils.createCascadesContext(logicalPlan);
-        Rule rule = new LogicalLimitToPhysicalLimit().build();
-        List<Plan> physicalPlans = rule.transform(logicalPlan, 
cascadesContext);
-        Assertions.assertEquals(1, physicalPlans.size());
-        Plan impl = physicalPlans.get(0);
-        Assertions.assertEquals(PlanType.PHYSICAL_LIMIT, impl.getType());
-    }
-}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/LogicalProjectToPhysicalProjectTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/LogicalProjectToPhysicalProjectTest.java
deleted file mode 100644
index 2bc9bdea92..0000000000
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/LogicalProjectToPhysicalProjectTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.rules.implementation;
-
-import org.apache.doris.nereids.CascadesContext;
-import org.apache.doris.nereids.memo.Group;
-import org.apache.doris.nereids.rules.Rule;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.PlanType;
-import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
-import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
-import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
-import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
-import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
-import org.apache.doris.nereids.util.MemoTestUtils;
-import org.apache.doris.nereids.util.PlanConstructor;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.Map;
-
-public class LogicalProjectToPhysicalProjectTest {
-    private static final Map<String, Rule> rulesMap
-            = ImmutableMap.<String, Rule>builder()
-            .put(LogicalProject.class.getName(), (new 
LogicalProjectToPhysicalProject()).build())
-            .put(LogicalAggregate.class.getName(), (new 
LogicalAggToPhysicalHashAgg()).build())
-            .put(LogicalJoin.class.getName(), (new 
LogicalJoinToHashJoin()).build())
-            .put(LogicalOlapScan.class.getName(), (new 
LogicalOlapScanToPhysicalOlapScan()).build())
-            .put(LogicalFilter.class.getName(), (new 
LogicalFilterToPhysicalFilter()).build())
-            .put(LogicalSort.class.getName(), (new 
LogicalSortToPhysicalHeapSort()).build())
-            .build();
-
-    private static PhysicalPlan rewriteLogicalToPhysical(Group group, 
CascadesContext cascadesContext) {
-        List<Plan> children = Lists.newArrayList();
-        for (Group child : group.getLogicalExpression().children()) {
-            children.add(rewriteLogicalToPhysical(child, cascadesContext));
-        }
-
-        Rule rule = 
rulesMap.get(group.getLogicalExpression().getPlan().getClass().getName());
-        List<Plan> transform = 
rule.transform(group.getLogicalExpression().getPlan(), cascadesContext);
-        Assertions.assertEquals(1, transform.size());
-        Assertions.assertTrue(transform.get(0) instanceof PhysicalPlan);
-        PhysicalPlan implPlanNode = (PhysicalPlan) transform.get(0);
-
-        return (PhysicalPlan) implPlanNode.withChildren(children);
-    }
-
-    @Test
-    public void projectionImplTest() {
-        LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "a", 0);
-        LogicalPlan project = new LogicalProject<>(Lists.newArrayList(), scan);
-
-        CascadesContext cascadesContext = 
MemoTestUtils.createCascadesContext(project);
-
-        PhysicalPlan physicalProject = 
rewriteLogicalToPhysical(cascadesContext.getMemo().getRoot(), cascadesContext);
-        Assertions.assertEquals(PlanType.PHYSICAL_PROJECT, 
physicalProject.getType());
-        PhysicalPlan physicalScan = (PhysicalPlan) physicalProject.child(0);
-        Assertions.assertEquals(PlanType.PHYSICAL_OLAP_SCAN, 
physicalScan.getType());
-    }
-}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
index a6ccb7c7b7..628161c971 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
@@ -39,6 +39,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
 import org.apache.doris.nereids.types.IntegerType;
 import org.apache.doris.nereids.util.PlanConstructor;
 import org.apache.doris.qe.ConnectContext;
@@ -252,12 +253,7 @@ public class StatsCalculatorTest {
         StatsDeriveResult childStats = new StatsDeriveResult(10, 
slotColumnStatsMap);
 
         Group childGroup = new Group();
-        childGroup.setLogicalProperties(new LogicalProperties(new 
Supplier<List<Slot>>() {
-            @Override
-            public List<Slot> get() {
-                return Collections.emptyList();
-            }
-        }));
+        childGroup.setLogicalProperties(new 
LogicalProperties(Collections::emptyList));
         GroupPlan groupPlan = new GroupPlan(childGroup);
         childGroup.setStatistics(childStats);
 
@@ -269,9 +265,41 @@ public class StatsCalculatorTest {
         StatsCalculator statsCalculator = new StatsCalculator(groupExpression);
         statsCalculator.estimate();
         StatsDeriveResult limitStats = ownerGroup.getStatistics();
-        Assertions.assertEquals((long) (1), limitStats.getRowCount());
+        Assertions.assertEquals(1, limitStats.getRowCount());
         ColumnStats slot1Stats = limitStats.getSlotToColumnStats().get(slot1);
         Assertions.assertEquals(1, slot1Stats.getNdv());
         Assertions.assertEquals(1, slot1Stats.getNumNulls());
     }
+
+    @Test
+    public void testTopN() {
+        List<String> qualifier = new ArrayList<>();
+        qualifier.add("test");
+        qualifier.add("t");
+        SlotReference slot1 = new SlotReference("c1", IntegerType.INSTANCE, 
true, qualifier);
+        ColumnStats columnStats1 = new ColumnStats();
+        columnStats1.setNdv(10);
+        columnStats1.setNumNulls(5);
+        Map<Slot, ColumnStats> slotColumnStatsMap = new HashMap<>();
+        slotColumnStatsMap.put(slot1, columnStats1);
+        StatsDeriveResult childStats = new StatsDeriveResult(10, 
slotColumnStatsMap);
+
+        Group childGroup = new Group();
+        childGroup.setLogicalProperties(new 
LogicalProperties(Collections::emptyList));
+        GroupPlan groupPlan = new GroupPlan(childGroup);
+        childGroup.setStatistics(childStats);
+
+        LogicalTopN<GroupPlan> logicalTopN = new 
LogicalTopN<>(Collections.emptyList(), 1, 2, groupPlan);
+        GroupExpression groupExpression = new GroupExpression(logicalTopN);
+        groupExpression.addChild(childGroup);
+        Group ownerGroup = new Group();
+        ownerGroup.addGroupExpression(groupExpression);
+        StatsCalculator statsCalculator = new StatsCalculator(groupExpression);
+        statsCalculator.estimate();
+        StatsDeriveResult topNStats = ownerGroup.getStatistics();
+        Assertions.assertEquals(1, topNStats.getRowCount());
+        ColumnStats slot1Stats = topNStats.getSlotToColumnStats().get(slot1);
+        Assertions.assertEquals(1, slot1Stats.getNdv());
+        Assertions.assertEquals(1, slot1Stats.getNumNulls());
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanEqualsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanEqualsTest.java
index d851435540..36b79e57f2 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanEqualsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanEqualsTest.java
@@ -35,9 +35,9 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
 import org.apache.doris.nereids.types.BigIntType;
 import org.apache.doris.nereids.util.PlanConstructor;
 
@@ -218,12 +218,12 @@ public class PlanEqualsTest {
         // TODO: Depend on List<OrderKey> Equals
         List<OrderKey> orderKeyList = Lists.newArrayList();
 
-        PhysicalHeapSort physicalHeapSort = new PhysicalHeapSort(orderKeyList, 
logicalProperties, child);
-        Assertions.assertEquals(physicalHeapSort, physicalHeapSort);
+        PhysicalQuickSort physicalQuickSort = new 
PhysicalQuickSort(orderKeyList, logicalProperties, child);
+        Assertions.assertEquals(physicalQuickSort, physicalQuickSort);
 
         List<OrderKey> orderKeyListClone = Lists.newArrayList();
-        PhysicalHeapSort physicalHeapSortClone = new 
PhysicalHeapSort(orderKeyListClone, logicalProperties,
+        PhysicalQuickSort physicalQuickSortClone = new 
PhysicalQuickSort(orderKeyListClone, logicalProperties,
                 child);
-        Assertions.assertEquals(physicalHeapSort, physicalHeapSortClone);
+        Assertions.assertEquals(physicalQuickSort, physicalQuickSortClone);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to