This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new b8770ee1d88 [fix](statistics)Fix write all 0 to column stats bug when new partition created. (#38394) b8770ee1d88 is described below commit b8770ee1d88577ce454ed0fe05d3b770b955ddf8 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Fri Jul 26 23:03:35 2024 +0800 [fix](statistics)Fix write all 0 to column stats bug when new partition created. (#38394) Fix write all 0 to column stats bug when new partition created. --- .../java/org/apache/doris/catalog/OlapTable.java | 57 ++++++++++++---------- .../org/apache/doris/statistics/AnalysisInfo.java | 7 ++- .../doris/statistics/AnalysisInfoBuilder.java | 9 +++- .../apache/doris/statistics/AnalysisManager.java | 8 ++- .../apache/doris/statistics/OlapAnalysisTask.java | 18 ++++--- .../doris/statistics/StatisticsAutoCollector.java | 12 +++-- .../apache/doris/statistics/TableStatsMeta.java | 2 +- .../doris/statistics/AnalysisManagerTest.java | 25 ++++++++-- .../doris/statistics/TableStatsMetaTest.java | 10 +--- .../suites/statistics/analyze_stats.groovy | 27 ++++++++++ 10 files changed, 119 insertions(+), 56 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index d357b9f93d6..099935e387a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1197,44 +1197,49 @@ public class OlapTable extends Table { .collect(Collectors.toSet()))) { return true; } - long rowCount = getRowCount(); - if (rowCount > 0 && tblStats.rowCount == 0) { + // 1 Check row count. + long currentRowCount = getRowCount(); + long lastAnalyzeRowCount = tblStats.rowCount; + // 1.1 Empty table -> non-empty table. Need analyze. + if (currentRowCount != 0 && lastAnalyzeRowCount == 0) { return true; } + // 1.2 Non-empty table -> empty table. Need analyze; + if (currentRowCount == 0 && lastAnalyzeRowCount != 0) { + return true; + } + // 1.3 Table is still empty. Not need to analyze. lastAnalyzeRowCount == 0 is always true here. + if (currentRowCount == 0) { + return false; + } + // 1.4 If row count changed more than the threshold, need analyze. + // lastAnalyzeRowCount == 0 is always false here. + double changeRate = + ((double) Math.abs(currentRowCount - lastAnalyzeRowCount) / lastAnalyzeRowCount) * 100.0; + if (changeRate > (100 - StatisticsUtil.getTableStatsHealthThreshold())) { + return true; + } + + // 2. Check update rows. long updateRows = tblStats.updatedRows.get(); - int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows); + int tblHealth = StatisticsUtil.getTableHealth(currentRowCount, updateRows); return tblHealth < StatisticsUtil.getTableStatsHealthThreshold(); } @Override public Map<String, Set<String>> findReAnalyzeNeededPartitions() { - TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId()); - Set<String> allPartitions = getPartitionNames().stream().map(this::getPartition) - .filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet()); - if (tableStats == null) { - Map<String, Set<String>> ret = Maps.newHashMap(); - for (Column col : getSchemaAllIndexes(false)) { - if (StatisticsUtil.isUnsupportedType(col.getType())) { - continue; - } - ret.put(col.getName(), allPartitions); - } - return ret; - } - Map<String, Set<String>> colToPart = new HashMap<>(); + Set<String> partitions = Sets.newHashSet(); + // No need to filter unchanged partitions, because it may bring unexpected behavior. + // Use dummy partition to skip it. + partitions.add("Dummy Partition"); + Map<String, Set<String>> colToParts = new HashMap<>(); for (Column col : getSchemaAllIndexes(false)) { if (StatisticsUtil.isUnsupportedType(col.getType())) { continue; } - long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName()); - Set<String> partitions = getPartitionNames().stream() - .map(this::getPartition) - .filter(Partition::hasData) - .filter(partition -> partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName) - .collect(Collectors.toSet()); - colToPart.put(col.getName(), partitions); - } - return colToPart; + colToParts.put(col.getName(), partitions); + } + return colToParts; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index c707107e0e0..461528b151f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -190,6 +190,9 @@ public class AnalysisInfo implements Writable { @SerializedName("emptyJob") public final boolean emptyJob; + + @SerializedName("rowCount") + public final long rowCount; /** * * Used to store the newest partition version of tbl when creating this job. @@ -206,7 +209,8 @@ public class AnalysisInfo implements Writable { long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType, boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition, boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull, - boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject) { + boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject, + long rowCount) { this.jobId = jobId; this.taskId = taskId; this.taskIds = taskIds; @@ -244,6 +248,7 @@ public class AnalysisInfo implements Writable { this.tblUpdateTime = tblUpdateTime; this.emptyJob = emptyJob; this.userInject = userInject; + this.rowCount = rowCount; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java index 22f3d22b3ce..ddef30ee4de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java @@ -64,6 +64,7 @@ public class AnalysisInfoBuilder { private long tblUpdateTime; private boolean emptyJob; private boolean userInject; + private long rowCount; public AnalysisInfoBuilder() { } @@ -103,6 +104,7 @@ public class AnalysisInfoBuilder { tblUpdateTime = info.tblUpdateTime; emptyJob = info.emptyJob; userInject = info.userInject; + rowCount = info.rowCount; } public AnalysisInfoBuilder setJobId(long jobId) { @@ -275,12 +277,17 @@ public class AnalysisInfoBuilder { return this; } + public AnalysisInfoBuilder setRowCount(long rowCount) { + this.rowCount = rowCount; + return this; + } + public AnalysisInfo build() { return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames, colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent, sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType, externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount, - cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject); + cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject, rowCount); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 347b1c6b047..36cc57ee381 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -66,6 +66,7 @@ import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.reflect.TypeToken; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.logging.log4j.LogManager; @@ -309,10 +310,13 @@ public class AnalysisManager implements Writable { private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<String> columnNames, Set<String> partitionNames, AnalysisType analysisType) throws DdlException { + Set<String> dummyPartitions = Sets.newHashSet(); + // validateAndGetPartitions is to be deprecated, for now, use dummy partition for empty partitions. + dummyPartitions.add("Dummy Partition"); Map<String, Set<String>> columnToPartitions = columnNames.stream() .collect(Collectors.toMap( columnName -> columnName, - columnName -> new HashSet<>(partitionNames == null ? Collections.emptySet() : partitionNames) + columnName -> new HashSet<>(partitionNames == null ? dummyPartitions : partitionNames) )); if (analysisType == AnalysisType.HISTOGRAM) { @@ -405,6 +409,8 @@ public class AnalysisManager implements Writable { infoBuilder.setTaskIds(Lists.newArrayList()); infoBuilder.setTblUpdateTime(table.getUpdateTime()); infoBuilder.setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0); + long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : table.getRowCount(); + infoBuilder.setRowCount(rowCount); return infoBuilder.build(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 073000d9eb2..1eec0ee93f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -19,6 +19,7 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.CreateMaterializedViewStmt; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndexMeta; @@ -37,7 +38,6 @@ import org.apache.commons.text.StringSubstitutor; import java.security.SecureRandom; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -69,15 +69,17 @@ public class OlapAnalysisTask extends BaseAnalysisTask { return; } Set<String> partitionNames = info.colToPartitions.get(info.colName); - if (StatisticsUtil.isEmptyTable(tbl, info.analysisMethod) - || partitionNames == null || partitionNames.isEmpty()) { - if (partitionNames == null) { - LOG.warn("Table {}.{}.{}, partitionNames for column {} is null. ColToPartitions:[{}]", - info.catalogId, info.dbId, info.tblId, info.colName, info.colToPartitions); - } + if (partitionNames == null || partitionNames.isEmpty()) { + LOG.warn("Table {}.{}.{}, partitionNames for column {} is null or empty. ColToPartitions:[{}]", + info.catalogId, info.dbId, info.tblId, info.colName, info.colToPartitions); + throw new RuntimeException(); + } + if (info.rowCount == 0 && tableSample != null) { StatsId statsId = new StatsId(concatColumnStatsId(), info.catalogId, info.dbId, info.tblId, info.indexId, info.colName, null); - job.appendBuf(this, Arrays.asList(new ColStatsData(statsId))); + ColStatsData colStatsData = new ColStatsData(statsId); + Env.getCurrentEnv().getStatisticsCache().syncColStats(colStatsData); + job.appendBuf(this, Collections.singletonList(colStatsData)); return; } if (tableSample != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 4dd8960b6fb..85913f5fd48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.Config; @@ -33,6 +32,7 @@ import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.StatisticsUtil; +import com.google.common.collect.Sets; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -181,6 +181,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { List<AnalysisInfo> analysisInfos, TableIf table) { AnalysisMethod analysisMethod = table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes() ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; + long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : table.getRowCount(); AnalysisInfo jobInfo = new AnalysisInfoBuilder() .setJobId(Env.getCurrentEnv().getNextId()) .setCatalogId(db.getCatalog().getId()) @@ -203,6 +204,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { .setJobType(JobType.SYSTEM) .setTblUpdateTime(table.getUpdateTime()) .setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0) + .setRowCount(rowCount) .build(); analysisInfos.add(jobInfo); } @@ -228,10 +230,12 @@ public class StatisticsAutoCollector extends StatisticsCollector { Set<String> partitionColumnNames = olapTable.getPartitionInfo().getPartitionColumns().stream() .map(Column::getName).collect(Collectors.toSet()); colNames = partitionColumnNames.stream().collect(Collectors.joining(",")); - Set<String> partitionNames = olapTable.getAllPartitions().stream() - .map(Partition::getName).collect(Collectors.toSet()); + Set<String> partitions = Sets.newHashSet(); + // No need to filter unchanged partitions, because it may bring unexpected behavior. + // Use dummy partition to skip it. + partitions.add("Dummy Partition"); for (String column : partitionColumnNames) { - needRunPartitions.put(column, partitionNames); + needRunPartitions.put(column, partitions); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 9231c6a2bc7..4a37106cb6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -157,7 +157,7 @@ public class TableStatsMeta implements Writable { jobType = analyzedJob.jobType; if (tableIf != null) { if (tableIf instanceof OlapTable) { - rowCount = analyzedJob.emptyJob ? 0 : tableIf.getRowCount(); + rowCount = analyzedJob.rowCount; } if (!analyzedJob.emptyJob && analyzedJob.colToPartitions.keySet() .containsAll(tableIf.getBaseSchema().stream() diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index dc8fb70bc05..ebe9cdf93b6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -276,7 +276,7 @@ public class AnalysisManagerTest { new MockUp<OlapTable>() { int count = 0; - int[] rowCount = new int[]{100, 100, 200, 200, 1, 1}; + int[] rowCount = new int[]{100, 200, 1, 0, 0, 100}; final Column c = new Column("col1", PrimitiveType.INT); @Mock @@ -298,20 +298,35 @@ public class AnalysisManagerTest { OlapTable olapTable = new OlapTable(); TableStatsMeta stats1 = new TableStatsMeta( 50, new AnalysisInfoBuilder().setColToPartitions(new HashMap<>()) - .setColName("col1").build(), olapTable); - stats1.updatedRows.addAndGet(50); + .setColName("col1").setRowCount(100).build(), olapTable); + stats1.updatedRows.addAndGet(70); Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1)); TableStatsMeta stats2 = new TableStatsMeta( 190, new AnalysisInfoBuilder() - .setColToPartitions(new HashMap<>()).setColName("col1").build(), olapTable); + .setColToPartitions(new HashMap<>()).setColName("col1").setRowCount(200).build(), olapTable); stats2.updatedRows.addAndGet(20); Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2)); TableStatsMeta stats3 = new TableStatsMeta(0, new AnalysisInfoBuilder() - .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1").build(), olapTable); + .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1") + .setRowCount(0).build(), olapTable); Assertions.assertTrue(olapTable.needReAnalyzeTable(stats3)); + TableStatsMeta stats4 = new TableStatsMeta(0, new AnalysisInfoBuilder() + .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1") + .setRowCount(1).build(), olapTable); + Assertions.assertTrue(olapTable.needReAnalyzeTable(stats4)); + + TableStatsMeta stats5 = new TableStatsMeta(0, new AnalysisInfoBuilder() + .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1") + .setRowCount(0).build(), olapTable); + Assertions.assertFalse(olapTable.needReAnalyzeTable(stats5)); + + TableStatsMeta stats6 = new TableStatsMeta(0, new AnalysisInfoBuilder() + .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1") + .setRowCount(30).build(), olapTable); + Assertions.assertTrue(olapTable.needReAnalyzeTable(stats6)); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/TableStatsMetaTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/TableStatsMetaTest.java index b5e73ba09da..7532884f400 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/TableStatsMetaTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/TableStatsMetaTest.java @@ -19,8 +19,6 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.OlapTable; -import mockit.Mock; -import mockit.MockUp; import mockit.Mocked; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -31,15 +29,9 @@ class TableStatsMetaTest { @Test void update(@Mocked OlapTable table) { - new MockUp<OlapTable>() { - @Mock - public long getRowCount() { - return 4; - } - }; TableStatsMeta tableStatsMeta = new TableStatsMeta(); AnalysisInfo jobInfo = new AnalysisInfoBuilder().setColToPartitions(new HashMap<>()) - .setColName("col1").build(); + .setColName("col1").setRowCount(4).build(); tableStatsMeta.update(jobInfo, table); Assertions.assertEquals(4, tableStatsMeta.rowCount); } diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index d2ef7b14ff7..be032a359c9 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -2825,6 +2825,33 @@ PARTITION `p599` VALUES IN (599) assertEquals("521779.0", alter_result[0][5]) assertEquals("7.142863009760572", alter_result[0][6]) + // Test analyze after new empty partition created. + sql """CREATE TABLE `part` ( + `id` INT NULL, + `colint` INT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + PARTITION BY RANGE(`id`) + (PARTITION p1 VALUES [("-2147483648"), ("10000")), + PARTITION p2 VALUES [("10000"), ("20000"))) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """analyze table part with sync;""" + sql """Insert into part values (1, 1), (10001, 10001);""" + sql """analyze table part with sync;""" + sleep(1000) + sql """alter table part add partition p3 VALUES [("20000"), ("30000"));""" + sql """analyze table part with sync;""" + sql """analyze table part with sync;""" + def new_part_result = sql """show column stats part(id)""" + assertEquals("2.0", new_part_result[0][2]) + new_part_result = sql """show column stats part(colint)""" + assertEquals("2.0", new_part_result[0][2]) sql """DROP DATABASE IF EXISTS trigger""" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org