This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 290a2ac3d53 [improvement](statistics)Analyze all columns when partition first loaded. (#38601) 290a2ac3d53 is described below commit 290a2ac3d539698259469188b5d55cceb13de915 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Wed Jul 31 23:47:46 2024 +0800 [improvement](statistics)Analyze all columns when partition first loaded. (#38601) backport: https://github.com/apache/doris/pull/38540 --- .../apache/doris/analysis/AnalyzeProperties.java | 2 + .../java/org/apache/doris/catalog/OlapTable.java | 4 + .../apache/doris/statistics/AnalysisManager.java | 16 +- .../doris/statistics/StatisticsAutoCollector.java | 15 -- .../apache/doris/statistics/TableStatsMeta.java | 18 +- .../doris/statistics/AnalysisManagerTest.java | 8 +- .../suites/statistics/analyze_stats.groovy | 238 ++++++++++++--------- 7 files changed, 171 insertions(+), 130 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java index 4b5f161d2be..11197cfe62e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeProperties.java @@ -44,6 +44,7 @@ public class AnalyzeProperties { public static final String PROPERTY_PERIOD_SECONDS = "period.seconds"; public static final String PROPERTY_FORCE_FULL = "force.full"; public static final String PROPERTY_PARTITION_COLUMN_FROM_SQL = "partition.column.from.sql"; + public static final String PROPERTY_USE_AUTO_ANALYZER = "use.auto.analyzer"; public static final AnalyzeProperties DEFAULT_PROP = new AnalyzeProperties(new HashMap<String, String>() { { @@ -72,6 +73,7 @@ public class AnalyzeProperties { .add(PROPERTY_PERIOD_CRON) .add(PROPERTY_FORCE_FULL) .add(PROPERTY_PARTITION_COLUMN_FROM_SQL) + .add(PROPERTY_USE_AUTO_ANALYZER) .build(); public AnalyzeProperties(Map<String, String> properties) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 1b77bf25056..3b4d22f0e5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1205,6 +1205,10 @@ public class OlapTable extends Table { .collect(Collectors.toSet()))) { return true; } + // Check new partition first loaded. + if (tblStats.newPartitionLoaded != null && tblStats.newPartitionLoaded.get()) { + return true; + } // 1 Check row count. long currentRowCount = getRowCount(); long lastAnalyzeRowCount = tblStats.rowCount; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 36cc57ee381..6dd67283774 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -208,7 +208,21 @@ public class AnalysisManager implements Writable { } // Each analyze stmt corresponding to an analysis job. - public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws DdlException { + public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws DdlException, AnalysisException { + // Using auto analyzer if user specifies. + if (stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) { + StatisticsAutoCollector autoCollector = Env.getCurrentEnv().getStatisticsAutoCollector(); + if (autoCollector.skip(stmt.getTable())) { + return; + } + List<AnalysisInfo> jobs = new ArrayList<>(); + autoCollector.createAnalyzeJobForTbl(stmt.getDb(), jobs, stmt.getTable()); + AnalysisInfo job = autoCollector.getReAnalyzeRequiredPart(jobs.get(0)); + if (job != null) { + Env.getCurrentEnv().getStatisticsAutoCollector().createSystemAnalysisJob(job); + } + return; + } AnalysisInfo jobInfo = buildAndAssignJob(stmt); if (jobInfo == null) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 85913f5fd48..a04f428aa66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -32,14 +32,12 @@ import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.StatisticsUtil; -import com.google.common.collect.Sets; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.time.LocalTime; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -224,19 +222,6 @@ public class StatisticsAutoCollector extends StatisticsCollector { String colNames = jobInfo.colName; if (table.needReAnalyzeTable(tblStats)) { needRunPartitions = table.findReAnalyzeNeededPartitions(); - } else if (table instanceof OlapTable && tblStats.newPartitionLoaded.get()) { - OlapTable olapTable = (OlapTable) table; - needRunPartitions = new HashMap<>(); - Set<String> partitionColumnNames = olapTable.getPartitionInfo().getPartitionColumns().stream() - .map(Column::getName).collect(Collectors.toSet()); - colNames = partitionColumnNames.stream().collect(Collectors.joining(",")); - Set<String> partitions = Sets.newHashSet(); - // No need to filter unchanged partitions, because it may bring unexpected behavior. - // Use dummy partition to skip it. - partitions.add("Dummy Partition"); - for (String column : partitionColumnNames) { - needRunPartitions.put(column, partitions); - } } if (needRunPartitions == null || needRunPartitions.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 07580df3607..3eceab0db90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -19,12 +19,12 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.util.StatisticsUtil; @@ -166,21 +166,16 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { indexesRowCount.putAll(analyzedJob.indexesRowCount); clearStaleIndexRowCount((OlapTable) tableIf); } - if (!analyzedJob.emptyJob && analyzedJob.colToPartitions.keySet() + if (analyzedJob.emptyJob && AnalysisMethod.SAMPLE.equals(analyzedJob.analysisMethod)) { + return; + } + if (analyzedJob.colToPartitions.keySet() .containsAll(tableIf.getBaseSchema().stream() .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) .map(Column::getName).collect(Collectors.toSet()))) { updatedRows.set(0); newPartitionLoaded.set(false); } - if (tableIf instanceof OlapTable) { - PartitionInfo partitionInfo = ((OlapTable) tableIf).getPartitionInfo(); - if (partitionInfo != null && analyzedJob.colToPartitions.keySet() - .containsAll(partitionInfo.getPartitionColumns().stream() - .map(Column::getName).collect(Collectors.toSet()))) { - newPartitionLoaded.set(false); - } - } } } @@ -189,6 +184,9 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { if (indexesRowCount == null) { indexesRowCount = new ConcurrentHashMap<>(); } + if (newPartitionLoaded == null) { + newPartitionLoaded = new AtomicBoolean(false); + } } public long getRowCount(long indexId) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index ebe9cdf93b6..4993884f02a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -296,12 +296,18 @@ public class AnalysisManagerTest { }; OlapTable olapTable = new OlapTable(); + TableStatsMeta stats0 = new TableStatsMeta( + 50, new AnalysisInfoBuilder().setColToPartitions(new HashMap<>()) + .setColName("col1").setRowCount(100).build(), olapTable); + stats0.newPartitionLoaded.set(true); + Assertions.assertTrue(olapTable.needReAnalyzeTable(stats0)); + TableStatsMeta stats1 = new TableStatsMeta( 50, new AnalysisInfoBuilder().setColToPartitions(new HashMap<>()) .setColName("col1").setRowCount(100).build(), olapTable); stats1.updatedRows.addAndGet(70); - Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1)); + TableStatsMeta stats2 = new TableStatsMeta( 190, new AnalysisInfoBuilder() .setColToPartitions(new HashMap<>()).setColName("col1").setRowCount(200).build(), olapTable); diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index be032a359c9..f0e31e4d7c2 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -121,56 +121,144 @@ suite("test_analyze") { SET forbid_unknown_col_stats=true; """ -// sql """ -// SELECT * FROM ${tbl}; -// """ - sql """ DROP STATS ${tbl}(analyzetestlimitedk3) """ - def exception = null - -// try { -// sql """ -// SELECT * FROM ${tbl}; -// """ -// } catch (Exception e) { -// exception = e -// } -// -// assert exception != null -// -// exception = null - sql """ ANALYZE TABLE ${tbl} WITH SYNC """ -// sql """ -// SELECT * FROM ${tbl}; -// """ - sql """ DROP STATS ${tbl} """ -// try { -// sql """ -// SELECT * FROM ${tbl}; -// """ -// } catch (Exception e) { -// exception = e -// } + // Test partition load data for the first time. + sql """ + CREATE TABLE `partition_test` ( + `id` INT NOT NULL, + `name` VARCHAR(25) NOT NULL, + `comment` VARCHAR(152) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + PARTITION BY RANGE(`id`) + (PARTITION p1 VALUES [("0"), ("100")), + PARTITION p2 VALUES [("100"), ("200")), + PARTITION p3 VALUES [("200"), ("300"))) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1"); + """ + + sql """analyze table partition_test with sync""" + sql """insert into partition_test values (1, '1', '1')""" + def partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "true") + assertEquals(partition_result[0][0], "1") + sql """analyze table partition_test with sync""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "false") + sql """insert into partition_test values (101, '1', '1')""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "true") + sql """analyze table partition_test(id) with sync""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "true") + sql """analyze table partition_test with sync""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "false") + sql """insert into partition_test values (102, '1', '1')""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "false") + sql """insert into partition_test values (2, '2', '2')""" + sql """insert into partition_test values (3, '3', '3')""" + sql """insert into partition_test values (4, '4', '4')""" + sql """insert into partition_test values (5, '5', '5')""" + sql """insert into partition_test values (6, '6', '6')""" + sql """insert into partition_test values (7, '7', '7')""" + sql """insert into partition_test values (8, '8', '8')""" + sql """insert into partition_test values (9, '9', '9')""" + sql """insert into partition_test values (10, '10', '10')""" + sql """insert into partition_test values (103, '1', '1')""" + sql """insert into partition_test values (104, '1', '1')""" + sql """insert into partition_test values (105, '1', '1')""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "false") + sql """analyze table partition_test with sync""" + sql """insert into partition_test values (201, '1', '1')""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "true") + partition_result = sql """show column stats partition_test(id)""" + assertEquals("id", partition_result[0][0]) + assertEquals("15.0", partition_result[0][2]) + partition_result = sql """show column stats partition_test(name)""" + assertEquals("name", partition_result[0][0]) + assertEquals("15.0", partition_result[0][2]) + partition_result = sql """show column stats partition_test(comment)""" + assertEquals("comment", partition_result[0][0]) + assertEquals("15.0", partition_result[0][2]) + + // Test sample agg table value column + sql """ + CREATE TABLE `agg_table_test` ( + `id` BIGINT NOT NULL, + `name` VARCHAR(10) REPLACE NULL + ) ENGINE=OLAP + AGGREGATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 32 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """insert into agg_table_test values (1,'name1'), (2, 'name2')""" + sql """analyze table agg_table_test with sample rows 100 with sync""" + def agg_result = sql """show column stats agg_table_test (name)""" + assertEquals(agg_result[0][7], "N/A") + assertEquals(agg_result[0][8], "N/A") + + // Continue test partition load data for the first time. + def reported = false; + for (int i = 0; i < 10; i++) { + def data_result = sql """show data from partition_test""" + logger.info("show data from partition_test: " + data_result) + if (data_result[0][4] == '16') { + reported = true; + break; + } + sleep(1000) + } + if (!reported) { + logger.info("partition_test row count is not reported.") + } else { + sql """analyze table partition_test PROPERTIES("use.auto.analyzer"="true")""" + for (int i = 0; i < 10; i++) { + def auto_analyze_result = sql """show auto analyze partition_test""" + logger.info("show auto analyze partition_test result : " + auto_analyze_result) + if (auto_analyze_result[0][9] == 'FINISHED') { + logger.info("Auto analyze finished.") + auto_analyze_result = sql """show table stats partition_test""" + assertEquals(auto_analyze_result[0][6], "false") + auto_analyze_result = sql """show column stats partition_test(id)""" + assertEquals("id", auto_analyze_result[0][0]) + assertEquals("16.0", auto_analyze_result[0][2]) + auto_analyze_result = sql """show column stats partition_test(name)""" + assertEquals("name", auto_analyze_result[0][0]) + assertEquals("16.0", auto_analyze_result[0][2]) + auto_analyze_result = sql """show column stats partition_test(comment)""" + assertEquals("comment", auto_analyze_result[0][0]) + assertEquals("16.0", auto_analyze_result[0][2]) + break + } + sleep(1000) + } + } def a_result_1 = sql """ ANALYZE DATABASE ${db} WITH SYNC WITH SAMPLE PERCENT 10 """ - def a_result_2 = sql """ - ANALYZE DATABASE ${db} WITH SYNC WITH SAMPLE PERCENT 5 - """ - def a_result_3 = sql """ ANALYZE DATABASE ${db} WITH SAMPLE PERCENT 5 """ @@ -2560,62 +2648,6 @@ PARTITION `p599` VALUES IN (599) sql """drop stats col1100 """ sql """DROP TABLE IF EXISTS col1100""" - // Test partititon load data for the first time. - sql """ - CREATE TABLE `partition_test` ( - `id` INT NOT NULL, - `name` VARCHAR(25) NOT NULL, - `comment` VARCHAR(152) NULL - ) ENGINE=OLAP - DUPLICATE KEY(`id`) - COMMENT 'OLAP' - PARTITION BY RANGE(`id`) - (PARTITION p1 VALUES [("0"), ("100")), - PARTITION p2 VALUES [("100"), ("200")), - PARTITION p3 VALUES [("200"), ("300"))) - DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES ( - "replication_num" = "1"); - """ - - sql """analyze table partition_test with sync""" - sql """insert into partition_test values (1, '1', '1')""" - def partition_result = sql """show table stats partition_test""" - assertEquals(partition_result[0][6], "true") - assertEquals(partition_result[0][0], "1") - sql """analyze table partition_test with sync""" - partition_result = sql """show table stats partition_test""" - assertEquals(partition_result[0][6], "false") - sql """insert into partition_test values (101, '1', '1')""" - partition_result = sql """show table stats partition_test""" - assertEquals(partition_result[0][6], "true") - sql """analyze table partition_test(id) with sync""" - partition_result = sql """show table stats partition_test""" - assertEquals(partition_result[0][6], "false") - sql """insert into partition_test values (102, '1', '1')""" - partition_result = sql """show table stats partition_test""" - assertEquals(partition_result[0][6], "false") - - // Test sample agg table value column - sql """ - CREATE TABLE `agg_table_test` ( - `id` BIGINT NOT NULL, - `name` VARCHAR(10) REPLACE NULL - ) ENGINE=OLAP - AGGREGATE KEY(`id`) - COMMENT 'OLAP' - DISTRIBUTED BY HASH(`id`) BUCKETS 32 - PROPERTIES ( - "replication_num" = "1" - ); - """ - sql """insert into agg_table_test values (1,'name1'), (2, 'name2')""" - Thread.sleep(1000 * 60) - sql """analyze table agg_table_test with sample rows 100 with sync""" - def agg_result = sql """show column stats agg_table_test (name)""" - assertEquals(agg_result[0][7], "N/A") - assertEquals(agg_result[0][8], "N/A") - // Test sample string type min max sql """ CREATE TABLE `string_min_max` ( @@ -2628,19 +2660,19 @@ PARTITION `p599` VALUES IN (599) PROPERTIES ( "replication_num" = "1" ); - """ - sql """insert into string_min_max values (1,'name1'), (2, 'name2')""" - sql """set forbid_unknown_col_stats=false""" - explain { - sql("select min(name), max(name) from string_min_max") - contains "pushAggOp=NONE" - } - sql """set enable_pushdown_string_minmax = true""" - explain { - sql("select min(name), max(name) from string_min_max") - contains "pushAggOp=MINMAX" - } - sql """set forbid_unknown_col_stats=true""" + """ + sql """insert into string_min_max values (1,'name1'), (2, 'name2')""" + sql """set forbid_unknown_col_stats=false""" + explain { + sql("select min(name), max(name) from string_min_max") + contains "pushAggOp=NONE" + } + sql """set enable_pushdown_string_minmax = true""" + explain { + sql("select min(name), max(name) from string_min_max") + contains "pushAggOp=MINMAX" + } + sql """set forbid_unknown_col_stats=true""" // Test alter sql """ @@ -2840,7 +2872,7 @@ PARTITION `p599` VALUES IN (599) "replication_allocation" = "tag.location.default: 1" ); """ - + sql """analyze table part with sync;""" sql """Insert into part values (1, 1), (10001, 10001);""" sql """analyze table part with sync;""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org