jackwener commented on code in PR #8859:
URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r857118324


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -18,46 +18,246 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.MasterDaemon;
 
 import com.google.common.collect.Queues;
 
-import java.util.ArrayList;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 
-/*
-Schedule statistics job.
-  1. divide job to multi task
-  2. submit all task to StatisticsTaskScheduler
-Switch job state from pending to scheduling.
+/**
+  * Schedule statistics job.
+  *     1. divide job to multi task
+  *     2. submit all task to StatisticsTaskScheduler
+  * Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans 
for a single BE,
+     * we'll divide subtasks by partition. relevant 
values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is 
for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = 
Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by 
users.
+     * if all statistics be collected at the same time, the cluster may be 
overburdened
+     * and normal query services may be affected. Therefore, we put the jobs 
into the queue
+     * and schedule them one by one, and finally divide each job to several 
subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = 
Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
         StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        if (pendingJob != null) {
+            try {
+                if (pendingJob.getTasks().size() == 0) {
+                    divide(pendingJob);
+                }
+                List<StatisticsTask> tasks = pendingJob.getTasks();
+                
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(tasks);
+                pendingJob.updateJobState(StatisticsJob.JobState.SCHEDULING);
+                pendingJobQueue.remove();
+            } catch (IllegalStateException e) {
+                // throw IllegalStateException if the queue is full, re-add 
the tasks next time
+                LOG.info("The statistics task queue is full, schedule the 
job(id={}) later", pendingJob.getId());
+            } catch (DdlException e) {
+                pendingJobQueue.remove();
+                pendingJob.updateJobState(StatisticsJob.JobState.FAILED);
+                LOG.info("Failed to schedule the statistical job(id={})", 
pendingJob.getId(), e);
+            }
+        }
     }
 
     public void addPendingJob(StatisticsJob statisticsJob) throws 
IllegalStateException {
         pendingJobQueue.add(statisticsJob);
     }
 
+    /**
+     * Statistics tasks are of the following types:
+     * table:
+     * - row_count: table row count are critical in estimating cardinality and 
memory usage of scan nodes.
+     * - data_size: table size, not applicable to CBO, mainly used to monitor 
and manage table size.
+     * column:
+     * - num_distinct_value: used to determine the selectivity of an 
equivalent expression.
+     * - min: The minimum value.
+     * - max: The maximum value.
+     * - num_nulls: number of nulls.
+     * - avg_col_len: the average length of a column, in bytes, is used for 
memory and network IO evaluation.
+     * - max_col_len: the Max length of the column, in bytes, is used for 
memory and network IO evaluation.
+     * <p>
+     * Divide:
+     * - min, max, ndv: These three full indicators are collected by a 
sub-task.
+     * - max_col_lens, avg_col_lens: Two sampling indicators were collected by 
a sub-task.
+     * <p>
+     * If the table row-count is greater than the maximum number of Be scans 
for a single BE,
+     * we'll divide subtasks by partition. relevant 
values(3700000000L&600000000L) are derived from test.
+     * <p>
+     * Eventually, we will get several subtasks of the following types:
+     *
+     * @throws DdlException DdlException
+     * @see MetaStatisticsTask
+     * @see SampleSQLStatisticsTask
+     * @see SQLStatisticsTask
+     */
+    private void divide(StatisticsJob statisticsJob) throws DdlException {
+        long jobId = statisticsJob.getId();
+        long dbId = statisticsJob.getDbId();
+        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
+        Set<Long> tblIds = statisticsJob.getTblIds();
+        Map<Long, List<String>> tableIdToColumnName = 
statisticsJob.getTableIdToColumnName();
+        List<StatisticsTask> tasks = statisticsJob.getTasks();
+        List<Long> backendIds = 
Catalog.getCurrentSystemInfo().getBackendIds(true);
+
+        for (Long tblId : tblIds) {
+            Table tbl = db.getTableOrDdlException(tblId);
+            long rowCount = tbl.getRowCount();
+            List<Long> partitionIds = ((OlapTable) tbl).getPartitionIds();
+            List<String> columnNameList = tableIdToColumnName.get(tblId);
+
+            // step 1: generate data_size task
+            StatsCategoryDesc dataSizeCategory = getTblStatsCategoryDesc(dbId, 
tblId);
+            StatsGranularityDesc dataSizeGranularity = 
getTblStatsGranularityDesc(tblId);
+            MetaStatisticsTask dataSizeTask = new MetaStatisticsTask(jobId,
+                    dataSizeGranularity, dataSizeCategory, 
Collections.singletonList(StatsType.DATA_SIZE));
+            tasks.add(dataSizeTask);
+
+            // step 2: generate row_count task
+            KeysType keysType = ((OlapTable) tbl).getKeysType();
+            if (keysType == KeysType.DUP_KEYS) {
+                StatsCategoryDesc rowCountCategory = 
getTblStatsCategoryDesc(dbId, tblId);
+                StatsGranularityDesc rowCountGranularity = 
getTblStatsGranularityDesc(tblId);
+                MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId,
+                        rowCountGranularity, rowCountCategory, 
Collections.singletonList(StatsType.ROW_COUNT));
+                tasks.add(metaTask);
+            } else {
+                if (rowCount > backendIds.size() * COUNT_MAX_SCAN_PER_TASK) {
+                    // divide subtasks by partition
+                    for (Long partitionId : partitionIds) {
+                        StatsCategoryDesc rowCountCategory = 
getTblStatsCategoryDesc(dbId, tblId);
+                        StatsGranularityDesc rowCountGranularity = 
getPartitionStatsGranularityDesc(tblId, partitionId);
+                        SQLStatisticsTask sqlTask = new 
SQLStatisticsTask(jobId,
+                                rowCountGranularity, rowCountCategory, 
Collections.singletonList(StatsType.ROW_COUNT));
+                        tasks.add(sqlTask);
+                    }
+                } else {
+                    StatsCategoryDesc rowCountCategory = 
getTblStatsCategoryDesc(dbId, tblId);
+                    StatsGranularityDesc rowCountGranularity = 
getTblStatsGranularityDesc(tblId);
+                    SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
+                            rowCountGranularity, rowCountCategory, 
Collections.singletonList(StatsType.ROW_COUNT));
+                    tasks.add(sqlTask);
+                }
+            }
+
+            // step 3: generate [min,max,ndv] task
+            if (rowCount > backendIds.size() * NDV_MAX_SCAN_PER_TASK) {
+                for (String columnName : columnNameList) {

Review Comment:
   It's more clear to use `columnNameList.foreach( ..... )`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to