This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6fc7aa15ad515f2119f6e399d5cf0398ab395554
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Tue Mar 5 16:19:08 2024 +0800

    Support column level health value. (#31794)
---
 .../apache/doris/analysis/ShowColumnStatsStmt.java |  4 +
 .../apache/doris/datasource/InternalCatalog.java   |  6 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  7 ++
 .../org/apache/doris/statistics/AnalysisInfo.java  | 17 ++--
 .../doris/statistics/AnalysisInfoBuilder.java      | 17 ++--
 .../org/apache/doris/statistics/AnalysisJob.java   |  8 +-
 .../apache/doris/statistics/AnalysisManager.java   | 13 ++-
 .../org/apache/doris/statistics/ColStatsMeta.java  | 16 ++--
 .../doris/statistics/StatisticsAutoCollector.java  | 95 ++++++++++++++++++++--
 .../doris/statistics/StatisticsJobAppender.java    | 28 +++++--
 .../apache/doris/statistics/TableStatsMeta.java    | 33 ++++----
 .../doris/statistics/util/StatisticsUtil.java      | 10 +++
 .../apache/doris/statistics/AnalysisJobTest.java   |  4 +-
 .../doris/statistics/AnalysisManagerTest.java      |  2 +-
 14 files changed, 197 insertions(+), 63 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
index 37be76b20df..749bfa7d360 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
@@ -61,6 +61,8 @@ public class ShowColumnStatsStmt extends ShowStmt {
                     .add("trigger")
                     .add("query_times")
                     .add("updated_time")
+                    .add("update_rows")
+                    .add("last_analyze_row_count")
                     .build();
 
     private final TableName tableName;
@@ -160,6 +162,8 @@ public class ShowColumnStatsStmt extends ShowStmt {
             row.add(String.valueOf(colStatsMeta == null ? "N/A" : 
colStatsMeta.jobType));
             row.add(String.valueOf(colStatsMeta == null ? "N/A" : 
colStatsMeta.queriedTimes));
             row.add(String.valueOf(p.second.updatedTime));
+            row.add(String.valueOf(colStatsMeta == null ? "N/A" : 
colStatsMeta.updatedRows));
+            row.add(String.valueOf(colStatsMeta == null ? "N/A" : 
colStatsMeta.rowCount));
             result.add(row);
         });
         return new ShowResultSet(getMetaData(), result);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 42a350163f6..d6bbfeeba06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3149,7 +3149,6 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     rowsToTruncate += partition.getBaseIndex().getRowCount();
                 }
             } else {
-                rowsToTruncate = olapTable.getRowCount();
                 for (Partition partition : olapTable.getPartitions()) {
                     // If need absolutely correct, should check running txn 
here.
                     // But if the txn is in prepare state, cann't known which 
partitions had load data.
@@ -3158,6 +3157,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     }
                     origPartitions.put(partition.getName(), partition.getId());
                     partitionsDistributionInfo.put(partition.getId(), 
partition.getDistributionInfo());
+                    rowsToTruncate += partition.getBaseIndex().getRowCount();
                 }
             }
             // if table currently has no partitions, this sql like empty 
command and do nothing, should return directly.
@@ -3318,10 +3318,8 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         if (truncateEntireTable) {
             // Drop the whole table stats after truncate the entire table
             Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
-        } else {
-            // Update the updated rows in table stats after truncate some 
partitions.
-            
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords);
         }
+        
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords);
         LOG.info("finished to truncate table {}, partitions: {}", 
tblRef.getName().toSql(), tblRef.getPartitionNames());
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8bfd0d7f1a4..4b8b89ff6a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -471,6 +471,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_AUTO_ANALYZE = "enable_auto_analyze";
 
+    public static final String ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG = 
"enable_auto_analyze_internal_catalog";
+
     public static final String AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD = 
"auto_analyze_table_width_threshold";
 
     public static final String FASTER_FLOAT_CONVERT = "faster_float_convert";
@@ -1504,6 +1506,11 @@ public class SessionVariable implements Serializable, 
Writable {
             flag = VariableMgr.GLOBAL)
     public boolean enableAutoAnalyze = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG,
+            description = {"临时参数,收否自动收集所有内表", "Temp variable, enable to auto 
collect all OlapTable."},
+            flag = VariableMgr.GLOBAL)
+    public boolean enableAutoAnalyzeInternalCatalog = false;
+
     @VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD,
             description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集",
                 "Maximum table width to enable auto analyze, "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
index c707107e0e0..0c5047a53c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
@@ -188,8 +188,11 @@ public class AnalysisInfo implements Writable {
     @SerializedName("endTime")
     public long endTime;
 
-    @SerializedName("emptyJob")
-    public final boolean emptyJob;
+    @SerializedName("rowCount")
+    public final long rowCount;
+
+    @SerializedName("updateRows")
+    public final long updateRows;
     /**
      *
      * Used to store the newest partition version of tbl when creating this 
job.
@@ -206,7 +209,8 @@ public class AnalysisInfo implements Writable {
             long lastExecTimeInMs, long timeCostInMs, AnalysisState state, 
ScheduleType scheduleType,
             boolean isExternalTableLevelTask, boolean partitionOnly, boolean 
samplingPartition,
             boolean isAllPartition, long partitionCount, CronExpression 
cronExpression, boolean forceFull,
-            boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean 
emptyJob, boolean userInject) {
+            boolean usingSqlForPartitionColumn, long tblUpdateTime, long 
rowCount, boolean userInject,
+            long updateRows) {
         this.jobId = jobId;
         this.taskId = taskId;
         this.taskIds = taskIds;
@@ -242,8 +246,9 @@ public class AnalysisInfo implements Writable {
         this.forceFull = forceFull;
         this.usingSqlForPartitionColumn = usingSqlForPartitionColumn;
         this.tblUpdateTime = tblUpdateTime;
-        this.emptyJob = emptyJob;
+        this.rowCount = rowCount;
         this.userInject = userInject;
+        this.updateRows = updateRows;
     }
 
     @Override
@@ -285,7 +290,9 @@ public class AnalysisInfo implements Writable {
         }
         sj.add("forceFull: " + forceFull);
         sj.add("usingSqlForPartitionColumn: " + usingSqlForPartitionColumn);
-        sj.add("emptyJob: " + emptyJob);
+        sj.add("rowCount: " + rowCount);
+        sj.add("userInject: " + userInject);
+        sj.add("updateRows: " + updateRows);
         return sj.toString();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
index 22f3d22b3ce..527d503fd52 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
@@ -62,8 +62,9 @@ public class AnalysisInfoBuilder {
     private boolean forceFull;
     private boolean usingSqlForPartitionColumn;
     private long tblUpdateTime;
-    private boolean emptyJob;
+    private long rowCount;
     private boolean userInject;
+    private long updateRows;
 
     public AnalysisInfoBuilder() {
     }
@@ -101,8 +102,9 @@ public class AnalysisInfoBuilder {
         forceFull = info.forceFull;
         usingSqlForPartitionColumn = info.usingSqlForPartitionColumn;
         tblUpdateTime = info.tblUpdateTime;
-        emptyJob = info.emptyJob;
+        rowCount = info.rowCount;
         userInject = info.userInject;
+        updateRows = info.updateRows;
     }
 
     public AnalysisInfoBuilder setJobId(long jobId) {
@@ -265,8 +267,8 @@ public class AnalysisInfoBuilder {
         return this;
     }
 
-    public AnalysisInfoBuilder setEmptyJob(boolean emptyJob) {
-        this.emptyJob = emptyJob;
+    public AnalysisInfoBuilder setRowCount(long rowCount) {
+        this.rowCount = rowCount;
         return this;
     }
 
@@ -275,12 +277,17 @@ public class AnalysisInfoBuilder {
         return this;
     }
 
+    public AnalysisInfoBuilder setUpdateRows(long updateRows) {
+        this.updateRows = updateRows;
+        return this;
+    }
+
     public AnalysisInfo build() {
         return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, 
tblId, colToPartitions, partitionNames,
                 colName, indexId, jobType, analysisMode, analysisMethod, 
analysisType, samplePercent,
                 sampleRows, maxBucketNum, periodTimeInMs, message, 
lastExecTimeInMs, timeCostInMs, state, scheduleType,
                 externalTableLevelTask, partitionOnly, samplingPartition, 
isAllPartition, partitionCount,
-                cronExpression, forceFull, usingSqlForPartitionColumn, 
tblUpdateTime, emptyJob, userInject);
+                cronExpression, forceFull, usingSqlForPartitionColumn, 
tblUpdateTime, rowCount, userInject, updateRows);
     }
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
index f52764bd6c9..600d95a6278 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
@@ -84,14 +84,12 @@ public class AnalysisJob {
     protected void markOneTaskDone() {
         if (queryingTask.isEmpty()) {
             try {
-                writeBuf();
-                updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
-                        + (System.currentTimeMillis() - start) / 1000);
+                flushBuffer();
             } finally {
                 deregisterJob();
             }
         } else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
-            writeBuf();
+            flushBuffer();
         }
     }
 
@@ -115,7 +113,7 @@ public class AnalysisJob {
         }
     }
 
-    protected void writeBuf() {
+    protected void flushBuffer() {
         if (killed) {
             return;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index ab883c79f8f..bac84996dd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -416,7 +416,10 @@ public class AnalysisManager implements Writable {
         infoBuilder.setColToPartitions(colToPartitions);
         infoBuilder.setTaskIds(Lists.newArrayList());
         infoBuilder.setTblUpdateTime(table.getUpdateTime());
-        infoBuilder.setEmptyJob(table instanceof OlapTable && 
table.getRowCount() == 0);
+        long rowCount = table.getRowCount();
+        infoBuilder.setRowCount(rowCount);
+        TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId());
+        infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : 
tableStatsStatus.updatedRows.get());
         return infoBuilder.build();
     }
 
@@ -572,7 +575,7 @@ public class AnalysisManager implements Writable {
         }
         TableStatsMeta tableStats = findTableStatsStatus(tbl.getId());
         if (tableStats == null) {
-            updateTableStatsStatus(new TableStatsMeta(jobInfo.emptyJob ? 0 : 
tbl.getRowCount(), jobInfo, tbl));
+            updateTableStatsStatus(new TableStatsMeta(jobInfo.rowCount, 
jobInfo, tbl));
         } else {
             tableStats.update(jobInfo, tbl);
             logCreateTableStats(tableStats);
@@ -807,7 +810,7 @@ public class AnalysisManager implements Writable {
                     analysisInfo.dbId, analysisInfo.tblId);
             return table.createAnalysisTask(analysisInfo);
         } catch (Throwable t) {
-            LOG.warn("Failed to find table", t);
+            LOG.warn("Failed to create task.", t);
             throw new DdlException("Failed to create task", t);
         }
     }
@@ -1156,10 +1159,12 @@ public class AnalysisManager implements Writable {
 
 
     public void updateColumnUsedInPredicate(Set<Slot> slotReferences) {
+        LOG.info("Add slots to high priority queues.");
         updateColumn(slotReferences, highPriorityColumns);
     }
 
     public void updateQueriedColumn(Collection<Slot> slotReferences) {
+        LOG.info("Add slots to mid priority queues.");
         updateColumn(slotReferences, midPriorityColumns);
     }
 
@@ -1179,6 +1184,8 @@ public class AnalysisManager implements Writable {
                     if (catalog != null) {
                         queue.offer(new HighPriorityColumn(catalog.getId(), 
database.getId(),
                                 table.getId(), 
optionalColumn.get().getName()));
+                        LOG.info("Offer column " + table.getName() + "(" + 
table.getId() + ")."
+                                + optionalColumn.get().getName());
                     }
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
index 445641b2505..7e317d67bd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
@@ -43,16 +43,20 @@ public class ColStatsMeta {
     @SerializedName("trigger")
     public JobType jobType;
 
-    public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod,
-            AnalysisType analysisType, JobType jobType, long queriedTimes) {
+    @SerializedName("updatedRows")
+    public long updatedRows;
+
+    @SerializedName("rowCount")
+    public long rowCount;
+
+    public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, 
AnalysisType analysisType, JobType jobType,
+            long queriedTimes, long rowCount, long updatedRows) {
         this.updatedTime = updatedTime;
         this.analysisMethod = analysisMethod;
         this.analysisType = analysisType;
         this.jobType = jobType;
         this.queriedTimes.addAndGet(queriedTimes);
-    }
-
-    public void clear() {
-        updatedTime = 0;
+        this.updatedRows = updatedRows;
+        this.rowCount = rowCount;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 19b9c69db08..c498881bfbf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -23,6 +23,8 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 import org.apache.doris.statistics.AnalysisInfo.JobType;
 import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
@@ -33,6 +35,8 @@ import org.apache.logging.log4j.Logger;
 
 import java.time.LocalTime;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -60,9 +64,16 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
             }
             try {
                 TableIf table = job.getKey();
+                if (!supportAutoAnalyze(table)) {
+                    continue;
+                }
                 Set<String> columns = job.getValue()
                         .stream()
-                        .filter(c -> needAnalyzeColumn(table, c))
+                        .filter(c -> {
+                            boolean needAnalyzeColumn = 
needAnalyzeColumn(table, c);
+                            LOG.info("Need analyze column " + c + " ? " + 
needAnalyzeColumn);
+                            return needAnalyzeColumn;
+                        })
                         .collect(Collectors.toSet());
                 processOneJob(table, columns);
             } catch (Exception e) {
@@ -100,22 +111,92 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
     }
 
     protected void processOneJob(TableIf table, Set<String> columns) throws 
DdlException {
-        Set<String> collect = columns.stream().filter(c -> 
needAnalyzeColumn(table, c)).collect(Collectors.toSet());
-        if (collect.isEmpty()) {
+        appendPartitionColumns(table, columns);
+        if (columns.isEmpty()) {
             return;
         }
         AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns);
+        LOG.info("Analyze job : {}", analyzeJob.toString());
         createSystemAnalysisJob(analyzeJob);
     }
 
+    protected void appendPartitionColumns(TableIf table, Set<String> columns) {
+        if (!(table instanceof OlapTable)) {
+            return;
+        }
+        AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
+        TableStatsMeta tableStatsStatus = 
manager.findTableStatsStatus(table.getId());
+        if (tableStatsStatus != null && 
tableStatsStatus.newPartitionLoaded.get()) {
+            OlapTable olapTable = (OlapTable) table;
+            columns.addAll(olapTable.getPartitionNames());
+        }
+    }
+
     protected boolean needAnalyzeColumn(TableIf table, String column) {
-        //TODO: Calculate column health value.
-        return true;
+        AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
+        TableStatsMeta tableStatsStatus = 
manager.findTableStatsStatus(table.getId());
+        if (tableStatsStatus == null) {
+            return true;
+        }
+        if (tableStatsStatus.userInjected) {
+            return false;
+        }
+        ColStatsMeta columnStatsMeta = 
tableStatsStatus.findColumnStatsMeta(column);
+        if (columnStatsMeta == null) {
+            return true;
+        }
+        if (table instanceof OlapTable) {
+            long currentUpdatedRows = tableStatsStatus.updatedRows.get();
+            long lastAnalyzeUpdateRows = columnStatsMeta.updatedRows;
+            if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) {
+                return true;
+            }
+            OlapTable olapTable = (OlapTable) table;
+            if (tableStatsStatus.newPartitionLoaded.get() && 
olapTable.isPartitionColumn(column)) {
+                return true;
+            }
+            if (columnStatsMeta.rowCount == 0 && olapTable.getRowCount() > 0) {
+                return true;
+            }
+            if (currentUpdatedRows == lastAnalyzeUpdateRows) {
+                return false;
+            }
+            double healthValue = ((double) (currentUpdatedRows - 
lastAnalyzeUpdateRows)
+                    / (double) currentUpdatedRows) * 100.0;
+            LOG.info("Column " + column + " health value is " + healthValue);
+            return healthValue < StatisticsUtil.getTableStatsHealthThreshold();
+        } else {
+            if (!(table instanceof HMSExternalTable)) {
+                return false;
+            }
+            HMSExternalTable hmsTable = (HMSExternalTable) table;
+            if (!hmsTable.getDlaType().equals(DLAType.HIVE)) {
+                return false;
+            }
+            return System.currentTimeMillis()
+                    - tableStatsStatus.updatedTime > 
StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
+        }
+    }
+
+    protected boolean supportAutoAnalyze(TableIf tableIf) {
+        if (tableIf == null) {
+            return false;
+        }
+        return tableIf instanceof OlapTable
+                || tableIf instanceof HMSExternalTable
+                && ((HMSExternalTable) 
tableIf).getDlaType().equals(HMSExternalTable.DLAType.HIVE);
     }
 
     protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String> 
columns) {
         AnalysisMethod analysisMethod = table.getDataSize(true) >= 
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
                 ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
+        AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
+        TableStatsMeta tableStatsStatus = 
manager.findTableStatsStatus(table.getId());
+        long rowCount = table.getRowCount();
+        Map<String, Set<String>> colToPartitions = new HashMap<>();
+        Set<String> dummyPartition = new HashSet<>();
+        dummyPartition.add("dummy partition");
+        columns.stream().forEach(c -> colToPartitions.put(c, dummyPartition));
         return new AnalysisInfoBuilder()
                 .setJobId(Env.getCurrentEnv().getNextId())
                 .setCatalogId(table.getDatabase().getCatalog().getId())
@@ -133,7 +214,9 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
                 .setLastExecTimeInMs(System.currentTimeMillis())
                 .setJobType(JobType.SYSTEM)
                 .setTblUpdateTime(table.getUpdateTime())
-                .setEmptyJob(table instanceof OlapTable && table.getRowCount() 
== 0)
+                .setRowCount(rowCount)
+                .setUpdateRows(tableStatsStatus == null ? 0 : 
tableStatsStatus.updatedRows.get())
+                .setColToPartitions(colToPartitions)
                 .build();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
index 73d0d1340ad..71bb71d3cda 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
@@ -19,6 +19,7 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.util.MasterDaemon;
@@ -28,7 +29,7 @@ import org.apache.doris.statistics.util.StatisticsUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -70,34 +71,44 @@ public class StatisticsJobAppender extends MasterDaemon {
 
     protected void appendJobs() {
         AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager();
+        // LOG.info("Append column to high priority job map.");
         appendColumnsToJobs(manager.highPriorityColumns, 
manager.highPriorityJobs);
+        // LOG.info("Append column to mid priority job map.");
         appendColumnsToJobs(manager.midPriorityColumns, 
manager.midPriorityJobs);
-        appendToLowQueue(manager.lowPriorityJobs);
+        if (StatisticsUtil.enableAutoAnalyzeInternalCatalog()) {
+            // LOG.info("Append column to low priority job map.");
+            appendToLowQueue(manager.lowPriorityJobs);
+        }
     }
 
     protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue, 
Map<TableIf, Set<String>> jobsMap) {
         int size = columnQueue.size();
         for (int i = 0; i < size; i++) {
             HighPriorityColumn column = columnQueue.poll();
+            LOG.info("Process column " + column.tblId + "." + column.colName);
             TableIf table = StatisticsUtil.findTable(column.catalogId, 
column.dbId, column.tblId);
             synchronized (jobsMap) {
                 // If job map reach the upper limit, stop putting new jobs.
                 if (!jobsMap.containsKey(table) && jobsMap.size() >= 
JOB_MAP_SIZE) {
+                    LOG.info("Job map full.");
                     break;
                 }
                 if (jobsMap.containsKey(table)) {
                     jobsMap.get(table).add(column.colName);
                 } else {
-                    jobsMap.put(table, Collections.singleton(column.colName));
+                    HashSet<String> columns = new HashSet<>();
+                    columns.add(column.colName);
+                    jobsMap.put(table, columns);
                 }
+                LOG.info("Column " + column.tblId + "." + column.colName + " 
added");
             }
         }
     }
 
-    protected void appendToLowQueue(Map<TableIf, Set<String>>  jobsMap) {
-
+    protected void appendToLowQueue(Map<TableIf, Set<String>> jobsMap) {
         InternalCatalog catalog = Env.getCurrentInternalCatalog();
         List<Long> sortedDbs = 
catalog.getDbIds().stream().sorted().collect(Collectors.toList());
+        int batchSize = 100;
         for (long dbId : sortedDbs) {
             if (dbId < currentDbId
                     || 
StatisticConstants.SYSTEM_DBS.contains(catalog.getDbNullable(dbId).getFullName()))
 {
@@ -108,11 +119,11 @@ public class StatisticsJobAppender extends MasterDaemon {
             List<Table> tables = db.get().getTables().stream()
                     .sorted((t1, t2) -> (int) (t1.getId() - 
t2.getId())).collect(Collectors.toList());
             for (Table t : tables) {
-                if (t.getId() <= currentTableId) {
+                if (!(t instanceof OlapTable) || t.getId() <= currentTableId) {
                     continue;
                 }
                 synchronized (jobsMap) {
-                    // If job map reach the upper limit, stop putting new jobs.
+                    // If job map reach the upper limit, stop adding new jobs.
                     if (!jobsMap.containsKey(t) && jobsMap.size() >= 
JOB_MAP_SIZE) {
                         return;
                     }
@@ -126,6 +137,9 @@ public class StatisticsJobAppender extends MasterDaemon {
                     }
                 }
                 currentTableId = t.getId();
+                if (--batchSize <= 0) {
+                    return;
+                }
             }
         }
         // All tables have been processed once, reset for the next loop.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
index 9231c6a2bc7..96ca0aab54c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
@@ -126,11 +126,6 @@ public class TableStatsMeta implements Writable {
         return colNameToColStatsMeta.keySet();
     }
 
-    public void reset() {
-        updatedTime = 0;
-        colNameToColStatsMeta.values().forEach(ColStatsMeta::clear);
-    }
-
     public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
         updatedTime = analyzedJob.tblUpdateTime;
         userInjected = analyzedJob.userInject;
@@ -145,34 +140,34 @@ public class TableStatsMeta implements Writable {
         for (String col : cols) {
             ColStatsMeta colStatsMeta = colNameToColStatsMeta.get(col);
             if (colStatsMeta == null) {
-                colNameToColStatsMeta.put(col, new ColStatsMeta(updatedTime,
-                        analyzedJob.analysisMethod, analyzedJob.analysisType, 
analyzedJob.jobType, 0));
+                colNameToColStatsMeta.put(col, new ColStatsMeta(updatedTime, 
analyzedJob.analysisMethod,
+                        analyzedJob.analysisType, analyzedJob.jobType, 0, 
analyzedJob.rowCount,
+                        analyzedJob.updateRows));
             } else {
                 colStatsMeta.updatedTime = updatedTime;
                 colStatsMeta.analysisType = analyzedJob.analysisType;
                 colStatsMeta.analysisMethod = analyzedJob.analysisMethod;
                 colStatsMeta.jobType = analyzedJob.jobType;
+                colStatsMeta.updatedRows = analyzedJob.updateRows;
+                colStatsMeta.rowCount = analyzedJob.rowCount;
             }
         }
         jobType = analyzedJob.jobType;
         if (tableIf != null) {
             if (tableIf instanceof OlapTable) {
-                rowCount = analyzedJob.emptyJob ? 0 : tableIf.getRowCount();
-            }
-            if (!analyzedJob.emptyJob && analyzedJob.colToPartitions.keySet()
-                    .containsAll(tableIf.getBaseSchema().stream()
-                            .filter(c -> 
!StatisticsUtil.isUnsupportedType(c.getType()))
-                            
.map(Column::getName).collect(Collectors.toSet()))) {
-                updatedRows.set(0);
-                newPartitionLoaded.set(false);
-            }
-            if (tableIf instanceof OlapTable) {
+                rowCount = analyzedJob.rowCount;
                 PartitionInfo partitionInfo = ((OlapTable) 
tableIf).getPartitionInfo();
-                if (partitionInfo != null && 
analyzedJob.colToPartitions.keySet()
+                if (analyzedJob.rowCount != 0 && partitionInfo != null && 
analyzedJob.colToPartitions.keySet()
                         
.containsAll(partitionInfo.getPartitionColumns().stream()
-                            
.map(Column::getName).collect(Collectors.toSet()))) {
+                                
.map(Column::getName).collect(Collectors.toSet()))) {
                     newPartitionLoaded.set(false);
                 }
+                if (analyzedJob.rowCount != 0 && 
analyzedJob.colToPartitions.keySet()
+                        .containsAll(tableIf.getBaseSchema().stream()
+                                .filter(c -> 
!StatisticsUtil.isUnsupportedType(c.getType()))
+                                
.map(Column::getName).collect(Collectors.toSet()))) {
+                    userInjected = false;
+                }
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index f56aa0db607..0a56a11d115 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -898,6 +898,16 @@ public class StatisticsUtil {
         return false;
     }
 
+    public static boolean enableAutoAnalyzeInternalCatalog() {
+        try {
+            return findConfigFromGlobalSessionVar(
+                        
SessionVariable.ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG).enableAutoAnalyzeInternalCatalog;
+        } catch (Exception e) {
+            LOG.warn("Fail to get value of enable auto analyze internal 
catalog, return false by default", e);
+        }
+        return false;
+    }
+
     public static int getInsertMergeCount() {
         try {
             return 
findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT)
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index 1bf2041bb4f..cb2637d5cf6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -184,7 +184,7 @@ public class AnalysisJobTest {
             protected void syncLoadStats() {
             }
         };
-        job.writeBuf();
+        job.flushBuffer();
 
         Assertions.assertEquals(0, job.queryFinished.size());
     }
@@ -210,7 +210,7 @@ public class AnalysisJobTest {
         job.buf.add(new ColStatsData());
         job.queryFinished = new HashSet<>();
         job.queryFinished.add(task2);
-        job.writeBuf();
+        job.flushBuffer();
         Assertions.assertEquals(0, job.queryFinished.size());
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index f8a77fe06db..4e8bbfe5aff 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -306,7 +306,7 @@ public class AnalysisManagerTest {
         Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2));
 
         TableStatsMeta stats3 = new TableStatsMeta(0, new AnalysisInfoBuilder()
-                .setColToPartitions(new 
HashMap<>()).setEmptyJob(true).setColName("col1").build(), olapTable);
+                .setColToPartitions(new 
HashMap<>()).setRowCount(0).setColName("col1").build(), olapTable);
         Assertions.assertTrue(olapTable.needReAnalyzeTable(stats3));
 
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to