This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch high-priority-column in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7bcec7100f99b1ce4f6aba932ebff51bab83dd5e Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Fri Mar 15 10:14:30 2024 +0800 Refactor. (#32273) --- .../main/java/org/apache/doris/common/Config.java | 2 +- .../java/org/apache/doris/catalog/OlapTable.java | 53 --------------- .../main/java/org/apache/doris/catalog/Table.java | 11 ---- .../java/org/apache/doris/catalog/TableIf.java | 5 -- .../org/apache/doris/datasource/ExternalTable.java | 31 --------- .../apache/doris/statistics/AnalysisManager.java | 76 +++++----------------- .../apache/doris/statistics/BaseAnalysisTask.java | 2 +- .../doris/statistics/ExternalAnalysisTask.java | 4 +- .../doris/statistics/FollowerColumnSender.java | 4 +- .../apache/doris/statistics/OlapAnalysisTask.java | 2 +- .../{HighPriorityColumn.java => QueryColumn.java} | 8 +-- .../doris/statistics/StatisticConstants.java | 2 +- .../doris/statistics/StatisticsAutoCollector.java | 51 ++++++++++++--- .../doris/statistics/StatisticsCollector.java | 75 --------------------- .../doris/statistics/StatisticsJobAppender.java | 62 +++++++++++------- .../doris/statistics/util/StatisticsUtil.java | 54 +++++++-------- .../doris/statistics/AnalysisManagerTest.java | 45 ------------- 17 files changed, 137 insertions(+), 350 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index daa9c8b1d35..414dcd0e999 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1602,7 +1602,7 @@ public class Config extends ConfigBase { "This parameter controls the time interval for automatic collection jobs to check the health of table" + "statistics and trigger automatic collection" }) - public static int auto_check_statistics_in_minutes = 5; + public static int auto_check_statistics_in_minutes = 1; /** * If set to TRUE, the compaction slower replica will be skipped when select get queryable replicas diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 463b3b1f19d..4ac54827cee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -59,8 +59,6 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.HistogramTask; import org.apache.doris.statistics.OlapAnalysisTask; -import org.apache.doris.statistics.TableStatsMeta; -import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TColumn; @@ -1267,57 +1265,6 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } } - public boolean needReAnalyzeTable(TableStatsMeta tblStats) { - if (tblStats == null) { - return true; - } - if (!tblStats.analyzeColumns().containsAll(getBaseSchema() - .stream() - .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) - .map(Column::getName) - .collect(Collectors.toSet()))) { - return true; - } - long rowCount = getRowCount(); - if (rowCount > 0 && tblStats.rowCount == 0) { - return true; - } - long updateRows = tblStats.updatedRows.get(); - int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows); - return tblHealth < StatisticsUtil.getTableStatsHealthThreshold(); - } - - @Override - public Map<String, Set<String>> findReAnalyzeNeededPartitions() { - TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId()); - Set<String> allPartitions = getPartitionNames().stream().map(this::getPartition) - .filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet()); - if (tableStats == null) { - Map<String, Set<String>> ret = Maps.newHashMap(); - for (Column col : getSchemaAllIndexes(false)) { - if (StatisticsUtil.isUnsupportedType(col.getType())) { - continue; - } - ret.put(col.getName(), allPartitions); - } - return ret; - } - Map<String, Set<String>> colToPart = new HashMap<>(); - for (Column col : getSchemaAllIndexes(false)) { - if (StatisticsUtil.isUnsupportedType(col.getType())) { - continue; - } - long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName()); - Set<String> partitions = getPartitionNames().stream() - .map(this::getPartition) - .filter(Partition::hasData) - .filter(partition -> partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName) - .collect(Collectors.toSet()); - colToPart.put(col.getName(), partitions); - } - return colToPart; - } - @Override public long fetchRowCount() { long rowCount = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 35f5b14efc5..619f415719b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -33,7 +33,6 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; -import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.thrift.TTableDescriptor; import com.google.common.base.Preconditions; @@ -642,16 +641,6 @@ public abstract class Table extends MetaObject implements Writable, TableIf { public void analyze(String dbName) {} - @Override - public boolean needReAnalyzeTable(TableStatsMeta tblStats) { - return true; - } - - @Override - public Map<String, Set<String>> findReAnalyzeNeededPartitions() { - return Collections.emptyMap(); - } - @Override public List<Long> getChunkSizes() { throw new NotImplementedException("getChunkSized not implemented"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 484dd3bb6eb..5e90f5555d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -30,7 +30,6 @@ import org.apache.doris.persist.AlterConstraintLog; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; -import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.thrift.TTableDescriptor; import com.google.common.collect.ImmutableList; @@ -182,10 +181,6 @@ public interface TableIf { Optional<ColumnStatistic> getColumnStatistic(String colName); - boolean needReAnalyzeTable(TableStatsMeta tblStats); - - Map<String, Set<String>> findReAnalyzeNeededPartitions(); - // Get all the chunk sizes of this table. Now, only HMS external table implemented this interface. // For HMS external table, the return result is a list of all the files' size. List<Long> getChunkSizes(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 7f82d0d3876..aa7e0385914 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -32,11 +32,8 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; -import org.apache.doris.statistics.TableStatsMeta; -import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.TTableDescriptor; -import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import lombok.Getter; import org.apache.commons.lang3.NotImplementedException; @@ -46,12 +43,9 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; /** * External table represent tables that are not self-managed by Doris. @@ -317,31 +311,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { objectCreated = false; } - @Override - public boolean needReAnalyzeTable(TableStatsMeta tblStats) { - if (tblStats == null) { - return true; - } - if (!tblStats.analyzeColumns().containsAll(getBaseSchema() - .stream() - .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) - .map(Column::getName) - .collect(Collectors.toSet()))) { - return true; - } - return System.currentTimeMillis() - - tblStats.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis(); - } - - @Override - public Map<String, Set<String>> findReAnalyzeNeededPartitions() { - HashSet<String> partitions = Sets.newHashSet(); - // TODO: Find a way to collect external table partitions that need to be analyzed. - partitions.add("Dummy Partition"); - return getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) - .collect(Collectors.toMap(Column::getName, k -> partitions)); - } - @Override public List<Long> getChunkSizes() { throw new NotImplementedException("getChunkSized not implemented"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index ddcdf459e6e..8a30aec9e60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -112,8 +112,8 @@ public class AnalysisManager implements Writable { private static final Logger LOG = LogManager.getLogger(AnalysisManager.class); private static final int COLUMN_QUEUE_SIZE = 1000; - public final Queue<HighPriorityColumn> highPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE); - public final Queue<HighPriorityColumn> midPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE); + public final Queue<QueryColumn> highPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE); + public final Queue<QueryColumn> midPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE); public final Map<TableName, Set<String>> highPriorityJobs = new LinkedHashMap<>(); public final Map<TableName, Set<String>> midPriorityJobs = new LinkedHashMap<>(); public final Map<TableName, Set<String>> lowPriorityJobs = new LinkedHashMap<>(); @@ -307,55 +307,10 @@ public class AnalysisManager implements Writable { } } - /** - * Gets the partitions for which statistics are to be collected. First verify that - * there are partitions that have been deleted but have historical statistics(invalid statistics), - * if there are these partitions, we need to delete them to avoid errors in summary table level statistics. - * Then get the partitions for which statistics need to be collected based on collection mode (incremental/full). - * <p> - * note: - * If there is no invalid statistics, it does not need to collect/update - * statistics if the following conditions are met: - * - in full collection mode, the partitioned table does not have partitions - * - in incremental collection mode, partition statistics already exist - * <p> - * TODO Supports incremental collection of statistics from materialized views - */ - private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<String> columnNames, - Set<String> partitionNames, AnalysisType analysisType) throws DdlException { - - Map<String, Set<String>> columnToPartitions = columnNames.stream() - .collect(Collectors.toMap( - columnName -> columnName, - columnName -> new HashSet<>(partitionNames == null ? Collections.emptySet() : partitionNames) - )); - - if (analysisType == AnalysisType.HISTOGRAM) { - // Collecting histograms does not need to support incremental collection, - // and will automatically cover historical statistics - return columnToPartitions; - } - - if (table instanceof HMSExternalTable) { - // TODO Currently, we do not support INCREMENTAL collection for external table. - // One reason is external table partition id couldn't convert to a Long value. - // Will solve this problem later. - return columnToPartitions; - } - - if (analysisType == AnalysisType.FUNDAMENTALS) { - Map<String, Set<String>> result = table.findReAnalyzeNeededPartitions(); - result.keySet().retainAll(columnNames); - return result; - } - - return columnToPartitions; - } - // Make sure colName of job has all the column as this AnalyzeStmt specified, no matter whether it will be analyzed // or not. @VisibleForTesting - public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlException { + public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) { AnalysisInfoBuilder infoBuilder = new AnalysisInfoBuilder(); long jobId = Env.getCurrentEnv().getNextId(); TableIf table = stmt.getTable(); @@ -413,9 +368,10 @@ public class AnalysisManager implements Writable { long periodTimeInMs = stmt.getPeriodTimeInMs(); infoBuilder.setPeriodTimeInMs(periodTimeInMs); - - Map<String, Set<String>> colToPartitions = validateAndGetPartitions(table, columnNames, - partitionNames, analysisType); + Map<String, Set<String>> colToPartitions = new HashMap<>(); + Set<String> dummyPartition = new HashSet<>(); + dummyPartition.add("dummy partition"); + columnNames.stream().forEach(c -> colToPartitions.put(c, dummyPartition)); infoBuilder.setColToPartitions(colToPartitions); infoBuilder.setTaskIds(Lists.newArrayList()); infoBuilder.setTblUpdateTime(table.getUpdateTime()); @@ -770,6 +726,7 @@ public class AnalysisManager implements Writable { } tableStats.updatedTime = 0; tableStats.userInjected = false; + tableStats.rowCount = table.getRowCount(); } public void invalidateRemoteStats(long catalogId, long dbId, long tableId, @@ -1196,16 +1153,14 @@ public class AnalysisManager implements Writable { public void updateColumnUsedInPredicate(Set<Slot> slotReferences) { - LOG.info("Add slots to high priority queues."); updateColumn(slotReferences, highPriorityColumns); } public void updateQueriedColumn(Collection<Slot> slotReferences) { - LOG.info("Add slots to mid priority queues."); updateColumn(slotReferences, midPriorityColumns); } - protected void updateColumn(Collection<Slot> slotReferences, Queue<HighPriorityColumn> queue) { + protected void updateColumn(Collection<Slot> slotReferences, Queue<QueryColumn> queue) { for (Slot s : slotReferences) { if (!(s instanceof SlotReference)) { return; @@ -1219,10 +1174,12 @@ public class AnalysisManager implements Writable { if (database != null) { CatalogIf catalog = database.getCatalog(); if (catalog != null) { - queue.offer(new HighPriorityColumn(catalog.getId(), database.getId(), + queue.offer(new QueryColumn(catalog.getId(), database.getId(), table.getId(), optionalColumn.get().getName())); - LOG.info("Offer column " + table.getName() + "(" + table.getId() + ")." - + optionalColumn.get().getName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Offer column " + table.getName() + "(" + table.getId() + ")." + + optionalColumn.get().getName()); + } } } } @@ -1231,14 +1188,15 @@ public class AnalysisManager implements Writable { public void mergeFollowerQueryColumns(Collection<TQueryColumn> highColumns, Collection<TQueryColumn> midColumns) { + LOG.info("Received {} high columns and {} mid columns", highColumns.size(), midColumns.size()); for (TQueryColumn c : highColumns) { - if (!highPriorityColumns.offer(new HighPriorityColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId), + if (!highPriorityColumns.offer(new QueryColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId), Long.parseLong(c.tblId), c.colName))) { break; } } for (TQueryColumn c : midColumns) { - if (!midPriorityColumns.offer(new HighPriorityColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId), + if (!midPriorityColumns.offer(new QueryColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId), Long.parseLong(c.tblId), c.colName))) { break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 68767843507..4e1c3abe944 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -48,7 +48,7 @@ public abstract class BaseAnalysisTask { public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB public static final double LIMIT_FACTOR = 1.2; - protected static final String COLLECT_COL_STATISTICS = + protected static final String FULL_ANALYZE_TEMPLATE = "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, " + " ${catalogId} AS `catalog_id`, " + " ${dbId} AS `db_id`, " diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java index ef1b795bd13..690aa237439 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java @@ -73,6 +73,8 @@ public class ExternalAnalysisTask extends BaseAnalysisTask { */ private void getTableStats() { Map<String, String> params = buildStatsParams(null); + Pair<Double, Long> sampleInfo = getSampleInfo(); + params.put("scaleFactor", String.valueOf(sampleInfo.first)); List<ResultRow> columnResult = StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) .replace(ANALYZE_TABLE_COUNT_TEMPLATE)); @@ -98,7 +100,7 @@ public class ExternalAnalysisTask extends BaseAnalysisTask { if (LOG.isDebugEnabled()) { LOG.debug("Will do full collection for column {}", col.getName()); } - sb.append(COLLECT_COL_STATISTICS); + sb.append(FULL_ANALYZE_TEMPLATE); } else { // Do sample analyze if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java index 0a804152694..51ff9501308 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java @@ -74,14 +74,14 @@ public class FollowerColumnSender extends MasterDaemon { = analysisManager.highPriorityColumns .stream() .filter(c -> StatisticsUtil.needAnalyzeColumn(c)) - .map(HighPriorityColumn::toThrift) + .map(QueryColumn::toThrift) .collect(Collectors.toSet()); Set<TQueryColumn> midPriorityColumns = analysisManager.midPriorityColumns .stream() .filter(c -> StatisticsUtil.needAnalyzeColumn(c)) .filter(c -> !highPriorityColumns.contains(c)) - .map(HighPriorityColumn::toThrift) + .map(QueryColumn::toThrift) .collect(Collectors.toSet()); analysisManager.highPriorityColumns.clear(); analysisManager.midPriorityColumns.clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 0cb33be80e0..29c67784577 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -228,7 +228,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask { params.put("tblName", String.valueOf(tbl.getName())); params.put("index", getIndex()); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String collectColStats = stringSubstitutor.replace(COLLECT_COL_STATISTICS); + String collectColStats = stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE); runQuery(collectColStats); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/QueryColumn.java similarity index 88% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/QueryColumn.java index d619ef82c08..df91ea7f4c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/QueryColumn.java @@ -21,14 +21,14 @@ import org.apache.doris.thrift.TQueryColumn; import java.util.Objects; -public class HighPriorityColumn { +public class QueryColumn { public final long catalogId; public final long dbId; public final long tblId; public final String colName; - public HighPriorityColumn(long catalogId, long dbId, long tblId, String colName) { + public QueryColumn(long catalogId, long dbId, long tblId, String colName) { this.catalogId = catalogId; this.dbId = dbId; this.tblId = tblId; @@ -45,10 +45,10 @@ public class HighPriorityColumn { if (this == other) { return true; } - if (!(other instanceof HighPriorityColumn)) { + if (!(other instanceof QueryColumn)) { return false; } - HighPriorityColumn otherCriticalColumn = (HighPriorityColumn) other; + QueryColumn otherCriticalColumn = (QueryColumn) other; return this.catalogId == otherCriticalColumn.catalogId && this.dbId == otherCriticalColumn.dbId && this.tblId == otherCriticalColumn.tblId diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index 74c7bd7c9db..314cf6648bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -95,7 +95,7 @@ public class StatisticConstants { public static final int ANALYZE_TIMEOUT_IN_SEC = 43200; - public static final int TASK_QUEUE_CAP = 10; + public static final int TASK_QUEUE_CAP = 1; public static final int AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD = 100; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index c26e7b05efd..43ee1af2032 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; @@ -31,6 +32,7 @@ import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,18 +47,33 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -public class StatisticsAutoCollector extends StatisticsCollector { +public class StatisticsAutoCollector extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(StatisticsAutoCollector.class); + protected final AnalysisTaskExecutor analysisTaskExecutor; + public StatisticsAutoCollector() { - super("Automatic Analyzer", - TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes), - new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num, - StatisticConstants.TASK_QUEUE_CAP)); + super("Automatic Analyzer", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes)); + this.analysisTaskExecutor = new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num, + StatisticConstants.TASK_QUEUE_CAP); } @Override + protected void runAfterCatalogReady() { + if (!Env.getCurrentEnv().isMaster()) { + return; + } + if (!StatisticsUtil.statsTblAvailable()) { + LOG.info("Stats table not available, skip"); + return; + } + if (Env.isCheckpointThread()) { + return; + } + collect(); + } + protected void collect() { while (canCollect()) { Pair<Entry<TableName, Set<String>>, JobPriority> job = getJob(); @@ -70,8 +87,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { if (!supportAutoAnalyze(table)) { continue; } - Set<String> columns = job.first.getValue().stream().collect(Collectors.toSet()); - processOneJob(table, columns, job.second); + processOneJob(table, job.first.getValue(), job.second); } catch (Exception e) { LOG.warn("Failed to analyze table {} with columns [{}]", job.first.getKey().getTbl(), job.first.getValue().stream().collect(Collectors.joining(",")), e); @@ -107,13 +123,14 @@ public class StatisticsAutoCollector extends StatisticsCollector { } protected void processOneJob(TableIf table, Set<String> columns, JobPriority priority) throws DdlException { + columns = columns.stream().filter(c -> StatisticsUtil.needAnalyzeColumn(table, c)).collect(Collectors.toSet()); appendPartitionColumns(table, columns); if (columns.isEmpty()) { return; } AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns, priority); - LOG.info("Analyze job : {}", analyzeJob.toString()); - createSystemAnalysisJob(analyzeJob); + LOG.debug("Auto analyze job : {}", analyzeJob.toString()); + executeSystemAnalysisJob(analyzeJob); } protected void appendPartitionColumns(TableIf table, Set<String> columns) { @@ -170,4 +187,20 @@ public class StatisticsAutoCollector extends StatisticsCollector { .setPriority(priority) .build(); } + + // Analysis job created by the system + @VisibleForTesting + protected void executeSystemAnalysisJob(AnalysisInfo jobInfo) + throws DdlException { + Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>(); + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); + if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId) + && jobInfo.priority.equals(JobPriority.LOW)) { + analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false); + } + Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values()); + Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks); + analysisTasks.values().forEach(analysisTaskExecutor::submitTask); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java deleted file mode 100644 index e26d3170178..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Env; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.util.MasterDaemon; -import org.apache.doris.statistics.util.StatisticsUtil; - -import org.apache.hudi.common.util.VisibleForTesting; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.HashMap; -import java.util.Map; - -public abstract class StatisticsCollector extends MasterDaemon { - - private static final Logger LOG = LogManager.getLogger(StatisticsCollector.class); - - protected final AnalysisTaskExecutor analysisTaskExecutor; - - public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) { - super(name, intervalMs); - this.analysisTaskExecutor = analysisTaskExecutor; - } - - @Override - protected void runAfterCatalogReady() { - if (!Env.getCurrentEnv().isMaster()) { - return; - } - if (!StatisticsUtil.statsTblAvailable()) { - LOG.info("Stats table not available, skip"); - return; - } - if (Env.isCheckpointThread()) { - return; - } - collect(); - } - - protected abstract void collect(); - - // Analysis job created by the system - @VisibleForTesting - protected void createSystemAnalysisJob(AnalysisInfo jobInfo) - throws DdlException { - Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>(); - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); - if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) { - analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false); - } - Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values()); - Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks); - analysisTasks.values().forEach(analysisTaskExecutor::submitTask); - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java index 9e07c65e2fe..336171d8858 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class StatisticsJobAppender extends MasterDaemon { @@ -44,9 +45,12 @@ public class StatisticsJobAppender extends MasterDaemon { public static final long INTERVAL = 1000; public static final int JOB_MAP_SIZE = 1000; + public static final int TABLE_BATCH_SIZE = 100; - private long currentDbId; - private long currentTableId; + private long currentDbId = 0; + private long currentTableId = 0; + private long lastRoundFinishTime = 0; + private long lowJobIntervalMs = TimeUnit.MINUTES.toMillis(1); public StatisticsJobAppender() { super("Statistics Job Appender", INTERVAL); @@ -60,10 +64,6 @@ public class StatisticsJobAppender extends MasterDaemon { if (!Env.getCurrentEnv().isMaster()) { return; } - if (!StatisticsUtil.statsTblAvailable()) { - LOG.info("Stats table not available, skip"); - return; - } if (Env.isCheckpointThread()) { return; } @@ -72,31 +72,28 @@ public class StatisticsJobAppender extends MasterDaemon { protected void appendJobs() { AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager(); - // LOG.info("Append column to high priority job map."); appendColumnsToJobs(manager.highPriorityColumns, manager.highPriorityJobs); - // LOG.info("Append column to mid priority job map."); appendColumnsToJobs(manager.midPriorityColumns, manager.midPriorityJobs); if (StatisticsUtil.enableAutoAnalyzeInternalCatalog()) { - // LOG.info("Append column to low priority job map."); - appendToLowQueue(manager.lowPriorityJobs); + appendToLowJobs(manager.lowPriorityJobs); } } - protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue, Map<TableName, Set<String>> jobsMap) { + protected void appendColumnsToJobs(Queue<QueryColumn> columnQueue, Map<TableName, Set<String>> jobsMap) { int size = columnQueue.size(); + int processed = 0; for (int i = 0; i < size; i++) { - HighPriorityColumn column = columnQueue.poll(); + QueryColumn column = columnQueue.poll(); if (!StatisticsUtil.needAnalyzeColumn(column)) { continue; } - LOG.info("Process column " + column.tblId + "." + column.colName); TableIf table = StatisticsUtil.findTable(column.catalogId, column.dbId, column.tblId); TableName tableName = new TableName(table.getDatabase().getCatalog().getName(), table.getDatabase().getFullName(), table.getName()); synchronized (jobsMap) { // If job map reach the upper limit, stop putting new jobs. if (!jobsMap.containsKey(tableName) && jobsMap.size() >= JOB_MAP_SIZE) { - LOG.info("Job map full."); + LOG.info("High or mid job map full."); break; } if (jobsMap.containsKey(tableName)) { @@ -106,15 +103,21 @@ public class StatisticsJobAppender extends MasterDaemon { columns.add(column.colName); jobsMap.put(tableName, columns); } - LOG.info("Column " + column.tblId + "." + column.colName + " added"); } + processed++; + } + if (size > 0 && LOG.isDebugEnabled()) { + LOG.debug("{} of {} columns append to jobs", processed, size); } } - protected void appendToLowQueue(Map<TableName, Set<String>> jobsMap) { + protected void appendToLowJobs(Map<TableName, Set<String>> jobsMap) { + if (System.currentTimeMillis() - lastRoundFinishTime < lowJobIntervalMs) { + return; + } InternalCatalog catalog = Env.getCurrentInternalCatalog(); List<Long> sortedDbs = catalog.getDbIds().stream().sorted().collect(Collectors.toList()); - int batchSize = 100; + int processed = 0; for (long dbId : sortedDbs) { if (dbId < currentDbId || StatisticConstants.SYSTEM_DBS.contains(catalog.getDbNullable(dbId).getFullName())) { @@ -128,31 +131,40 @@ public class StatisticsJobAppender extends MasterDaemon { if (!(t instanceof OlapTable) || t.getId() <= currentTableId) { continue; } - TableName tableName = new TableName(t.getDatabase().getCatalog().getName(), - t.getDatabase().getFullName(), t.getName()); + OlapTable olapTable = (OlapTable) t; + Set<String> columns = olapTable.getColumns().stream() + .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) + .filter(c -> StatisticsUtil.needAnalyzeColumn(olapTable, c.getName())) + .map(c -> c.getName()).collect(Collectors.toSet()); + if (columns.isEmpty()) { + continue; + } + TableName tableName = new TableName(olapTable.getDatabase().getCatalog().getName(), + olapTable.getDatabase().getFullName(), olapTable.getName()); synchronized (jobsMap) { // If job map reach the upper limit, stop adding new jobs. if (!jobsMap.containsKey(tableName) && jobsMap.size() >= JOB_MAP_SIZE) { + LOG.info("Low job map full."); return; } - Set<String> columns = t.getColumns().stream() - .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) - .filter(c -> StatisticsUtil.needAnalyzeColumn(t, c.getName())) - .map(c -> c.getName()).collect(Collectors.toSet()); if (jobsMap.containsKey(tableName)) { jobsMap.get(tableName).addAll(columns); } else { jobsMap.put(tableName, columns); } } - currentTableId = t.getId(); - if (--batchSize <= 0) { + currentTableId = olapTable.getId(); + if (++processed > TABLE_BATCH_SIZE) { return; } } } // All tables have been processed once, reset for the next loop. + if (LOG.isDebugEnabled()) { + LOG.debug("All low priority internal tables are appended once."); + } currentDbId = 0; currentTableId = 0; + lastRoundFinishTime = System.currentTimeMillis(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 3ce8e7966af..20e98564776 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -75,8 +75,8 @@ import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.ColStatsMeta; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.ColumnStatisticBuilder; -import org.apache.doris.statistics.HighPriorityColumn; import org.apache.doris.statistics.Histogram; +import org.apache.doris.statistics.QueryColumn; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.statistics.TableStatsMeta; @@ -1044,57 +1044,58 @@ public class StatisticsUtil { return true; } - // TODO: Need refactor, hard to understand now. public static boolean needAnalyzeColumn(TableIf table, String column) { AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId()); + // Table never been analyzed, need analyze. if (tableStatsStatus == null) { return true; } + // User injected column stats, don't do auto analyze, avoid overwrite user injected stats. if (tableStatsStatus.userInjected) { return false; } ColStatsMeta columnStatsMeta = tableStatsStatus.findColumnStatsMeta(column); + // Column never been analyzed, need analyze. if (columnStatsMeta == null) { return true; } if (table instanceof OlapTable) { - long currentUpdatedRows = tableStatsStatus.updatedRows.get(); - long lastAnalyzeUpdateRows = columnStatsMeta.updatedRows; - if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) { - return true; - } - if (lastAnalyzeUpdateRows > currentUpdatedRows) { - // Shouldn't happen. Just in case. + OlapTable olapTable = (OlapTable) table; + // 0. Check new partition first time loaded flag. + if (olapTable.isPartitionColumn(column) && tableStatsStatus.newPartitionLoaded.get()) { return true; } - OlapTable olapTable = (OlapTable) table; + // 1. Check row count. + // TODO: One conner case. Last analyze row count is 0, but actually it's not 0 because isEmptyTable waiting. long currentRowCount = olapTable.getRowCount(); long lastAnalyzeRowCount = columnStatsMeta.rowCount; - if (tableStatsStatus.newPartitionLoaded.get() && olapTable.isPartitionColumn(column)) { + // 1.1 Empty table -> non-empty table. Need analyze. + if (currentRowCount != 0 && lastAnalyzeRowCount == 0) { return true; } - if (lastAnalyzeRowCount == 0 && currentRowCount > 0) { + // 1.2 Non-empty table -> empty table. Need analyze; + if (currentRowCount == 0 && lastAnalyzeRowCount != 0) { return true; } - if (currentUpdatedRows == lastAnalyzeUpdateRows) { + // 1.3 Table is still empty. Not need to analyze. lastAnalyzeRowCount == 0 is always true here. + if (currentRowCount == 0) { return false; } - double healthValue = ((double) (currentUpdatedRows - lastAnalyzeUpdateRows) - / (double) currentUpdatedRows) * 100.0; - LOG.info("Column " + column + " update rows health value is " + healthValue); - if (healthValue < StatisticsUtil.getTableStatsHealthThreshold()) { + // 1.4 If row count changed more than the threshold, need analyze. + // lastAnalyzeRowCount == 0 is always false here. + double changeRate = + ((double) Math.abs(currentRowCount - lastAnalyzeRowCount) / lastAnalyzeRowCount) * 100.0; + if (changeRate > StatisticsUtil.getTableStatsHealthThreshold()) { return true; } - if (currentRowCount == 0 && lastAnalyzeRowCount != 0) { - return true; - } - if (currentRowCount == 0 && lastAnalyzeRowCount == 0) { - return false; - } - healthValue = ((double) (currentRowCount - lastAnalyzeRowCount) / (double) currentRowCount) * 100.0; - return healthValue < StatisticsUtil.getTableStatsHealthThreshold(); + // 2. Check update rows. + long currentUpdatedRows = tableStatsStatus.updatedRows.get(); + long lastAnalyzeUpdateRows = columnStatsMeta.updatedRows; + changeRate = ((double) Math.abs(currentUpdatedRows - lastAnalyzeUpdateRows) / lastAnalyzeRowCount) * 100.0; + return changeRate > StatisticsUtil.getTableStatsHealthThreshold(); } else { + // Now, we only support Hive external table auto analyze. if (!(table instanceof HMSExternalTable)) { return false; } @@ -1102,12 +1103,13 @@ public class StatisticsUtil { if (!hmsTable.getDlaType().equals(DLAType.HIVE)) { return false; } + // External is hard to calculate change rate, use time interval to control analyze frequency. return System.currentTimeMillis() - tableStatsStatus.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis(); } } - public static boolean needAnalyzeColumn(HighPriorityColumn column) { + public static boolean needAnalyzeColumn(QueryColumn column) { if (column == null) { return false; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index 4e8bbfe5aff..53cb0807b80 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -22,9 +22,6 @@ import org.apache.doris.analysis.AnalyzeTblStmt; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.ShowAnalyzeStmt; import org.apache.doris.analysis.TableName; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; @@ -33,7 +30,6 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import mockit.Expectations; import mockit.Injectable; import mockit.Mock; @@ -270,47 +266,6 @@ public class AnalysisManagerTest { }; } - @Test - public void testReAnalyze() { - new MockUp<OlapTable>() { - - int count = 0; - int[] rowCount = new int[]{100, 100, 200, 200, 1, 1}; - - final Column c = new Column("col1", PrimitiveType.INT); - @Mock - public long getRowCount() { - return rowCount[count++]; - } - - @Mock - public List<Column> getBaseSchema() { - return Lists.newArrayList(c); - } - - @Mock - public List<Column> getColumns() { return Lists.newArrayList(c); } - - }; - OlapTable olapTable = new OlapTable(); - TableStatsMeta stats1 = new TableStatsMeta( - 50, new AnalysisInfoBuilder().setColToPartitions(new HashMap<>()) - .setColName("col1").build(), olapTable); - stats1.updatedRows.addAndGet(50); - - Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1)); - TableStatsMeta stats2 = new TableStatsMeta( - 190, new AnalysisInfoBuilder() - .setColToPartitions(new HashMap<>()).setColName("col1").build(), olapTable); - stats2.updatedRows.addAndGet(20); - Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2)); - - TableStatsMeta stats3 = new TableStatsMeta(0, new AnalysisInfoBuilder() - .setColToPartitions(new HashMap<>()).setRowCount(0).setColName("col1").build(), olapTable); - Assertions.assertTrue(olapTable.needReAnalyzeTable(stats3)); - - } - @Test public void testRecordLimit1() { Config.analyze_record_limit = 2; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org