This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/high-priority-column by this 
push:
     new 88ed1980d86 High priority queue and map. (#31509)
88ed1980d86 is described below

commit 88ed1980d86731b799963f63eeef35390663f159
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Wed Feb 28 12:01:55 2024 +0800

    High priority queue and map. (#31509)
---
 .../main/java/org/apache/doris/catalog/Env.java    |  20 +-
 .../apache/doris/statistics/AnalysisManager.java   |  33 +-
 .../doris/statistics/StatisticsAutoCollector.java  | 202 ++------
 .../doris/statistics/StatisticsCollector.java      |   4 -
 .../doris/statistics/StatisticsJobAppender.java    | 135 +++++
 .../statistics/StatisticsAutoCollectorTest.java    | 546 ---------------------
 6 files changed, 213 insertions(+), 727 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index d5bcd324d9b..66812579d2e 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -243,6 +243,7 @@ import org.apache.doris.statistics.AnalysisManager;
 import org.apache.doris.statistics.StatisticsAutoCollector;
 import org.apache.doris.statistics.StatisticsCache;
 import org.apache.doris.statistics.StatisticsCleaner;
+import org.apache.doris.statistics.StatisticsJobAppender;
 import org.apache.doris.statistics.query.QueryStats;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
@@ -516,6 +517,8 @@ public class Env {
 
     private StatisticsAutoCollector statisticsAutoCollector;
 
+    private StatisticsJobAppender statisticsJobAppender;
+
     private HiveTransactionMgr hiveTransactionMgr;
 
     private TopicPublisherThread topicPublisherThread;
@@ -743,6 +746,7 @@ public class Env {
         this.analysisManager = new AnalysisManager();
         this.statisticsCleaner = new StatisticsCleaner();
         this.statisticsAutoCollector = new StatisticsAutoCollector();
+        this.statisticsJobAppender = new StatisticsJobAppender();
         this.globalFunctionMgr = new GlobalFunctionMgr();
         this.workloadGroupMgr = new WorkloadGroupMgr();
         this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
@@ -1019,13 +1023,6 @@ public class Env {
             // If not using bdb, we need to notify the FE type transfer 
manually.
             notifyNewFETypeTransfer(FrontendNodeType.MASTER);
         }
-        if (statisticsCleaner != null) {
-            statisticsCleaner.start();
-        }
-        if (statisticsAutoCollector != null) {
-            statisticsAutoCollector.start();
-        }
-
         queryCancelWorker.start();
 
         TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
@@ -1663,6 +1660,11 @@ public class Env {
         binlogGcer.start();
         columnIdFlusher.start();
         insertOverwriteManager.start();
+
+        // auto analyze related threads.
+        statisticsCleaner.start();
+        statisticsAutoCollector.start();
+        statisticsJobAppender.start();
     }
 
     // start threads that should running on all FE
@@ -5994,6 +5996,10 @@ public class Env {
         return statisticsAutoCollector;
     }
 
+    public StatisticsJobAppender getStatisticsJobAppender() {
+        return statisticsJobAppender;
+    }
+
     public void alterMTMVRefreshInfo(AlterMTMVRefreshInfo info) {
         AlterMTMV alter = new AlterMTMV(info.getMvName(), 
info.getRefreshInfo(), MTMVAlterOpType.ALTER_REFRESH_INFO);
         this.alter.processAlterMTMV(alter, false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index da80a48081b..31928491e3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -85,6 +85,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -108,15 +109,12 @@ public class AnalysisManager implements Writable {
 
     private static final Logger LOG = 
LogManager.getLogger(AnalysisManager.class);
 
-    /**
-     * Mem only.
-     */
-    public final Queue<HighPriorityColumn> predicateColumns = new 
ArrayBlockingQueue<>(100);
-
-    /**
-     * Mem only.
-     */
-    public final Queue<HighPriorityColumn> queryColumns = new 
ArrayBlockingQueue<>(100);
+    private static final int COLUMN_QUEUE_SIZE = 1000;
+    public final Queue<HighPriorityColumn> highPriorityColumns = new 
ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
+    public final Queue<HighPriorityColumn> midPriorityColumns = new 
ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
+    public final Map<TableIf, Set<String>> highPriorityJobs = new 
LinkedHashMap<>();
+    public final Map<TableIf, Set<String>> midPriorityJobs = new 
LinkedHashMap<>();
+    public final Map<TableIf, Set<String>> lowPriorityJobs = new 
LinkedHashMap<>();
 
     // Tracking running manually submitted async tasks, keep in mem only
     protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> 
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
@@ -167,11 +165,6 @@ public class AnalysisManager implements Writable {
 
     public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) 
throws DdlException, AnalysisException {
         DatabaseIf<TableIf> db = analyzeDBStmt.getDb();
-        // Using auto analyzer if user specifies.
-        if 
(analyzeDBStmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer"))
 {
-            Env.getCurrentEnv().getStatisticsAutoCollector().analyzeDb(db);
-            return;
-        }
         List<AnalysisInfo> analysisInfos = buildAnalysisInfosForDB(db, 
analyzeDBStmt.getAnalyzeProperties());
         if (!analyzeDBStmt.isSync()) {
             sendJobId(analysisInfos, proxy);
@@ -219,6 +212,11 @@ public class AnalysisManager implements Writable {
 
     // Each analyze stmt corresponding to an analysis job.
     public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws 
DdlException {
+        // Using auto analyzer if user specifies.
+        if 
(stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) {
+            
Env.getCurrentEnv().getStatisticsAutoCollector().processOneJob(stmt.getTable(), 
stmt.getColumnNames());
+            return;
+        }
         AnalysisInfo jobInfo = buildAndAssignJob(stmt);
         if (jobInfo == null) {
             return;
@@ -1103,11 +1101,11 @@ public class AnalysisManager implements Writable {
 
 
     public void updateColumnUsedInPredicate(Set<Slot> slotReferences) {
-        updateColumn(slotReferences, predicateColumns);
+        updateColumn(slotReferences, highPriorityColumns);
     }
 
     public void updateQueriedColumn(Collection<Slot> slotReferences) {
-        updateColumn(slotReferences, queryColumns);
+        updateColumn(slotReferences, midPriorityColumns);
     }
 
     protected void updateColumn(Collection<Slot> slotReferences, 
Queue<HighPriorityColumn> queue) {
@@ -1117,7 +1115,8 @@ public class AnalysisManager implements Writable {
             }
             Optional<Column> optionalColumn = ((SlotReference) s).getColumn();
             Optional<TableIf> optionalTable = ((SlotReference) s).getTable();
-            if (optionalColumn.isPresent() && optionalTable.isPresent()) {
+            if (optionalColumn.isPresent() && optionalTable.isPresent()
+                    && 
!StatisticsUtil.isUnsupportedType(optionalColumn.get().getType())) {
                 TableIf table = optionalTable.get();
                 DatabaseIf database = table.getDatabase();
                 if (database != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index dbb7046467a..19b9c69db08 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -17,31 +17,24 @@
 
 package org.apache.doris.statistics;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 import org.apache.doris.statistics.AnalysisInfo.JobType;
 import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
-import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.time.LocalTime;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -59,8 +52,23 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
 
     @Override
     protected void collect() {
-        if (canCollect()) {
-            analyzeAll();
+        while (canCollect()) {
+            Map.Entry<TableIf, Set<String>> job = getJob();
+            if (job == null) {
+                // No more job to process, break and sleep.
+                break;
+            }
+            try {
+                TableIf table = job.getKey();
+                Set<String> columns = job.getValue()
+                        .stream()
+                        .filter(c -> needAnalyzeColumn(table, c))
+                        .collect(Collectors.toSet());
+                processOneJob(table, columns);
+            } catch (Exception e) {
+                LOG.warn("Failed to analyze table {} with columns [{}]",
+                        job.getKey().getName(), 
job.getValue().stream().collect(Collectors.joining(",")), e);
+            }
         }
     }
 
@@ -69,133 +77,56 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
             && 
StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
     }
 
-    protected void analyzeAll() {
-        List<CatalogIf> catalogs = getCatalogsInOrder();
-        for (CatalogIf ctl : catalogs) {
-            if (!canCollect()) {
-                analysisTaskExecutor.clear();
-                break;
-            }
-            if (!ctl.enableAutoAnalyze()) {
-                continue;
-            }
-            List<DatabaseIf> dbs = getDatabasesInOrder(ctl);
-            for (DatabaseIf<TableIf> databaseIf : dbs) {
-                if (!canCollect()) {
-                    analysisTaskExecutor.clear();
-                    break;
-                }
-                if 
(StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) {
-                    continue;
-                }
-                try {
-                    analyzeDb(databaseIf);
-                } catch (Throwable t) {
-                    LOG.warn("Failed to analyze database {}.{}", 
ctl.getName(), databaseIf.getFullName(), t);
-                    continue;
-                }
-            }
+    protected Map.Entry<TableIf, Set<String>> getJob() {
+        AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
+        Optional<Map.Entry<TableIf, Set<String>>> job = 
fetchJobFromMap(manager.highPriorityJobs);
+        if (job.isPresent()) {
+            return job.get();
         }
+        job = fetchJobFromMap(manager.midPriorityJobs);
+        if (job.isPresent()) {
+            return job.get();
+        }
+        job = fetchJobFromMap(manager.lowPriorityJobs);
+        return job.isPresent() ? job.get() : null;
     }
 
-    public List<CatalogIf> getCatalogsInOrder() {
-        return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream()
-            .sorted((c1, c2) -> (int) (c1.getId() - 
c2.getId())).collect(Collectors.toList());
-    }
-
-    public List<DatabaseIf<? extends TableIf>> 
getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
-        return catalog.getAllDbs().stream()
-            .sorted((d1, d2) -> (int) (d1.getId() - 
d2.getId())).collect(Collectors.toList());
-    }
-
-    public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) {
-        return db.getTables().stream()
-            .sorted((t1, t2) -> (int) (t1.getId() - 
t2.getId())).collect(Collectors.toList());
-    }
-
-    public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException {
-        List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
-        for (AnalysisInfo analysisInfo : analysisInfos) {
-            try {
-                if (!canCollect()) {
-                    analysisTaskExecutor.clear();
-                    break;
-                }
-                analysisInfo = getReAnalyzeRequiredPart(analysisInfo);
-                if (analysisInfo == null) {
-                    continue;
-                }
-                createSystemAnalysisJob(analysisInfo);
-            } catch (Throwable t) {
-                analysisInfo.message = t.getMessage();
-                LOG.warn("Failed to auto analyze table {}.{}, reason {}",
-                        databaseIf.getFullName(), analysisInfo.tblId, 
analysisInfo.message, t);
-                continue;
-            }
+    protected Optional<Map.Entry<TableIf, Set<String>>> 
fetchJobFromMap(Map<TableIf, Set<String>> jobMap) {
+        synchronized (jobMap) {
+            Optional<Map.Entry<TableIf, Set<String>>> first = 
jobMap.entrySet().stream().findFirst();
+            first.ifPresent(entry -> jobMap.remove(entry.getKey()));
+            return first;
         }
     }
 
-    protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends 
TableIf> db) {
-        List<AnalysisInfo> analysisInfos = new ArrayList<>();
-        for (TableIf table : getTablesInOrder(db)) {
-            try {
-                if (skip(table)) {
-                    continue;
-                }
-                createAnalyzeJobForTbl(db, analysisInfos, table);
-            } catch (Throwable t) {
-                LOG.warn("Failed to analyze table {}.{}.{}",
-                        db.getCatalog().getName(), db.getFullName(), 
table.getName(), t);
-                continue;
-            }
+    protected void processOneJob(TableIf table, Set<String> columns) throws 
DdlException {
+        Set<String> collect = columns.stream().filter(c -> 
needAnalyzeColumn(table, c)).collect(Collectors.toSet());
+        if (collect.isEmpty()) {
+            return;
         }
-        return analysisInfos;
+        AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns);
+        createSystemAnalysisJob(analyzeJob);
     }
 
-    // return true if skip auto analyze this time.
-    protected boolean skip(TableIf table) {
-        if (!(table instanceof OlapTable || table instanceof 
HMSExternalTable)) {
-            return true;
-        }
-        // For now, only support Hive HMS table auto collection.
-        if (table instanceof HMSExternalTable
-                && !((HMSExternalTable) 
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
-            return true;
-        }
-        if (table.getDataSize(true) < 
StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {
-            return false;
-        }
-        TableStatsMeta tableStats = 
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
-        // means it's never got analyzed or new partition loaded data.
-        if (tableStats == null || tableStats.newPartitionLoaded.get()) {
-            return false;
-        }
-        if (tableStats.userInjected) {
-            return true;
-        }
-        return System.currentTimeMillis()
-                - tableStats.updatedTime < 
StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis();
+    protected boolean needAnalyzeColumn(TableIf table, String column) {
+        //TODO: Calculate column health value.
+        return true;
     }
 
-    protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
-            List<AnalysisInfo> analysisInfos, TableIf table) {
+    protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String> 
columns) {
         AnalysisMethod analysisMethod = table.getDataSize(true) >= 
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
                 ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
-        AnalysisInfo jobInfo = new AnalysisInfoBuilder()
+        return new AnalysisInfoBuilder()
                 .setJobId(Env.getCurrentEnv().getNextId())
-                .setCatalogId(db.getCatalog().getId())
-                .setDBId(db.getId())
+                .setCatalogId(table.getDatabase().getCatalog().getId())
+                .setDBId(table.getDatabase().getId())
                 .setTblId(table.getId())
-                .setColName(
-                        table.getSchemaAllIndexes(false).stream()
-                            .filter(c -> 
!StatisticsUtil.isUnsupportedType(c.getType()))
-                            
.map(Column::getName).collect(Collectors.joining(","))
-                )
+                .setColName(columns.stream().collect(Collectors.joining(",")))
                 .setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
                 .setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
                 .setAnalysisMethod(analysisMethod)
                 .setSampleRows(analysisMethod.equals(AnalysisMethod.SAMPLE)
-                        ? StatisticsUtil.getHugeTableSampleRows() : -1)
+                    ? StatisticsUtil.getHugeTableSampleRows() : -1)
                 .setScheduleType(ScheduleType.AUTOMATIC)
                 .setState(AnalysisState.PENDING)
                 .setTaskIds(new ArrayList<>())
@@ -204,40 +135,5 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
                 .setTblUpdateTime(table.getUpdateTime())
                 .setEmptyJob(table instanceof OlapTable && table.getRowCount() 
== 0)
                 .build();
-        analysisInfos.add(jobInfo);
-    }
-
-    @VisibleForTesting
-    protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
-        TableIf table = StatisticsUtil.findTable(jobInfo.catalogId, 
jobInfo.dbId, jobInfo.tblId);
-        // Skip tables that are too wide.
-        if (table.getBaseSchema().size() > 
StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) {
-            return null;
-        }
-
-        AnalysisManager analysisManager = 
Env.getServingEnv().getAnalysisManager();
-        TableStatsMeta tblStats = 
analysisManager.findTableStatsStatus(table.getId());
-
-        Map<String, Set<String>> needRunPartitions = null;
-        String colNames = jobInfo.colName;
-        if (table.needReAnalyzeTable(tblStats)) {
-            needRunPartitions = table.findReAnalyzeNeededPartitions();
-        } else if (table instanceof OlapTable && 
tblStats.newPartitionLoaded.get()) {
-            OlapTable olapTable = (OlapTable) table;
-            needRunPartitions = new HashMap<>();
-            Set<String> partitionColumnNames = 
olapTable.getPartitionInfo().getPartitionColumns().stream()
-                    .map(Column::getName).collect(Collectors.toSet());
-            colNames = 
partitionColumnNames.stream().collect(Collectors.joining(","));
-            Set<String> partitionNames = olapTable.getAllPartitions().stream()
-                    .map(Partition::getName).collect(Collectors.toSet());
-            for (String column : partitionColumnNames) {
-                needRunPartitions.put(column, partitionNames);
-            }
-        }
-
-        if (needRunPartitions == null || needRunPartitions.isEmpty()) {
-            return null;
-        }
-        return new 
AnalysisInfoBuilder(jobInfo).setColName(colNames).setColToPartitions(needRunPartitions).build();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
index 0985b9b2b95..e26d3170178 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -61,10 +61,6 @@ public abstract class StatisticsCollector extends 
MasterDaemon {
     @VisibleForTesting
     protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
             throws DdlException {
-        if (jobInfo.colToPartitions.isEmpty()) {
-            // No statistics need to be collected or updated
-            return;
-        }
         Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
         AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
         analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, 
false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
new file mode 100644
index 00000000000..73d0d1340ad
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StatisticsJobAppender extends MasterDaemon {
+
+    private static final Logger LOG = 
LogManager.getLogger(StatisticsJobAppender.class);
+
+    public static final long INTERVAL = 1000;
+    public static final int JOB_MAP_SIZE = 1000;
+
+    private long currentDbId;
+    private long currentTableId;
+
+    public StatisticsJobAppender() {
+        super("Statistics Job Appender", INTERVAL);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        if (!StatisticsUtil.enableAutoAnalyze()) {
+            return;
+        }
+        if (!Env.getCurrentEnv().isMaster()) {
+            return;
+        }
+        if (!StatisticsUtil.statsTblAvailable()) {
+            LOG.info("Stats table not available, skip");
+            return;
+        }
+        if (Env.isCheckpointThread()) {
+            return;
+        }
+        appendJobs();
+    }
+
+    protected void appendJobs() {
+        AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager();
+        appendColumnsToJobs(manager.highPriorityColumns, 
manager.highPriorityJobs);
+        appendColumnsToJobs(manager.midPriorityColumns, 
manager.midPriorityJobs);
+        appendToLowQueue(manager.lowPriorityJobs);
+    }
+
+    protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue, 
Map<TableIf, Set<String>> jobsMap) {
+        int size = columnQueue.size();
+        for (int i = 0; i < size; i++) {
+            HighPriorityColumn column = columnQueue.poll();
+            TableIf table = StatisticsUtil.findTable(column.catalogId, 
column.dbId, column.tblId);
+            synchronized (jobsMap) {
+                // If job map reach the upper limit, stop putting new jobs.
+                if (!jobsMap.containsKey(table) && jobsMap.size() >= 
JOB_MAP_SIZE) {
+                    break;
+                }
+                if (jobsMap.containsKey(table)) {
+                    jobsMap.get(table).add(column.colName);
+                } else {
+                    jobsMap.put(table, Collections.singleton(column.colName));
+                }
+            }
+        }
+    }
+
+    protected void appendToLowQueue(Map<TableIf, Set<String>>  jobsMap) {
+
+        InternalCatalog catalog = Env.getCurrentInternalCatalog();
+        List<Long> sortedDbs = 
catalog.getDbIds().stream().sorted().collect(Collectors.toList());
+        for (long dbId : sortedDbs) {
+            if (dbId < currentDbId
+                    || 
StatisticConstants.SYSTEM_DBS.contains(catalog.getDbNullable(dbId).getFullName()))
 {
+                continue;
+            }
+            currentDbId = dbId;
+            Optional<Database> db = catalog.getDb(dbId);
+            List<Table> tables = db.get().getTables().stream()
+                    .sorted((t1, t2) -> (int) (t1.getId() - 
t2.getId())).collect(Collectors.toList());
+            for (Table t : tables) {
+                if (t.getId() <= currentTableId) {
+                    continue;
+                }
+                synchronized (jobsMap) {
+                    // If job map reach the upper limit, stop putting new jobs.
+                    if (!jobsMap.containsKey(t) && jobsMap.size() >= 
JOB_MAP_SIZE) {
+                        return;
+                    }
+                    Set<String> columns
+                            = t.getColumns().stream().filter(c -> 
!StatisticsUtil.isUnsupportedType(c.getType()))
+                            .map(c -> c.getName()).collect(Collectors.toSet());
+                    if (jobsMap.containsKey(t)) {
+                        jobsMap.get(t).addAll(columns);
+                    } else {
+                        jobsMap.put(t, columns);
+                    }
+                }
+                currentTableId = t.getId();
+            }
+        }
+        // All tables have been processed once, reset for the next loop.
+        currentDbId = 0;
+        currentTableId = 0;
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
deleted file mode 100644
index ddc19959a2d..00000000000
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
+++ /dev/null
@@ -1,546 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DatabaseIf;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.EnvFactory;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.Type;
-import org.apache.doris.catalog.View;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
-import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
-import org.apache.doris.statistics.AnalysisInfo.JobType;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class StatisticsAutoCollectorTest {
-
-    @Test
-    public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) {
-        new MockUp<CatalogIf>() {
-            @Mock
-            public Collection<DatabaseIf> getAllDbs() {
-                Database db1 = new Database(1, FeConstants.INTERNAL_DB_NAME);
-                Database db2 = new Database(2, "anyDB");
-                List<DatabaseIf> databaseIfs = new ArrayList<>();
-                databaseIfs.add(db1);
-                databaseIfs.add(db2);
-                return databaseIfs;
-            }
-        };
-        new MockUp<StatisticsAutoCollector>() {
-            @Mock
-            public List<AnalysisInfo> 
constructAnalysisInfo(DatabaseIf<TableIf> db) {
-                return Arrays.asList(analysisInfo, analysisInfo);
-            }
-
-            int count = 0;
-
-            @Mock
-            public AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) 
{
-                return count++ == 0 ? null : jobInfo;
-            }
-
-            @Mock
-            public void createSystemAnalysisJob(AnalysisInfo jobInfo)
-                    throws DdlException {
-
-            }
-        };
-
-        StatisticsAutoCollector saa = new StatisticsAutoCollector();
-        saa.runAfterCatalogReady();
-        new Expectations() {
-            {
-                try {
-                    saa.createSystemAnalysisJob((AnalysisInfo) any);
-                    times = 1;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-    }
-
-    @Test
-    public void testConstructAnalysisInfo(
-            @Injectable OlapTable o2, @Injectable View v) {
-        new MockUp<Database>() {
-            @Mock
-            public List<Table> getTables() {
-                List<Table> tableIfs = new ArrayList<>();
-                tableIfs.add(o2);
-                tableIfs.add(v);
-                return tableIfs;
-            }
-
-            @Mock
-            public String getFullName() {
-                return "anyDb";
-            }
-        };
-
-        new MockUp<OlapTable>() {
-            @Mock
-            public String getName() {
-                return "anytable";
-            }
-
-            @Mock
-            public List<Column> getSchemaAllIndexes(boolean full) {
-                List<Column> columns = new ArrayList<>();
-                columns.add(new Column("c1", PrimitiveType.INT));
-                columns.add(new Column("c2", PrimitiveType.HLL));
-                return columns;
-            }
-        };
-        StatisticsAutoCollector saa = new StatisticsAutoCollector();
-        List<AnalysisInfo> analysisInfoList = saa.constructAnalysisInfo(new 
Database(1, "anydb"));
-        Assertions.assertEquals(1, analysisInfoList.size());
-        Assertions.assertEquals("c1", 
analysisInfoList.get(0).colName.split(",")[0]);
-    }
-
-    @Test
-    public void testGetReAnalyzeRequiredPart0() {
-
-        TableIf tableIf = new OlapTable();
-
-        new MockUp<OlapTable>() {
-            @Mock
-            protected Map<String, Set<String>> findReAnalyzeNeededPartitions() 
{
-                Set<String> partitionNames = new HashSet<>();
-                partitionNames.add("p1");
-                partitionNames.add("p2");
-                Map<String, Set<String>> map = new HashMap<>();
-                map.put("col1", partitionNames);
-                return map;
-            }
-
-            @Mock
-            public long getRowCount() {
-                return 100;
-            }
-
-            @Mock
-            public List<Column> getBaseSchema() {
-                return Lists.newArrayList(new Column("col1", Type.INT), new 
Column("col2", Type.INT));
-            }
-        };
-
-        new MockUp<StatisticsUtil>() {
-            @Mock
-            public TableIf findTable(long catalogName, long dbName, long 
tblName) {
-                return tableIf;
-            }
-        };
-        AnalysisInfo analysisInfo = new 
AnalysisInfoBuilder().setAnalysisMethod(AnalysisMethod.FULL)
-                .setColToPartitions(new HashMap<>()).setAnalysisType(
-                
AnalysisType.FUNDAMENTALS).setColName("col1").setJobType(JobType.SYSTEM).build();
-        new MockUp<AnalysisManager>() {
-
-            int count = 0;
-
-            TableStatsMeta[] tableStatsArr =
-                    new TableStatsMeta[] {new TableStatsMeta(0, analysisInfo, 
tableIf),
-                            new TableStatsMeta(0, analysisInfo, tableIf), 
null};
-
-            {
-                tableStatsArr[0].updatedRows.addAndGet(100);
-                tableStatsArr[1].updatedRows.addAndGet(0);
-            }
-
-            @Mock
-            public TableStatsMeta findTableStatsStatus(long tblId) {
-                return tableStatsArr[count++];
-            }
-        };
-
-        new MockUp<StatisticsAutoCollector>() {
-            @Mock
-            public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, 
TableIf table,
-                    Set<String> needRunPartitions) {
-                return new AnalysisInfoBuilder().build();
-            }
-        };
-        StatisticsAutoCollector statisticsAutoCollector = new 
StatisticsAutoCollector();
-        AnalysisInfo analysisInfo2 = new AnalysisInfoBuilder()
-                .setCatalogId(0)
-                .setDBId(0)
-                .setTblId(0).build();
-        
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
-        // uncomment it when updatedRows gets ready
-        // 
Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
-        
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
-    }
-
-    @Test
-    public void testSkipWideTable() {
-
-        TableIf tableIf = new OlapTable();
-
-        new MockUp<OlapTable>() {
-            @Mock
-            public List<Column> getBaseSchema() {
-                return Lists.newArrayList(new Column("col1", Type.INT), new 
Column("col2", Type.INT));
-            }
-        };
-
-        new MockUp<StatisticsUtil>() {
-            int count = 0;
-            int [] thresholds = {1, 10};
-            @Mock
-            public TableIf findTable(long catalogName, long dbName, long 
tblName) {
-                return tableIf;
-            }
-
-            @Mock
-            public int getAutoAnalyzeTableWidthThreshold() {
-                return thresholds[count++];
-            }
-        };
-
-        new MockUp<OlapTable>() {
-            @Mock
-            public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
-                HashMap<String, Set<String>> ret = Maps.newHashMap();
-                ret.put("key1", Sets.newHashSet());
-                return ret;
-            }
-        };
-
-        AnalysisInfo analysisInfo = new AnalysisInfoBuilder().build();
-        StatisticsAutoCollector statisticsAutoCollector = new 
StatisticsAutoCollector();
-        
Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo));
-        
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo));
-    }
-
-    @Test
-    public void testLoop() {
-        AtomicBoolean timeChecked = new AtomicBoolean();
-        AtomicBoolean switchChecked = new AtomicBoolean();
-        new MockUp<StatisticsUtil>() {
-
-            @Mock
-            public boolean inAnalyzeTime(LocalTime now) {
-                timeChecked.set(true);
-                return true;
-            }
-
-            @Mock
-            public boolean enableAutoAnalyze() {
-                switchChecked.set(true);
-                return true;
-            }
-        };
-        StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
-        autoCollector.collect();
-        Assertions.assertTrue(timeChecked.get() && switchChecked.get());
-
-    }
-
-    @Test
-    public void checkAvailableThread() {
-        StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
-        
Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num,
-                
autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize());
-    }
-
-    @Test
-    public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta 
stats, @Mocked TableIf anyOtherTable) {
-        new MockUp<OlapTable>() {
-
-            @Mock
-            public long getDataSize(boolean singleReplica) {
-                return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5 
+ 1000000000;
-            }
-        };
-
-        new MockUp<AnalysisManager>() {
-
-            @Mock
-            public TableStatsMeta findTableStatsStatus(long tblId) {
-                return stats;
-            }
-        };
-        // A very huge table has been updated recently, so we should skip it 
this time
-        stats.updatedTime = System.currentTimeMillis() - 1000;
-        stats.newPartitionLoaded = new AtomicBoolean();
-        stats.newPartitionLoaded.set(true);
-        StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
-        // Test new partition loaded data for the first time. Not skip.
-        Assertions.assertFalse(autoCollector.skip(olapTable));
-        stats.newPartitionLoaded.set(false);
-        // Assertions.assertTrue(autoCollector.skip(olapTable));
-        // The update of this huge table is long time ago, so we shouldn't 
skip it this time
-        stats.updatedTime = System.currentTimeMillis()
-                - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 
10000;
-        Assertions.assertFalse(autoCollector.skip(olapTable));
-        new MockUp<AnalysisManager>() {
-
-            @Mock
-            public TableStatsMeta findTableStatsStatus(long tblId) {
-                return null;
-            }
-        };
-        // can't find table stats meta, which means this table never get 
analyzed,  so we shouldn't skip it this time
-        Assertions.assertFalse(autoCollector.skip(olapTable));
-        new MockUp<AnalysisManager>() {
-
-            @Mock
-            public TableStatsMeta findTableStatsStatus(long tblId) {
-                return stats;
-            }
-        };
-        stats.userInjected = true;
-        Assertions.assertTrue(autoCollector.skip(olapTable));
-        // this is not olap table nor external table, so we should skip it 
this time
-        Assertions.assertTrue(autoCollector.skip(anyOtherTable));
-    }
-
-    // For small table, use full
-    @Test
-    public void testCreateAnalyzeJobForTbl1(
-            @Injectable OlapTable t1,
-            @Injectable Database db
-    ) throws Exception {
-        new MockUp<Database>() {
-
-            @Mock
-            public CatalogIf getCatalog() {
-                return Env.getCurrentInternalCatalog();
-            }
-
-            @Mock
-            public long getId() {
-                return 0;
-            }
-        };
-        new MockUp<OlapTable>() {
-
-            int count = 0;
-
-            @Mock
-            public List<Column> getBaseSchema() {
-                return Lists.newArrayList(new Column("test", 
PrimitiveType.INT));
-            }
-
-            @Mock
-            public long getDataSize(boolean singleReplica) {
-                return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() - 1;
-            }
-
-            @Mock
-            public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
-                return new OlapAnalysisTask(info);
-            }
-        };
-
-        new MockUp<StatisticsUtil>() {
-            @Mock
-            public TableIf findTable(long catalogId, long dbId, long tblId) {
-                return t1;
-            }
-        };
-
-        StatisticsAutoCollector sac = new StatisticsAutoCollector();
-        List<AnalysisInfo> jobInfos = new ArrayList<>();
-        sac.createAnalyzeJobForTbl(db, jobInfos, t1);
-        AnalysisInfo jobInfo = jobInfos.get(0);
-        Map<String, Set<String>> colToPartitions = new HashMap<>();
-        colToPartitions.put("test", new HashSet<String>() {
-            {
-                add("p1");
-            }
-        });
-        jobInfo = new 
AnalysisInfoBuilder(jobInfo).setColToPartitions(colToPartitions).build();
-        Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
-        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
-        analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, 
false);
-        Assertions.assertEquals(1, analysisTasks.size());
-        for (BaseAnalysisTask task : analysisTasks.values()) {
-            Assertions.assertNull(task.getTableSample());
-        }
-    }
-
-    // for big table, use sample
-    @Test
-    public void testCreateAnalyzeJobForTbl2(
-            @Injectable OlapTable t1,
-            @Injectable Database db
-    ) throws Exception {
-        new MockUp<Database>() {
-
-            @Mock
-            public CatalogIf getCatalog() {
-                return Env.getCurrentInternalCatalog();
-            }
-
-            @Mock
-            public long getId() {
-                return 0;
-            }
-        };
-        new MockUp<OlapTable>() {
-
-            int count = 0;
-
-            @Mock
-            public List<Column> getBaseSchema() {
-                return Lists.newArrayList(new Column("test", 
PrimitiveType.INT));
-            }
-
-            @Mock
-            public long getDataSize(boolean singleReplica) {
-                return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 2;
-            }
-
-            @Mock
-            public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
-                return new OlapAnalysisTask(info);
-            }
-        };
-
-        new MockUp<StatisticsUtil>() {
-            @Mock
-            public TableIf findTable(long catalogId, long dbId, long tblId) {
-                return t1;
-            }
-        };
-
-        StatisticsAutoCollector sac = new StatisticsAutoCollector();
-        List<AnalysisInfo> jobInfos = new ArrayList<>();
-        sac.createAnalyzeJobForTbl(db, jobInfos, t1);
-        AnalysisInfo jobInfo = jobInfos.get(0);
-        Map<String, Set<String>> colToPartitions = new HashMap<>();
-        colToPartitions.put("test", new HashSet<String>() {
-            {
-                add("p1");
-            }
-        });
-        jobInfo = new 
AnalysisInfoBuilder(jobInfo).setColToPartitions(colToPartitions).build();
-        Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
-        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
-        analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, 
false);
-        Assertions.assertEquals(1, analysisTasks.size());
-        for (BaseAnalysisTask task : analysisTasks.values()) {
-            Assertions.assertNotNull(task.getTableSample());
-        }
-    }
-
-    @Test
-    public void testDisableAuto1() throws Exception {
-        InternalCatalog catalog1 = 
EnvFactory.getInstance().createInternalCatalog();
-        List<CatalogIf> catalogs = Lists.newArrayList();
-        catalogs.add(catalog1);
-
-        new MockUp<StatisticsAutoCollector>() {
-            @Mock
-            public List<CatalogIf> getCatalogsInOrder() {
-                return catalogs;
-            }
-
-            @Mock
-            protected boolean canCollect() {
-                return false;
-            }
-
-        };
-
-        StatisticsAutoCollector sac = new StatisticsAutoCollector();
-        new Expectations(catalog1) {{
-                catalog1.enableAutoAnalyze();
-                times = 0;
-            }};
-
-        sac.analyzeAll();
-    }
-
-    @Test
-    public void testDisableAuto2() throws Exception {
-        InternalCatalog catalog1 = 
EnvFactory.getInstance().createInternalCatalog();
-        List<CatalogIf> catalogs = Lists.newArrayList();
-        catalogs.add(catalog1);
-
-        Database db1 = new Database();
-        List<DatabaseIf<? extends TableIf>> dbs = Lists.newArrayList();
-        dbs.add(db1);
-
-        new MockUp<StatisticsAutoCollector>() {
-            int count = 0;
-            boolean[] canCollectReturn = {true, false};
-            @Mock
-            public List<CatalogIf> getCatalogsInOrder() {
-                return catalogs;
-            }
-
-            @Mock
-            public List<DatabaseIf<? extends TableIf>> 
getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
-                return dbs;
-            }
-
-                @Mock
-            protected boolean canCollect() {
-                return canCollectReturn[count++];
-            }
-
-        };
-
-        StatisticsAutoCollector sac = new StatisticsAutoCollector();
-        new Expectations(catalog1, db1) {{
-                catalog1.enableAutoAnalyze();
-                result = true;
-                times = 1;
-                db1.getFullName();
-                times = 0;
-            }};
-
-        sac.analyzeAll();
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to