morningman commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427583758


##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1899,6 +1899,14 @@ public class Config extends ConfigBase {
     @ConfField(masterOnly = true)
     public static boolean enable_hms_events_incremental_sync = false;
 
+    /**
+     * If set to true, doris will try to parse the ddl of a hive view and try 
to execute the query
+     * otherwise it will throw an AnalysisException.
+     */
+    @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)

Review Comment:
   add `description` field



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -54,35 +54,73 @@
 @Log4j2
 public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, 
C>, Writable {
 
-    @SerializedName(value = "jid")
+    @SerializedName(value = "jobId")
     private Long jobId;
 
-    @SerializedName(value = "jn")
+    @SerializedName(value = "jobName")
     private String jobName;
 
-    @SerializedName(value = "js")
+    @SerializedName(value = "jobStatus")
     private JobStatus jobStatus;
 
-    @SerializedName(value = "cdb")
+    @SerializedName(value = "currentDbName")
     private String currentDbName;
 
-    @SerializedName(value = "c")
+    @SerializedName(value = "comment")
     private String comment;
 
-    @SerializedName(value = "cu")
+    @SerializedName(value = "createUser")
     private UserIdentity createUser;
 
-    @SerializedName(value = "jc")
+    @SerializedName(value = "jobConfig")
     private JobExecutionConfiguration jobConfig;
 
-    @SerializedName(value = "ctms")
-    private Long createTimeMs;
+    @SerializedName(value = "createTimeMs")
+    private Long createTimeMs = -1L;
+
+    @SerializedName(value = "startTimeMs")
+    private Long startTimeMs = -1L;
+
+    @SerializedName(value = "finishTimeMs")
+    private Long finishTimeMs = -1L;
 
     @SerializedName(value = "sql")
     String executeSql;
 
-    @SerializedName(value = "ftm")
-    private long finishTimeMs;
+    public AbstractJob() {}
+
+    public AbstractJob(Long id) {
+        setJobId(id);
+    }
+
+    public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,

Review Comment:
   Add comment to explain the different of this 2 constructors



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map> {
             new Column("CreateTime", ScalarType.createStringType()),
             new Column("Comment", ScalarType.createStringType()));
 
+    private static final ShowResultSetMetaData TASK_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("TaskId", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Label", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Status", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("EtlInfo", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TaskInfo", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("ErrorMsg", 
ScalarType.createVarchar(20)))
+
+                    .addColumn(new Column("CreateTimeMs", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("FinishTimeMs", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TrackingUrl", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("LoadStatistic", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("User", 
ScalarType.createVarchar(20)))
+                    .build();
+
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
     static {
-        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder();
+        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder<>();
         for (int i = 0; i < SCHEMA.size(); i++) {
             builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
         }
         COLUMN_TO_INDEX = builder.build();
     }
 
-    @SerializedName(value = "lp")
-    String labelPrefix;
+    @SerializedName("taskIdList")
+    ConcurrentLinkedQueue<Long> taskIdList;
+    @SerializedName("dbId")
+    private final long dbId;
+    @SerializedName("labelName")
+    private String labelName;
+    @SerializedName("loadType")
+    private InsertJob.LoadType loadType;
+    // 0: the job status is pending
+    // n/100: n is the number of task which has been finished
+    // 99: all tasks have been finished
+    // 100: txn status is visible and load has been finished
+    @SerializedName("progress")
+    private int progress;
+    @SerializedName("failMsg")
+    private FailMsg failMsg;
+    @SerializedName("plans")
+    private List<InsertIntoTableCommand> plans;
+    private LoadStatistic loadStatistic = new LoadStatistic();
+    private Set<Long> finishedTaskIds = new HashSet<>();
+    private Set<String> tableNames;
+    private ConcurrentHashMap<Long, InsertTask> idToTasks = new 
ConcurrentHashMap<>();
+    private Map<String, String> properties;
+    private AuthorizationInfo authorizationInfo;
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+
+    private ConnectContext ctx;
+    private StmtExecutor stmtExecutor;
+    private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+    private List<TabletCommitInfo> commitInfos = new ArrayList<>();
+
+    // max save task num, do we need to config it?
+    private static final int MAX_SAVE_TASK_NUM = 100;
+
+    /**
+     * load job type
+     */
+    public enum LoadType {
+        BULK,
+        SPARK,
+        LOCAL_FILE,
+        UNKNOWN
+
+    }
+
+    public enum Priority {
+        HIGH(0),
+        NORMAL(1),
+        LOW(2);
 
-    InsertIntoTableCommand command;
+        Priority(int value) {
+            this.value = value;
+        }
 
-    StmtExecutor stmtExecutor;
+        private final int value;
 
-    ConnectContext ctx;
+        public int getValue() {
+            return value;
+        }
+    }
 
-    @SerializedName("tis")
-    ConcurrentLinkedQueue<Long> taskIdList;
+    public InsertJob(Long jobId, String jobName,
+                     JobStatus jobStatus,
+                     LabelName labelName,
+                     String comment,
+                     UserIdentity createUser,
+                     JobExecutionConfiguration jobConfig,
+                     Long createTimeMs,
+                     String executeSql) {
+        super(jobId, jobName, jobStatus, labelName.getDbName(), comment, 
createUser,
+                jobConfig, createTimeMs, executeSql, null);
+        this.dbId = ConnectContext.get().getCurrentDbId();
+        this.labelName = labelName.getLabelName();
+    }
 
-    // max save task num, do we need to config it?
-    private static final int MAX_SAVE_TASK_NUM = 100;
+    public InsertJob(ConnectContext ctx,
+                      StmtExecutor executor,
+                      String labelName,
+                      List<InsertIntoTableCommand> plans,
+                      Set<String> sinkTableNames,
+                      Map<String, String> properties,
+                      String comment,
+                      JobExecutionConfiguration jobConfig) {
+        super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, 
null,

Review Comment:
   The generation of job id be both generated outside or inside.
   In first constructor, it is passed from outside, in second constructor, is 
is generated inside.
   We should unify them



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java:
##########
@@ -99,11 +99,34 @@ public interface Job<T extends AbstractTask, C> {
 
     /**
      * Cancels all running tasks of this job.
-     *
      * @throws JobException If cancelling a running task fails.
      */
     void cancelAllTasks() throws JobException;
 
+    /**
+     * register job
+     * @throws JobException If register job failed.
+     */
+    void onRegister() throws JobException;
+
+    /**
+     * register job failed
+     * @throws JobException If failed.
+     */
+    void onRegisterFailed() throws JobException;
+
+    /**
+     * relay create job
+     * @throws JobException  If replay create failed.
+     */
+    void onReplayCreate() throws JobException;
+
+    /**
+     * relay finished or cancelled job

Review Comment:
   ```suggestion
        * replay finished or cancelled job
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -236,4 +290,135 @@ public T getJob(Long jobId) {
         return jobMap.get(jobId);
     }
 
+
+    /**
+     * get load info by db
+     *
+     * @param dbId          db id
+     * @param dbName        db name
+     * @param labelValue    label name
+     * @param accurateMatch accurate match
+     * @param jobState      state
+     * @return load infos
+     * @throws AnalysisException ex
+     */
+    public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName,
+                                                      String labelValue,
+                                                      boolean accurateMatch,
+                                                      JobState jobState) 
throws AnalysisException {
+        LinkedList<List<Comparable>> loadJobInfos = new LinkedList<>();
+        if (!Env.getCurrentEnv().getLabelProcessor().existLabelJobs(dbId)) {
+            return loadJobInfos;
+        }
+        readLock();
+        try {
+            List<InsertJob> loadJobList = 
Env.getCurrentEnv().getLabelProcessor()
+                    .filterLabelJobs(dbId, labelValue, accurateMatch);
+            // check state
+            for (InsertJob loadJob : loadJobList) {
+                try {
+                    if (jobState != null && !validState(jobState, loadJob)) {
+                        continue;
+                    }
+                    // add load job info, convert String list to Comparable 
list
+                    loadJobInfos.add(new ArrayList<>(loadJob.getShowInfo()));
+                } catch (RuntimeException e) {
+                    // ignore this load job
+                    log.warn("get load job info failed. job id: {}", 
loadJob.getJobId(), e);
+                }
+            }
+            return loadJobInfos;
+        } finally {
+            readUnlock();
+        }
+    }
+
+    private static boolean validState(JobState jobState, InsertJob loadJob) {
+        JobStatus status = loadJob.getJobStatus();
+        switch (status) {
+            case RUNNING:
+                return jobState == JobState.PENDING || jobState == JobState.ETL
+                        || jobState == JobState.LOADING || jobState == 
JobState.COMMITTED;
+            case STOPPED:
+                return jobState == JobState.CANCELLED;
+            case FINISHED:
+                return jobState == JobState.FINISHED;
+            default:
+                return false;
+        }
+    }
+
+    public void cancelLoadJob(CancelLoadStmt cs)
+            throws JobException, AnalysisException, DdlException {
+        String dbName = cs.getDbName();
+        String label = cs.getLabel();
+        String state = cs.getState();
+        CompoundPredicate.Operator operator = cs.getOperator();
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+        // List of load jobs waiting to be cancelled
+        List<InsertJob> uncompletedLoadJob;
+        readLock();
+        try {
+            List<InsertJob> loadJobs = 
Env.getCurrentEnv().getLabelProcessor().getLabelJobs(db);
+            List<InsertJob> matchLoadJobs = Lists.newArrayList();
+            addNeedCancelLoadJob(label, state, operator, loadJobs, 
matchLoadJobs);
+            if (matchLoadJobs.isEmpty()) {
+                throw new JobException("Load job does not exist");
+            }
+            // check state here
+            uncompletedLoadJob =

Review Comment:
   ```suggestion
               unfinishedLoadJob =
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+    private final Map<Long, Map<String, List<InsertJob>>> 
dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+    /**
+     * getLabelJob
+     * @param db db
+     * @return label jobs
+     * @throws JobException e
+     */
+    public List<InsertJob> getLabelJobs(Database db) throws JobException {

Review Comment:
   The method's name `getLabelJobs` and `addLabelJob` sounds strange.
   How about just `getJobs` and `addJob`?



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -303,7 +348,19 @@ public ShowResultSetMetaData getJobMetaData() {
         return builder.build();
     }
 
-    private static long getNextId() {

Review Comment:
   Why changing the method of generating task id?



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -54,35 +54,73 @@
 @Log4j2
 public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, 
C>, Writable {
 
-    @SerializedName(value = "jid")
+    @SerializedName(value = "jobId")

Review Comment:
   Do not change it.
   We suggest to use abbr in future, to minimize the edit log size



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr {
+    private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+    private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+    private final Map<Long, Map<String, List<InsertJob>>> 
dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+    // lock for export job
+    // lock is private and must use after db lock
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    private JobManager<InsertJob, ?> getJobManager() {
+        return Env.getCurrentEnv().getJobManager();
+    }
+
+    /**
+     * add load job and add tasks
+     * @param loadJob job
+     */
+    public void addLoadJob(InsertJob loadJob) throws DdlException {
+        writeLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = 
dbIdToLabelToLoadJobs.get(loadJob.getDbId());
+            if (labelToLoadJobs != null && 
labelToLoadJobs.containsKey(loadJob.getLabel())) {
+                throw new LabelAlreadyUsedException(loadJob.getLabel());
+            }
+            unprotectAddJob(loadJob);
+        } catch (LabelAlreadyUsedException e) {
+            throw new RuntimeException(e);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void unprotectAddJob(InsertJob job) throws DdlException {
+        loadIdToJob.put(job.getJobId(), job);
+        try {
+            getJobManager().registerJob(job);
+            if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+                dbIdToLabelToLoadJobs.put(job.getDbId(), new 
ConcurrentHashMap<>());
+            }
+            Map<String, List<InsertJob>> labelToLoadJobs = 
dbIdToLabelToLoadJobs.get(job.getDbId());
+            if (!labelToLoadJobs.containsKey(job.getLabel())) {
+                labelToLoadJobs.put(job.getLabel(), new ArrayList<>());
+            }
+            labelToLoadJobs.get(job.getLabel()).add(job);
+        } catch (org.apache.doris.job.exception.JobException e) {
+            throw new DdlException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * replay load job
+     * @param replayLoadLog load log
+     * @throws DdlException ex
+     */
+    public void replayLoadJob(ReplayLoadLog replayLoadLog) throws DdlException 
{
+        writeLock();
+        try {
+            if (replayLoadLog instanceof ReplayLoadLog.ReplayCreateLoadLog) {
+                InsertJob loadJob = new 
InsertJob((ReplayLoadLog.ReplayCreateLoadLog) replayLoadLog);
+                JobExecutionConfiguration jobConfig = new 
JobExecutionConfiguration();
+                jobConfig.setExecuteType(JobExecuteType.INSTANT);
+                loadJob.setJobConfig(jobConfig);
+                addLoadJob(loadJob);
+                LOG.info(new LogBuilder(LogKey.LOAD_JOB, 
loadJob.getJobId()).add("msg", "replay create load job")
+                        .build());
+            } else if (replayLoadLog instanceof 
ReplayLoadLog.ReplayEndLoadLog) {
+                InsertJob job = loadIdToJob.get(replayLoadLog.getId());
+                if (job == null) {
+                    // This should not happen.
+                    // Last time I found that when user submit a job with 
already used label, an END_LOAD_JOB edit log
+                    // will be written but the job is not added to 
'idToLoadJob', so this job here we got will be null.
+                    // And this bug has been fixed.
+                    // Just add a log here to observe.
+                    LOG.warn("job does not exist when replaying end load job 
edit log: {}", replayLoadLog);
+                    return;
+                }
+                job.unprotectReadEndOperation((ReplayLoadLog.ReplayEndLoadLog) 
replayLoadLog);
+                LOG.info(new LogBuilder(LogKey.LOAD_JOB, 
replayLoadLog.getId()).add("operation", replayLoadLog)
+                        .add("msg", "replay end load job").build());
+            } else {
+                throw new DdlException("Unsupported replay job type. ");
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    //    public void updateJobProgress(Long jobId, Long beId, TUniqueId 
loadId, TUniqueId fragmentId, long scannedRows,
+    //                                  long scannedBytes, boolean isDone) {
+    //        LoadJobExecutor job = loadIdToJob.get(jobId);
+    //        if (job != null) {
+    //            job.updateLoadingStatus(beId, loadId, fragmentId, 
scannedRows, scannedBytes, isDone);
+    //        }
+    //    }
+
+    /**
+     * cancel job
+     *
+     * @param dbName   dbName
+     * @param label    job label
+     * @param state    job state
+     * @param operator filter operator, like or equals
+     */
+    public void cancelLoadJob(String dbName, String label, String state, 
CompoundPredicate.Operator operator)
+            throws JobException, AnalysisException, DdlException {
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+        // List of load jobs waiting to be cancelled
+        List<InsertJob> uncompletedLoadJob;
+        readLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = 
dbIdToLabelToLoadJobs.get(db.getId());
+            if (labelToLoadJobs == null) {
+                throw new JobException("Load job does not exist");
+            }
+            List<InsertJob> matchLoadJobs = Lists.newArrayList();
+            addNeedCancelLoadJob(label, state, operator,
+                    
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+                    matchLoadJobs);
+            if (matchLoadJobs.isEmpty()) {
+                throw new JobException("Load job does not exist");
+            }
+            // check state here
+            uncompletedLoadJob =
+                    matchLoadJobs.stream().filter(InsertJob::isRunning)
+                            .collect(Collectors.toList());
+            if (uncompletedLoadJob.isEmpty()) {
+                throw new JobException("There is no uncompleted job");
+            }
+        } finally {
+            readUnlock();
+        }
+        for (InsertJob loadJob : uncompletedLoadJob) {
+            try {
+                loadJob.cancelJob();
+            } catch (JobException e) {
+                LOG.warn("Fail to cancel job, its label: {}", 
loadJob.getLabel());
+            }
+        }
+    }
+
+    private static void addNeedCancelLoadJob(String label, String state,

Review Comment:
   ```suggestion
       private void addNeedCancelLoadJob(String label, String state,
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map> {
             new Column("CreateTime", ScalarType.createStringType()),
             new Column("Comment", ScalarType.createStringType()));
 
+    private static final ShowResultSetMetaData TASK_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("TaskId", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Label", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Status", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("EtlInfo", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TaskInfo", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("ErrorMsg", 
ScalarType.createVarchar(20)))
+
+                    .addColumn(new Column("CreateTimeMs", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("FinishTimeMs", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TrackingUrl", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("LoadStatistic", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("User", 
ScalarType.createVarchar(20)))
+                    .build();
+
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
     static {
-        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder();
+        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder<>();
         for (int i = 0; i < SCHEMA.size(); i++) {
             builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
         }
         COLUMN_TO_INDEX = builder.build();
     }
 
-    @SerializedName(value = "lp")
-    String labelPrefix;
+    @SerializedName("taskIdList")
+    ConcurrentLinkedQueue<Long> taskIdList;
+    @SerializedName("dbId")
+    private final long dbId;
+    @SerializedName("labelName")
+    private String labelName;
+    @SerializedName("loadType")
+    private InsertJob.LoadType loadType;
+    // 0: the job status is pending
+    // n/100: n is the number of task which has been finished
+    // 99: all tasks have been finished
+    // 100: txn status is visible and load has been finished
+    @SerializedName("progress")

Review Comment:
   Use abbreviation as serialize name



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map> {
             new Column("CreateTime", ScalarType.createStringType()),
             new Column("Comment", ScalarType.createStringType()));
 
+    private static final ShowResultSetMetaData TASK_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("TaskId", 
ScalarType.createVarchar(20)))

Review Comment:
   Use String as Column Type, for all columns. Varchar 20 is not long enough



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -125,23 +273,30 @@ public void cancelTaskById(long taskId) throws 
JobException {
     }
 
     @Override
-    public boolean isReadyForScheduling(Map taskContext) {
-        return CollectionUtils.isEmpty(getRunningTasks());
+    public void cancelAllTasks() throws JobException {
+        writeLock();

Review Comment:
   What is this lock protect?



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java:
##########
@@ -99,11 +99,34 @@ public interface Job<T extends AbstractTask, C> {
 
     /**
      * Cancels all running tasks of this job.
-     *
      * @throws JobException If cancelling a running task fails.
      */
     void cancelAllTasks() throws JobException;
 
+    /**
+     * register job
+     * @throws JobException If register job failed.
+     */
+    void onRegister() throws JobException;
+
+    /**
+     * register job failed
+     * @throws JobException If failed.
+     */
+    void onRegisterFailed() throws JobException;
+
+    /**
+     * relay create job

Review Comment:
   ```suggestion
        * replay create job
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -74,30 +72,70 @@ public class InsertTask extends AbstractTask {
     }
 
     private String labelName;
-
     private InsertIntoTableCommand command;
-
     private StmtExecutor stmtExecutor;
-
     private ConnectContext ctx;
-
     private String sql;
-
     private String currentDb;
-
     private UserIdentity userIdentity;
-
+    private LoadStatistic loadStatistic;
     private AtomicBoolean isCanceled = new AtomicBoolean(false);
-
     private AtomicBoolean isFinished = new AtomicBoolean(false);
-
     private static final String LABEL_SPLITTER = "_";
 
+    private FailMsg failMsg;
+    @Getter
+    private String trackingUrl;
 
     @Getter
     @Setter
-    private LoadJob loadJob;
+    private InsertJob jobInfo;
+    private TaskType taskType = TaskType.PENDING;
+    private MergeType mergeType = MergeType.APPEND;
+
+    /**
+     * task merge type
+     */
+    enum MergeType {
+        MERGE,
+        APPEND,
+        DELETE
+    }
+
+    /**
+     * task type
+     */
+    enum TaskType {
+        UNKNOWN, // this is only for ISSUE #2354
+        PENDING,
+        LOADING,
+        FINISHED,
+        FAILED,
+        CANCELLED
+    }
+
+    public InsertTask(InsertIntoTableCommand insertInto,
+                      ConnectContext ctx, StmtExecutor executor, LoadStatistic 
statistic) {
+        this(null, insertInto, ctx, executor, statistic);
+    }
+
+    public InsertTask(String labelName, String currentDb, String sql, 
UserIdentity userIdentity) {
+        this.labelName = labelName;
+        this.sql = sql;
+        this.currentDb = currentDb;
+        this.userIdentity = userIdentity;
+        setTaskId(Env.getCurrentEnv().getNextId());
+    }
 
+    public InsertTask(String labelName, InsertIntoTableCommand insertInto,
+                       ConnectContext ctx, StmtExecutor executor, 
LoadStatistic statistic) {
+        this.labelName = labelName;

Review Comment:
   Label is not used in `InsertTask`.
   So I think we can merge this constructor and the first constructor.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -203,27 +351,273 @@ public void onTaskSuccess(InsertTask task) throws 
JobException {
 
     @Override
     public List<String> getShowInfo() {
-        return super.getCommonShowInfo();
+        readLock();
+        try {
+            // check auth
+            checkAuth("SHOW LOAD");
+            List<String> jobInfo = Lists.newArrayList();
+            // jobId
+            jobInfo.add(getJobId().toString());
+            // label
+            jobInfo.add(getLabelName());
+            // state
+            jobInfo.add(getJobStatus().name());
+
+            // progress
+            String progress = 
Env.getCurrentProgressManager().getProgressInfo(String.valueOf(getJobId()));
+            switch (getJobStatus()) {
+                case RUNNING:
+                    if (isPending()) {
+                        jobInfo.add("ETL:0%; LOAD:0%");
+                    } else {
+                        jobInfo.add("ETL:100%; LOAD:" + progress + "%");
+                    }
+                    break;
+                case FINISHED:
+                    jobInfo.add("ETL:100%; LOAD:100%");
+                    break;
+                case STOPPED:
+                default:
+                    jobInfo.add("ETL:N/A; LOAD:N/A");
+                    break;
+            }
+            // type
+            jobInfo.add(loadType.name());
+
+            // etl info
+            if (loadStatistic.getCounters().size() == 0) {
+                jobInfo.add(FeConstants.null_string);
+            } else {
+                jobInfo.add(Joiner.on("; 
").withKeyValueSeparator("=").join(loadStatistic.getCounters()));
+            }
+
+            // task info
+            jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + 
getTimeout()
+                    + "; max_filter_ratio:" + getMaxFilterRatio() + "; 
priority:" + getPriority());
+            // error msg
+            if (failMsg == null) {
+                jobInfo.add(FeConstants.null_string);
+            } else {
+                jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + 
failMsg.getMsg());
+            }
+
+            // create time
+            jobInfo.add(TimeUtils.longToTimeString(getCreateTimeMs()));
+            // etl start time
+            jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+            // etl end time
+            jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+            // load start time
+            jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+            // load end time
+            jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs()));
+            // tracking urls
+            List<String> trackingUrl = idToTasks.values().stream()
+                    .map(InsertTask::getTrackingUrl)
+                    .collect(Collectors.toList());
+            if (trackingUrl.isEmpty()) {
+                jobInfo.add(FeConstants.null_string);
+            } else {
+                jobInfo.add(trackingUrl.toString());
+            }
+            // job details
+            jobInfo.add(loadStatistic.toJson());
+            // transaction id
+            jobInfo.add(String.valueOf(0));
+            // error tablets
+            jobInfo.add(errorTabletsToJson());
+            // user, some load job may not have user info
+            if (getCreateUser() == null || getCreateUser().getQualifiedUser() 
== null) {
+                jobInfo.add(FeConstants.null_string);
+            } else {
+                jobInfo.add(getCreateUser().getQualifiedUser());
+            }
+            // comment
+            jobInfo.add(getComment());
+            return jobInfo;
+        } catch (DdlException e) {
+            throw new RuntimeException(e);
+        } finally {
+            readUnlock();
+        }
+    }
+
+    private String getPriority() {
+        return properties.getOrDefault(LoadStmt.PRIORITY, 
Priority.NORMAL.name());
+    }
+
+    public double getMaxFilterRatio() {
+        return 
Double.parseDouble(properties.getOrDefault(LoadStmt.MAX_FILTER_RATIO_PROPERTY, 
"0.0"));
+    }
+
+    public long getTimeout() {
+        if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
+            return Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY));
+        }
+        return Config.broker_load_default_timeout_second;
+    }
+
+
+    public static InsertJob readFields(DataInput in) throws IOException {
+        String jsonJob = Text.readString(in);
+        InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class);
+        job.setRunningTasks(new ArrayList<>());
+        return job;
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, GsonUtils.GSON.toJson(this));
     }
 
-    private static final ShowResultSetMetaData TASK_META_DATA =
-            ShowResultSetMetaData.builder()
-                    .addColumn(new Column("TaskId", 
ScalarType.createVarchar(20)))
-                    .addColumn(new Column("Label", 
ScalarType.createVarchar(20)))
-                    .addColumn(new Column("Status", 
ScalarType.createVarchar(20)))
-                    .addColumn(new Column("EtlInfo", 
ScalarType.createVarchar(20)))
-                    .addColumn(new Column("TaskInfo", 
ScalarType.createVarchar(20)))
-                    .addColumn(new Column("ErrorMsg", 
ScalarType.createVarchar(20)))
+    public String errorTabletsToJson() {
+        Map<Long, String> map = new HashMap<>();
+        errorTabletInfos.stream().limit(Config.max_error_tablet_of_broker_load)
+                .forEach(p -> map.put(p.getTabletId(), p.getMsg()));
+        Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+        return gson.toJson(map);
+    }
 
-                    .addColumn(new Column("CreateTimeMs", 
ScalarType.createVarchar(20)))
-                    .addColumn(new Column("FinishTimeMs", 
ScalarType.createVarchar(20)))
-                    .addColumn(new Column("TrackingUrl", 
ScalarType.createVarchar(20)))
-                    .addColumn(new Column("LoadStatistic", 
ScalarType.createVarchar(20)))
-                    .addColumn(new Column("User", 
ScalarType.createVarchar(20)))
-                    .build();
+    public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId 
fragmentId, long scannedRows,
+                                    long scannedBytes, boolean isDone) {
+        loadStatistic.updateLoadProgress(beId, loadId, fragmentId, 
scannedRows, scannedBytes, isDone);
+        progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 
100);
+        if (progress == 100) {
+            progress = 99;
+        }
+    }
+
+    private void checkAuth(String command) throws DdlException {
+        if (authorizationInfo == null) {
+            // use the old method to check priv
+            checkAuthWithoutAuthInfo(command);
+            return;
+        }
+        if 
(!Env.getCurrentEnv().getAccessManager().checkPrivByAuthInfo(ConnectContext.get(),
 authorizationInfo,
+                PrivPredicate.LOAD)) {
+            
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
+                    Privilege.LOAD_PRIV);
+        }
+    }
+
+    /**
+     * This method is compatible with old load job without authorization info
+     * If db or table name could not be found by id, it will throw the 
NOT_EXISTS_ERROR
+     *
+     * @throws DdlException
+     */
+    private void checkAuthWithoutAuthInfo(String command) throws DdlException {
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+        // check auth
+        if (tableNames.isEmpty()) {
+            // forward compatibility
+            if 
(!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), 
db.getFullName(),
+                    PrivPredicate.LOAD)) {
+                
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
+                        Privilege.LOAD_PRIV);
+            }
+        } else {
+            for (String tblName : tableNames) {
+                if 
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), 
db.getFullName(),
+                        tblName, PrivPredicate.LOAD)) {
+                    
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
+                            command,
+                            ConnectContext.get().getQualifiedUser(),
+                            ConnectContext.get().getRemoteIP(), 
db.getFullName() + ": " + tblName);
+                }
+            }
+        }
+    }
+
+    public void unprotectReadEndOperation(InsertJob replayLog) {
+        setJobStatus(replayLog.getJobStatus());
+        progress = replayLog.getProgress();
+        setStartTimeMs(replayLog.getStartTimeMs());
+        setFinishTimeMs(replayLog.getFinishTimeMs());
+        failMsg = replayLog.getFailMsg();
+    }
+
+    public String getResourceName() {
+        // TODO: get tvf param from tvf relation
+        return LoadType.BULK.name();
+    }
+
+    public boolean isRunning() {
+        return getJobStatus() != JobStatus.FINISHED;
+    }
+
+    public boolean isPending() {
+        return getJobStatus() != JobStatus.FINISHED;
+    }
+
+    public boolean isCommitted() {

Review Comment:
   remove unused method `isCommitted()` and `isFinished()`



##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -26,57 +34,105 @@
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
 import org.apache.doris.job.scheduler.JobScheduler;
 import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.load.loadv2.JobState;
 
+import com.google.common.collect.Lists;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 @Log4j2
 public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
 
-
     private final ConcurrentHashMap<Long, T> jobMap = new 
ConcurrentHashMap<>(32);
 
-    private JobScheduler jobScheduler;
+    private JobScheduler<T, C> jobScheduler;
+
+    // lock for job
+    // lock is private and must use after db lock
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
 
     public void start() {
-        jobScheduler = new JobScheduler(jobMap);
+        jobScheduler = new JobScheduler<T, C>(jobMap);
         jobScheduler.start();
     }
 
+
+    /**
+     * get running job
+     *
+     * @param jobId id
+     * @return running job
+     */
+    public T getJob(long jobId) {
+        return jobMap.get(jobId);
+    }
+
     public void registerJob(T job) throws JobException {
-        job.checkJobParams();
-        checkJobNameExist(job.getJobName());
-        if (jobMap.get(job.getJobId()) != null) {
-            throw new JobException("job id exist, jobId:" + job.getJobId());
+        writeLock();
+        try {
+            job.onRegister();
+            job.checkJobParams();
+            checkJobNameExist(job.getJobName());
+            if (jobMap.get(job.getJobId()) != null) {
+                throw new JobException("job id exist, jobId:" + 
job.getJobId());
+            }
+            jobMap.put(job.getJobId(), job);
+            //check its need to scheduler
+            jobScheduler.scheduleOneJob(job);
+            job.logCreateOperation();
+        } catch (JobException e) {
+            // job.onRegisterFailed();

Review Comment:
   Why comment out this?



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map> {
             new Column("CreateTime", ScalarType.createStringType()),
             new Column("Comment", ScalarType.createStringType()));
 
+    private static final ShowResultSetMetaData TASK_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("TaskId", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Label", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Status", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("EtlInfo", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TaskInfo", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("ErrorMsg", 
ScalarType.createVarchar(20)))
+
+                    .addColumn(new Column("CreateTimeMs", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("FinishTimeMs", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TrackingUrl", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("LoadStatistic", 
ScalarType.createVarchar(20)))
+                    .addColumn(new Column("User", 
ScalarType.createVarchar(20)))
+                    .build();
+
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
     static {
-        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder();
+        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder<>();
         for (int i = 0; i < SCHEMA.size(); i++) {
             builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
         }
         COLUMN_TO_INDEX = builder.build();
     }
 
-    @SerializedName(value = "lp")
-    String labelPrefix;
+    @SerializedName("taskIdList")
+    ConcurrentLinkedQueue<Long> taskIdList;
+    @SerializedName("dbId")
+    private final long dbId;
+    @SerializedName("labelName")
+    private String labelName;
+    @SerializedName("loadType")
+    private InsertJob.LoadType loadType;
+    // 0: the job status is pending
+    // n/100: n is the number of task which has been finished
+    // 99: all tasks have been finished
+    // 100: txn status is visible and load has been finished
+    @SerializedName("progress")
+    private int progress;
+    @SerializedName("failMsg")
+    private FailMsg failMsg;
+    @SerializedName("plans")
+    private List<InsertIntoTableCommand> plans;
+    private LoadStatistic loadStatistic = new LoadStatistic();
+    private Set<Long> finishedTaskIds = new HashSet<>();
+    private Set<String> tableNames;
+    private ConcurrentHashMap<Long, InsertTask> idToTasks = new 
ConcurrentHashMap<>();
+    private Map<String, String> properties;
+    private AuthorizationInfo authorizationInfo;
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+
+    private ConnectContext ctx;
+    private StmtExecutor stmtExecutor;
+    private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+    private List<TabletCommitInfo> commitInfos = new ArrayList<>();
+
+    // max save task num, do we need to config it?
+    private static final int MAX_SAVE_TASK_NUM = 100;
+
+    /**
+     * load job type
+     */
+    public enum LoadType {
+        BULK,
+        SPARK,
+        LOCAL_FILE,
+        UNKNOWN
+
+    }
+
+    public enum Priority {
+        HIGH(0),
+        NORMAL(1),
+        LOW(2);
 
-    InsertIntoTableCommand command;
+        Priority(int value) {
+            this.value = value;
+        }
 
-    StmtExecutor stmtExecutor;
+        private final int value;
 
-    ConnectContext ctx;
+        public int getValue() {
+            return value;
+        }
+    }
 
-    @SerializedName("tis")
-    ConcurrentLinkedQueue<Long> taskIdList;
+    public InsertJob(Long jobId, String jobName,
+                     JobStatus jobStatus,
+                     LabelName labelName,
+                     String comment,
+                     UserIdentity createUser,
+                     JobExecutionConfiguration jobConfig,
+                     Long createTimeMs,
+                     String executeSql) {
+        super(jobId, jobName, jobStatus, labelName.getDbName(), comment, 
createUser,
+                jobConfig, createTimeMs, executeSql, null);
+        this.dbId = ConnectContext.get().getCurrentDbId();
+        this.labelName = labelName.getLabelName();
+    }
 
-    // max save task num, do we need to config it?
-    private static final int MAX_SAVE_TASK_NUM = 100;
+    public InsertJob(ConnectContext ctx,
+                      StmtExecutor executor,
+                      String labelName,
+                      List<InsertIntoTableCommand> plans,
+                      Set<String> sinkTableNames,
+                      Map<String, String> properties,
+                      String comment,
+                      JobExecutionConfiguration jobConfig) {
+        super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, 
null,
+                comment, ctx.getCurrentUserIdentity(), jobConfig);
+        this.ctx = ctx;
+        this.plans = plans;
+        this.stmtExecutor = executor;
+        this.dbId = ctx.getCurrentDbId();
+        this.labelName = labelName;
+        this.tableNames = sinkTableNames;
+        this.properties = properties;
+        // TODO: not support other type yet
+        this.loadType = InsertJob.LoadType.BULK;
+    }
 
     @Override
-    public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
-        //nothing need to do in insert job
-        InsertTask task = new InsertTask(null, getCurrentDbName(), 
getExecuteSql(), getCreateUser());
-        task.setJobId(getJobId());
-        task.setTaskType(taskType);
-        task.setTaskId(Env.getCurrentEnv().getNextId());
-        ArrayList<InsertTask> tasks = new ArrayList<>();
-        tasks.add(task);
-        super.initTasks(tasks);
-        addNewTask(task.getTaskId());
-        return tasks;
+    public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> 
taskContext) {
+        if (plans.isEmpty()) {
+            InsertTask task = new InsertTask(labelName, getCurrentDbName(), 
getExecuteSql(), getCreateUser());
+            task.setTaskType(taskType);
+            task.setJobId(getJobId());
+            task.setCreateTimeMs(System.currentTimeMillis());
+            task.setStatus(TaskStatus.PENDING);
+            ArrayList<InsertTask> tasks = new ArrayList<>();
+            tasks.add(task);
+            super.initTasks(tasks);
+            addNewTask(task.getTaskId());
+            return tasks;
+        } else {
+            return createBatchTasks(taskType);
+        }
     }
 
-    public void addNewTask(long id) {
+    private List<InsertTask> createBatchTasks(TaskType taskType) {

Review Comment:
   We should unify the `createBatchTasks()` and `createTasks()`.
   For example, why there is `idToTasks.put(task.getTaskId(), task);` in 
`createBatchTasks()` but not in `createTasks()`?



##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -95,7 +151,7 @@ public void unregisterJob(String jobName) throws 
JobException {
     public void alterJobStatus(Long jobId, JobStatus status) throws 
JobException {

Review Comment:
   ```suggestion
       private void alterJobStatus(Long jobId, JobStatus status) throws 
JobException {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+    private final Map<Long, Map<String, List<InsertJob>>> 
dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();

Review Comment:
   This map should be protect by a lock.
   Not for concurrency issue, but for atomic operation



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -74,30 +72,70 @@ public class InsertTask extends AbstractTask {
     }
 
     private String labelName;

Review Comment:
   This field is unused



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -130,22 +168,14 @@ protected TUniqueId generateQueryId(String taskIdString) {
         return new TUniqueId(taskId.getMostSignificantBits(), 
taskId.getLeastSignificantBits());
     }
 
-    public InsertTask(String labelName, String currentDb, String sql, 
UserIdentity userIdentity) {
-        this.labelName = labelName;
-        this.sql = sql;
-        this.currentDb = currentDb;
-        this.userIdentity = userIdentity;
-
-    }
-
     @Override
     public void run() throws JobException {
         try {
             if (isCanceled.get()) {
                 log.info("task has been canceled, task id is {}", getTaskId());
                 return;
             }
-            command.run(ctx, stmtExecutor);
+            command.runWithStatistic(ctx, stmtExecutor, loadStatistic);

Review Comment:
   `Statistic` always mean the statistic used for CBO.
   Better change the name.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+    private final Map<Long, Map<String, List<InsertJob>>> 
dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+    /**
+     * getLabelJob
+     * @param db db
+     * @return label jobs
+     * @throws JobException e
+     */
+    public List<InsertJob> getLabelJobs(Database db) throws JobException {

Review Comment:
   Same issue for `filterLabelJobs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to