This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ae0db0a5140115c930445427bd8e6471cbf19a29
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Wed Feb 28 11:54:40 2024 +0800

    Collect high priority columns. (#31235)
---
 .../doris/nereids/jobs/executor/Rewriter.java      |   4 +-
 .../org/apache/doris/nereids/rules/RuleType.java   |   1 +
 .../expression/HighPriorityColumnCollector.java    | 202 +++++++++++++++++++++
 .../apache/doris/statistics/AnalysisManager.java   |  44 +++++
 .../doris/statistics/HighPriorityColumn.java       |  55 ++++++
 5 files changed, 305 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index e2d386dc910..fdba72fbf6e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -32,6 +32,7 @@ import 
org.apache.doris.nereids.rules.expression.CheckLegalityAfterRewrite;
 import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
 import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
 import org.apache.doris.nereids.rules.expression.ExpressionRewrite;
+import org.apache.doris.nereids.rules.expression.HighPriorityColumnCollector;
 import org.apache.doris.nereids.rules.rewrite.AddDefaultLimit;
 import org.apache.doris.nereids.rules.rewrite.AdjustConjunctsReturnType;
 import org.apache.doris.nereids.rules.rewrite.AdjustNullable;
@@ -440,7 +441,8 @@ public class Rewriter extends AbstractBatchJobExecutor {
                             new CollectFilterAboveConsumer(),
                             new CollectProjectAboveConsumer()
                     )
-            )
+            ),
+            topic("Collect used column", custom(RuleType.COLLECT_COLUMNS, 
HighPriorityColumnCollector::new))
     );
 
     private static final List<RewriteJob> WHOLE_TREE_REWRITE_JOBS
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index ea76eddd1d7..363a97db0a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -296,6 +296,7 @@ public enum RuleType {
 
     LEADING_JOIN(RuleTypeClass.REWRITE),
     REWRITE_SENTINEL(RuleTypeClass.REWRITE),
+    COLLECT_COLUMNS(RuleTypeClass.REWRITE),
 
     // topn opts
     DEFER_MATERIALIZE_TOP_N_RESULT(RuleTypeClass.REWRITE),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/HighPriorityColumnCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/HighPriorityColumnCollector.java
new file mode 100644
index 00000000000..ed67ad97005
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/HighPriorityColumnCollector.java
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.expression;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.nereids.jobs.JobContext;
+import 
org.apache.doris.nereids.rules.expression.HighPriorityColumnCollector.CollectorContext;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.AnalysisManager;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Used to collect High priority column.
+ */
+public class HighPriorityColumnCollector extends 
DefaultPlanRewriter<CollectorContext> implements CustomRewriter {
+
+    @Override
+    public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext != null && 
connectContext.getSessionVariable().internalSession) {
+            return plan;
+        }
+        CollectorContext context = new CollectorContext();
+        plan.accept(this, context);
+        if (StatisticsUtil.enableAutoAnalyze()) {
+            context.queried.removeAll(context.usedInPredicate);
+            AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
+            
analysisManager.updateColumnUsedInPredicate(context.usedInPredicate);
+            analysisManager.updateQueriedColumn(context.queried);
+        }
+        return plan;
+    }
+
+    /**
+     * Context.
+     */
+    public static class CollectorContext {
+        public Map<Slot/*project output column*/, NamedExpression/*Actual 
project expr*/> projects = new HashMap<>();
+
+        public Set<Slot> usedInPredicate = new HashSet<>();
+
+        public Set<Slot> queried = new HashSet<>();
+    }
+
+    @Override
+    public Plan visitLogicalProject(LogicalProject<? extends Plan> project, 
CollectorContext context) {
+        project.child().accept(this, context);
+        List<NamedExpression> projects = project.getOutputs();
+        List<Slot> slots = project.computeOutput();
+        for (int i = 0; i < slots.size(); i++) {
+            context.projects.put(slots.get(i), projects.get(i));
+        }
+        if (project.child() instanceof LogicalCatalogRelation
+                || project.child() instanceof LogicalFilter
+                && ((LogicalFilter) project.child()).child() instanceof 
LogicalCatalogRelation) {
+            Set<Slot> allUsed = project.getExpressions()
+                    .stream().flatMap(e -> e.<Set<SlotReference>>collect(n -> 
n instanceof SlotReference).stream())
+                    .collect(Collectors.toSet());
+            LogicalCatalogRelation scan = project.child() instanceof 
LogicalCatalogRelation
+                    ? (LogicalCatalogRelation) project.child()
+                    : (LogicalCatalogRelation) project.child().child(0);
+            List<Slot> outputOfScan = scan.getOutput();
+            for (Slot slot : outputOfScan) {
+                if (!allUsed.contains(slot)) {
+                    context.queried.remove(slot);
+                }
+            }
+        }
+        return project;
+    }
+
+    @Override
+    public Plan visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> 
join, CollectorContext context) {
+        join.child(0).accept(this, context);
+        join.child(1).accept(this, context);
+        context.usedInPredicate.addAll(
+                (join.isMarkJoin() ? join.getLeftConditionSlot() : 
join.getConditionSlot())
+                .stream().flatMap(s -> backtrace(s, context).stream())
+                .collect(Collectors.toSet())
+        );
+        return join;
+    }
+
+    @Override
+    public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> 
aggregate, CollectorContext context) {
+        aggregate.child(0).accept(this, context);
+        context.usedInPredicate.addAll(aggregate.getGroupByExpressions()
+                .stream()
+                .flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof 
SlotReference).stream())
+                .flatMap(s -> backtrace(s, context).stream())
+                .collect(Collectors.toSet()));
+        return aggregate;
+    }
+
+    @Override
+    public Plan visitLogicalHaving(LogicalHaving<? extends Plan> having, 
CollectorContext context) {
+        having.child(0).accept(this, context);
+        context.usedInPredicate.addAll(
+                having.getExpressions().stream()
+                .flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof 
SlotReference).stream())
+                .flatMap(s -> backtrace(s, context).stream())
+                .collect(Collectors.toSet()));
+        return having;
+    }
+
+    @Override
+    public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, 
CollectorContext context) {
+        List<Slot> slots = olapScan.getOutput();
+        context.queried.addAll(slots);
+        return olapScan;
+    }
+
+    @Override
+    public Plan visitLogicalFileScan(LogicalFileScan fileScan, 
CollectorContext context) {
+        List<Slot> slots = fileScan.getOutput();
+        context.queried.addAll(slots);
+        return fileScan;
+    }
+
+    @Override
+    public Plan visitLogicalFilter(LogicalFilter<? extends Plan> filter, 
CollectorContext context) {
+        filter.child(0).accept(this, context);
+        context.usedInPredicate.addAll(filter
+                .getExpressions()
+                .stream()
+                .flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof 
SlotReference).stream())
+                .flatMap(s -> backtrace(s, context).stream())
+                .collect(Collectors.toSet()));
+        return filter;
+    }
+
+    private Set<Slot> backtrace(Slot slot, CollectorContext context) {
+        return backtrace(slot, new HashSet<>(), context);
+    }
+
+    private Set<Slot> backtrace(Slot slot, Set<Slot> path, CollectorContext 
context) {
+        if (path.contains(slot)) {
+            return Collections.emptySet();
+        }
+        path.add(slot);
+        if (slot instanceof SlotReference) {
+            SlotReference slotReference = (SlotReference) slot;
+            Optional<Column> col = slotReference.getColumn();
+            Optional<TableIf> table = slotReference.getTable();
+            if (col.isPresent() && table.isPresent()) {
+                return Collections.singleton(slot);
+            }
+        }
+        NamedExpression namedExpression = context.projects.get(slot);
+        if (namedExpression == null) {
+            return Collections.emptySet();
+        }
+        Set<SlotReference> slotReferences
+                = namedExpression.<Set<SlotReference>>collect(n -> n 
instanceof SlotReference);
+        Set<Slot> refCol = new HashSet<>();
+        for (SlotReference slotReference : slotReferences) {
+            refCol.addAll(backtrace(slotReference, path, context));
+        }
+        return refCol;
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index eac50b40757..da80a48081b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -47,6 +47,8 @@ import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.persist.AnalyzeDeletionLog;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
@@ -89,9 +91,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
 import java.util.StringJoiner;
 import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -104,6 +108,16 @@ public class AnalysisManager implements Writable {
 
     private static final Logger LOG = 
LogManager.getLogger(AnalysisManager.class);
 
+    /**
+     * Mem only.
+     */
+    public final Queue<HighPriorityColumn> predicateColumns = new 
ArrayBlockingQueue<>(100);
+
+    /**
+     * Mem only.
+     */
+    public final Queue<HighPriorityColumn> queryColumns = new 
ArrayBlockingQueue<>(100);
+
     // Tracking running manually submitted async tasks, keep in mem only
     protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> 
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
 
@@ -1086,4 +1100,34 @@ public class AnalysisManager implements Writable {
         }
         return false;
     }
+
+
+    public void updateColumnUsedInPredicate(Set<Slot> slotReferences) {
+        updateColumn(slotReferences, predicateColumns);
+    }
+
+    public void updateQueriedColumn(Collection<Slot> slotReferences) {
+        updateColumn(slotReferences, queryColumns);
+    }
+
+    protected void updateColumn(Collection<Slot> slotReferences, 
Queue<HighPriorityColumn> queue) {
+        for (Slot s : slotReferences) {
+            if (!(s instanceof SlotReference)) {
+                return;
+            }
+            Optional<Column> optionalColumn = ((SlotReference) s).getColumn();
+            Optional<TableIf> optionalTable = ((SlotReference) s).getTable();
+            if (optionalColumn.isPresent() && optionalTable.isPresent()) {
+                TableIf table = optionalTable.get();
+                DatabaseIf database = table.getDatabase();
+                if (database != null) {
+                    CatalogIf catalog = database.getCatalog();
+                    if (catalog != null) {
+                        queue.offer(new HighPriorityColumn(catalog.getId(), 
database.getId(),
+                                table.getId(), 
optionalColumn.get().getName()));
+                    }
+                }
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
new file mode 100644
index 00000000000..c4bc20c399a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import java.util.Objects;
+
+public class HighPriorityColumn {
+
+    public final long catalogId;
+    public final long dbId;
+    public final long tblId;
+    public final String colName;
+
+    public HighPriorityColumn(long catalogId, long dbId, long tblId, String 
colName) {
+        this.catalogId = catalogId;
+        this.dbId = dbId;
+        this.tblId = tblId;
+        this.colName = colName;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(catalogId, dbId, tblId, colName);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof HighPriorityColumn)) {
+            return false;
+        }
+        HighPriorityColumn otherCriticalColumn = (HighPriorityColumn) other;
+        return this.catalogId == otherCriticalColumn.catalogId
+            && this.dbId == otherCriticalColumn.dbId
+            && this.tblId == otherCriticalColumn.tblId
+            && this.colName.equals(otherCriticalColumn.colName);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to