This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e23bb788e7f [improvement](statistics)Make partition batch size configurable and limit total rows to scan in partition analyze. (#36410) e23bb788e7f is described below commit e23bb788e7fa27f1fc59ef64c1a8b2110872f0e1 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Wed Jun 19 10:13:24 2024 +0800 [improvement](statistics)Make partition batch size configurable and limit total rows to scan in partition analyze. (#36410) 1. Make the number of partitions to collect in one batch configurable 2. Add a limit to the total row count while doing partition analyzing in one batch. --- .../src/main/java/org/apache/doris/qe/GlobalVariable.java | 8 ++++++++ .../src/main/java/org/apache/doris/qe/SessionVariable.java | 1 - .../java/org/apache/doris/statistics/BaseAnalysisTask.java | 14 +++++++------- .../org/apache/doris/statistics/util/StatisticsUtil.java | 5 +++++ 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java index 14d8c35ff72..4790cd2172b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java @@ -63,6 +63,8 @@ public final class GlobalVariable { public static final String DEFAULT_USING_META_CACHE_FOR_EXTERNAL_CATALOG = "default_using_meta_cache_for_external_catalog"; + public static final String PARTITION_ANALYZE_BATCH_SIZE = "partition_analyze_batch_size"; + @VariableMgr.VarAttr(name = VERSION_COMMENT, flag = VariableMgr.READ_ONLY) public static String versionComment = "Doris version " + Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH; @@ -155,6 +157,12 @@ public final class GlobalVariable { "Only for compatibility with MySQL ecosystem, no practical meaning"}) public static boolean super_read_only = true; + @VariableMgr.VarAttr(name = PARTITION_ANALYZE_BATCH_SIZE, flag = VariableMgr.GLOBAL, + description = { + "批量收集分区信息的分区数", + "Number of partitions to collect in one batch."}) + public static int partitionAnalyzeBatchSize = 10; + // Don't allow creating instance. private GlobalVariable() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 309611c6ab4..b4125810c24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -520,7 +520,6 @@ public class SessionVariable implements Serializable, Writable { public static final String HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = "huge_table_lower_bound_size_in_bytes"; public static final String HUGE_PARTITION_LOWER_BOUND_ROWS = "huge_partition_lower_bound_rows"; - // for spill to disk public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold"; public static final String EXTERNAL_AGG_BYTES_THRESHOLD = "external_agg_bytes_threshold"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 5091ce53a20..920d528317d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -61,7 +61,6 @@ public abstract class BaseAnalysisTask { public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB public static final double LIMIT_FACTOR = 1.2; - public static final int PARTITION_BATCH_SIZE = 100; protected static final String FULL_ANALYZE_TEMPLATE = "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, " @@ -367,6 +366,7 @@ public abstract class BaseAnalysisTask { List<String> sqls = Lists.newArrayList(); Set<String> partNames = Sets.newHashSet(); int count = 0; + long batchRowCount = 0; AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); TableStatsMeta tableStatsStatus = analysisManager.findTableStatsStatus(tbl.getId()); String idxName = info.indexId == -1 ? tbl.getName() : ((OlapTable) tbl).getIndexNameById(info.indexId); @@ -374,22 +374,24 @@ public abstract class BaseAnalysisTask { ? null : tableStatsStatus.findColumnStatsMeta(idxName, col.getName()); boolean hasHughPartition = false; long hugePartitionThreshold = StatisticsUtil.getHugePartitionLowerBoundRows(); + int partitionBatchSize = StatisticsUtil.getPartitionAnalyzeBatchSize(); // Find jobInfo for this task. AnalysisInfo jobInfo = analysisManager.findJobInfo(job.getJobInfo().jobId); // For sync job, get jobInfo from job.jobInfo. boolean isSync = jobInfo == null; jobInfo = isSync ? job.jobInfo : jobInfo; - StatisticsCache cache = Env.getCurrentEnv().getStatisticsCache(); for (String part : partitionNames) { // External table partition is null. Partition partition = tbl.getPartition(part); if (partition != null) { // For huge partition, skip analyze it. - if (partition.getBaseIndex().getRowCount() > hugePartitionThreshold) { + long partitionRowCount = partition.getBaseIndex().getRowCount(); + if (partitionRowCount > hugePartitionThreshold) { hasHughPartition = true; LOG.info("Partition {} in table {} is too large, skip it.", part, tbl.getName()); continue; } + batchRowCount += partitionRowCount; // For cluster upgrade compatible (older version metadata doesn't have partition update rows map) // and insert before first analyze, set partition update rows to 0. jobInfo.partitionUpdateRows.putIfAbsent(partition.getId(), 0L); @@ -413,12 +415,9 @@ public abstract class BaseAnalysisTask { params.put("partitionInfo", getPartitionInfo(part)); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); sqls.add(stringSubstitutor.replace(PARTITION_ANALYZE_TEMPLATE)); - // TODO: invalidate remote FE's cache. - cache.invalidatePartitionColumnStatsCache( - info.catalogId, info.dbId, info.tblId, info.indexId, part, col.getName()); count++; partNames.add(part); - if (count == PARTITION_BATCH_SIZE) { + if (count == partitionBatchSize || batchRowCount > hugePartitionThreshold) { String sql = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME + Joiner.on(" UNION ALL ").join(sqls); runInsert(sql); @@ -427,6 +426,7 @@ public abstract class BaseAnalysisTask { partNames.clear(); sqls.clear(); count = 0; + batchRowCount = 0; } } if (count > 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 646d0235b5c..f6041a6a767 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -63,6 +63,7 @@ import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; import org.apache.doris.qe.AuditLogHelper; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.GlobalVariable; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; @@ -877,6 +878,10 @@ public class StatisticsUtil { return StatisticConstants.HUGE_PARTITION_LOWER_BOUND_ROWS; } + public static int getPartitionAnalyzeBatchSize() { + return GlobalVariable.partitionAnalyzeBatchSize; + } + public static long getHugeTableAutoAnalyzeIntervalInMillis() { try { return findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org