This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d9924c9b8e [Improvement](topn) add limit threashold session variable 
and fuzzy for topn optimizations (#16514)
d9924c9b8e is described below

commit d9924c9b8ec04c5c44c302f3a7638d4cc47148fc
Author: Kang <kxiao.ti...@gmail.com>
AuthorDate: Fri Feb 10 12:56:33 2023 +0800

    [Improvement](topn) add limit threashold session variable and fuzzy for 
topn optimizations (#16514)
    
    1. add limit threshold for topn runtime pushdown and key topn optimization
    2. use unified session variable topn_opt_limit_threshold for all topn 
optimizations
    3. add fuzzy support for topn_opt_limit_threshold
---
 docs/en/docs/advanced/variables.md                 |  4 ++
 docs/zh-CN/docs/advanced/variables.md              |  3 +
 .../main/java/org/apache/doris/common/Config.java  |  3 -
 .../java/org/apache/doris/analysis/SelectStmt.java |  4 +-
 .../org/apache/doris/planner/OlapScanNode.java     |  6 ++
 .../org/apache/doris/planner/OriginalPlanner.java  |  2 +
 .../java/org/apache/doris/qe/SessionVariable.java  | 10 ++-
 .../java/org/apache/doris/planner/PlannerTest.java | 80 +++++++++++++++++-----
 8 files changed, 88 insertions(+), 24 deletions(-)

diff --git a/docs/en/docs/advanced/variables.md 
b/docs/en/docs/advanced/variables.md
index 9725672cc3..b995bc3d44 100644
--- a/docs/en/docs/advanced/variables.md
+++ b/docs/en/docs/advanced/variables.md
@@ -571,3 +571,7 @@ Translated with www.DeepL.com/Translator (free version)
 * `enable_file_cache`
 
     Set wether to use block file cache. This variable takes effect only if the 
BE config enable_file_cache=true. The cache is not used when BE config 
enable_file_cache=false.
+
+* `topn_opt_limit_threshold`
+
+    Set threshold for limit of topn query (eg. SELECT * FROM t ORDER BY k 
LIMIT n). If n <= threshold, topn optimizations(runtime predicate pushdown, two 
phase result fetch and read order by key) will enable automatically, otherwise 
disable. Default value is 1024.
diff --git a/docs/zh-CN/docs/advanced/variables.md 
b/docs/zh-CN/docs/advanced/variables.md
index 4c8e8ee06e..dcbdb501e3 100644
--- a/docs/zh-CN/docs/advanced/variables.md
+++ b/docs/zh-CN/docs/advanced/variables.md
@@ -560,3 +560,6 @@ SELECT /*+ SET_VAR(query_timeout = 1, 
enable_partition_cache=true) */ sleep(3);
 
     控制是否启用block file 
cache。该变量只有在be.conf中enable_file_cache=true时才有效,如果be.conf中enable_file_cache=false,则block
 file cache一直处于禁用状态。
        
+* `topn_opt_limit_threshold`
+
+    设置topn优化的limit阈值 (例如:SELECT * FROM t ORDER BY k LIMIT n). 
如果limit的n小于等于阈值,topn相关优化(动态过滤下推、两阶段获取结果、按key的顺序读数据)会自动启用,否则会禁用。默认值是1024。
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index ef325d357c..6ef6184e81 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1969,9 +1969,6 @@ public class Config extends ConfigBase {
     @ConfField(masterOnly = true, mutable = true)
     public static int max_error_tablet_of_broker_load = 3;
 
-    @ConfField(mutable = false)
-    public static int topn_two_phase_limit_threshold = 512;
-
     /**
      * Used to set session variables randomly to check more issues in github 
workflow
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index a8b572d040..d0819885db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -708,8 +708,8 @@ public class SelectStmt extends QueryStmt {
         // Only TOPN query at present
         if (getOrderByElements() == null
                     || !hasLimit()
-                    || getLimit() == 0
-                    || getLimit() > 
ConnectContext.get().getSessionVariable().twoPhaseReadLimitThreshold) {
+                    || getLimit() <= 0
+                    || getLimit() > 
ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
             return false;
         }
         // Check order by exprs are all slot refs
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index ccc1d14b0e..2d6bd2d12e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -940,6 +940,12 @@ public class OlapScanNode extends ScanNode {
      * Check Parent sort node can push down to child olap scan.
      */
     public boolean checkPushSort(SortNode sortNode) {
+        // Ensure limit is less then threshold
+        if (sortNode.getLimit() <= 0
+                || sortNode.getLimit() > 
ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
+            return false;
+        }
+
         // Ensure all isAscOrder is same, ande length != 0.
         // Can't be zorder.
         if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() 
!= 1
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 5f0abd125d..00328d1ef9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -448,6 +448,8 @@ public class OriginalPlanner extends Planner {
             SortNode sortNode = (SortNode) node;
             PlanNode child = sortNode.getChild(0);
             if (child instanceof OlapScanNode && sortNode.getLimit() > 0
+                    && ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null
+                    && sortNode.getLimit() <= 
ConnectContext.get().getSessionVariable().topnOptLimitThreshold
                     && 
sortNode.getSortInfo().getMaterializedOrderingExprs().size() > 0) {
                 Expr firstSortExpr = 
sortNode.getSortInfo().getMaterializedOrderingExprs().get(0);
                 if (firstSortExpr instanceof SlotRef && 
!firstSortExpr.getType().isStringType()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e85a4dcb33..28c2028226 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -259,7 +259,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String EXTERNAL_SORT_BYTES_THRESHOLD = 
"external_sort_bytes_threshold";
 
     public static final String ENABLE_TWO_PHASE_READ_OPT = 
"enable_two_phase_read_opt";
-    public static final String TWO_PHASE_READ_OPT_LIMIT_THRESHOLD = 
"two_phase_read_opt_limit_threshold";
+    public static final String TOPN_OPT_LIMIT_THRESHOLD = 
"topn_opt_limit_threshold";
+
     public static final String ENABLE_FILE_CACHE = "enable_file_cache";
 
     public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST = 
"group_by_and_having_use_alias_first";
@@ -681,8 +682,8 @@ public class SessionVariable implements Serializable, 
Writable {
     // 2. spawn fetch RPC to other nodes to get related data by sorted rowids
     @VariableMgr.VarAttr(name = ENABLE_TWO_PHASE_READ_OPT, fuzzy = true)
     public boolean enableTwoPhaseReadOpt = true;
-    @VariableMgr.VarAttr(name = TWO_PHASE_READ_OPT_LIMIT_THRESHOLD)
-    public long twoPhaseReadLimitThreshold = 512;
+    @VariableMgr.VarAttr(name = TOPN_OPT_LIMIT_THRESHOLD)
+    public long topnOptLimitThreshold = 1024;
 
     // Default value is false, which means the group by and having clause
     // should first use column name not alias. According to mysql.
@@ -735,6 +736,9 @@ public class SessionVariable implements Serializable, 
Writable {
             // this.enableFoldConstantByBe = false;
             // this.enableTwoPhaseReadOpt = true;
         }
+
+        // set random 1, 10, 100, 1000, 10000
+        this.topnOptLimitThreshold = 10 ^ (random.nextInt(5));
     }
 
     public String printFuzzyVariables() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
index 2b5e0c3266..a23aab5d6c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
@@ -508,17 +508,53 @@ public class PlannerTest extends TestWithFeService {
 
     @Test
     public void testPushSortToOlapScan() throws Exception {
-        // Push sort successfully
-        String sql1 = "explain select k1 from db1.tbl1 order by k1, k2";
+        // Push sort fail without limit
+        String sql1 = "explain select k1 from db1.tbl3 order by k1, k2";
         StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1);
         stmtExecutor1.execute();
         Planner planner1 = stmtExecutor1.planner();
         String plan1 = planner1.getExplainString(new ExplainOptions(false, 
false));
+        Assertions.assertFalse(plan1.contains("SORT INFO:\n          `k1`\n    
      `k2`"));
+        Assertions.assertFalse(plan1.contains("SORT LIMIT:"));
+        Assertions.assertFalse(plan1.contains("TOPN OPT"));
+
+        // Push sort fail limit > topnOptLimitThreshold
+        sql1 = "explain select k1 from db1.tbl3 order by k1, k2 limit "
+            + (connectContext.getSessionVariable().topnOptLimitThreshold + 1);
+        stmtExecutor1 = new StmtExecutor(connectContext, sql1);
+        stmtExecutor1.execute();
+        planner1 = stmtExecutor1.planner();
+        plan1 = planner1.getExplainString(new ExplainOptions(false, false));
+        Assertions.assertFalse(plan1.contains("SORT INFO:\n          `k1`\n    
      `k2`"));
+        Assertions.assertFalse(plan1.contains("SORT LIMIT:"));
+        Assertions.assertFalse(plan1.contains("TOPN OPT"));
+
+        // Push sort success limit = topnOptLimitThreshold
+        sql1 = "explain select k1 from db1.tbl3 order by k1, k2 limit "
+            + (connectContext.getSessionVariable().topnOptLimitThreshold);
+        stmtExecutor1 = new StmtExecutor(connectContext, sql1);
+        stmtExecutor1.execute();
+        planner1 = stmtExecutor1.planner();
+        plan1 = planner1.getExplainString(new ExplainOptions(false, false));
         Assertions.assertTrue(plan1.contains("SORT INFO:\n          `k1`\n     
     `k2`"));
         Assertions.assertTrue(plan1.contains("SORT LIMIT:"));
+        Assertions.assertTrue(plan1.contains("TOPN OPT"));
+
+        // Push sort success limit < topnOptLimitThreshold
+        if (connectContext.getSessionVariable().topnOptLimitThreshold > 1) {
+            sql1 = "explain select k1 from db1.tbl3 order by k1, k2 limit "
+                + (connectContext.getSessionVariable().topnOptLimitThreshold - 
1);
+            stmtExecutor1 = new StmtExecutor(connectContext, sql1);
+            stmtExecutor1.execute();
+            planner1 = stmtExecutor1.planner();
+            plan1 = planner1.getExplainString(new ExplainOptions(false, 
false));
+            Assertions.assertTrue(plan1.contains("SORT INFO:\n          `k1`\n 
         `k2`"));
+            Assertions.assertTrue(plan1.contains("SORT LIMIT:"));
+            Assertions.assertTrue(plan1.contains("TOPN OPT"));
+        }
 
         // Push sort failed
-        String sql2 = "explain select k1, k2, k3 from db1.tbl1 order by k1, 
k3, k2";
+        String sql2 = "explain select k1, k2, k3 from db1.tbl3 order by k1, 
k3, k2";
         StmtExecutor stmtExecutor2 = new StmtExecutor(connectContext, sql2);
         stmtExecutor2.execute();
         Planner planner2 = stmtExecutor2.planner();
@@ -529,9 +565,10 @@ public class PlannerTest extends TestWithFeService {
 
     @Test
     public void testEliminatingSortNode() throws Exception {
-            // fail case 1
+            // success case 1
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 order 
by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 order 
by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -542,7 +579,8 @@ public class PlannerTest extends TestWithFeService {
 
             // fail case 2
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k3 
= 2 order by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k3 
= 2 order by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -553,7 +591,8 @@ public class PlannerTest extends TestWithFeService {
 
             // fail case 3
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 
!= 2 order by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 
!= 2 order by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -564,7 +603,8 @@ public class PlannerTest extends TestWithFeService {
 
             // fail case 4
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 or k2 
= 2 order by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 or k2 
= 2 order by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -575,7 +615,8 @@ public class PlannerTest extends TestWithFeService {
 
             // fail case 5
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 
= 2 or k3 = 3 order by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 
= 2 or k3 = 3 order by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -587,7 +628,8 @@ public class PlannerTest extends TestWithFeService {
             // fail case 6
             // TODO, support: in (select 1)
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k1 in (select 
1) and k2 = 2 order by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k1 in (select 
1) and k2 = 2 order by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -597,7 +639,8 @@ public class PlannerTest extends TestWithFeService {
 
             // fail case 7
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k1 not in (1) 
and k2 = 2 order by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k1 not in (1) 
and k2 = 2 order by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -607,7 +650,8 @@ public class PlannerTest extends TestWithFeService {
 
             // success case 1
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 
= 2 order by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 
= 2 order by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -618,7 +662,8 @@ public class PlannerTest extends TestWithFeService {
 
             // success case 2
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k3 = 3 and k2 
= 2 and k1 = 1 order by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k3 = 3 and k2 
= 2 and k1 = 1 order by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -629,7 +674,8 @@ public class PlannerTest extends TestWithFeService {
 
             // success case 3
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k1 in (1) and 
k2 in (2) and k2 !=2 order by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k1 in (1) and 
k2 in (2) and k2 !=2 order by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -640,7 +686,8 @@ public class PlannerTest extends TestWithFeService {
 
             // success case 4
             {
-            String sql1 = "explain select k1 from db1.tbl1 where k1 in 
(concat('1','2')) and k2 = 2 order by k1, k2";
+            String sql1 = "explain select k1 from db1.tbl1 where k1 in 
(concat('1','2')) and k2 = 2 order by k1, k2 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();
@@ -652,7 +699,8 @@ public class PlannerTest extends TestWithFeService {
             // success case 5
             {
             String sql1 = "explain select tbl1.k1 from db1.tbl1 join db1.tbl2 
on tbl1.k1 = tbl2.k1"
-                    + " where tbl1.k1 = 1 and tbl2.k1 = 2 and tbl1.k2 = 3 
order by tbl1.k1, tbl2.k1";
+                    + " where tbl1.k1 = 1 and tbl2.k1 = 2 and tbl1.k2 = 3 
order by tbl1.k1, tbl2.k1 limit "
+                    + 
connectContext.getSessionVariable().topnOptLimitThreshold;
             StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, 
sql1);
             stmtExecutor1.execute();
             Planner planner1 = stmtExecutor1.planner();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to