This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch high-priority-column in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/high-priority-column by this push: new 88ed1980d86 High priority queue and map. (#31509) 88ed1980d86 is described below commit 88ed1980d86731b799963f63eeef35390663f159 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Wed Feb 28 12:01:55 2024 +0800 High priority queue and map. (#31509) --- .../main/java/org/apache/doris/catalog/Env.java | 20 +- .../apache/doris/statistics/AnalysisManager.java | 33 +- .../doris/statistics/StatisticsAutoCollector.java | 202 ++------ .../doris/statistics/StatisticsCollector.java | 4 - .../doris/statistics/StatisticsJobAppender.java | 135 +++++ .../statistics/StatisticsAutoCollectorTest.java | 546 --------------------- 6 files changed, 213 insertions(+), 727 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index d5bcd324d9b..66812579d2e 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -243,6 +243,7 @@ import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.StatisticsAutoCollector; import org.apache.doris.statistics.StatisticsCache; import org.apache.doris.statistics.StatisticsCleaner; +import org.apache.doris.statistics.StatisticsJobAppender; import org.apache.doris.statistics.query.QueryStats; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -516,6 +517,8 @@ public class Env { private StatisticsAutoCollector statisticsAutoCollector; + private StatisticsJobAppender statisticsJobAppender; + private HiveTransactionMgr hiveTransactionMgr; private TopicPublisherThread topicPublisherThread; @@ -743,6 +746,7 @@ public class Env { this.analysisManager = new AnalysisManager(); this.statisticsCleaner = new StatisticsCleaner(); this.statisticsAutoCollector = new StatisticsAutoCollector(); + this.statisticsJobAppender = new StatisticsJobAppender(); this.globalFunctionMgr = new GlobalFunctionMgr(); this.workloadGroupMgr = new WorkloadGroupMgr(); this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr(); @@ -1019,13 +1023,6 @@ public class Env { // If not using bdb, we need to notify the FE type transfer manually. notifyNewFETypeTransfer(FrontendNodeType.MASTER); } - if (statisticsCleaner != null) { - statisticsCleaner.start(); - } - if (statisticsAutoCollector != null) { - statisticsAutoCollector.start(); - } - queryCancelWorker.start(); TopicPublisher wgPublisher = new WorkloadGroupPublisher(this); @@ -1663,6 +1660,11 @@ public class Env { binlogGcer.start(); columnIdFlusher.start(); insertOverwriteManager.start(); + + // auto analyze related threads. + statisticsCleaner.start(); + statisticsAutoCollector.start(); + statisticsJobAppender.start(); } // start threads that should running on all FE @@ -5994,6 +5996,10 @@ public class Env { return statisticsAutoCollector; } + public StatisticsJobAppender getStatisticsJobAppender() { + return statisticsJobAppender; + } + public void alterMTMVRefreshInfo(AlterMTMVRefreshInfo info) { AlterMTMV alter = new AlterMTMV(info.getMvName(), info.getRefreshInfo(), MTMVAlterOpType.ALTER_REFRESH_INFO); this.alter.processAlterMTMV(alter, false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index da80a48081b..31928491e3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -85,6 +85,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -108,15 +109,12 @@ public class AnalysisManager implements Writable { private static final Logger LOG = LogManager.getLogger(AnalysisManager.class); - /** - * Mem only. - */ - public final Queue<HighPriorityColumn> predicateColumns = new ArrayBlockingQueue<>(100); - - /** - * Mem only. - */ - public final Queue<HighPriorityColumn> queryColumns = new ArrayBlockingQueue<>(100); + private static final int COLUMN_QUEUE_SIZE = 1000; + public final Queue<HighPriorityColumn> highPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE); + public final Queue<HighPriorityColumn> midPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE); + public final Map<TableIf, Set<String>> highPriorityJobs = new LinkedHashMap<>(); + public final Map<TableIf, Set<String>> midPriorityJobs = new LinkedHashMap<>(); + public final Map<TableIf, Set<String>> lowPriorityJobs = new LinkedHashMap<>(); // Tracking running manually submitted async tasks, keep in mem only protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); @@ -167,11 +165,6 @@ public class AnalysisManager implements Writable { public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) throws DdlException, AnalysisException { DatabaseIf<TableIf> db = analyzeDBStmt.getDb(); - // Using auto analyzer if user specifies. - if (analyzeDBStmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) { - Env.getCurrentEnv().getStatisticsAutoCollector().analyzeDb(db); - return; - } List<AnalysisInfo> analysisInfos = buildAnalysisInfosForDB(db, analyzeDBStmt.getAnalyzeProperties()); if (!analyzeDBStmt.isSync()) { sendJobId(analysisInfos, proxy); @@ -219,6 +212,11 @@ public class AnalysisManager implements Writable { // Each analyze stmt corresponding to an analysis job. public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws DdlException { + // Using auto analyzer if user specifies. + if (stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) { + Env.getCurrentEnv().getStatisticsAutoCollector().processOneJob(stmt.getTable(), stmt.getColumnNames()); + return; + } AnalysisInfo jobInfo = buildAndAssignJob(stmt); if (jobInfo == null) { return; @@ -1103,11 +1101,11 @@ public class AnalysisManager implements Writable { public void updateColumnUsedInPredicate(Set<Slot> slotReferences) { - updateColumn(slotReferences, predicateColumns); + updateColumn(slotReferences, highPriorityColumns); } public void updateQueriedColumn(Collection<Slot> slotReferences) { - updateColumn(slotReferences, queryColumns); + updateColumn(slotReferences, midPriorityColumns); } protected void updateColumn(Collection<Slot> slotReferences, Queue<HighPriorityColumn> queue) { @@ -1117,7 +1115,8 @@ public class AnalysisManager implements Writable { } Optional<Column> optionalColumn = ((SlotReference) s).getColumn(); Optional<TableIf> optionalTable = ((SlotReference) s).getTable(); - if (optionalColumn.isPresent() && optionalTable.isPresent()) { + if (optionalColumn.isPresent() && optionalTable.isPresent() + && !StatisticsUtil.isUnsupportedType(optionalColumn.get().getType())) { TableIf table = optionalTable.get(); DatabaseIf database = table.getDatabase(); if (database != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index dbb7046467a..19b9c69db08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -17,31 +17,24 @@ package org.apache.doris.statistics; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.StatisticsUtil; -import org.apache.hudi.common.util.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.time.LocalTime; import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -59,8 +52,23 @@ public class StatisticsAutoCollector extends StatisticsCollector { @Override protected void collect() { - if (canCollect()) { - analyzeAll(); + while (canCollect()) { + Map.Entry<TableIf, Set<String>> job = getJob(); + if (job == null) { + // No more job to process, break and sleep. + break; + } + try { + TableIf table = job.getKey(); + Set<String> columns = job.getValue() + .stream() + .filter(c -> needAnalyzeColumn(table, c)) + .collect(Collectors.toSet()); + processOneJob(table, columns); + } catch (Exception e) { + LOG.warn("Failed to analyze table {} with columns [{}]", + job.getKey().getName(), job.getValue().stream().collect(Collectors.joining(",")), e); + } } } @@ -69,133 +77,56 @@ public class StatisticsAutoCollector extends StatisticsCollector { && StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId())); } - protected void analyzeAll() { - List<CatalogIf> catalogs = getCatalogsInOrder(); - for (CatalogIf ctl : catalogs) { - if (!canCollect()) { - analysisTaskExecutor.clear(); - break; - } - if (!ctl.enableAutoAnalyze()) { - continue; - } - List<DatabaseIf> dbs = getDatabasesInOrder(ctl); - for (DatabaseIf<TableIf> databaseIf : dbs) { - if (!canCollect()) { - analysisTaskExecutor.clear(); - break; - } - if (StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) { - continue; - } - try { - analyzeDb(databaseIf); - } catch (Throwable t) { - LOG.warn("Failed to analyze database {}.{}", ctl.getName(), databaseIf.getFullName(), t); - continue; - } - } + protected Map.Entry<TableIf, Set<String>> getJob() { + AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); + Optional<Map.Entry<TableIf, Set<String>>> job = fetchJobFromMap(manager.highPriorityJobs); + if (job.isPresent()) { + return job.get(); } + job = fetchJobFromMap(manager.midPriorityJobs); + if (job.isPresent()) { + return job.get(); + } + job = fetchJobFromMap(manager.lowPriorityJobs); + return job.isPresent() ? job.get() : null; } - public List<CatalogIf> getCatalogsInOrder() { - return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream() - .sorted((c1, c2) -> (int) (c1.getId() - c2.getId())).collect(Collectors.toList()); - } - - public List<DatabaseIf<? extends TableIf>> getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) { - return catalog.getAllDbs().stream() - .sorted((d1, d2) -> (int) (d1.getId() - d2.getId())).collect(Collectors.toList()); - } - - public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) { - return db.getTables().stream() - .sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList()); - } - - public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException { - List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf); - for (AnalysisInfo analysisInfo : analysisInfos) { - try { - if (!canCollect()) { - analysisTaskExecutor.clear(); - break; - } - analysisInfo = getReAnalyzeRequiredPart(analysisInfo); - if (analysisInfo == null) { - continue; - } - createSystemAnalysisJob(analysisInfo); - } catch (Throwable t) { - analysisInfo.message = t.getMessage(); - LOG.warn("Failed to auto analyze table {}.{}, reason {}", - databaseIf.getFullName(), analysisInfo.tblId, analysisInfo.message, t); - continue; - } + protected Optional<Map.Entry<TableIf, Set<String>>> fetchJobFromMap(Map<TableIf, Set<String>> jobMap) { + synchronized (jobMap) { + Optional<Map.Entry<TableIf, Set<String>>> first = jobMap.entrySet().stream().findFirst(); + first.ifPresent(entry -> jobMap.remove(entry.getKey())); + return first; } } - protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends TableIf> db) { - List<AnalysisInfo> analysisInfos = new ArrayList<>(); - for (TableIf table : getTablesInOrder(db)) { - try { - if (skip(table)) { - continue; - } - createAnalyzeJobForTbl(db, analysisInfos, table); - } catch (Throwable t) { - LOG.warn("Failed to analyze table {}.{}.{}", - db.getCatalog().getName(), db.getFullName(), table.getName(), t); - continue; - } + protected void processOneJob(TableIf table, Set<String> columns) throws DdlException { + Set<String> collect = columns.stream().filter(c -> needAnalyzeColumn(table, c)).collect(Collectors.toSet()); + if (collect.isEmpty()) { + return; } - return analysisInfos; + AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns); + createSystemAnalysisJob(analyzeJob); } - // return true if skip auto analyze this time. - protected boolean skip(TableIf table) { - if (!(table instanceof OlapTable || table instanceof HMSExternalTable)) { - return true; - } - // For now, only support Hive HMS table auto collection. - if (table instanceof HMSExternalTable - && !((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { - return true; - } - if (table.getDataSize(true) < StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) { - return false; - } - TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); - // means it's never got analyzed or new partition loaded data. - if (tableStats == null || tableStats.newPartitionLoaded.get()) { - return false; - } - if (tableStats.userInjected) { - return true; - } - return System.currentTimeMillis() - - tableStats.updatedTime < StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis(); + protected boolean needAnalyzeColumn(TableIf table, String column) { + //TODO: Calculate column health value. + return true; } - protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db, - List<AnalysisInfo> analysisInfos, TableIf table) { + protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String> columns) { AnalysisMethod analysisMethod = table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes() ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; - AnalysisInfo jobInfo = new AnalysisInfoBuilder() + return new AnalysisInfoBuilder() .setJobId(Env.getCurrentEnv().getNextId()) - .setCatalogId(db.getCatalog().getId()) - .setDBId(db.getId()) + .setCatalogId(table.getDatabase().getCatalog().getId()) + .setDBId(table.getDatabase().getId()) .setTblId(table.getId()) - .setColName( - table.getSchemaAllIndexes(false).stream() - .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) - .map(Column::getName).collect(Collectors.joining(",")) - ) + .setColName(columns.stream().collect(Collectors.joining(","))) .setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS) .setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL) .setAnalysisMethod(analysisMethod) .setSampleRows(analysisMethod.equals(AnalysisMethod.SAMPLE) - ? StatisticsUtil.getHugeTableSampleRows() : -1) + ? StatisticsUtil.getHugeTableSampleRows() : -1) .setScheduleType(ScheduleType.AUTOMATIC) .setState(AnalysisState.PENDING) .setTaskIds(new ArrayList<>()) @@ -204,40 +135,5 @@ public class StatisticsAutoCollector extends StatisticsCollector { .setTblUpdateTime(table.getUpdateTime()) .setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0) .build(); - analysisInfos.add(jobInfo); - } - - @VisibleForTesting - protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) { - TableIf table = StatisticsUtil.findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId); - // Skip tables that are too wide. - if (table.getBaseSchema().size() > StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) { - return null; - } - - AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); - TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId()); - - Map<String, Set<String>> needRunPartitions = null; - String colNames = jobInfo.colName; - if (table.needReAnalyzeTable(tblStats)) { - needRunPartitions = table.findReAnalyzeNeededPartitions(); - } else if (table instanceof OlapTable && tblStats.newPartitionLoaded.get()) { - OlapTable olapTable = (OlapTable) table; - needRunPartitions = new HashMap<>(); - Set<String> partitionColumnNames = olapTable.getPartitionInfo().getPartitionColumns().stream() - .map(Column::getName).collect(Collectors.toSet()); - colNames = partitionColumnNames.stream().collect(Collectors.joining(",")); - Set<String> partitionNames = olapTable.getAllPartitions().stream() - .map(Partition::getName).collect(Collectors.toSet()); - for (String column : partitionColumnNames) { - needRunPartitions.put(column, partitionNames); - } - } - - if (needRunPartitions == null || needRunPartitions.isEmpty()) { - return null; - } - return new AnalysisInfoBuilder(jobInfo).setColName(colNames).setColToPartitions(needRunPartitions).build(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java index 0985b9b2b95..e26d3170178 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java @@ -61,10 +61,6 @@ public abstract class StatisticsCollector extends MasterDaemon { @VisibleForTesting protected void createSystemAnalysisJob(AnalysisInfo jobInfo) throws DdlException { - if (jobInfo.colToPartitions.isEmpty()) { - // No statistics need to be collected or updated - return; - } Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java new file mode 100644 index 00000000000..73d0d1340ad --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.stream.Collectors; + +public class StatisticsJobAppender extends MasterDaemon { + + private static final Logger LOG = LogManager.getLogger(StatisticsJobAppender.class); + + public static final long INTERVAL = 1000; + public static final int JOB_MAP_SIZE = 1000; + + private long currentDbId; + private long currentTableId; + + public StatisticsJobAppender() { + super("Statistics Job Appender", INTERVAL); + } + + @Override + protected void runAfterCatalogReady() { + if (!StatisticsUtil.enableAutoAnalyze()) { + return; + } + if (!Env.getCurrentEnv().isMaster()) { + return; + } + if (!StatisticsUtil.statsTblAvailable()) { + LOG.info("Stats table not available, skip"); + return; + } + if (Env.isCheckpointThread()) { + return; + } + appendJobs(); + } + + protected void appendJobs() { + AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager(); + appendColumnsToJobs(manager.highPriorityColumns, manager.highPriorityJobs); + appendColumnsToJobs(manager.midPriorityColumns, manager.midPriorityJobs); + appendToLowQueue(manager.lowPriorityJobs); + } + + protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue, Map<TableIf, Set<String>> jobsMap) { + int size = columnQueue.size(); + for (int i = 0; i < size; i++) { + HighPriorityColumn column = columnQueue.poll(); + TableIf table = StatisticsUtil.findTable(column.catalogId, column.dbId, column.tblId); + synchronized (jobsMap) { + // If job map reach the upper limit, stop putting new jobs. + if (!jobsMap.containsKey(table) && jobsMap.size() >= JOB_MAP_SIZE) { + break; + } + if (jobsMap.containsKey(table)) { + jobsMap.get(table).add(column.colName); + } else { + jobsMap.put(table, Collections.singleton(column.colName)); + } + } + } + } + + protected void appendToLowQueue(Map<TableIf, Set<String>> jobsMap) { + + InternalCatalog catalog = Env.getCurrentInternalCatalog(); + List<Long> sortedDbs = catalog.getDbIds().stream().sorted().collect(Collectors.toList()); + for (long dbId : sortedDbs) { + if (dbId < currentDbId + || StatisticConstants.SYSTEM_DBS.contains(catalog.getDbNullable(dbId).getFullName())) { + continue; + } + currentDbId = dbId; + Optional<Database> db = catalog.getDb(dbId); + List<Table> tables = db.get().getTables().stream() + .sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList()); + for (Table t : tables) { + if (t.getId() <= currentTableId) { + continue; + } + synchronized (jobsMap) { + // If job map reach the upper limit, stop putting new jobs. + if (!jobsMap.containsKey(t) && jobsMap.size() >= JOB_MAP_SIZE) { + return; + } + Set<String> columns + = t.getColumns().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) + .map(c -> c.getName()).collect(Collectors.toSet()); + if (jobsMap.containsKey(t)) { + jobsMap.get(t).addAll(columns); + } else { + jobsMap.put(t, columns); + } + } + currentTableId = t.getId(); + } + } + // All tables have been processed once, reset for the next loop. + currentDbId = 0; + currentTableId = 0; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java deleted file mode 100644 index ddc19959a2d..00000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java +++ /dev/null @@ -1,546 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.EnvFactory; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.Type; -import org.apache.doris.catalog.View; -import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; -import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.InternalCatalog; -import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; -import org.apache.doris.statistics.AnalysisInfo.AnalysisType; -import org.apache.doris.statistics.AnalysisInfo.JobType; -import org.apache.doris.statistics.util.StatisticsUtil; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mock; -import mockit.MockUp; -import mockit.Mocked; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.time.LocalTime; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -public class StatisticsAutoCollectorTest { - - @Test - public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) { - new MockUp<CatalogIf>() { - @Mock - public Collection<DatabaseIf> getAllDbs() { - Database db1 = new Database(1, FeConstants.INTERNAL_DB_NAME); - Database db2 = new Database(2, "anyDB"); - List<DatabaseIf> databaseIfs = new ArrayList<>(); - databaseIfs.add(db1); - databaseIfs.add(db2); - return databaseIfs; - } - }; - new MockUp<StatisticsAutoCollector>() { - @Mock - public List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<TableIf> db) { - return Arrays.asList(analysisInfo, analysisInfo); - } - - int count = 0; - - @Mock - public AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) { - return count++ == 0 ? null : jobInfo; - } - - @Mock - public void createSystemAnalysisJob(AnalysisInfo jobInfo) - throws DdlException { - - } - }; - - StatisticsAutoCollector saa = new StatisticsAutoCollector(); - saa.runAfterCatalogReady(); - new Expectations() { - { - try { - saa.createSystemAnalysisJob((AnalysisInfo) any); - times = 1; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - @Test - public void testConstructAnalysisInfo( - @Injectable OlapTable o2, @Injectable View v) { - new MockUp<Database>() { - @Mock - public List<Table> getTables() { - List<Table> tableIfs = new ArrayList<>(); - tableIfs.add(o2); - tableIfs.add(v); - return tableIfs; - } - - @Mock - public String getFullName() { - return "anyDb"; - } - }; - - new MockUp<OlapTable>() { - @Mock - public String getName() { - return "anytable"; - } - - @Mock - public List<Column> getSchemaAllIndexes(boolean full) { - List<Column> columns = new ArrayList<>(); - columns.add(new Column("c1", PrimitiveType.INT)); - columns.add(new Column("c2", PrimitiveType.HLL)); - return columns; - } - }; - StatisticsAutoCollector saa = new StatisticsAutoCollector(); - List<AnalysisInfo> analysisInfoList = saa.constructAnalysisInfo(new Database(1, "anydb")); - Assertions.assertEquals(1, analysisInfoList.size()); - Assertions.assertEquals("c1", analysisInfoList.get(0).colName.split(",")[0]); - } - - @Test - public void testGetReAnalyzeRequiredPart0() { - - TableIf tableIf = new OlapTable(); - - new MockUp<OlapTable>() { - @Mock - protected Map<String, Set<String>> findReAnalyzeNeededPartitions() { - Set<String> partitionNames = new HashSet<>(); - partitionNames.add("p1"); - partitionNames.add("p2"); - Map<String, Set<String>> map = new HashMap<>(); - map.put("col1", partitionNames); - return map; - } - - @Mock - public long getRowCount() { - return 100; - } - - @Mock - public List<Column> getBaseSchema() { - return Lists.newArrayList(new Column("col1", Type.INT), new Column("col2", Type.INT)); - } - }; - - new MockUp<StatisticsUtil>() { - @Mock - public TableIf findTable(long catalogName, long dbName, long tblName) { - return tableIf; - } - }; - AnalysisInfo analysisInfo = new AnalysisInfoBuilder().setAnalysisMethod(AnalysisMethod.FULL) - .setColToPartitions(new HashMap<>()).setAnalysisType( - AnalysisType.FUNDAMENTALS).setColName("col1").setJobType(JobType.SYSTEM).build(); - new MockUp<AnalysisManager>() { - - int count = 0; - - TableStatsMeta[] tableStatsArr = - new TableStatsMeta[] {new TableStatsMeta(0, analysisInfo, tableIf), - new TableStatsMeta(0, analysisInfo, tableIf), null}; - - { - tableStatsArr[0].updatedRows.addAndGet(100); - tableStatsArr[1].updatedRows.addAndGet(0); - } - - @Mock - public TableStatsMeta findTableStatsStatus(long tblId) { - return tableStatsArr[count++]; - } - }; - - new MockUp<StatisticsAutoCollector>() { - @Mock - public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table, - Set<String> needRunPartitions) { - return new AnalysisInfoBuilder().build(); - } - }; - StatisticsAutoCollector statisticsAutoCollector = new StatisticsAutoCollector(); - AnalysisInfo analysisInfo2 = new AnalysisInfoBuilder() - .setCatalogId(0) - .setDBId(0) - .setTblId(0).build(); - Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); - // uncomment it when updatedRows gets ready - // Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); - Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); - } - - @Test - public void testSkipWideTable() { - - TableIf tableIf = new OlapTable(); - - new MockUp<OlapTable>() { - @Mock - public List<Column> getBaseSchema() { - return Lists.newArrayList(new Column("col1", Type.INT), new Column("col2", Type.INT)); - } - }; - - new MockUp<StatisticsUtil>() { - int count = 0; - int [] thresholds = {1, 10}; - @Mock - public TableIf findTable(long catalogName, long dbName, long tblName) { - return tableIf; - } - - @Mock - public int getAutoAnalyzeTableWidthThreshold() { - return thresholds[count++]; - } - }; - - new MockUp<OlapTable>() { - @Mock - public Map<String, Set<String>> findReAnalyzeNeededPartitions() { - HashMap<String, Set<String>> ret = Maps.newHashMap(); - ret.put("key1", Sets.newHashSet()); - return ret; - } - }; - - AnalysisInfo analysisInfo = new AnalysisInfoBuilder().build(); - StatisticsAutoCollector statisticsAutoCollector = new StatisticsAutoCollector(); - Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo)); - Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo)); - } - - @Test - public void testLoop() { - AtomicBoolean timeChecked = new AtomicBoolean(); - AtomicBoolean switchChecked = new AtomicBoolean(); - new MockUp<StatisticsUtil>() { - - @Mock - public boolean inAnalyzeTime(LocalTime now) { - timeChecked.set(true); - return true; - } - - @Mock - public boolean enableAutoAnalyze() { - switchChecked.set(true); - return true; - } - }; - StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); - autoCollector.collect(); - Assertions.assertTrue(timeChecked.get() && switchChecked.get()); - - } - - @Test - public void checkAvailableThread() { - StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); - Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num, - autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize()); - } - - @Test - public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta stats, @Mocked TableIf anyOtherTable) { - new MockUp<OlapTable>() { - - @Mock - public long getDataSize(boolean singleReplica) { - return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5 + 1000000000; - } - }; - - new MockUp<AnalysisManager>() { - - @Mock - public TableStatsMeta findTableStatsStatus(long tblId) { - return stats; - } - }; - // A very huge table has been updated recently, so we should skip it this time - stats.updatedTime = System.currentTimeMillis() - 1000; - stats.newPartitionLoaded = new AtomicBoolean(); - stats.newPartitionLoaded.set(true); - StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); - // Test new partition loaded data for the first time. Not skip. - Assertions.assertFalse(autoCollector.skip(olapTable)); - stats.newPartitionLoaded.set(false); - // Assertions.assertTrue(autoCollector.skip(olapTable)); - // The update of this huge table is long time ago, so we shouldn't skip it this time - stats.updatedTime = System.currentTimeMillis() - - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 10000; - Assertions.assertFalse(autoCollector.skip(olapTable)); - new MockUp<AnalysisManager>() { - - @Mock - public TableStatsMeta findTableStatsStatus(long tblId) { - return null; - } - }; - // can't find table stats meta, which means this table never get analyzed, so we shouldn't skip it this time - Assertions.assertFalse(autoCollector.skip(olapTable)); - new MockUp<AnalysisManager>() { - - @Mock - public TableStatsMeta findTableStatsStatus(long tblId) { - return stats; - } - }; - stats.userInjected = true; - Assertions.assertTrue(autoCollector.skip(olapTable)); - // this is not olap table nor external table, so we should skip it this time - Assertions.assertTrue(autoCollector.skip(anyOtherTable)); - } - - // For small table, use full - @Test - public void testCreateAnalyzeJobForTbl1( - @Injectable OlapTable t1, - @Injectable Database db - ) throws Exception { - new MockUp<Database>() { - - @Mock - public CatalogIf getCatalog() { - return Env.getCurrentInternalCatalog(); - } - - @Mock - public long getId() { - return 0; - } - }; - new MockUp<OlapTable>() { - - int count = 0; - - @Mock - public List<Column> getBaseSchema() { - return Lists.newArrayList(new Column("test", PrimitiveType.INT)); - } - - @Mock - public long getDataSize(boolean singleReplica) { - return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() - 1; - } - - @Mock - public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { - return new OlapAnalysisTask(info); - } - }; - - new MockUp<StatisticsUtil>() { - @Mock - public TableIf findTable(long catalogId, long dbId, long tblId) { - return t1; - } - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - List<AnalysisInfo> jobInfos = new ArrayList<>(); - sac.createAnalyzeJobForTbl(db, jobInfos, t1); - AnalysisInfo jobInfo = jobInfos.get(0); - Map<String, Set<String>> colToPartitions = new HashMap<>(); - colToPartitions.put("test", new HashSet<String>() { - { - add("p1"); - } - }); - jobInfo = new AnalysisInfoBuilder(jobInfo).setColToPartitions(colToPartitions).build(); - Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>(); - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); - Assertions.assertEquals(1, analysisTasks.size()); - for (BaseAnalysisTask task : analysisTasks.values()) { - Assertions.assertNull(task.getTableSample()); - } - } - - // for big table, use sample - @Test - public void testCreateAnalyzeJobForTbl2( - @Injectable OlapTable t1, - @Injectable Database db - ) throws Exception { - new MockUp<Database>() { - - @Mock - public CatalogIf getCatalog() { - return Env.getCurrentInternalCatalog(); - } - - @Mock - public long getId() { - return 0; - } - }; - new MockUp<OlapTable>() { - - int count = 0; - - @Mock - public List<Column> getBaseSchema() { - return Lists.newArrayList(new Column("test", PrimitiveType.INT)); - } - - @Mock - public long getDataSize(boolean singleReplica) { - return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 2; - } - - @Mock - public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { - return new OlapAnalysisTask(info); - } - }; - - new MockUp<StatisticsUtil>() { - @Mock - public TableIf findTable(long catalogId, long dbId, long tblId) { - return t1; - } - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - List<AnalysisInfo> jobInfos = new ArrayList<>(); - sac.createAnalyzeJobForTbl(db, jobInfos, t1); - AnalysisInfo jobInfo = jobInfos.get(0); - Map<String, Set<String>> colToPartitions = new HashMap<>(); - colToPartitions.put("test", new HashSet<String>() { - { - add("p1"); - } - }); - jobInfo = new AnalysisInfoBuilder(jobInfo).setColToPartitions(colToPartitions).build(); - Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>(); - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); - Assertions.assertEquals(1, analysisTasks.size()); - for (BaseAnalysisTask task : analysisTasks.values()) { - Assertions.assertNotNull(task.getTableSample()); - } - } - - @Test - public void testDisableAuto1() throws Exception { - InternalCatalog catalog1 = EnvFactory.getInstance().createInternalCatalog(); - List<CatalogIf> catalogs = Lists.newArrayList(); - catalogs.add(catalog1); - - new MockUp<StatisticsAutoCollector>() { - @Mock - public List<CatalogIf> getCatalogsInOrder() { - return catalogs; - } - - @Mock - protected boolean canCollect() { - return false; - } - - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - new Expectations(catalog1) {{ - catalog1.enableAutoAnalyze(); - times = 0; - }}; - - sac.analyzeAll(); - } - - @Test - public void testDisableAuto2() throws Exception { - InternalCatalog catalog1 = EnvFactory.getInstance().createInternalCatalog(); - List<CatalogIf> catalogs = Lists.newArrayList(); - catalogs.add(catalog1); - - Database db1 = new Database(); - List<DatabaseIf<? extends TableIf>> dbs = Lists.newArrayList(); - dbs.add(db1); - - new MockUp<StatisticsAutoCollector>() { - int count = 0; - boolean[] canCollectReturn = {true, false}; - @Mock - public List<CatalogIf> getCatalogsInOrder() { - return catalogs; - } - - @Mock - public List<DatabaseIf<? extends TableIf>> getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) { - return dbs; - } - - @Mock - protected boolean canCollect() { - return canCollectReturn[count++]; - } - - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - new Expectations(catalog1, db1) {{ - catalog1.enableAutoAnalyze(); - result = true; - times = 1; - db1.getFullName(); - times = 0; - }}; - - sac.analyzeAll(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org