This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 318398a24efb2f1d3bf7c20d68cc155be6376b12 Author: minghong <[email protected]> AuthorDate: Sat Mar 14 14:30:10 2026 +0800 branch-4.1 [feat](topn lazy materialize)using index topn lazy (#59572) (#61309) ### What problem does this PR solve? pick #59572 Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- .../doris/nereids/processor/post/Validator.java | 27 ++------ .../post/materialize/LazyMaterializeTopN.java | 80 ++++++++++++---------- .../post/materialize/LazySlotPruning.java | 51 +++++++++++++- .../post/materialize/MaterializeProbeVisitor.java | 23 ++++++- .../java/org/apache/doris/qe/SessionVariable.java | 13 ++++ .../topNLazyMaterializationUsingIndex.out | 43 ++++++++++++ .../topNLazyMaterializationUsingIndex.groovy | 77 +++++++++++++++++++++ 7 files changed, 255 insertions(+), 59 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java index 40777e26faa..926e12cd279 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java @@ -32,11 +32,7 @@ import org.apache.doris.nereids.util.LazyCompute; import com.google.common.base.Preconditions; import java.util.BitSet; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import java.util.stream.Collectors; /** * validator plan. @@ -63,27 +59,20 @@ public class Validator extends PlanPostProcessor { child.accept(this, context); } - Optional<Slot> opt = checkAllSlotFromChildren(plan); - if (opt.isPresent()) { - List<Slot> childrenOutput = plan.children().stream().flatMap(p -> p.getOutput().stream()).collect( - Collectors.toList()); - throw new AnalysisException("A expression contains slot not from children\n" - + "Slot: " + opt.get() + " Children Output:" + childrenOutput + "\n" - + "Plan: " + plan.treeString() + "\n"); - } + checkAllSlotFromChildren(plan); return plan; } /** * Check all slot must from children. */ - public static Optional<Slot> checkAllSlotFromChildren(Plan plan) { + public static void checkAllSlotFromChildren(Plan plan) { if (plan.arity() == 0) { - return Optional.empty(); + return; } // agg exist multi-phase if (plan instanceof Aggregate) { - return Optional.empty(); + return; } Supplier<BitSet> childrenOutputIds = LazyCompute.of(() -> { @@ -97,7 +86,6 @@ public class Validator extends PlanPostProcessor { }); for (Expression expression : plan.getExpressions()) { - AtomicReference<Slot> invalidSlot = new AtomicReference<>(); expression.anyMatch(e -> { if (e instanceof Slot) { Slot slot = (Slot) e; @@ -105,14 +93,13 @@ public class Validator extends PlanPostProcessor { return false; } if (!childrenOutputIds.get().get(slot.getExprId().asInt())) { - invalidSlot.set(slot); - return true; + throw new AnalysisException("A expression contains slot not from children\n" + + "Slot: " + slot + " Children Output:" + childrenOutputIds.get() + "\n" + + "Plan: " + plan.treeString() + "\n"); } } return false; }); - return Optional.ofNullable(invalidSlot.get()); } - return Optional.empty(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java index 3626c380b8a..6105b652445 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java @@ -40,6 +40,8 @@ import org.apache.doris.qe.SessionVariable; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.HashMap; @@ -53,17 +55,20 @@ import java.util.Set; * post rule to do lazy materialize */ public class LazyMaterializeTopN extends PlanPostProcessor { - /* BE do not support pattern: - union - -->materialize - -->topn - -->scan1 - -->materialize - -->topn - -->scan2 - when we create materializeNode for the first union child, set hasMaterialized=true - to avoid generating materializeNode for other union's children - */ + /* + * BE do not support pattern: + * union + * -->materialize + * -->topn + * -->scan1 + * -->materialize + * -->topn + * -->scan2 + * when we create materializeNode for the first union child, set + * hasMaterialized=true + * to avoid generating materializeNode for other union's children + */ + private static final Logger LOG = LogManager.getLogger(LazyMaterializeTopN.class); private boolean hasMaterialized = false; @Override @@ -75,11 +80,11 @@ public class LazyMaterializeTopN extends PlanPostProcessor { return topN; } /* - topn(output=[x] orderkey=[b]) - ->project(a as x) - ->T(a, b) - 'x' can be lazy materialized. - materializeMap: x->(T, a) + * topn(output=[x] orderkey=[b]) + * ->project(a as x) + * ->T(a, b) + * 'x' can be lazy materialized. + * materializeMap: x->(T, a) */ Map<Slot, MaterializeSource> materializeMap = new HashMap<>(); List<Slot> materializedSlots = new ArrayList<>(); @@ -112,9 +117,12 @@ public class LazyMaterializeTopN extends PlanPostProcessor { List<Slot> originOutput = topN.getOutput(); BiMap<Relation, SlotReference> relationToRowId = HashBiMap.create(relationToLazySlotMap.size()); HashSet<SlotReference> rowIdSet = new HashSet<>(); - // we should use threadStatementContext, not ctx.getStatementContext(), because the StatisticsCleaner - // will generate two statementContext, and reuse the plan which generated by outer StatementContext, - // so we should generate exprId by the outer StatementContext, or else generate conflict expr id + // we should use threadStatementContext, not ctx.getStatementContext(), because + // the StatisticsCleaner + // will generate two statementContext, and reuse the plan which generated by + // outer StatementContext, + // so we should generate exprId by the outer StatementContext, or else generate + // conflict expr id StatementContext threadStatementContext = StatementScopeIdGenerator.getStatementContext(); for (Relation relation : relationToLazySlotMap.keySet()) { if (relation instanceof CatalogRelation) { @@ -154,13 +162,13 @@ public class LazyMaterializeTopN extends PlanPostProcessor { if (materializeInput == null) { /* - topn - -->any - => - project - -->materialize - -->topn - -->any + * topn + * -->any + * => + * project + * -->materialize + * -->topn + * -->any */ result = new PhysicalLazyMaterialize(result, result.getOutput(), materializedSlots, relationToLazySlotMap, relationToRowId, materializeMap, @@ -168,14 +176,14 @@ public class LazyMaterializeTopN extends PlanPostProcessor { hasMaterialized = true; } else { /* - topn - -->any - => - project - -->materialize - -->project - -->topn - -->any + * topn + * -->any + * => + * project + * -->materialize + * -->project + * -->topn + * -->any */ List<Slot> reOrderedMaterializedSlots = new ArrayList<>(); for (Slot slot : materializeInput) { @@ -195,8 +203,8 @@ public class LazyMaterializeTopN extends PlanPostProcessor { } /* - [a, r1, r2, b, r2] => [a, b, r1, r2] - move all rowIds to tail, and remove duplicated rowIds + * [a, r1, r2, b, r2] => [a, b, r1, r2] + * move all rowIds to tail, and remove duplicated rowIds */ private List<SlotReference> moveRowIdsToTail(List<Slot> slots, Set<SlotReference> rowIds) { List<SlotReference> reArrangedSlots = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java index bc54aa84bd2..c7b89519f1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java @@ -27,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeFileScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterializeOlapScan; @@ -39,20 +40,24 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; +import org.apache.doris.qe.SessionVariable; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; /** - * prune lazy materialized slot + prune lazy materialized slot */ public class LazySlotPruning extends DefaultPlanRewriter<LazySlotPruning.Context> { /** - * Context + Context */ public static class Context { private PhysicalRelation scan; @@ -98,6 +103,48 @@ public class LazySlotPruning extends DefaultPlanRewriter<LazySlotPruning.Context return plan; } + @Override + public Plan visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Context context) { + if (SessionVariable.getTopNLazyMaterializationUsingIndex() && filter.child() instanceof PhysicalOlapScan) { + /* + materialization(materializedSlots=[a, b], lazy=[c]) + ->topn(b) + ->filter(a=1, output=(rowid, a, b)) + ->materializeOlapScan(rowid, lazy=[c], T[a,b,c]) + => + materialization(materializedSlots=[b], lazy=[a, c]) + ->topn(b) + ->project(rowid, b) + ->filter(a=1, output=(rowid, a, b)) + ->materializeOlapScan(rowid, lazy=[a,c], T[a,b,c]) + */ + List<Slot> lazySlotsToScan = new ArrayList<>(); + boolean lazySlotsChanged = false; + + for (Slot slot : context.lazySlots) { + if (!filter.getInputSlots().contains(slot)) { + lazySlotsToScan.add(slot); + } else { + lazySlotsChanged = true; + } + } + if (lazySlotsChanged) { + Context contextForScan = context.withLazySlots(lazySlotsToScan); + filter = (PhysicalFilter<? extends Plan>) filter.withChildren( + filter.child().accept(this, contextForScan)); + filter = (PhysicalFilter<? extends Plan>) filter + .copyStatsAndGroupIdFrom(filter).resetLogicalProperties(); + List<Slot> filterOutput = Lists.newArrayList(filter.getOutput()); + filterOutput.removeAll(filter.getInputSlots()); + return new PhysicalProject<>( + filterOutput.stream().map(s -> (SlotReference) s).collect(Collectors.toList()), + Optional.empty(), null, + filter.getPhysicalProperties(), filter.getStats(), filter); + } + } + return visit(filter, context); + } + @Override public Plan visitPhysicalOlapScan(PhysicalOlapScan scan, Context context) { if (scan.getOutput().containsAll(context.lazySlots)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeProbeVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeProbeVisitor.java index c6ff7709a7d..b6b4b31d8b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeProbeVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/MaterializeProbeVisitor.java @@ -28,13 +28,16 @@ import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.Relation; import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterialize; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; +import org.apache.doris.qe.SessionVariable; import com.google.common.collect.ImmutableSet; import org.apache.logging.log4j.LogManager; @@ -71,6 +74,24 @@ public class MaterializeProbeVisitor extends DefaultPlanVisitor<Optional<Materia } } + @Override + public Optional<MaterializeSource> visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, + ProbeContext context) { + if (SessionVariable.getTopNLazyMaterializationUsingIndex() && filter.child() instanceof PhysicalOlapScan) { + // agg table do not support lazy materialize + OlapTable table = ((PhysicalOlapScan) filter.child()).getTable(); + if (KeysType.AGG_KEYS.equals(table.getKeysType())) { + return Optional.empty(); + } + if (filter.getInputSlots().contains(context.slot)) { + return Optional.of(new MaterializeSource((Relation) filter.child(), context.slot)); + } else { + return filter.child().accept(this, context); + } + } + return this.visit(filter, context); + } + @Override public Optional<MaterializeSource> visit(Plan plan, ProbeContext context) { if (plan.getInputSlots().contains(context.slot)) { @@ -195,7 +216,7 @@ public class MaterializeProbeVisitor extends DefaultPlanVisitor<Optional<Materia } else { // projectExpr is alias Alias alias = (Alias) projectExpr; - if (alias.child() instanceof SlotReference) { + if (alias.child() instanceof SlotReference && !SessionVariable.getTopNLazyMaterializationUsingIndex()) { ProbeContext childContext = new ProbeContext((SlotReference) alias.child()); return project.child().accept(this, childContext); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 334fe566248..b0d880e6dc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1632,6 +1632,11 @@ public class SessionVariable implements Serializable, Writable { varType = VariableAnnotation.EXPERIMENTAL) public int topNLazyMaterializationThreshold = 1024; + @VariableMgr.VarAttr(name = "topn_lazy_materialization_using_index", needForward = true, + fuzzy = false, + varType = VariableAnnotation.EXPERIMENTAL) + public boolean topNLazyMaterializationUsingIndex = false; + @VariableMgr.VarAttr(name = ENABLE_PRUNE_NESTED_COLUMN, needForward = true, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, @@ -1652,6 +1657,14 @@ public class SessionVariable implements Serializable, Writable { } } + public static boolean getTopNLazyMaterializationUsingIndex() { + if (ConnectContext.get() != null) { + return ConnectContext.get().getSessionVariable().topNLazyMaterializationUsingIndex; + } else { + return VariableMgr.getDefaultSessionVariable().topNLazyMaterializationUsingIndex; + } + } + @VariableMgr.VarAttr(name = DISABLE_INVERTED_INDEX_V1_FOR_VARIANT, needForward = true) private boolean disableInvertedIndexV1ForVaraint = true; diff --git a/regression-test/data/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.out b/regression-test/data/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.out new file mode 100644 index 00000000000..d966e52f11e --- /dev/null +++ b/regression-test/data/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.out @@ -0,0 +1,43 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !plan -- +PhysicalResultSink +--PhysicalProject[t1.addr, t1.age, t1.user_id, t1.username] +----PhysicalLazyMaterialize[materializedSlots:(t1.username) lazySlots:(t1.addr,t1.age,t1.user_id)] +------PhysicalTopN[MERGE_SORT] +--------PhysicalDistribute[DistributionSpecGather] +----------PhysicalTopN[LOCAL_SORT] +------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t1, t1.username] +--------------filter((t1.user_id = 1)) +----------------PhysicalLazyMaterializeOlapScan[t1 lazySlots:(t1.age,t1.addr)] + +-- !exec -- +1 a 10 cd + +-- !plan2 -- +PhysicalResultSink +--PhysicalProject[t2.addr, t2.age, t2.user_id, t2.username] +----PhysicalLazyMaterialize[materializedSlots:(t2.username) lazySlots:(t2.addr,t2.age,t2.user_id)] +------PhysicalTopN[MERGE_SORT] +--------PhysicalDistribute[DistributionSpecGather] +----------PhysicalTopN[LOCAL_SORT] +------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t2, t2.username] +--------------hashJoin[INNER_JOIN broadcast] hashCondition=((t1.username = t2.username)) otherCondition=() build RFs:RF0 username->[username];RF1 username->[username] +----------------PhysicalProject[t1.username] +------------------PhysicalOlapScan[t1] apply RFs: RF0 RF1 +----------------PhysicalProject[regression_test_query_p0_topn_lazy_usingIndex.__DORIS_GLOBAL_ROWID_COL__t2, t2.username] +------------------filter((t2.user_id > 0)) +--------------------PhysicalLazyMaterializeOlapScan[t2 lazySlots:(t2.age,t2.addr)] + +-- !exe2 -- +1 a 10 cd 1 a 10 cd + +-- !plan_no_effect -- +PhysicalResultSink +--PhysicalProject[t1.addr, t1.age, t1.user_id, t1.username] +----PhysicalLazyMaterialize[materializedSlots:(t1.user_id) lazySlots:(t1.addr,t1.age,t1.username)] +------PhysicalTopN[MERGE_SORT] +--------PhysicalDistribute[DistributionSpecGather] +----------PhysicalTopN[LOCAL_SORT] +------------filter((t1.user_id > 0)) +--------------PhysicalLazyMaterializeOlapScan[t1 lazySlots:(t1.username,t1.age,t1.addr)] + diff --git a/regression-test/suites/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.groovy b/regression-test/suites/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.groovy new file mode 100644 index 00000000000..8b839adfac7 --- /dev/null +++ b/regression-test/suites/query_p0/topn_lazy/usingIndex/topNLazyMaterializationUsingIndex.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("topNLazyMaterializationUsingIndex.groovy") { + sql """ + drop table if exists t1; + CREATE TABLE t1 + ( + `user_id` LARGEINT NOT NULL, + `username` VARCHAR(50) NOT NULL, + age int, + addr VARCHAR(50) NOT NULL + ) + duplicate KEY(user_id, username) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1"); + + insert into t1 values ( 1, 'a', 10, 'cd'),(1,'b', 20, 'cq'); + + + drop table if exists t2; + CREATE TABLE t2 + ( + `user_id` LARGEINT NOT NULL, + `username` VARCHAR(50) NOT NULL, + age int, + addr VARCHAR(50) NOT NULL + ) + duplicate KEY(user_id, username) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1"); + + insert into t2 values ( 1, 'a', 10, 'cd'),(1,'b', 20, 'cq'); + + set topn_lazy_materialization_using_index = true; + SET detail_shape_nodes='PhysicalProject'; + """ + qt_plan """ + explain shape plan + select * from t1 where user_id = 1 order by username limit 1; + """ + qt_exec """ + select * from t1 where user_id = 1 order by username limit 1; + """ + + qt_plan2 """ + explain shape plan + select t2.* from t1 join t2 on t1.username=t2.username where t2.user_id > 0 order by username limit 1; + """ + + qt_exe2 """ + select t2.*, t1.* from t1 join t2 on t1.username=t2.username where t2.user_id > 0 order by username limit 1; + """ + + qt_plan_no_effect """ + explain shape plan + select * from t1 where + user_id > 0 order by user_id limit 1; + """ + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
