morrySnow commented on code in PR #13883:
URL: https://github.com/apache/doris/pull/13883#discussion_r1011494623


##########
fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java:
##########
@@ -710,7 +710,7 @@ public void analyze(TQueryOptions tQueryOptions) throws 
UserException {
                     throw e;
                 } catch (Exception e) {
                     if (parsedStmt instanceof LogicalPlanAdapter) {
-                        throw new NereidsException(new 
AnalysisException("Unexpected exception: " + e.getMessage()));
+                        throw new NereidsException(new 
AnalysisException("Unexpected exception: ", e));

Review Comment:
   ```suggestion
                           throw new NereidsException(new 
AnalysisException("Unexpected exception: " + e.getMessage(), e));
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatsDeriveResult.java:
##########
@@ -48,10 +48,7 @@ public StatsDeriveResult(double rowCount) {
 
     public StatsDeriveResult(StatsDeriveResult another) {
         this.rowCount = another.rowCount;
-        slotIdToColumnStats = new HashMap<>();
-        for (Entry<Id, ColumnStatistic> entry : 
another.slotIdToColumnStats.entrySet()) {
-            slotIdToColumnStats.put(entry.getKey(), entry.getValue().copy());
-        }
+        slotIdToColumnStats = new HashMap<>(another.slotIdToColumnStats);

Review Comment:
   why remove copy on value?



##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1829,4 +1829,44 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long max_backend_heartbeat_failure_tolerance_count = 1;
+    @ConfField(mutable = false)
+    public static int statistic_table_bucket_count = 7;

Review Comment:
   why 7?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -5197,4 +5216,53 @@ public int getFollowerCount() {
         }
         return count;
     }
+
+    public AnalysisJobScheduler getAnalysisJobScheduler() {
+        return analysisJobScheduler;
+    }
+
+    // TODO:
+    //  1. handle partition level analysis statement properly
+    //  2. support sample job
+    //  3. support period job
+    public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
+        String catalogName = analyzeStmt.getCatalogName();
+        String db = analyzeStmt.getDBName();
+        String tbl = analyzeStmt.getTblName();
+        List<String> colNames = analyzeStmt.getOptColumnNames();
+        String persistAnalysisJobSQLTemplate = "INSERT INTO " + 
StatisticConstants.STATISTIC_DB_NAME + "."
+                + StatisticConstants.ANALYSIS_JOB_TABLE + " VALUES(${jobId}, 
'${catalogName}', '${dbName}',"
+                + "'${tblName}','${colName}', '${jobType}', '${analysisType}', 
'${message}', '${lastExecTimeInMs}',"
+                + "'${state}', '${scheduleType}')";

Review Comment:
   move it as a static class number into StatisticUtil



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -5197,4 +5216,53 @@ public int getFollowerCount() {
         }
         return count;
     }
+
+    public AnalysisJobScheduler getAnalysisJobScheduler() {
+        return analysisJobScheduler;
+    }
+
+    // TODO:
+    //  1. handle partition level analysis statement properly
+    //  2. support sample job
+    //  3. support period job
+    public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
+        String catalogName = analyzeStmt.getCatalogName();
+        String db = analyzeStmt.getDBName();
+        String tbl = analyzeStmt.getTblName();
+        List<String> colNames = analyzeStmt.getOptColumnNames();
+        String persistAnalysisJobSQLTemplate = "INSERT INTO " + 
StatisticConstants.STATISTIC_DB_NAME + "."
+                + StatisticConstants.ANALYSIS_JOB_TABLE + " VALUES(${jobId}, 
'${catalogName}', '${dbName}',"
+                + "'${tblName}','${colName}', '${jobType}', '${analysisType}', 
'${message}', '${lastExecTimeInMs}',"
+                + "'${state}', '${scheduleType}')";
+        if (colNames != null) {
+            for (String colName : colNames) {
+                AnalysisJobInfo analysisJobInfo = new 
AnalysisJobInfo(Env.getCurrentEnv().getNextId(), catalogName, db,
+                        tbl, colName, AnalysisJobInfo.JobType.MANUAL, 
ScheduleType.ONCE);
+                analysisJobInfo.analysisType = AnalysisType.FULL;
+                Map<String, String> params = new HashMap<>();
+                params.put("jobId", String.valueOf(analysisJobInfo.jobId));
+                params.put("catalogName", analysisJobInfo.catalogName);
+                params.put("dbName", analysisJobInfo.dbName);
+                params.put("tblName", analysisJobInfo.tblName);
+                params.put("colName", analysisJobInfo.colName);
+                params.put("jobType", analysisJobInfo.jobType.toString());
+                params.put("analysisType", 
analysisJobInfo.analysisType.toString());
+                params.put("message", "");
+                params.put("lastExecTimeInMs", "0");
+                params.put("state", JobState.PENDING.toString());
+                params.put("scheduleType", 
analysisJobInfo.scheduleType.toString());
+                try {
+                    StatisticUtil.execUpdate(
+                            new 
StringSubstitutor(params).replace(persistAnalysisJobSQLTemplate));
+                } catch (Exception e) {
+                    LOG.warn("Failed to persite job for column: {}", colName, 
e);
+                }
+                
Env.getCurrentEnv().getAnalysisJobScheduler().schedule(analysisJobInfo);
+            }
+        }
+    }
+
+    public StatisticsCache getStatisticsCache() {
+        return statisticsCache;

Review Comment:
   put getters together



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java:
##########
@@ -399,7 +399,19 @@ public class SchemaTable extends Table {
                                     .column("CREATION_TIME", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("OLDEST_WRITE_TIMESTAMP", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("NEWEST_WRITE_TIMESTAMP", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .build()))
+            .put("analysis_jobs", new 
SchemaTable(SystemIdGenerator.getNextId(), "analysis_jobs", TableType.SCHEMA,
+                            builder().column("job_id", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("catalog_name", 
ScalarType.createVarchar(1024))
+                                    .column("db_name", 
ScalarType.createVarchar(1024))
+                                    .column("tbl_name", 
ScalarType.createVarchar(1024))

Review Comment:
   do we need add index id in this table?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########
@@ -320,6 +320,10 @@ public List<Long> getDbIds() {
         return Lists.newArrayList(idToDb.keySet());
     }
 
+    public List<Database> getDbs() {
+        return Lists.newArrayList(idToDb.values());
+    }
+

Review Comment:
   do we need to add this to CatalogIf as a interface



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -5197,4 +5216,53 @@ public int getFollowerCount() {
         }
         return count;
     }
+
+    public AnalysisJobScheduler getAnalysisJobScheduler() {
+        return analysisJobScheduler;
+    }
+
+    // TODO:
+    //  1. handle partition level analysis statement properly
+    //  2. support sample job
+    //  3. support period job
+    public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
+        String catalogName = analyzeStmt.getCatalogName();
+        String db = analyzeStmt.getDBName();
+        String tbl = analyzeStmt.getTblName();
+        List<String> colNames = analyzeStmt.getOptColumnNames();
+        String persistAnalysisJobSQLTemplate = "INSERT INTO " + 
StatisticConstants.STATISTIC_DB_NAME + "."
+                + StatisticConstants.ANALYSIS_JOB_TABLE + " VALUES(${jobId}, 
'${catalogName}', '${dbName}',"
+                + "'${tblName}','${colName}', '${jobType}', '${analysisType}', 
'${message}', '${lastExecTimeInMs}',"
+                + "'${state}', '${scheduleType}')";
+        if (colNames != null) {
+            for (String colName : colNames) {
+                AnalysisJobInfo analysisJobInfo = new 
AnalysisJobInfo(Env.getCurrentEnv().getNextId(), catalogName, db,
+                        tbl, colName, AnalysisJobInfo.JobType.MANUAL, 
ScheduleType.ONCE);
+                analysisJobInfo.analysisType = AnalysisType.FULL;
+                Map<String, String> params = new HashMap<>();
+                params.put("jobId", String.valueOf(analysisJobInfo.jobId));
+                params.put("catalogName", analysisJobInfo.catalogName);
+                params.put("dbName", analysisJobInfo.dbName);
+                params.put("tblName", analysisJobInfo.tblName);
+                params.put("colName", analysisJobInfo.colName);
+                params.put("jobType", analysisJobInfo.jobType.toString());
+                params.put("analysisType", 
analysisJobInfo.analysisType.toString());
+                params.put("message", "");
+                params.put("lastExecTimeInMs", "0");
+                params.put("state", JobState.PENDING.toString());
+                params.put("scheduleType", 
analysisJobInfo.scheduleType.toString());
+                try {
+                    StatisticUtil.execUpdate(

Review Comment:
   ```suggestion
                       StatisticsUtil.execUpdate(
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1829,4 +1829,44 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long max_backend_heartbeat_failure_tolerance_count = 1;
+    @ConfField(mutable = false)
+    public static int statistic_table_bucket_count = 7;
+
+    @ConfField
+    public static long statistics_max_mem_per_query_in_bytes = 2L * 1024 * 
1024 * 1024;
+
+    @ConfField
+    public static int statistic_parallel_exec_instance_num = 1;
+
+    @ConfField
+    public static int statistics_simultaneously_running_job_num = 10;
+
+    @ConfField
+    public static int statistic_internal_table_replica_num = 1;
+
+    @ConfField
+    public static int statistic_clean_interval_in_hours = 24 * 2;
+
+    @ConfField
+    public static int statistics_stale_statistics_fetch_size = 1000;
+
+    @ConfField
+    public static int 
statistics_outdated_record_detector_running_interval_in_minutes = 5;
+
+    @ConfField
+    public static int statistics_records_outdated_time_in_ms = 2 * 24 * 3600 * 
1000;
+
+    @ConfField
+    public static int statistics_job_execution_timeout_in_min = 5;
+
+    @ConfField
+    public static int statistics_table_creation_retry_interval_in_seconds = 5;
+
+    @ConfField
+    public static int statistics_cache_max_size = 100000;
+
+    @ConfField
+    public static int statistics_cache_valid_duration_in_hours = 24 * 2;
+
+    public static int statistics_cache_refresh_interval = 24 * 2;

Review Comment:
   forget @ConfField ?



##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1829,4 +1829,44 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long max_backend_heartbeat_failure_tolerance_count = 1;
+    @ConfField(mutable = false)
+    public static int statistic_table_bucket_count = 7;
+
+    @ConfField
+    public static long statistics_max_mem_per_query_in_bytes = 2L * 1024 * 
1024 * 1024;
+
+    @ConfField
+    public static int statistic_parallel_exec_instance_num = 1;
+
+    @ConfField
+    public static int statistics_simultaneously_running_job_num = 10;
+
+    @ConfField
+    public static int statistic_internal_table_replica_num = 1;
+
+    @ConfField
+    public static int statistic_clean_interval_in_hours = 24 * 2;

Review Comment:
   why some configs have no 's'?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -4421,7 +4439,8 @@ public void changeCatalog(ConnectContext ctx, String 
catalogName) throws DdlExce
     // Change current database of this session.
     public void changeDb(ConnectContext ctx, String qualifiedDb) throws 
DdlException {
         if (!auth.checkDbPriv(ctx, qualifiedDb, PrivPredicate.SHOW)) {
-            
ErrorReport.reportDdlException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, 
ctx.getQualifiedUser(), qualifiedDb);
+            
ErrorReport.reportDdlException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, 
ctx.getQualifiedUser(),
+                    qualifiedDb);

Review Comment:
   revert this change



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/ExpressionEstimation.java:
##########
@@ -38,94 +38,99 @@
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.util.Utils;
-import org.apache.doris.statistics.ColumnStat;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.ColumnStatisticBuilder;
 import org.apache.doris.statistics.StatsDeriveResult;
 
 import com.google.common.base.Preconditions;
 
 /**
  * Used to estimate for expressions that not producing boolean value.
  */
-public class ExpressionEstimation extends ExpressionVisitor<ColumnStat, 
StatsDeriveResult> {
+public class ExpressionEstimation extends ExpressionVisitor<ColumnStatistic, 
StatsDeriveResult> {
 
     private static ExpressionEstimation INSTANCE = new ExpressionEstimation();
 
     /**
      * returned columnStat is newly created or a copy of stats
      */
-    public static ColumnStat estimate(Expression expression, StatsDeriveResult 
stats) {
+    public static ColumnStatistic estimate(Expression expression, 
StatsDeriveResult stats) {
         return INSTANCE.visit(expression, stats);
     }
 
     @Override
-    public ColumnStat visit(Expression expr, StatsDeriveResult context) {
+    public ColumnStatistic visit(Expression expr, StatsDeriveResult context) {
         return expr.accept(this, context);
     }
 
     //TODO: case-when need to re-implemented
     @Override
-    public ColumnStat visitCaseWhen(CaseWhen caseWhen, StatsDeriveResult 
context) {
-        ColumnStat columnStat = new ColumnStat();
+    public ColumnStatistic visitCaseWhen(CaseWhen caseWhen, StatsDeriveResult 
context) {
+        ColumnStatisticBuilder columnStat = new ColumnStatisticBuilder();
         columnStat.setNdv(caseWhen.getWhenClauses().size() + 1);
-        columnStat.setSelectivity(1.0);
         columnStat.setMinValue(0);
         columnStat.setMaxValue(Double.MAX_VALUE);
         columnStat.setAvgSizeByte(8);
         columnStat.setNumNulls(0);
-        columnStat.setMaxSizeByte(8);
-        return columnStat;
+        return columnStat.createColumnStatistic();
     }
 
-    public ColumnStat visitCast(Cast cast, StatsDeriveResult context) {
+    public ColumnStatistic visitCast(Cast cast, StatsDeriveResult context) {
         return cast.child().accept(this, context);
     }
 
     @Override
-    public ColumnStat visitLiteral(Literal literal, StatsDeriveResult context) 
{
-        if 
(ColumnStat.MAX_MIN_UNSUPPORTED_TYPE.contains(literal.getDataType().toCatalogDataType()))
 {
-            return ColumnStat.UNKNOWN;
+    public ColumnStatistic visitLiteral(Literal literal, StatsDeriveResult 
context) {
+        if 
(ColumnStatistic.MAX_MIN_UNSUPPORTED_TYPE.contains(literal.getDataType().toCatalogDataType()))
 {
+            return ColumnStatistic.DEFAULT;
         }
         double literalVal = literal.getDouble();
-        ColumnStat columnStat = new ColumnStat();
+        ColumnStatisticBuilder columnStat = new ColumnStatisticBuilder();

Review Comment:
   ```suggestion
           ColumnStatisticBuilder builder = new ColumnStatisticBuilder();
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -435,6 +446,10 @@ public class Env {
 
     private MTMVJobManager mtmvJobManager;
 
+    private AnalysisJobScheduler analysisJobScheduler;
+
+    private StatisticsCache statisticsCache;

Review Comment:
   final



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/ExpressionEstimation.java:
##########
@@ -38,94 +38,99 @@
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.util.Utils;
-import org.apache.doris.statistics.ColumnStat;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.ColumnStatisticBuilder;
 import org.apache.doris.statistics.StatsDeriveResult;
 
 import com.google.common.base.Preconditions;
 
 /**
  * Used to estimate for expressions that not producing boolean value.
  */
-public class ExpressionEstimation extends ExpressionVisitor<ColumnStat, 
StatsDeriveResult> {
+public class ExpressionEstimation extends ExpressionVisitor<ColumnStatistic, 
StatsDeriveResult> {
 
     private static ExpressionEstimation INSTANCE = new ExpressionEstimation();
 
     /**
      * returned columnStat is newly created or a copy of stats
      */
-    public static ColumnStat estimate(Expression expression, StatsDeriveResult 
stats) {
+    public static ColumnStatistic estimate(Expression expression, 
StatsDeriveResult stats) {
         return INSTANCE.visit(expression, stats);
     }
 
     @Override
-    public ColumnStat visit(Expression expr, StatsDeriveResult context) {
+    public ColumnStatistic visit(Expression expr, StatsDeriveResult context) {
         return expr.accept(this, context);
     }
 
     //TODO: case-when need to re-implemented
     @Override
-    public ColumnStat visitCaseWhen(CaseWhen caseWhen, StatsDeriveResult 
context) {
-        ColumnStat columnStat = new ColumnStat();
+    public ColumnStatistic visitCaseWhen(CaseWhen caseWhen, StatsDeriveResult 
context) {
+        ColumnStatisticBuilder columnStat = new ColumnStatisticBuilder();
         columnStat.setNdv(caseWhen.getWhenClauses().size() + 1);
-        columnStat.setSelectivity(1.0);
         columnStat.setMinValue(0);
         columnStat.setMaxValue(Double.MAX_VALUE);
         columnStat.setAvgSizeByte(8);
         columnStat.setNumNulls(0);
-        columnStat.setMaxSizeByte(8);
-        return columnStat;
+        return columnStat.createColumnStatistic();
     }
 
-    public ColumnStat visitCast(Cast cast, StatsDeriveResult context) {
+    public ColumnStatistic visitCast(Cast cast, StatsDeriveResult context) {
         return cast.child().accept(this, context);
     }
 
     @Override
-    public ColumnStat visitLiteral(Literal literal, StatsDeriveResult context) 
{
-        if 
(ColumnStat.MAX_MIN_UNSUPPORTED_TYPE.contains(literal.getDataType().toCatalogDataType()))
 {
-            return ColumnStat.UNKNOWN;
+    public ColumnStatistic visitLiteral(Literal literal, StatsDeriveResult 
context) {
+        if 
(ColumnStatistic.MAX_MIN_UNSUPPORTED_TYPE.contains(literal.getDataType().toCatalogDataType()))
 {
+            return ColumnStatistic.DEFAULT;
         }
         double literalVal = literal.getDouble();
-        ColumnStat columnStat = new ColumnStat();
+        ColumnStatisticBuilder columnStat = new ColumnStatisticBuilder();
         columnStat.setMaxValue(literalVal);
         columnStat.setMinValue(literalVal);
         columnStat.setNdv(1);
         columnStat.setNumNulls(1);
         columnStat.setAvgSizeByte(1);
-        return columnStat;
+        return columnStat.createColumnStatistic();

Review Comment:
   ```suggestion
           return builder.build();
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/ExpressionEstimation.java:
##########
@@ -153,82 +159,98 @@ public ColumnStat visitBinaryArithmetic(BinaryArithmetic 
binaryArithmetic, Stats
                             Math.max(leftMin / noneZeroDivisor(rightMin), 
leftMin / noneZeroDivisor(rightMax)),
                             leftMax / noneZeroDivisor(rightMin)),
                     leftMax / noneZeroDivisor(rightMax));
-            return new ColumnStat(ndv, leftColStats.getAvgSizeByte(), 
leftColStats.getMaxSizeByte(),
-                    numNulls, min, max);
+            return new 
ColumnStatisticBuilder().setCount(count).setNdv(ndv).setAvgSizeByte(leftColStats.avgSizeByte)
+                    
.setNumNulls(numNulls).setDataSize(binaryArithmetic.getDataType().width()).setMinValue(min)
+                    
.setMaxValue(max).setSelectivity(1.0).setMaxExpr(null).setMinExpr(null).createColumnStatistic();
         }
-        return ColumnStat.UNKNOWN;
+        return ColumnStatistic.UNKNOWN;
     }
 
     private double noneZeroDivisor(double d) {
         return d == 0.0 ? 1.0 : d;
     }
 
     @Override
-    public ColumnStat visitMin(Min min, StatsDeriveResult context) {
+    public ColumnStatistic visitMin(Min min, StatsDeriveResult context) {
         Expression child = min.child();
-        ColumnStat columnStat = child.accept(this, context);
-        if (columnStat == ColumnStat.UNKNOWN) {
-            return ColumnStat.UNKNOWN;
+        ColumnStatistic columnStat = child.accept(this, context);
+        if (columnStat == ColumnStatistic.UNKNOWN) {
+            return ColumnStatistic.UNKNOWN;
         }
-        return new ColumnStat(1, min.child().getDataType().width(),
-                min.child().getDataType().width(), 1, 
columnStat.getMinValue(), columnStat.getMinValue());
+        double width = min.child().getDataType().width();
+        return new 
ColumnStatisticBuilder().setCount(1).setNdv(1).setAvgSizeByte(width).setNumNulls(width)
+                
.setDataSize(1).setMinValue(columnStat.minValue).setMaxValue(columnStat.minValue).setSelectivity(1.0)

Review Comment:
   dataSize should not be 1



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/ExpressionEstimation.java:
##########
@@ -153,82 +159,98 @@ public ColumnStat visitBinaryArithmetic(BinaryArithmetic 
binaryArithmetic, Stats
                             Math.max(leftMin / noneZeroDivisor(rightMin), 
leftMin / noneZeroDivisor(rightMax)),
                             leftMax / noneZeroDivisor(rightMin)),
                     leftMax / noneZeroDivisor(rightMax));
-            return new ColumnStat(ndv, leftColStats.getAvgSizeByte(), 
leftColStats.getMaxSizeByte(),
-                    numNulls, min, max);
+            return new 
ColumnStatisticBuilder().setCount(count).setNdv(ndv).setAvgSizeByte(leftColStats.avgSizeByte)
+                    
.setNumNulls(numNulls).setDataSize(binaryArithmetic.getDataType().width()).setMinValue(min)
+                    
.setMaxValue(max).setSelectivity(1.0).setMaxExpr(null).setMinExpr(null).createColumnStatistic();
         }
-        return ColumnStat.UNKNOWN;
+        return ColumnStatistic.UNKNOWN;
     }
 
     private double noneZeroDivisor(double d) {
         return d == 0.0 ? 1.0 : d;
     }
 
     @Override
-    public ColumnStat visitMin(Min min, StatsDeriveResult context) {
+    public ColumnStatistic visitMin(Min min, StatsDeriveResult context) {
         Expression child = min.child();
-        ColumnStat columnStat = child.accept(this, context);
-        if (columnStat == ColumnStat.UNKNOWN) {
-            return ColumnStat.UNKNOWN;
+        ColumnStatistic columnStat = child.accept(this, context);
+        if (columnStat == ColumnStatistic.UNKNOWN) {
+            return ColumnStatistic.UNKNOWN;
         }
-        return new ColumnStat(1, min.child().getDataType().width(),
-                min.child().getDataType().width(), 1, 
columnStat.getMinValue(), columnStat.getMinValue());
+        double width = min.child().getDataType().width();
+        return new 
ColumnStatisticBuilder().setCount(1).setNdv(1).setAvgSizeByte(width).setNumNulls(width)

Review Comment:
   set count and ndv to 1 is not correct. @englefly do we change it later?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/FilterEstimation.java:
##########
@@ -108,30 +121,31 @@ public StatsDeriveResult 
visitComparisonPredicate(ComparisonPredicate cp, Estima
         boolean isNot = (context != null) && context.isNot;
         Expression left = cp.left();
         Expression right = cp.right();
-        ColumnStat statsForLeft = ExpressionEstimation.estimate(left, 
inputStats);
-        ColumnStat statsForRight = ExpressionEstimation.estimate(right, 
inputStats);
-
+        ColumnStatistic statsForLeft = ExpressionEstimation.estimate(left, 
inputStats);
+        ColumnStatistic statsForRight = ExpressionEstimation.estimate(right, 
inputStats);
+        ColumnStatisticBuilder leftBuilder = new 
ColumnStatisticBuilder(statsForLeft);
         double selectivity;
         if (!(left instanceof Literal) && !(right instanceof Literal)) {
             selectivity = calculateWhenBothChildIsColumn(cp, statsForLeft, 
statsForRight);
         } else {
             // For literal, it's max min is same value.
             selectivity = updateLeftStatsWhenRightChildIsLiteral(cp,
-                    statsForLeft,
-                    statsForRight.getMaxValue(),
+                    leftBuilder,
+                    statsForRight.maxValue,
                     isNot);
         }
         StatsDeriveResult outputStats = new StatsDeriveResult(inputStats);
         //TODO: we take the assumption that func(A) and A have the same stats.
-        outputStats.updateBySelectivity(selectivity, cp.getInputSlots());
+
         if (left.getInputSlots().size() == 1) {
             Slot leftSlot = left.getInputSlots().iterator().next();
-            outputStats.updateColumnStatsForSlot(leftSlot, statsForLeft);
+            outputStats.addColumnStats(leftSlot.getExprId(), 
leftBuilder.createColumnStatistic());
         }
-        return outputStats;
+        return outputStats.updateBySelectivity(selectivity,
+                
cp.getInputSlots().stream().map(Slot::getExprId).collect(Collectors.toSet()));
     }
 
-    private double updateLessThan(ColumnStat statsForLeft, double val,
+    private double updateLessThan(ColumnStatisticBuilder statsForLeft, double 
val,

Review Comment:
   where to call builder's build?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java:
##########
@@ -270,26 +273,25 @@ private StatsDeriveResult computeFilter(Filter filter) {
     //       2. Consider the influence of runtime filter
     //       3. Get NDV and column data size from StatisticManger, 
StatisticManager doesn't support it now.
     private StatsDeriveResult computeScan(Scan scan) {
-        TableStats tableStats = Utils.execWithReturnVal(() ->
-                // TODO: tmp mock the table stats, after we support the table 
stats, we should remove this mock.
-                mockRowCountInStatistic(scan)
-        );
-        Map<Slot, ColumnStat> slotToColumnStats = new HashMap<>();
         Set<SlotReference> slotSet = 
scan.getOutput().stream().filter(SlotReference.class::isInstance)
                 .map(s -> (SlotReference) s).collect(Collectors.toSet());
+        Map<Id, ColumnStatistic> columnStatisticMap = new HashMap<>();
+        Table table = scan.getTable();
+        double rowCount = Double.NaN;
         for (SlotReference slotReference : slotSet) {
             String colName = slotReference.getName();
             if (colName == null) {
                 throw new RuntimeException("Column name of SlotReference 
shouldn't be null here");
             }
-            ColumnStat columnStats = 
tableStats.getColumnStatsOrDefault(colName);
-            slotToColumnStats.put(slotReference, columnStats);
+            ColumnStatistic statistic =
+                    
Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(table.getId(), 
colName);
+            if (statistic == ColumnStatistic.UNKNOWN) {
+                statistic = ColumnStatistic.DEFAULT;
+            }
+            rowCount = statistic.count;

Review Comment:
   if we have no statistics use mockRowCountInStatistic to get real row count



##########
fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java:
##########
@@ -251,7 +251,19 @@ private void handleQuery() {
                 
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
                 .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
                 .setSqlHash(ctx.getSqlHash());
-
+        // StmtExecutor stmtExecutor = new StmtExecutor(ctx, originStmt);
+        // try {
+        //     UUID uuid = UUID.randomUUID();
+        //     TUniqueId queryId = new 
TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
+        //     ctx.setQueryId(queryId);
+        //     List<TResultBatch> resultBatches = 
stmtExecutor.executeInternalSQL();
+        //     System.out.println(resultBatches);
+        // } catch (Exception e) {
+        //     LOG.warn("Fuck ", e);
+        // }
+        // if (true) {
+        //     return;
+        // }

Review Comment:
   remove it



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/DropAnalysisJobLog.java:
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.common.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class DropAnalysisJobLog implements Writable {

Review Comment:
   put xxxLog into persist dir



##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -714,3 +714,15 @@ service FrontendService {
 
     AgentService.TGetStoragePolicyResult refreshStoragePolicy()
 }
+
+struct TGetAnalysisJob {
+     1 : optional i64 jobId
+     2 : optional string catalogName
+     3 : optional string dbName
+     4 : optional string tblName
+     5 : optional string colName
+     6 : optional string type
+     7 : optional string message
+     8 : optional i32 lastExecTimeInMs

Review Comment:
   ```suggestion
        8 : optional i32 lastExecTimeMs
   ```



##########
gensrc/thrift/Descriptors.thrift:
##########
@@ -316,3 +317,17 @@ struct TDescriptorTable {
   // all table descriptors referenced by tupleDescriptors
   3: optional list<TTableDescriptor> tableDescriptors;
 }
+
+struct TAnalysisJob {
+   1 : optional i64 jobId
+   2 : optional string catalogName
+   3 : optional string dbName
+   4 : optional string tblName
+   5 : optional string colName
+   6 : optional string jobType
+   7 : optional string analysisType
+   8 : optional string message
+   9 : optional i32 lastExecTimeInMs

Review Comment:
   ```suggestion
      9 : optional i32 lastExecTimeMs
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheKey.java:
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import java.util.Objects;
+
+public class StatisticsCacheKey {
+
+    public final long tableId;

Review Comment:
   add some comment to explain it is may index id?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/ExpressionEstimation.java:
##########
@@ -38,94 +38,99 @@
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.util.Utils;
-import org.apache.doris.statistics.ColumnStat;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.ColumnStatisticBuilder;
 import org.apache.doris.statistics.StatsDeriveResult;
 
 import com.google.common.base.Preconditions;
 
 /**
  * Used to estimate for expressions that not producing boolean value.
  */
-public class ExpressionEstimation extends ExpressionVisitor<ColumnStat, 
StatsDeriveResult> {
+public class ExpressionEstimation extends ExpressionVisitor<ColumnStatistic, 
StatsDeriveResult> {
 
     private static ExpressionEstimation INSTANCE = new ExpressionEstimation();
 
     /**
      * returned columnStat is newly created or a copy of stats
      */
-    public static ColumnStat estimate(Expression expression, StatsDeriveResult 
stats) {
+    public static ColumnStatistic estimate(Expression expression, 
StatsDeriveResult stats) {
         return INSTANCE.visit(expression, stats);
     }
 
     @Override
-    public ColumnStat visit(Expression expr, StatsDeriveResult context) {
+    public ColumnStatistic visit(Expression expr, StatsDeriveResult context) {
         return expr.accept(this, context);
     }
 
     //TODO: case-when need to re-implemented
     @Override
-    public ColumnStat visitCaseWhen(CaseWhen caseWhen, StatsDeriveResult 
context) {
-        ColumnStat columnStat = new ColumnStat();
+    public ColumnStatistic visitCaseWhen(CaseWhen caseWhen, StatsDeriveResult 
context) {
+        ColumnStatisticBuilder columnStat = new ColumnStatisticBuilder();
         columnStat.setNdv(caseWhen.getWhenClauses().size() + 1);
-        columnStat.setSelectivity(1.0);
         columnStat.setMinValue(0);
         columnStat.setMaxValue(Double.MAX_VALUE);
         columnStat.setAvgSizeByte(8);
         columnStat.setNumNulls(0);
-        columnStat.setMaxSizeByte(8);
-        return columnStat;
+        return columnStat.createColumnStatistic();
     }
 
-    public ColumnStat visitCast(Cast cast, StatsDeriveResult context) {
+    public ColumnStatistic visitCast(Cast cast, StatsDeriveResult context) {
         return cast.child().accept(this, context);
     }
 
     @Override
-    public ColumnStat visitLiteral(Literal literal, StatsDeriveResult context) 
{
-        if 
(ColumnStat.MAX_MIN_UNSUPPORTED_TYPE.contains(literal.getDataType().toCatalogDataType()))
 {
-            return ColumnStat.UNKNOWN;
+    public ColumnStatistic visitLiteral(Literal literal, StatsDeriveResult 
context) {
+        if 
(ColumnStatistic.MAX_MIN_UNSUPPORTED_TYPE.contains(literal.getDataType().toCatalogDataType()))
 {
+            return ColumnStatistic.DEFAULT;
         }
         double literalVal = literal.getDouble();
-        ColumnStat columnStat = new ColumnStat();
+        ColumnStatisticBuilder columnStat = new ColumnStatisticBuilder();
         columnStat.setMaxValue(literalVal);
         columnStat.setMinValue(literalVal);
         columnStat.setNdv(1);
         columnStat.setNumNulls(1);
         columnStat.setAvgSizeByte(1);
-        return columnStat;
+        return columnStat.createColumnStatistic();
+
     }
 
     @Override
-    public ColumnStat visitSlotReference(SlotReference slotReference, 
StatsDeriveResult context) {
-        ColumnStat columnStat = context.getColumnStatsBySlot(slotReference);
+    public ColumnStatistic visitSlotReference(SlotReference slotReference, 
StatsDeriveResult context) {
+        ColumnStatistic columnStat = 
context.getColumnStatsBySlotId(slotReference.getExprId());
         Preconditions.checkState(columnStat != null);
         return columnStat.copy();
     }
 
     @Override
-    public ColumnStat visitBinaryArithmetic(BinaryArithmetic binaryArithmetic, 
StatsDeriveResult context) {
-        ColumnStat leftColStats = binaryArithmetic.left().accept(this, 
context);
-        ColumnStat rightColStats = binaryArithmetic.right().accept(this, 
context);
-        double leftNdv = leftColStats.getNdv();
-        double rightNdv = rightColStats.getNdv();
+    public ColumnStatistic visitBinaryArithmetic(BinaryArithmetic 
binaryArithmetic, StatsDeriveResult context) {
+        ColumnStatistic leftColStats = binaryArithmetic.left().accept(this, 
context);
+        ColumnStatistic rightColStats = binaryArithmetic.right().accept(this, 
context);
+        double leftNdv = leftColStats.ndv;
+        double rightNdv = rightColStats.ndv;
         double ndv = Math.max(leftNdv, rightNdv);
-        double leftNullCount = leftColStats.getNumNulls();
-        double rightNullCount = rightColStats.getNumNulls();
+        double leftNullCount = leftColStats.numNulls;
+        double rightNullCount = rightColStats.numNulls;
         double rowCount = context.getRowCount();
         double numNulls = context.getRowCount()
                 * (1 - (1 - (leftNullCount / rowCount) * (1 - rightNullCount / 
rowCount)));
-        double leftMax = leftColStats.getMaxValue();
-        double rightMax = rightColStats.getMaxValue();
-        double leftMin = leftColStats.getMinValue();
-        double rightMin = rightColStats.getMinValue();
-
+        double leftMax = leftColStats.maxValue;
+        double rightMax = rightColStats.maxValue;
+        double leftMin = leftColStats.minValue;
+        double rightMin = rightColStats.minValue;
+        double count = Math.max(leftColStats.count, rightColStats.count);

Review Comment:
   what is count? rowCount? what's different with original rowCount?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java:
##########
@@ -255,8 +255,8 @@ public int hashCode() {
         return Objects.hash(children, plan);
     }
 
-    public StatsDeriveResult getCopyOfChildStats(int idx) {
-        return child(idx).getStatistics().copy();
+    public StatsDeriveResult childStatistics(int idx) {
+        return child(idx).getStatistics();

Review Comment:
   statistics is immutable?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/JoinEstimation.java:
##########
@@ -311,14 +311,24 @@ private static long getSemiJoinRowCount(StatsDeriveResult 
leftStats, StatsDerive
             }
             rowCount = leftStats.getRowCount();
         }
-        Map<Slot, ColumnStat> leftSlotToColStats = 
leftStats.getSlotToColumnStats();
-        Map<Slot, ColumnStat> rightSlotToColStats = 
rightStats.getSlotToColumnStats();
+        Map<Id, ColumnStatistic> leftSlotToColStats = 
leftStats.getSlotIdToColumnStats();
+        Map<Id, ColumnStatistic> rightSlotToColStats = 
rightStats.getSlotIdToColumnStats();
         double minSelectivity = 1.0;
         for (Expression hashConjunct : hashConjuncts) {
             // TODO: since we have no column stats here. just use a fix ratio 
to compute the row count.
-            double lhsNdv = 
leftSlotToColStats.get(removeCast(hashConjunct.child(0))).getNdv();
+            Expression leftChild = (SlotReference) 
removeCast(hashConjunct.child(0));

Review Comment:
   do not need removecast anymore



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -5197,4 +5216,53 @@ public int getFollowerCount() {
         }
         return count;
     }
+
+    public AnalysisJobScheduler getAnalysisJobScheduler() {
+        return analysisJobScheduler;
+    }
+
+    // TODO:
+    //  1. handle partition level analysis statement properly
+    //  2. support sample job
+    //  3. support period job
+    public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
+        String catalogName = analyzeStmt.getCatalogName();
+        String db = analyzeStmt.getDBName();
+        String tbl = analyzeStmt.getTblName();
+        List<String> colNames = analyzeStmt.getOptColumnNames();
+        String persistAnalysisJobSQLTemplate = "INSERT INTO " + 
StatisticConstants.STATISTIC_DB_NAME + "."
+                + StatisticConstants.ANALYSIS_JOB_TABLE + " VALUES(${jobId}, 
'${catalogName}', '${dbName}',"
+                + "'${tblName}','${colName}', '${jobType}', '${analysisType}', 
'${message}', '${lastExecTimeInMs}',"
+                + "'${state}', '${scheduleType}')";
+        if (colNames != null) {
+            for (String colName : colNames) {
+                AnalysisJobInfo analysisJobInfo = new 
AnalysisJobInfo(Env.getCurrentEnv().getNextId(), catalogName, db,
+                        tbl, colName, AnalysisJobInfo.JobType.MANUAL, 
ScheduleType.ONCE);
+                analysisJobInfo.analysisType = AnalysisType.FULL;
+                Map<String, String> params = new HashMap<>();
+                params.put("jobId", String.valueOf(analysisJobInfo.jobId));
+                params.put("catalogName", analysisJobInfo.catalogName);
+                params.put("dbName", analysisJobInfo.dbName);
+                params.put("tblName", analysisJobInfo.tblName);
+                params.put("colName", analysisJobInfo.colName);
+                params.put("jobType", analysisJobInfo.jobType.toString());
+                params.put("analysisType", 
analysisJobInfo.analysisType.toString());
+                params.put("message", "");
+                params.put("lastExecTimeInMs", "0");
+                params.put("state", JobState.PENDING.toString());
+                params.put("scheduleType", 
analysisJobInfo.scheduleType.toString());

Review Comment:
   i think u should refactor execUpdate method to hide details such as params 
name and persistAnalysisJobSQLTemplate



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java:
##########
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.persist.AnalysisJobScheduler;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.AnalysisJobInfo.JobState;
+
+import org.apache.commons.text.StringSubstitutor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AnalysisJob {
+
+    private final AnalysisJobScheduler analysisJobScheduler;
+
+    private final AnalysisJobInfo info;
+
+    private CatalogIf catalog;
+
+    private Database db;
+
+    private Table tbl;
+
+    private Column col;
+
+    private StmtExecutor stmtExecutor;
+
+    public AnalysisJob(AnalysisJobScheduler analysisJobScheduler, 
AnalysisJobInfo info) {
+        this.analysisJobScheduler = analysisJobScheduler;
+        this.info = info;
+        init(info);
+    }
+
+    private void init(AnalysisJobInfo info) {
+        catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
+        if (catalog == null) {
+            analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
+                    String.format("Catalog with name: %s not exists", 
info.dbName), System.currentTimeMillis());
+            return;
+        }
+        db = 
Env.getCurrentEnv().getInternalCatalog().getDb(info.dbName).orElse(null);
+        if (db == null) {
+            analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
+                    String.format("DB with name %s not exists", info.dbName), 
System.currentTimeMillis());
+            return;
+        }
+        tbl = db.getTable(info.tblName).orElse(null);
+        if (tbl == null) {
+            analysisJobScheduler.updateJobStatus(
+                    info.jobId, JobState.FAILED,
+                    String.format("Table with name %s not exists", 
info.tblName), System.currentTimeMillis());
+        }
+        col = tbl.getColumn(info.colName);
+        if (col == null) {
+            analysisJobScheduler.updateJobStatus(
+                    info.jobId, JobState.FAILED, String.format("Column with 
name %s not exists", info.tblName),
+                    System.currentTimeMillis());
+        }
+    }
+
+    private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + " SELECT "
+            + "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, "
+            + "${catalogId} AS catalog_id, "
+            + "${dbId} AS db_id, "
+            + "${tblId} AS tbl_id, "
+            + "'${colId}' AS col_id, "
+            + "${partId} AS part_id, "
+            + "COUNT(1) AS row_count, "
+            + "NDV(${colName}) AS ndv, "
+            + "SUM(CASE WHEN ${colName} IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
+            + "MIN(${colName}) AS min, "
+            + "MAX(${colName}) AS max, "
+            + "${dataSizeFunction} AS data_size, "
+            + "NOW()"
+            + "FROM `${dbName}`.`${tblName}` "
+            + "PARTITION ${partName}";
+
+    private static final String ANALYZE_COLUMN_SQL_TEMPLATE = "INSERT INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + "    SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, 
row_count, "
+            + "        ndv, null_count, min, max, data_size, update_time\n"

Review Comment:
   ```suggestion
               + "        `ndv`, `null_count`, `min`, `max`, `data_size`, 
`update_time`\n"
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/ExpressionEstimation.java:
##########
@@ -38,94 +38,99 @@
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.util.Utils;
-import org.apache.doris.statistics.ColumnStat;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.ColumnStatisticBuilder;
 import org.apache.doris.statistics.StatsDeriveResult;
 
 import com.google.common.base.Preconditions;
 
 /**
  * Used to estimate for expressions that not producing boolean value.
  */
-public class ExpressionEstimation extends ExpressionVisitor<ColumnStat, 
StatsDeriveResult> {
+public class ExpressionEstimation extends ExpressionVisitor<ColumnStatistic, 
StatsDeriveResult> {
 
     private static ExpressionEstimation INSTANCE = new ExpressionEstimation();
 
     /**
      * returned columnStat is newly created or a copy of stats
      */
-    public static ColumnStat estimate(Expression expression, StatsDeriveResult 
stats) {
+    public static ColumnStatistic estimate(Expression expression, 
StatsDeriveResult stats) {
         return INSTANCE.visit(expression, stats);
     }
 
     @Override
-    public ColumnStat visit(Expression expr, StatsDeriveResult context) {
+    public ColumnStatistic visit(Expression expr, StatsDeriveResult context) {
         return expr.accept(this, context);
     }
 
     //TODO: case-when need to re-implemented
     @Override
-    public ColumnStat visitCaseWhen(CaseWhen caseWhen, StatsDeriveResult 
context) {
-        ColumnStat columnStat = new ColumnStat();
+    public ColumnStatistic visitCaseWhen(CaseWhen caseWhen, StatsDeriveResult 
context) {
+        ColumnStatisticBuilder columnStat = new ColumnStatisticBuilder();
         columnStat.setNdv(caseWhen.getWhenClauses().size() + 1);
-        columnStat.setSelectivity(1.0);
         columnStat.setMinValue(0);
         columnStat.setMaxValue(Double.MAX_VALUE);
         columnStat.setAvgSizeByte(8);
         columnStat.setNumNulls(0);
-        columnStat.setMaxSizeByte(8);
-        return columnStat;
+        return columnStat.createColumnStatistic();
     }
 
-    public ColumnStat visitCast(Cast cast, StatsDeriveResult context) {
+    public ColumnStatistic visitCast(Cast cast, StatsDeriveResult context) {
         return cast.child().accept(this, context);
     }
 
     @Override
-    public ColumnStat visitLiteral(Literal literal, StatsDeriveResult context) 
{
-        if 
(ColumnStat.MAX_MIN_UNSUPPORTED_TYPE.contains(literal.getDataType().toCatalogDataType()))
 {
-            return ColumnStat.UNKNOWN;
+    public ColumnStatistic visitLiteral(Literal literal, StatsDeriveResult 
context) {
+        if 
(ColumnStatistic.MAX_MIN_UNSUPPORTED_TYPE.contains(literal.getDataType().toCatalogDataType()))
 {
+            return ColumnStatistic.DEFAULT;
         }
         double literalVal = literal.getDouble();
-        ColumnStat columnStat = new ColumnStat();
+        ColumnStatisticBuilder columnStat = new ColumnStatisticBuilder();
         columnStat.setMaxValue(literalVal);
         columnStat.setMinValue(literalVal);
         columnStat.setNdv(1);
         columnStat.setNumNulls(1);
         columnStat.setAvgSizeByte(1);
-        return columnStat;
+        return columnStat.createColumnStatistic();
+
     }
 
     @Override
-    public ColumnStat visitSlotReference(SlotReference slotReference, 
StatsDeriveResult context) {
-        ColumnStat columnStat = context.getColumnStatsBySlot(slotReference);
+    public ColumnStatistic visitSlotReference(SlotReference slotReference, 
StatsDeriveResult context) {
+        ColumnStatistic columnStat = 
context.getColumnStatsBySlotId(slotReference.getExprId());
         Preconditions.checkState(columnStat != null);
         return columnStat.copy();
     }
 
     @Override
-    public ColumnStat visitBinaryArithmetic(BinaryArithmetic binaryArithmetic, 
StatsDeriveResult context) {
-        ColumnStat leftColStats = binaryArithmetic.left().accept(this, 
context);
-        ColumnStat rightColStats = binaryArithmetic.right().accept(this, 
context);
-        double leftNdv = leftColStats.getNdv();
-        double rightNdv = rightColStats.getNdv();
+    public ColumnStatistic visitBinaryArithmetic(BinaryArithmetic 
binaryArithmetic, StatsDeriveResult context) {
+        ColumnStatistic leftColStats = binaryArithmetic.left().accept(this, 
context);
+        ColumnStatistic rightColStats = binaryArithmetic.right().accept(this, 
context);

Review Comment:
   ```suggestion
           ColumnStatistic leftColStatistic = 
binaryArithmetic.left().accept(this, context);
           ColumnStatistic rightColStatistic = 
binaryArithmetic.right().accept(this, context);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java:
##########
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.persist.AnalysisJobScheduler;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.AnalysisJobInfo.JobState;
+
+import org.apache.commons.text.StringSubstitutor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AnalysisJob {
+
+    private final AnalysisJobScheduler analysisJobScheduler;
+
+    private final AnalysisJobInfo info;
+
+    private CatalogIf catalog;
+
+    private Database db;
+
+    private Table tbl;
+
+    private Column col;
+
+    private StmtExecutor stmtExecutor;
+
+    public AnalysisJob(AnalysisJobScheduler analysisJobScheduler, 
AnalysisJobInfo info) {
+        this.analysisJobScheduler = analysisJobScheduler;
+        this.info = info;
+        init(info);
+    }
+
+    private void init(AnalysisJobInfo info) {
+        catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
+        if (catalog == null) {
+            analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
+                    String.format("Catalog with name: %s not exists", 
info.dbName), System.currentTimeMillis());
+            return;
+        }
+        db = 
Env.getCurrentEnv().getInternalCatalog().getDb(info.dbName).orElse(null);
+        if (db == null) {
+            analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
+                    String.format("DB with name %s not exists", info.dbName), 
System.currentTimeMillis());
+            return;
+        }
+        tbl = db.getTable(info.tblName).orElse(null);
+        if (tbl == null) {
+            analysisJobScheduler.updateJobStatus(
+                    info.jobId, JobState.FAILED,
+                    String.format("Table with name %s not exists", 
info.tblName), System.currentTimeMillis());
+        }
+        col = tbl.getColumn(info.colName);
+        if (col == null) {
+            analysisJobScheduler.updateJobStatus(
+                    info.jobId, JobState.FAILED, String.format("Column with 
name %s not exists", info.tblName),
+                    System.currentTimeMillis());
+        }
+    }
+
+    private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + " SELECT "
+            + "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, "
+            + "${catalogId} AS catalog_id, "
+            + "${dbId} AS db_id, "
+            + "${tblId} AS tbl_id, "
+            + "'${colId}' AS col_id, "
+            + "${partId} AS part_id, "
+            + "COUNT(1) AS row_count, "
+            + "NDV(${colName}) AS ndv, "
+            + "SUM(CASE WHEN ${colName} IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
+            + "MIN(${colName}) AS min, "
+            + "MAX(${colName}) AS max, "
+            + "${dataSizeFunction} AS data_size, "
+            + "NOW()"
+            + "FROM `${dbName}`.`${tblName}` "
+            + "PARTITION ${partName}";
+
+    private static final String ANALYZE_COLUMN_SQL_TEMPLATE = "INSERT INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + "    SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, 
row_count, "
+            + "        ndv, null_count, min, max, data_size, update_time\n"
+            + "    FROM \n"
+            + "     (SELECT CONCAT(${tblId}, '-', '${colId}') AS id, "
+            + "         ${catalogId} AS catalog_id, "
+            + "         ${dbId} AS db_id, "
+            + "         ${tblId} AS tbl_id, "
+            + "         '${colId}' AS col_id, "
+            + "         NULL AS part_id, "
+            + "         SUM(count) AS row_count, \n"
+            + "         SUM(null_count) AS null_count, "
+            + "         MIN(CAST(min AS ${type})) AS min, "
+            + "         MAX(CAST(max AS ${type})) AS max, "
+            + "         SUM(data_size_in_bytes) AS data_size, "
+            + "         NOW() AS update_time\n"
+            + "     FROM ${internalDB}.${columnStatTbl}"
+            + "     WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND 
"
+            + "     ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND "
+            + "      ${internalDB}.${columnStatTbl}.col_id='${colId}'"
+            + "     ) t1, \n"
+            + "     (SELECT NDV(${colName}) AS ndv FROM 
`${dbName}`.`${tblName}`) t2\n";
+
+    public String getDataSizeFunction() {

Review Comment:
   private



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java:
##########
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.persist.AnalysisJobScheduler;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.AnalysisJobInfo.JobState;
+
+import org.apache.commons.text.StringSubstitutor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AnalysisJob {
+
+    private final AnalysisJobScheduler analysisJobScheduler;
+
+    private final AnalysisJobInfo info;
+
+    private CatalogIf catalog;
+
+    private Database db;
+
+    private Table tbl;
+
+    private Column col;
+
+    private StmtExecutor stmtExecutor;
+
+    public AnalysisJob(AnalysisJobScheduler analysisJobScheduler, 
AnalysisJobInfo info) {
+        this.analysisJobScheduler = analysisJobScheduler;
+        this.info = info;
+        init(info);
+    }
+
+    private void init(AnalysisJobInfo info) {
+        catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
+        if (catalog == null) {
+            analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
+                    String.format("Catalog with name: %s not exists", 
info.dbName), System.currentTimeMillis());
+            return;
+        }
+        db = 
Env.getCurrentEnv().getInternalCatalog().getDb(info.dbName).orElse(null);
+        if (db == null) {
+            analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
+                    String.format("DB with name %s not exists", info.dbName), 
System.currentTimeMillis());
+            return;
+        }
+        tbl = db.getTable(info.tblName).orElse(null);
+        if (tbl == null) {
+            analysisJobScheduler.updateJobStatus(
+                    info.jobId, JobState.FAILED,
+                    String.format("Table with name %s not exists", 
info.tblName), System.currentTimeMillis());
+        }
+        col = tbl.getColumn(info.colName);
+        if (col == null) {
+            analysisJobScheduler.updateJobStatus(
+                    info.jobId, JobState.FAILED, String.format("Column with 
name %s not exists", info.tblName),
+                    System.currentTimeMillis());
+        }
+    }
+
+    private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + " SELECT "
+            + "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, "
+            + "${catalogId} AS catalog_id, "
+            + "${dbId} AS db_id, "
+            + "${tblId} AS tbl_id, "
+            + "'${colId}' AS col_id, "
+            + "${partId} AS part_id, "
+            + "COUNT(1) AS row_count, "
+            + "NDV(${colName}) AS ndv, "
+            + "SUM(CASE WHEN ${colName} IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
+            + "MIN(${colName}) AS min, "
+            + "MAX(${colName}) AS max, "
+            + "${dataSizeFunction} AS data_size, "
+            + "NOW()"
+            + "FROM `${dbName}`.`${tblName}` "
+            + "PARTITION ${partName}";
+
+    private static final String ANALYZE_COLUMN_SQL_TEMPLATE = "INSERT INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + "    SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, 
row_count, "
+            + "        ndv, null_count, min, max, data_size, update_time\n"
+            + "    FROM \n"
+            + "     (SELECT CONCAT(${tblId}, '-', '${colId}') AS id, "
+            + "         ${catalogId} AS catalog_id, "
+            + "         ${dbId} AS db_id, "
+            + "         ${tblId} AS tbl_id, "
+            + "         '${colId}' AS col_id, "
+            + "         NULL AS part_id, "
+            + "         SUM(count) AS row_count, \n"
+            + "         SUM(null_count) AS null_count, "
+            + "         MIN(CAST(min AS ${type})) AS min, "
+            + "         MAX(CAST(max AS ${type})) AS max, "
+            + "         SUM(data_size_in_bytes) AS data_size, "
+            + "         NOW() AS update_time\n"
+            + "     FROM ${internalDB}.${columnStatTbl}"
+            + "     WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND 
"
+            + "     ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND "
+            + "      ${internalDB}.${columnStatTbl}.col_id='${colId}'"
+            + "     ) t1, \n"
+            + "     (SELECT NDV(${colName}) AS ndv FROM 
`${dbName}`.`${tblName}`) t2\n";
+
+    public String getDataSizeFunction() {
+        if (col.getType().isStringType()) {
+            return "SUM(LENGTH(${colName}))";
+        }
+        return "COUNT(1) * " + col.getType().getSlotSize();
+    }
+
+    public void execute() throws Exception {
+        tbl.readLock();
+        Map<String, String> params = new HashMap<>();
+        params.put("internalDB", StatisticConstants.STATISTIC_DB_NAME);
+        params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
+        params.put("catalogId", String.valueOf(catalog.getId()));
+        params.put("dbId", String.valueOf(db.getId()));
+        params.put("tblId", String.valueOf(tbl.getId()));
+        params.put("colId", String.valueOf(info.colName));
+        params.put("dataSizeFunction", getDataSizeFunction());
+        params.put("dbName", info.dbName);
+        params.put("colName", String.valueOf(info.colName));
+        params.put("tblName", String.valueOf(info.tblName));
+        List<String> partitionAnalysisSQLs = new ArrayList<>();
+        try {
+            tbl.readLock();
+            Set<String> partNames = tbl.getPartitionNames();
+            for (String partName : partNames) {
+                Partition part = tbl.getPartition(partName);
+                if (part == null) {
+                    continue;
+                }
+                params.put("partId", 
String.valueOf(tbl.getPartition(partName).getId()));
+                params.put("partName", String.valueOf(partName));
+                StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
+                
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
+            }
+        } finally {
+            tbl.readUnlock();
+        }
+        for (String sql : partitionAnalysisSQLs) {
+            ConnectContext connectContext = 
StatisticUtil.buildConnectContext();
+            this.stmtExecutor = new StmtExecutor(connectContext, sql);
+            this.stmtExecutor.execute();
+        }
+        params.remove("partId");
+        params.put("type", col.getType().toString());
+        StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
+        String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
+        ConnectContext connectContext = StatisticUtil.buildConnectContext();
+        this.stmtExecutor = new StmtExecutor(connectContext, sql);
+        this.stmtExecutor.execute();

Review Comment:
   why not use executeSql function in StatisticUtil



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobExecutor.java:
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
+import org.apache.doris.persist.AnalysisJobScheduler;
+import org.apache.doris.statistics.AnalysisJobInfo.JobState;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Comparator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class AnalysisJobExecutor extends Thread {
+
+    private static final Logger LOG = 
LogManager.getLogger(AnalysisJobExecutor.class);
+
+    private final ThreadPoolExecutor executors = 
ThreadPoolManager.newDaemonThreadPool(
+            Config.statistics_simultaneously_running_job_num,
+            Config.statistics_simultaneously_running_job_num, 0,
+            TimeUnit.DAYS, new LinkedBlockingQueue<>(),
+            new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
+            "Analysis Job Executor", true);
+
+    private final AnalysisJobScheduler jobScheduler;
+
+    private final Counter blockingCounter = new 
Counter(Config.statistics_simultaneously_running_job_num);

Review Comment:
   change counter name



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticCacheKey.java:
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+public class StatisticCacheKey {

Review Comment:
   delete it



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticKeeper.java:
##########
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.common.util.MasterDaemon;
+
+public class StatisticKeeper extends MasterDaemon {

Review Comment:
   remove it



##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java:
##########
@@ -537,7 +538,7 @@ public void computeStats(Analyzer analyzer) throws 
UserException {
         // update statsDeriveResult for real statistics
         // After statistics collection is complete, remove the logic
         if (analyzer.safeIsEnableJoinReorderBasedCost()) {
-            statsDeriveResult.setRowCount(cardinality);
+            statsDeriveResult = new StatsDeriveResult(cardinality, 
statsDeriveResult.getSlotIdToColumnStats());

Review Comment:
   we could add a withXXX function in StatsDeriveResult



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -5069,4 +5088,53 @@ public int getFollowerCount() {
         }
         return count;
     }
+
+    public AnalysisJobScheduler getAnalysisJobScheduler() {
+        return analysisJobScheduler;
+    }
+
+    // TODO:
+    //  1. handle partition level analysis statement properly
+    //  2. support sample job
+    //  3. support period job
+    public void createAnalysisJob(AnalyzeStmt analyzeStmt) {

Review Comment:
   i think this function should not in Env. put it into AnalysisJobScheduler is 
better than just put it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to