This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch high-priority-column in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6fc7aa15ad515f2119f6e399d5cf0398ab395554 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Tue Mar 5 16:19:08 2024 +0800 Support column level health value. (#31794) --- .../apache/doris/analysis/ShowColumnStatsStmt.java | 4 + .../apache/doris/datasource/InternalCatalog.java | 6 +- .../java/org/apache/doris/qe/SessionVariable.java | 7 ++ .../org/apache/doris/statistics/AnalysisInfo.java | 17 ++-- .../doris/statistics/AnalysisInfoBuilder.java | 17 ++-- .../org/apache/doris/statistics/AnalysisJob.java | 8 +- .../apache/doris/statistics/AnalysisManager.java | 13 ++- .../org/apache/doris/statistics/ColStatsMeta.java | 16 ++-- .../doris/statistics/StatisticsAutoCollector.java | 95 ++++++++++++++++++++-- .../doris/statistics/StatisticsJobAppender.java | 28 +++++-- .../apache/doris/statistics/TableStatsMeta.java | 33 ++++---- .../doris/statistics/util/StatisticsUtil.java | 10 +++ .../apache/doris/statistics/AnalysisJobTest.java | 4 +- .../doris/statistics/AnalysisManagerTest.java | 2 +- 14 files changed, 197 insertions(+), 63 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java index 37be76b20df..749bfa7d360 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java @@ -61,6 +61,8 @@ public class ShowColumnStatsStmt extends ShowStmt { .add("trigger") .add("query_times") .add("updated_time") + .add("update_rows") + .add("last_analyze_row_count") .build(); private final TableName tableName; @@ -160,6 +162,8 @@ public class ShowColumnStatsStmt extends ShowStmt { row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.jobType)); row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.queriedTimes)); row.add(String.valueOf(p.second.updatedTime)); + row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.updatedRows)); + row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.rowCount)); result.add(row); }); return new ShowResultSet(getMetaData(), result); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 42a350163f6..d6bbfeeba06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -3149,7 +3149,6 @@ public class InternalCatalog implements CatalogIf<Database> { rowsToTruncate += partition.getBaseIndex().getRowCount(); } } else { - rowsToTruncate = olapTable.getRowCount(); for (Partition partition : olapTable.getPartitions()) { // If need absolutely correct, should check running txn here. // But if the txn is in prepare state, cann't known which partitions had load data. @@ -3158,6 +3157,7 @@ public class InternalCatalog implements CatalogIf<Database> { } origPartitions.put(partition.getName(), partition.getId()); partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo()); + rowsToTruncate += partition.getBaseIndex().getRowCount(); } } // if table currently has no partitions, this sql like empty command and do nothing, should return directly. @@ -3318,10 +3318,8 @@ public class InternalCatalog implements CatalogIf<Database> { if (truncateEntireTable) { // Drop the whole table stats after truncate the entire table Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable); - } else { - // Update the updated rows in table stats after truncate some partitions. - Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords); } + Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords); LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 8bfd0d7f1a4..4b8b89ff6a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -471,6 +471,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_AUTO_ANALYZE = "enable_auto_analyze"; + public static final String ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG = "enable_auto_analyze_internal_catalog"; + public static final String AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD = "auto_analyze_table_width_threshold"; public static final String FASTER_FLOAT_CONVERT = "faster_float_convert"; @@ -1504,6 +1506,11 @@ public class SessionVariable implements Serializable, Writable { flag = VariableMgr.GLOBAL) public boolean enableAutoAnalyze = true; + @VariableMgr.VarAttr(name = ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG, + description = {"临时参数,收否自动收集所有内表", "Temp variable, enable to auto collect all OlapTable."}, + flag = VariableMgr.GLOBAL) + public boolean enableAutoAnalyzeInternalCatalog = false; + @VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD, description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集", "Maximum table width to enable auto analyze, " diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index c707107e0e0..0c5047a53c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -188,8 +188,11 @@ public class AnalysisInfo implements Writable { @SerializedName("endTime") public long endTime; - @SerializedName("emptyJob") - public final boolean emptyJob; + @SerializedName("rowCount") + public final long rowCount; + + @SerializedName("updateRows") + public final long updateRows; /** * * Used to store the newest partition version of tbl when creating this job. @@ -206,7 +209,8 @@ public class AnalysisInfo implements Writable { long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType, boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition, boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull, - boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject) { + boolean usingSqlForPartitionColumn, long tblUpdateTime, long rowCount, boolean userInject, + long updateRows) { this.jobId = jobId; this.taskId = taskId; this.taskIds = taskIds; @@ -242,8 +246,9 @@ public class AnalysisInfo implements Writable { this.forceFull = forceFull; this.usingSqlForPartitionColumn = usingSqlForPartitionColumn; this.tblUpdateTime = tblUpdateTime; - this.emptyJob = emptyJob; + this.rowCount = rowCount; this.userInject = userInject; + this.updateRows = updateRows; } @Override @@ -285,7 +290,9 @@ public class AnalysisInfo implements Writable { } sj.add("forceFull: " + forceFull); sj.add("usingSqlForPartitionColumn: " + usingSqlForPartitionColumn); - sj.add("emptyJob: " + emptyJob); + sj.add("rowCount: " + rowCount); + sj.add("userInject: " + userInject); + sj.add("updateRows: " + updateRows); return sj.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java index 22f3d22b3ce..527d503fd52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java @@ -62,8 +62,9 @@ public class AnalysisInfoBuilder { private boolean forceFull; private boolean usingSqlForPartitionColumn; private long tblUpdateTime; - private boolean emptyJob; + private long rowCount; private boolean userInject; + private long updateRows; public AnalysisInfoBuilder() { } @@ -101,8 +102,9 @@ public class AnalysisInfoBuilder { forceFull = info.forceFull; usingSqlForPartitionColumn = info.usingSqlForPartitionColumn; tblUpdateTime = info.tblUpdateTime; - emptyJob = info.emptyJob; + rowCount = info.rowCount; userInject = info.userInject; + updateRows = info.updateRows; } public AnalysisInfoBuilder setJobId(long jobId) { @@ -265,8 +267,8 @@ public class AnalysisInfoBuilder { return this; } - public AnalysisInfoBuilder setEmptyJob(boolean emptyJob) { - this.emptyJob = emptyJob; + public AnalysisInfoBuilder setRowCount(long rowCount) { + this.rowCount = rowCount; return this; } @@ -275,12 +277,17 @@ public class AnalysisInfoBuilder { return this; } + public AnalysisInfoBuilder setUpdateRows(long updateRows) { + this.updateRows = updateRows; + return this; + } + public AnalysisInfo build() { return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames, colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent, sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType, externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount, - cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject); + cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, rowCount, userInject, updateRows); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index f52764bd6c9..600d95a6278 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -84,14 +84,12 @@ public class AnalysisJob { protected void markOneTaskDone() { if (queryingTask.isEmpty()) { try { - writeBuf(); - updateTaskState(AnalysisState.FINISHED, "Cost time in sec: " - + (System.currentTimeMillis() - start) / 1000); + flushBuffer(); } finally { deregisterJob(); } } else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) { - writeBuf(); + flushBuffer(); } } @@ -115,7 +113,7 @@ public class AnalysisJob { } } - protected void writeBuf() { + protected void flushBuffer() { if (killed) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index ab883c79f8f..bac84996dd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -416,7 +416,10 @@ public class AnalysisManager implements Writable { infoBuilder.setColToPartitions(colToPartitions); infoBuilder.setTaskIds(Lists.newArrayList()); infoBuilder.setTblUpdateTime(table.getUpdateTime()); - infoBuilder.setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0); + long rowCount = table.getRowCount(); + infoBuilder.setRowCount(rowCount); + TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId()); + infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get()); return infoBuilder.build(); } @@ -572,7 +575,7 @@ public class AnalysisManager implements Writable { } TableStatsMeta tableStats = findTableStatsStatus(tbl.getId()); if (tableStats == null) { - updateTableStatsStatus(new TableStatsMeta(jobInfo.emptyJob ? 0 : tbl.getRowCount(), jobInfo, tbl)); + updateTableStatsStatus(new TableStatsMeta(jobInfo.rowCount, jobInfo, tbl)); } else { tableStats.update(jobInfo, tbl); logCreateTableStats(tableStats); @@ -807,7 +810,7 @@ public class AnalysisManager implements Writable { analysisInfo.dbId, analysisInfo.tblId); return table.createAnalysisTask(analysisInfo); } catch (Throwable t) { - LOG.warn("Failed to find table", t); + LOG.warn("Failed to create task.", t); throw new DdlException("Failed to create task", t); } } @@ -1156,10 +1159,12 @@ public class AnalysisManager implements Writable { public void updateColumnUsedInPredicate(Set<Slot> slotReferences) { + LOG.info("Add slots to high priority queues."); updateColumn(slotReferences, highPriorityColumns); } public void updateQueriedColumn(Collection<Slot> slotReferences) { + LOG.info("Add slots to mid priority queues."); updateColumn(slotReferences, midPriorityColumns); } @@ -1179,6 +1184,8 @@ public class AnalysisManager implements Writable { if (catalog != null) { queue.offer(new HighPriorityColumn(catalog.getId(), database.getId(), table.getId(), optionalColumn.get().getName())); + LOG.info("Offer column " + table.getName() + "(" + table.getId() + ")." + + optionalColumn.get().getName()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java index 445641b2505..7e317d67bd7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java @@ -43,16 +43,20 @@ public class ColStatsMeta { @SerializedName("trigger") public JobType jobType; - public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, - AnalysisType analysisType, JobType jobType, long queriedTimes) { + @SerializedName("updatedRows") + public long updatedRows; + + @SerializedName("rowCount") + public long rowCount; + + public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, AnalysisType analysisType, JobType jobType, + long queriedTimes, long rowCount, long updatedRows) { this.updatedTime = updatedTime; this.analysisMethod = analysisMethod; this.analysisType = analysisType; this.jobType = jobType; this.queriedTimes.addAndGet(queriedTimes); - } - - public void clear() { - updatedTime = 0; + this.updatedRows = updatedRows; + this.rowCount = rowCount; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 19b9c69db08..c498881bfbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -23,6 +23,8 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; @@ -33,6 +35,8 @@ import org.apache.logging.log4j.Logger; import java.time.LocalTime; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -60,9 +64,16 @@ public class StatisticsAutoCollector extends StatisticsCollector { } try { TableIf table = job.getKey(); + if (!supportAutoAnalyze(table)) { + continue; + } Set<String> columns = job.getValue() .stream() - .filter(c -> needAnalyzeColumn(table, c)) + .filter(c -> { + boolean needAnalyzeColumn = needAnalyzeColumn(table, c); + LOG.info("Need analyze column " + c + " ? " + needAnalyzeColumn); + return needAnalyzeColumn; + }) .collect(Collectors.toSet()); processOneJob(table, columns); } catch (Exception e) { @@ -100,22 +111,92 @@ public class StatisticsAutoCollector extends StatisticsCollector { } protected void processOneJob(TableIf table, Set<String> columns) throws DdlException { - Set<String> collect = columns.stream().filter(c -> needAnalyzeColumn(table, c)).collect(Collectors.toSet()); - if (collect.isEmpty()) { + appendPartitionColumns(table, columns); + if (columns.isEmpty()) { return; } AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns); + LOG.info("Analyze job : {}", analyzeJob.toString()); createSystemAnalysisJob(analyzeJob); } + protected void appendPartitionColumns(TableIf table, Set<String> columns) { + if (!(table instanceof OlapTable)) { + return; + } + AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); + TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId()); + if (tableStatsStatus != null && tableStatsStatus.newPartitionLoaded.get()) { + OlapTable olapTable = (OlapTable) table; + columns.addAll(olapTable.getPartitionNames()); + } + } + protected boolean needAnalyzeColumn(TableIf table, String column) { - //TODO: Calculate column health value. - return true; + AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); + TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId()); + if (tableStatsStatus == null) { + return true; + } + if (tableStatsStatus.userInjected) { + return false; + } + ColStatsMeta columnStatsMeta = tableStatsStatus.findColumnStatsMeta(column); + if (columnStatsMeta == null) { + return true; + } + if (table instanceof OlapTable) { + long currentUpdatedRows = tableStatsStatus.updatedRows.get(); + long lastAnalyzeUpdateRows = columnStatsMeta.updatedRows; + if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) { + return true; + } + OlapTable olapTable = (OlapTable) table; + if (tableStatsStatus.newPartitionLoaded.get() && olapTable.isPartitionColumn(column)) { + return true; + } + if (columnStatsMeta.rowCount == 0 && olapTable.getRowCount() > 0) { + return true; + } + if (currentUpdatedRows == lastAnalyzeUpdateRows) { + return false; + } + double healthValue = ((double) (currentUpdatedRows - lastAnalyzeUpdateRows) + / (double) currentUpdatedRows) * 100.0; + LOG.info("Column " + column + " health value is " + healthValue); + return healthValue < StatisticsUtil.getTableStatsHealthThreshold(); + } else { + if (!(table instanceof HMSExternalTable)) { + return false; + } + HMSExternalTable hmsTable = (HMSExternalTable) table; + if (!hmsTable.getDlaType().equals(DLAType.HIVE)) { + return false; + } + return System.currentTimeMillis() + - tableStatsStatus.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis(); + } + } + + protected boolean supportAutoAnalyze(TableIf tableIf) { + if (tableIf == null) { + return false; + } + return tableIf instanceof OlapTable + || tableIf instanceof HMSExternalTable + && ((HMSExternalTable) tableIf).getDlaType().equals(HMSExternalTable.DLAType.HIVE); } protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String> columns) { AnalysisMethod analysisMethod = table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes() ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; + AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); + TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId()); + long rowCount = table.getRowCount(); + Map<String, Set<String>> colToPartitions = new HashMap<>(); + Set<String> dummyPartition = new HashSet<>(); + dummyPartition.add("dummy partition"); + columns.stream().forEach(c -> colToPartitions.put(c, dummyPartition)); return new AnalysisInfoBuilder() .setJobId(Env.getCurrentEnv().getNextId()) .setCatalogId(table.getDatabase().getCatalog().getId()) @@ -133,7 +214,9 @@ public class StatisticsAutoCollector extends StatisticsCollector { .setLastExecTimeInMs(System.currentTimeMillis()) .setJobType(JobType.SYSTEM) .setTblUpdateTime(table.getUpdateTime()) - .setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0) + .setRowCount(rowCount) + .setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get()) + .setColToPartitions(colToPartitions) .build(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java index 73d0d1340ad..71bb71d3cda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java @@ -19,6 +19,7 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.util.MasterDaemon; @@ -28,7 +29,7 @@ import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -70,34 +71,44 @@ public class StatisticsJobAppender extends MasterDaemon { protected void appendJobs() { AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager(); + // LOG.info("Append column to high priority job map."); appendColumnsToJobs(manager.highPriorityColumns, manager.highPriorityJobs); + // LOG.info("Append column to mid priority job map."); appendColumnsToJobs(manager.midPriorityColumns, manager.midPriorityJobs); - appendToLowQueue(manager.lowPriorityJobs); + if (StatisticsUtil.enableAutoAnalyzeInternalCatalog()) { + // LOG.info("Append column to low priority job map."); + appendToLowQueue(manager.lowPriorityJobs); + } } protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue, Map<TableIf, Set<String>> jobsMap) { int size = columnQueue.size(); for (int i = 0; i < size; i++) { HighPriorityColumn column = columnQueue.poll(); + LOG.info("Process column " + column.tblId + "." + column.colName); TableIf table = StatisticsUtil.findTable(column.catalogId, column.dbId, column.tblId); synchronized (jobsMap) { // If job map reach the upper limit, stop putting new jobs. if (!jobsMap.containsKey(table) && jobsMap.size() >= JOB_MAP_SIZE) { + LOG.info("Job map full."); break; } if (jobsMap.containsKey(table)) { jobsMap.get(table).add(column.colName); } else { - jobsMap.put(table, Collections.singleton(column.colName)); + HashSet<String> columns = new HashSet<>(); + columns.add(column.colName); + jobsMap.put(table, columns); } + LOG.info("Column " + column.tblId + "." + column.colName + " added"); } } } - protected void appendToLowQueue(Map<TableIf, Set<String>> jobsMap) { - + protected void appendToLowQueue(Map<TableIf, Set<String>> jobsMap) { InternalCatalog catalog = Env.getCurrentInternalCatalog(); List<Long> sortedDbs = catalog.getDbIds().stream().sorted().collect(Collectors.toList()); + int batchSize = 100; for (long dbId : sortedDbs) { if (dbId < currentDbId || StatisticConstants.SYSTEM_DBS.contains(catalog.getDbNullable(dbId).getFullName())) { @@ -108,11 +119,11 @@ public class StatisticsJobAppender extends MasterDaemon { List<Table> tables = db.get().getTables().stream() .sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList()); for (Table t : tables) { - if (t.getId() <= currentTableId) { + if (!(t instanceof OlapTable) || t.getId() <= currentTableId) { continue; } synchronized (jobsMap) { - // If job map reach the upper limit, stop putting new jobs. + // If job map reach the upper limit, stop adding new jobs. if (!jobsMap.containsKey(t) && jobsMap.size() >= JOB_MAP_SIZE) { return; } @@ -126,6 +137,9 @@ public class StatisticsJobAppender extends MasterDaemon { } } currentTableId = t.getId(); + if (--batchSize <= 0) { + return; + } } } // All tables have been processed once, reset for the next loop. diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 9231c6a2bc7..96ca0aab54c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -126,11 +126,6 @@ public class TableStatsMeta implements Writable { return colNameToColStatsMeta.keySet(); } - public void reset() { - updatedTime = 0; - colNameToColStatsMeta.values().forEach(ColStatsMeta::clear); - } - public void update(AnalysisInfo analyzedJob, TableIf tableIf) { updatedTime = analyzedJob.tblUpdateTime; userInjected = analyzedJob.userInject; @@ -145,34 +140,34 @@ public class TableStatsMeta implements Writable { for (String col : cols) { ColStatsMeta colStatsMeta = colNameToColStatsMeta.get(col); if (colStatsMeta == null) { - colNameToColStatsMeta.put(col, new ColStatsMeta(updatedTime, - analyzedJob.analysisMethod, analyzedJob.analysisType, analyzedJob.jobType, 0)); + colNameToColStatsMeta.put(col, new ColStatsMeta(updatedTime, analyzedJob.analysisMethod, + analyzedJob.analysisType, analyzedJob.jobType, 0, analyzedJob.rowCount, + analyzedJob.updateRows)); } else { colStatsMeta.updatedTime = updatedTime; colStatsMeta.analysisType = analyzedJob.analysisType; colStatsMeta.analysisMethod = analyzedJob.analysisMethod; colStatsMeta.jobType = analyzedJob.jobType; + colStatsMeta.updatedRows = analyzedJob.updateRows; + colStatsMeta.rowCount = analyzedJob.rowCount; } } jobType = analyzedJob.jobType; if (tableIf != null) { if (tableIf instanceof OlapTable) { - rowCount = analyzedJob.emptyJob ? 0 : tableIf.getRowCount(); - } - if (!analyzedJob.emptyJob && analyzedJob.colToPartitions.keySet() - .containsAll(tableIf.getBaseSchema().stream() - .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) - .map(Column::getName).collect(Collectors.toSet()))) { - updatedRows.set(0); - newPartitionLoaded.set(false); - } - if (tableIf instanceof OlapTable) { + rowCount = analyzedJob.rowCount; PartitionInfo partitionInfo = ((OlapTable) tableIf).getPartitionInfo(); - if (partitionInfo != null && analyzedJob.colToPartitions.keySet() + if (analyzedJob.rowCount != 0 && partitionInfo != null && analyzedJob.colToPartitions.keySet() .containsAll(partitionInfo.getPartitionColumns().stream() - .map(Column::getName).collect(Collectors.toSet()))) { + .map(Column::getName).collect(Collectors.toSet()))) { newPartitionLoaded.set(false); } + if (analyzedJob.rowCount != 0 && analyzedJob.colToPartitions.keySet() + .containsAll(tableIf.getBaseSchema().stream() + .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) + .map(Column::getName).collect(Collectors.toSet()))) { + userInjected = false; + } } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index f56aa0db607..0a56a11d115 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -898,6 +898,16 @@ public class StatisticsUtil { return false; } + public static boolean enableAutoAnalyzeInternalCatalog() { + try { + return findConfigFromGlobalSessionVar( + SessionVariable.ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG).enableAutoAnalyzeInternalCatalog; + } catch (Exception e) { + LOG.warn("Fail to get value of enable auto analyze internal catalog, return false by default", e); + } + return false; + } + public static int getInsertMergeCount() { try { return findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT) diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index 1bf2041bb4f..cb2637d5cf6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -184,7 +184,7 @@ public class AnalysisJobTest { protected void syncLoadStats() { } }; - job.writeBuf(); + job.flushBuffer(); Assertions.assertEquals(0, job.queryFinished.size()); } @@ -210,7 +210,7 @@ public class AnalysisJobTest { job.buf.add(new ColStatsData()); job.queryFinished = new HashSet<>(); job.queryFinished.add(task2); - job.writeBuf(); + job.flushBuffer(); Assertions.assertEquals(0, job.queryFinished.size()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index f8a77fe06db..4e8bbfe5aff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -306,7 +306,7 @@ public class AnalysisManagerTest { Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2)); TableStatsMeta stats3 = new TableStatsMeta(0, new AnalysisInfoBuilder() - .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1").build(), olapTable); + .setColToPartitions(new HashMap<>()).setRowCount(0).setColName("col1").build(), olapTable); Assertions.assertTrue(olapTable.needReAnalyzeTable(stats3)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org