This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d9924c9b8e [Improvement](topn) add limit threashold session variable and fuzzy for topn optimizations (#16514) d9924c9b8e is described below commit d9924c9b8ec04c5c44c302f3a7638d4cc47148fc Author: Kang <kxiao.ti...@gmail.com> AuthorDate: Fri Feb 10 12:56:33 2023 +0800 [Improvement](topn) add limit threashold session variable and fuzzy for topn optimizations (#16514) 1. add limit threshold for topn runtime pushdown and key topn optimization 2. use unified session variable topn_opt_limit_threshold for all topn optimizations 3. add fuzzy support for topn_opt_limit_threshold --- docs/en/docs/advanced/variables.md | 4 ++ docs/zh-CN/docs/advanced/variables.md | 3 + .../main/java/org/apache/doris/common/Config.java | 3 - .../java/org/apache/doris/analysis/SelectStmt.java | 4 +- .../org/apache/doris/planner/OlapScanNode.java | 6 ++ .../org/apache/doris/planner/OriginalPlanner.java | 2 + .../java/org/apache/doris/qe/SessionVariable.java | 10 ++- .../java/org/apache/doris/planner/PlannerTest.java | 80 +++++++++++++++++----- 8 files changed, 88 insertions(+), 24 deletions(-) diff --git a/docs/en/docs/advanced/variables.md b/docs/en/docs/advanced/variables.md index 9725672cc3..b995bc3d44 100644 --- a/docs/en/docs/advanced/variables.md +++ b/docs/en/docs/advanced/variables.md @@ -571,3 +571,7 @@ Translated with www.DeepL.com/Translator (free version) * `enable_file_cache` Set wether to use block file cache. This variable takes effect only if the BE config enable_file_cache=true. The cache is not used when BE config enable_file_cache=false. + +* `topn_opt_limit_threshold` + + Set threshold for limit of topn query (eg. SELECT * FROM t ORDER BY k LIMIT n). If n <= threshold, topn optimizations(runtime predicate pushdown, two phase result fetch and read order by key) will enable automatically, otherwise disable. Default value is 1024. diff --git a/docs/zh-CN/docs/advanced/variables.md b/docs/zh-CN/docs/advanced/variables.md index 4c8e8ee06e..dcbdb501e3 100644 --- a/docs/zh-CN/docs/advanced/variables.md +++ b/docs/zh-CN/docs/advanced/variables.md @@ -560,3 +560,6 @@ SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3); 控制是否启用block file cache。该变量只有在be.conf中enable_file_cache=true时才有效,如果be.conf中enable_file_cache=false,则block file cache一直处于禁用状态。 +* `topn_opt_limit_threshold` + + 设置topn优化的limit阈值 (例如:SELECT * FROM t ORDER BY k LIMIT n). 如果limit的n小于等于阈值,topn相关优化(动态过滤下推、两阶段获取结果、按key的顺序读数据)会自动启用,否则会禁用。默认值是1024。 diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index ef325d357c..6ef6184e81 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1969,9 +1969,6 @@ public class Config extends ConfigBase { @ConfField(masterOnly = true, mutable = true) public static int max_error_tablet_of_broker_load = 3; - @ConfField(mutable = false) - public static int topn_two_phase_limit_threshold = 512; - /** * Used to set session variables randomly to check more issues in github workflow */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index a8b572d040..d0819885db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -708,8 +708,8 @@ public class SelectStmt extends QueryStmt { // Only TOPN query at present if (getOrderByElements() == null || !hasLimit() - || getLimit() == 0 - || getLimit() > ConnectContext.get().getSessionVariable().twoPhaseReadLimitThreshold) { + || getLimit() <= 0 + || getLimit() > ConnectContext.get().getSessionVariable().topnOptLimitThreshold) { return false; } // Check order by exprs are all slot refs diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index ccc1d14b0e..2d6bd2d12e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -940,6 +940,12 @@ public class OlapScanNode extends ScanNode { * Check Parent sort node can push down to child olap scan. */ public boolean checkPushSort(SortNode sortNode) { + // Ensure limit is less then threshold + if (sortNode.getLimit() <= 0 + || sortNode.getLimit() > ConnectContext.get().getSessionVariable().topnOptLimitThreshold) { + return false; + } + // Ensure all isAscOrder is same, ande length != 0. // Can't be zorder. if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() != 1 diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 5f0abd125d..00328d1ef9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -448,6 +448,8 @@ public class OriginalPlanner extends Planner { SortNode sortNode = (SortNode) node; PlanNode child = sortNode.getChild(0); if (child instanceof OlapScanNode && sortNode.getLimit() > 0 + && ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null + && sortNode.getLimit() <= ConnectContext.get().getSessionVariable().topnOptLimitThreshold && sortNode.getSortInfo().getMaterializedOrderingExprs().size() > 0) { Expr firstSortExpr = sortNode.getSortInfo().getMaterializedOrderingExprs().get(0); if (firstSortExpr instanceof SlotRef && !firstSortExpr.getType().isStringType() diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index e85a4dcb33..28c2028226 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -259,7 +259,8 @@ public class SessionVariable implements Serializable, Writable { public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold"; public static final String ENABLE_TWO_PHASE_READ_OPT = "enable_two_phase_read_opt"; - public static final String TWO_PHASE_READ_OPT_LIMIT_THRESHOLD = "two_phase_read_opt_limit_threshold"; + public static final String TOPN_OPT_LIMIT_THRESHOLD = "topn_opt_limit_threshold"; + public static final String ENABLE_FILE_CACHE = "enable_file_cache"; public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST = "group_by_and_having_use_alias_first"; @@ -681,8 +682,8 @@ public class SessionVariable implements Serializable, Writable { // 2. spawn fetch RPC to other nodes to get related data by sorted rowids @VariableMgr.VarAttr(name = ENABLE_TWO_PHASE_READ_OPT, fuzzy = true) public boolean enableTwoPhaseReadOpt = true; - @VariableMgr.VarAttr(name = TWO_PHASE_READ_OPT_LIMIT_THRESHOLD) - public long twoPhaseReadLimitThreshold = 512; + @VariableMgr.VarAttr(name = TOPN_OPT_LIMIT_THRESHOLD) + public long topnOptLimitThreshold = 1024; // Default value is false, which means the group by and having clause // should first use column name not alias. According to mysql. @@ -735,6 +736,9 @@ public class SessionVariable implements Serializable, Writable { // this.enableFoldConstantByBe = false; // this.enableTwoPhaseReadOpt = true; } + + // set random 1, 10, 100, 1000, 10000 + this.topnOptLimitThreshold = 10 ^ (random.nextInt(5)); } public String printFuzzyVariables() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java index 2b5e0c3266..a23aab5d6c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java @@ -508,17 +508,53 @@ public class PlannerTest extends TestWithFeService { @Test public void testPushSortToOlapScan() throws Exception { - // Push sort successfully - String sql1 = "explain select k1 from db1.tbl1 order by k1, k2"; + // Push sort fail without limit + String sql1 = "explain select k1 from db1.tbl3 order by k1, k2"; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); String plan1 = planner1.getExplainString(new ExplainOptions(false, false)); + Assertions.assertFalse(plan1.contains("SORT INFO:\n `k1`\n `k2`")); + Assertions.assertFalse(plan1.contains("SORT LIMIT:")); + Assertions.assertFalse(plan1.contains("TOPN OPT")); + + // Push sort fail limit > topnOptLimitThreshold + sql1 = "explain select k1 from db1.tbl3 order by k1, k2 limit " + + (connectContext.getSessionVariable().topnOptLimitThreshold + 1); + stmtExecutor1 = new StmtExecutor(connectContext, sql1); + stmtExecutor1.execute(); + planner1 = stmtExecutor1.planner(); + plan1 = planner1.getExplainString(new ExplainOptions(false, false)); + Assertions.assertFalse(plan1.contains("SORT INFO:\n `k1`\n `k2`")); + Assertions.assertFalse(plan1.contains("SORT LIMIT:")); + Assertions.assertFalse(plan1.contains("TOPN OPT")); + + // Push sort success limit = topnOptLimitThreshold + sql1 = "explain select k1 from db1.tbl3 order by k1, k2 limit " + + (connectContext.getSessionVariable().topnOptLimitThreshold); + stmtExecutor1 = new StmtExecutor(connectContext, sql1); + stmtExecutor1.execute(); + planner1 = stmtExecutor1.planner(); + plan1 = planner1.getExplainString(new ExplainOptions(false, false)); Assertions.assertTrue(plan1.contains("SORT INFO:\n `k1`\n `k2`")); Assertions.assertTrue(plan1.contains("SORT LIMIT:")); + Assertions.assertTrue(plan1.contains("TOPN OPT")); + + // Push sort success limit < topnOptLimitThreshold + if (connectContext.getSessionVariable().topnOptLimitThreshold > 1) { + sql1 = "explain select k1 from db1.tbl3 order by k1, k2 limit " + + (connectContext.getSessionVariable().topnOptLimitThreshold - 1); + stmtExecutor1 = new StmtExecutor(connectContext, sql1); + stmtExecutor1.execute(); + planner1 = stmtExecutor1.planner(); + plan1 = planner1.getExplainString(new ExplainOptions(false, false)); + Assertions.assertTrue(plan1.contains("SORT INFO:\n `k1`\n `k2`")); + Assertions.assertTrue(plan1.contains("SORT LIMIT:")); + Assertions.assertTrue(plan1.contains("TOPN OPT")); + } // Push sort failed - String sql2 = "explain select k1, k2, k3 from db1.tbl1 order by k1, k3, k2"; + String sql2 = "explain select k1, k2, k3 from db1.tbl3 order by k1, k3, k2"; StmtExecutor stmtExecutor2 = new StmtExecutor(connectContext, sql2); stmtExecutor2.execute(); Planner planner2 = stmtExecutor2.planner(); @@ -529,9 +565,10 @@ public class PlannerTest extends TestWithFeService { @Test public void testEliminatingSortNode() throws Exception { - // fail case 1 + // success case 1 { - String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -542,7 +579,8 @@ public class PlannerTest extends TestWithFeService { // fail case 2 { - String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k3 = 2 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k3 = 2 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -553,7 +591,8 @@ public class PlannerTest extends TestWithFeService { // fail case 3 { - String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 != 2 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 != 2 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -564,7 +603,8 @@ public class PlannerTest extends TestWithFeService { // fail case 4 { - String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 or k2 = 2 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 or k2 = 2 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -575,7 +615,8 @@ public class PlannerTest extends TestWithFeService { // fail case 5 { - String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 = 2 or k3 = 3 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 = 2 or k3 = 3 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -587,7 +628,8 @@ public class PlannerTest extends TestWithFeService { // fail case 6 // TODO, support: in (select 1) { - String sql1 = "explain select k1 from db1.tbl1 where k1 in (select 1) and k2 = 2 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k1 in (select 1) and k2 = 2 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -597,7 +639,8 @@ public class PlannerTest extends TestWithFeService { // fail case 7 { - String sql1 = "explain select k1 from db1.tbl1 where k1 not in (1) and k2 = 2 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k1 not in (1) and k2 = 2 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -607,7 +650,8 @@ public class PlannerTest extends TestWithFeService { // success case 1 { - String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 = 2 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2 = 2 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -618,7 +662,8 @@ public class PlannerTest extends TestWithFeService { // success case 2 { - String sql1 = "explain select k1 from db1.tbl1 where k3 = 3 and k2 = 2 and k1 = 1 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k3 = 3 and k2 = 2 and k1 = 1 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -629,7 +674,8 @@ public class PlannerTest extends TestWithFeService { // success case 3 { - String sql1 = "explain select k1 from db1.tbl1 where k1 in (1) and k2 in (2) and k2 !=2 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k1 in (1) and k2 in (2) and k2 !=2 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -640,7 +686,8 @@ public class PlannerTest extends TestWithFeService { // success case 4 { - String sql1 = "explain select k1 from db1.tbl1 where k1 in (concat('1','2')) and k2 = 2 order by k1, k2"; + String sql1 = "explain select k1 from db1.tbl1 where k1 in (concat('1','2')) and k2 = 2 order by k1, k2 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); @@ -652,7 +699,8 @@ public class PlannerTest extends TestWithFeService { // success case 5 { String sql1 = "explain select tbl1.k1 from db1.tbl1 join db1.tbl2 on tbl1.k1 = tbl2.k1" - + " where tbl1.k1 = 1 and tbl2.k1 = 2 and tbl1.k2 = 3 order by tbl1.k1, tbl2.k1"; + + " where tbl1.k1 = 1 and tbl2.k1 = 2 and tbl1.k2 = 3 order by tbl1.k1, tbl2.k1 limit " + + connectContext.getSessionVariable().topnOptLimitThreshold; StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1); stmtExecutor1.execute(); Planner planner1 = stmtExecutor1.planner(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org