weizhengte commented on code in PR #8859: URL: https://github.com/apache/incubator-doris/pull/8859#discussion_r855989344
########## fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java: ########## @@ -18,62 +18,248 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AnalyzeStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.glassfish.jersey.internal.guava.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.clearspring.analytics.util.Lists; - -/* -Used to store statistics job info, -including job status, progress, etc. +/*** + * Used to store statistics job info, + * including job status, progress, etc. */ public class StatisticsJob { + private static final Logger LOG = LogManager.getLogger(StatisticsJob.class); public enum JobState { PENDING, SCHEDULING, RUNNING, FINISHED, + FAILED, CANCELLED } - private long id = -1; + protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + private final long id = Catalog.getCurrentCatalog().getNextId(); + + /** + * to be collected database stats. + */ + private final long dbId; + + /** + * to be collected table stats. + */ + private final Set<Long> tblIds; + + /** + * to be collected column stats. + */ + private final Map<Long, List<String>> tableIdToColumnName; + + private final Map<String, String> properties; + + /** + * to be executed tasks. + */ + private final List<StatisticsTask> tasks = Lists.newArrayList(); + private JobState jobState = JobState.PENDING; - // optional - // to be collected table stats - private List<Long> tableId = Lists.newArrayList(); - // to be collected column stats - private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap(); - private Map<String, String> properties; - // end + private final List<String> errorMsgs = Lists.newArrayList(); + + private final long createTime = System.currentTimeMillis(); + private long startTime = -1L; + private long finishTime = -1L; + private int progress = 0; + + public StatisticsJob(Long dbId, + Set<Long> tblIds, + Map<Long, List<String>> tableIdToColumnName, + Map<String, String> properties) { + this.dbId = dbId; + this.tblIds = tblIds; + this.tableIdToColumnName = tableIdToColumnName; + this.properties = properties == null ? Maps.newHashMap() : properties; + } + + public void readLock() { + lock.readLock().lock(); + } + + public void readUnlock() { + lock.readLock().unlock(); + } - private List<StatisticsTask> taskList = Lists.newArrayList(); + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } public long getId() { - return id; + return this.id; + } + + public long getDbId() { + return this.dbId; + } + + public Set<Long> getTblIds() { + return this.tblIds; + } + + public Map<Long, List<String>> getTableIdToColumnName() { + return this.tableIdToColumnName; } - /* - AnalyzeStmt: Analyze t1(c1), t2 - StatisticsJob: - tableId [t1, t2] - tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]> - */ - public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) { - // TODO - return new StatisticsJob(); + public Map<String, String> getProperties() { + return this.properties; + } + + public List<StatisticsTask> getTasks() { + return this.tasks; + } + + public List<String> getErrorMsgs() { + return this.errorMsgs; + } + + public JobState getJobState() { + return this.jobState; + } + + public long getCreateTime() { + return this.createTime; + } + + public long getStartTime() { + return this.startTime; + } + + public long getFinishTime() { + return this.finishTime; + } + + public int getProgress() { + return this.progress; + } + + public void updateJobState(JobState newState) throws IllegalStateException { + writeLock(); + + try { + // PENDING -> SCHEDULING/FAILED/CANCELLED + if (jobState == JobState.PENDING) { + if (newState == JobState.SCHEDULING) { + this.jobState = newState; + } else if (newState == JobState.FAILED) { + this.jobState = JobState.FAILED; + LOG.info("Statistics job(id={}) is failed", id); + } else if (newState == JobState.CANCELLED) { + this.jobState = newState; + LOG.info("Statistics job(id={}) is cancelled.", id); + } else { + LOG.info("Invalid job state transition from PENDING to " + newState); + throw new IllegalStateException("Invalid job state transition from PENDING to " + newState); + } + return; + } + + // SCHEDULING -> RUNNING/FAILED/CANCELLED + if (jobState == JobState.SCHEDULING) { + if (newState == JobState.RUNNING) { + this.jobState = newState; + // job start running, set start time + this.startTime = System.currentTimeMillis(); + } else if (newState == JobState.FAILED) { + this.jobState = newState; + LOG.info("Statistics job(id={}) is failed", id); + } else if (newState == JobState.CANCELLED) { + this.jobState = newState; + LOG.info("Statistics job(id={}) is cancelled.", id); + } else { + LOG.info("Invalid job state transition from SCHEDULING to " + newState); + throw new IllegalStateException("Invalid job state transition from SCHEDULING to " + newState); + } + return; + } + + // RUNNING -> FINISHED/FAILED/CANCELLED + if (jobState == JobState.RUNNING) { + if (newState == JobState.FINISHED) { + // set finish time + this.finishTime = System.currentTimeMillis(); + this.jobState = newState; + } else if (newState == JobState.FAILED) { + this.jobState = newState; + LOG.info("Statistics job(id={}) is failed", id); + } else if (newState == JobState.CANCELLED) { + this.jobState = newState; + LOG.info("Statistics job(id={}) is cancelled.", id); + } else { + LOG.info("Invalid job state transition from RUNNING to " + newState); + throw new IllegalStateException("Invalid job state transition from RUNNING to " + newState); + } + return; + } + + // unsupported transition + LOG.info("Invalid job(id={}) state transition from {} to {} ", id, jobState, newState); + throw new IllegalStateException("Invalid job state transition from " + jobState + " to " + newState); + } finally { + writeUnlock(); + } Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org