This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch high-priority-column in repository https://gitbox.apache.org/repos/asf/doris.git
commit ae0db0a5140115c930445427bd8e6471cbf19a29 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Wed Feb 28 11:54:40 2024 +0800 Collect high priority columns. (#31235) --- .../doris/nereids/jobs/executor/Rewriter.java | 4 +- .../org/apache/doris/nereids/rules/RuleType.java | 1 + .../expression/HighPriorityColumnCollector.java | 202 +++++++++++++++++++++ .../apache/doris/statistics/AnalysisManager.java | 44 +++++ .../doris/statistics/HighPriorityColumn.java | 55 ++++++ 5 files changed, 305 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index e2d386dc910..fdba72fbf6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -32,6 +32,7 @@ import org.apache.doris.nereids.rules.expression.CheckLegalityAfterRewrite; import org.apache.doris.nereids.rules.expression.ExpressionNormalization; import org.apache.doris.nereids.rules.expression.ExpressionOptimization; import org.apache.doris.nereids.rules.expression.ExpressionRewrite; +import org.apache.doris.nereids.rules.expression.HighPriorityColumnCollector; import org.apache.doris.nereids.rules.rewrite.AddDefaultLimit; import org.apache.doris.nereids.rules.rewrite.AdjustConjunctsReturnType; import org.apache.doris.nereids.rules.rewrite.AdjustNullable; @@ -440,7 +441,8 @@ public class Rewriter extends AbstractBatchJobExecutor { new CollectFilterAboveConsumer(), new CollectProjectAboveConsumer() ) - ) + ), + topic("Collect used column", custom(RuleType.COLLECT_COLUMNS, HighPriorityColumnCollector::new)) ); private static final List<RewriteJob> WHOLE_TREE_REWRITE_JOBS diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index ea76eddd1d7..363a97db0a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -296,6 +296,7 @@ public enum RuleType { LEADING_JOIN(RuleTypeClass.REWRITE), REWRITE_SENTINEL(RuleTypeClass.REWRITE), + COLLECT_COLUMNS(RuleTypeClass.REWRITE), // topn opts DEFER_MATERIALIZE_TOP_N_RESULT(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/HighPriorityColumnCollector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/HighPriorityColumnCollector.java new file mode 100644 index 00000000000..ed67ad97005 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/HighPriorityColumnCollector.java @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.expression; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.nereids.jobs.JobContext; +import org.apache.doris.nereids.rules.expression.HighPriorityColumnCollector.CollectorContext; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalHaving; +import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter; +import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.statistics.AnalysisManager; +import org.apache.doris.statistics.util.StatisticsUtil; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Used to collect High priority column. + */ +public class HighPriorityColumnCollector extends DefaultPlanRewriter<CollectorContext> implements CustomRewriter { + + @Override + public Plan rewriteRoot(Plan plan, JobContext jobContext) { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext != null && connectContext.getSessionVariable().internalSession) { + return plan; + } + CollectorContext context = new CollectorContext(); + plan.accept(this, context); + if (StatisticsUtil.enableAutoAnalyze()) { + context.queried.removeAll(context.usedInPredicate); + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + analysisManager.updateColumnUsedInPredicate(context.usedInPredicate); + analysisManager.updateQueriedColumn(context.queried); + } + return plan; + } + + /** + * Context. + */ + public static class CollectorContext { + public Map<Slot/*project output column*/, NamedExpression/*Actual project expr*/> projects = new HashMap<>(); + + public Set<Slot> usedInPredicate = new HashSet<>(); + + public Set<Slot> queried = new HashSet<>(); + } + + @Override + public Plan visitLogicalProject(LogicalProject<? extends Plan> project, CollectorContext context) { + project.child().accept(this, context); + List<NamedExpression> projects = project.getOutputs(); + List<Slot> slots = project.computeOutput(); + for (int i = 0; i < slots.size(); i++) { + context.projects.put(slots.get(i), projects.get(i)); + } + if (project.child() instanceof LogicalCatalogRelation + || project.child() instanceof LogicalFilter + && ((LogicalFilter) project.child()).child() instanceof LogicalCatalogRelation) { + Set<Slot> allUsed = project.getExpressions() + .stream().flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof SlotReference).stream()) + .collect(Collectors.toSet()); + LogicalCatalogRelation scan = project.child() instanceof LogicalCatalogRelation + ? (LogicalCatalogRelation) project.child() + : (LogicalCatalogRelation) project.child().child(0); + List<Slot> outputOfScan = scan.getOutput(); + for (Slot slot : outputOfScan) { + if (!allUsed.contains(slot)) { + context.queried.remove(slot); + } + } + } + return project; + } + + @Override + public Plan visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join, CollectorContext context) { + join.child(0).accept(this, context); + join.child(1).accept(this, context); + context.usedInPredicate.addAll( + (join.isMarkJoin() ? join.getLeftConditionSlot() : join.getConditionSlot()) + .stream().flatMap(s -> backtrace(s, context).stream()) + .collect(Collectors.toSet()) + ); + return join; + } + + @Override + public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> aggregate, CollectorContext context) { + aggregate.child(0).accept(this, context); + context.usedInPredicate.addAll(aggregate.getGroupByExpressions() + .stream() + .flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof SlotReference).stream()) + .flatMap(s -> backtrace(s, context).stream()) + .collect(Collectors.toSet())); + return aggregate; + } + + @Override + public Plan visitLogicalHaving(LogicalHaving<? extends Plan> having, CollectorContext context) { + having.child(0).accept(this, context); + context.usedInPredicate.addAll( + having.getExpressions().stream() + .flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof SlotReference).stream()) + .flatMap(s -> backtrace(s, context).stream()) + .collect(Collectors.toSet())); + return having; + } + + @Override + public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, CollectorContext context) { + List<Slot> slots = olapScan.getOutput(); + context.queried.addAll(slots); + return olapScan; + } + + @Override + public Plan visitLogicalFileScan(LogicalFileScan fileScan, CollectorContext context) { + List<Slot> slots = fileScan.getOutput(); + context.queried.addAll(slots); + return fileScan; + } + + @Override + public Plan visitLogicalFilter(LogicalFilter<? extends Plan> filter, CollectorContext context) { + filter.child(0).accept(this, context); + context.usedInPredicate.addAll(filter + .getExpressions() + .stream() + .flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof SlotReference).stream()) + .flatMap(s -> backtrace(s, context).stream()) + .collect(Collectors.toSet())); + return filter; + } + + private Set<Slot> backtrace(Slot slot, CollectorContext context) { + return backtrace(slot, new HashSet<>(), context); + } + + private Set<Slot> backtrace(Slot slot, Set<Slot> path, CollectorContext context) { + if (path.contains(slot)) { + return Collections.emptySet(); + } + path.add(slot); + if (slot instanceof SlotReference) { + SlotReference slotReference = (SlotReference) slot; + Optional<Column> col = slotReference.getColumn(); + Optional<TableIf> table = slotReference.getTable(); + if (col.isPresent() && table.isPresent()) { + return Collections.singleton(slot); + } + } + NamedExpression namedExpression = context.projects.get(slot); + if (namedExpression == null) { + return Collections.emptySet(); + } + Set<SlotReference> slotReferences + = namedExpression.<Set<SlotReference>>collect(n -> n instanceof SlotReference); + Set<Slot> refCol = new HashSet<>(); + for (SlotReference slotReference : slotReferences) { + refCol.addAll(backtrace(slotReference, path, context)); + } + return refCol; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index eac50b40757..da80a48081b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -47,6 +47,8 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.persist.AnalyzeDeletionLog; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; @@ -89,9 +91,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.StringJoiner; import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -104,6 +108,16 @@ public class AnalysisManager implements Writable { private static final Logger LOG = LogManager.getLogger(AnalysisManager.class); + /** + * Mem only. + */ + public final Queue<HighPriorityColumn> predicateColumns = new ArrayBlockingQueue<>(100); + + /** + * Mem only. + */ + public final Queue<HighPriorityColumn> queryColumns = new ArrayBlockingQueue<>(100); + // Tracking running manually submitted async tasks, keep in mem only protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); @@ -1086,4 +1100,34 @@ public class AnalysisManager implements Writable { } return false; } + + + public void updateColumnUsedInPredicate(Set<Slot> slotReferences) { + updateColumn(slotReferences, predicateColumns); + } + + public void updateQueriedColumn(Collection<Slot> slotReferences) { + updateColumn(slotReferences, queryColumns); + } + + protected void updateColumn(Collection<Slot> slotReferences, Queue<HighPriorityColumn> queue) { + for (Slot s : slotReferences) { + if (!(s instanceof SlotReference)) { + return; + } + Optional<Column> optionalColumn = ((SlotReference) s).getColumn(); + Optional<TableIf> optionalTable = ((SlotReference) s).getTable(); + if (optionalColumn.isPresent() && optionalTable.isPresent()) { + TableIf table = optionalTable.get(); + DatabaseIf database = table.getDatabase(); + if (database != null) { + CatalogIf catalog = database.getCatalog(); + if (catalog != null) { + queue.offer(new HighPriorityColumn(catalog.getId(), database.getId(), + table.getId(), optionalColumn.get().getName())); + } + } + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java new file mode 100644 index 00000000000..c4bc20c399a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import java.util.Objects; + +public class HighPriorityColumn { + + public final long catalogId; + public final long dbId; + public final long tblId; + public final String colName; + + public HighPriorityColumn(long catalogId, long dbId, long tblId, String colName) { + this.catalogId = catalogId; + this.dbId = dbId; + this.tblId = tblId; + this.colName = colName; + } + + @Override + public int hashCode() { + return Objects.hash(catalogId, dbId, tblId, colName); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof HighPriorityColumn)) { + return false; + } + HighPriorityColumn otherCriticalColumn = (HighPriorityColumn) other; + return this.catalogId == otherCriticalColumn.catalogId + && this.dbId == otherCriticalColumn.dbId + && this.tblId == otherCriticalColumn.tblId + && this.colName.equals(otherCriticalColumn.colName); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org