This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3fd3dfe16f6c2d4289fdc91c96f3426be6a77e8f
Author: Calvin Kirs <[email protected]>
AuthorDate: Tue Apr 30 10:51:24 2024 +0800

    [Feat](Job) Job supports task execution statistics (#34109)
    
    * Support statistics
    
    * - Fix Failed task not showing up in the task list
    - Task metadata add jobName
    - Fix Finished job clear time error
    - Job metadata add successCount, failedCount, totalTaskCount
    
    * add test
---
 .../org/apache/doris/job/base/AbstractJob.java     |  35 +++++-
 .../doris/job/extensions/insert/InsertJob.java     | 140 ++++++++++++---------
 .../doris/job/extensions/insert/InsertTask.java    |  42 ++++---
 .../apache/doris/job/extensions/mtmv/MTMVTask.java |   4 +-
 .../org/apache/doris/job/manager/JobManager.java   |   5 +-
 .../apache/doris/job/scheduler/JobScheduler.java   |   2 +-
 .../org/apache/doris/job/task/AbstractTask.java    |   5 +-
 .../main/java/org/apache/doris/job/task/Task.java  |   2 +-
 .../doris/tablefunction/MetadataGenerator.java     |   2 +-
 .../suites/job_p0/test_base_insert_job.groovy      |  19 ++-
 10 files changed, 157 insertions(+), 99 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 8c1d6da3a63..3f595d6daf5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -51,6 +51,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -58,7 +59,11 @@ import java.util.stream.Collectors;
 @Data
 @Log4j2
 public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, 
C>, Writable {
-
+    public static final ImmutableList<Column> COMMON_SCHEMA = ImmutableList.of(
+            new Column("SucceedTaskCount", ScalarType.createStringType()),
+            new Column("FailedTaskCount", ScalarType.createStringType()),
+            new Column("CanceledTaskCount", ScalarType.createStringType())
+            );
     @SerializedName(value = "jid")
     private Long jobId;
 
@@ -92,6 +97,16 @@ public abstract class AbstractJob<T extends AbstractTask, C> 
implements Job<T, C
     @SerializedName(value = "sql")
     String executeSql;
 
+
+    @SerializedName(value = "stc")
+    private AtomicLong succeedTaskCount = new AtomicLong(0);
+
+    @SerializedName(value = "ftc")
+    private AtomicLong failedTaskCount = new AtomicLong(0);
+
+    @SerializedName(value = "ctc")
+    private AtomicLong canceledTaskCount = new AtomicLong(0);
+
     public AbstractJob() {
     }
 
@@ -142,6 +157,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
             task.cancel();
         }
         runningTasks = new CopyOnWriteArrayList<>();
+        logUpdateOperation();
     }
 
     private static final ImmutableList<String> TITLE_NAMES =
@@ -290,14 +306,18 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
 
     @Override
     public void onTaskFail(T task) throws JobException {
+        failedTaskCount.incrementAndGet();
         updateJobStatusIfEnd(false);
         runningTasks.remove(task);
+        logUpdateOperation();
     }
 
     @Override
     public void onTaskSuccess(T task) throws JobException {
+        succeedTaskCount.incrementAndGet();
         updateJobStatusIfEnd(true);
         runningTasks.remove(task);
+        logUpdateOperation();
 
     }
 
@@ -309,12 +329,15 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         }
         switch (executeType) {
             case ONE_TIME:
+                updateJobStatus(JobStatus.FINISHED);
+                this.finishTimeMs = System.currentTimeMillis();
+                break;
             case INSTANT:
                 this.finishTimeMs = System.currentTimeMillis();
                 if (taskSuccess) {
-                    
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
+                    updateJobStatus(JobStatus.FINISHED);
                 } else {
-                    
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.STOPPED);
+                    updateJobStatus(JobStatus.STOPPED);
                 }
                 break;
             case RECURRING:
@@ -322,7 +345,8 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
                 if (null != timerDefinition.getEndTimeMs()
                         && timerDefinition.getEndTimeMs() < 
System.currentTimeMillis()
                         + 
timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval())) 
{
-                    
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
+                    this.finishTimeMs = System.currentTimeMillis();
+                    updateJobStatus(JobStatus.FINISHED);
                 }
                 break;
             default:
@@ -360,6 +384,9 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         trow.addToColumnValue(new TCell().setStringVal(jobStatus.name()));
         trow.addToColumnValue(new TCell().setStringVal(executeSql));
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(createTimeMs)));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(succeedTaskCount.get())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(failedTaskCount.get())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(canceledTaskCount.get())));
         trow.addToColumnValue(new TCell().setStringVal(comment));
         return trow;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index a9e6bd4fc40..47d52c170b2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -30,7 +30,6 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LabelAlreadyUsedException;
-import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
@@ -67,38 +66,39 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.SerializedName;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 @EqualsAndHashCode(callSuper = true)
 @Data
-@Slf4j
+@Log4j2
 public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> 
implements GsonPostProcessable {
 
-    public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
-            new Column("Id", ScalarType.createStringType()),
-            new Column("Name", ScalarType.createStringType()),
-            new Column("Definer", ScalarType.createStringType()),
-            new Column("ExecuteType", ScalarType.createStringType()),
-            new Column("RecurringStrategy", ScalarType.createStringType()),
-            new Column("Status", ScalarType.createStringType()),
-            new Column("ExecuteSql", ScalarType.createStringType()),
-            new Column("CreateTime", ScalarType.createStringType()),
-            new Column("Comment", ScalarType.createStringType()));
+    public static final ImmutableList<Column> SCHEMA = 
ImmutableList.<Column>builder()
+            .add(new Column("Id", ScalarType.createStringType()))
+            .add(new Column("Name", ScalarType.createStringType()))
+            .add(new Column("Definer", ScalarType.createStringType()))
+            .add(new Column("ExecuteType", ScalarType.createStringType()))
+            .add(new Column("RecurringStrategy", 
ScalarType.createStringType()))
+            .add(new Column("Status", ScalarType.createStringType()))
+            .add(new Column("ExecuteSql", ScalarType.createStringType()))
+            .add(new Column("CreateTime", ScalarType.createStringType()))
+            .addAll(COMMON_SCHEMA)
+            .add(new Column("Comment", ScalarType.createStringType()))
+            .build();
 
     private static final ShowResultSetMetaData TASK_META_DATA =
             ShowResultSetMetaData.builder()
@@ -126,8 +126,10 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         COLUMN_TO_INDEX = builder.build();
     }
 
+    //we used insertTaskQueue to store the task info, and we will query the 
task info from it
+    @Deprecated
     @SerializedName("tis")
-    ConcurrentLinkedQueue<Long> historyTaskIdList;
+    ConcurrentLinkedQueue<Long> historyTaskIdList = new 
ConcurrentLinkedQueue<>();
     @SerializedName("did")
     private final long dbId;
     @SerializedName("ln")
@@ -146,7 +148,9 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
     private List<InsertIntoTableCommand> plans = new ArrayList<>();
     private LoadStatistic loadStatistic = new LoadStatistic();
     private Set<Long> finishedTaskIds = new HashSet<>();
-    private ConcurrentHashMap<Long, InsertTask> idToTasks = new 
ConcurrentHashMap<>();
+
+    @SerializedName("tas")
+    private ConcurrentLinkedQueue<InsertTask> insertTaskQueue = new 
ConcurrentLinkedQueue<>();
     private Map<String, String> properties = new HashMap<>();
     private Set<String> tableNames;
     private AuthorizationInfo authorizationInfo;
@@ -164,8 +168,8 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         if (null == plans) {
             plans = new ArrayList<>();
         }
-        if (null == idToTasks) {
-            idToTasks = new ConcurrentHashMap<>();
+        if (null == insertTaskQueue) {
+            insertTaskQueue = new ConcurrentLinkedQueue<>();
         }
         if (null == loadStatistic) {
             loadStatistic = new LoadStatistic();
@@ -182,6 +186,15 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         if (null == historyTaskIdList) {
             historyTaskIdList = new ConcurrentLinkedQueue<>();
         }
+        if (null == getSucceedTaskCount()) {
+            setSucceedTaskCount(new AtomicLong(0));
+        }
+        if (null == getFailedTaskCount()) {
+            setFailedTaskCount(new AtomicLong(0));
+        }
+        if (null == getCanceledTaskCount()) {
+            setCanceledTaskCount(new AtomicLong(0));
+        }
     }
 
     /**
@@ -250,9 +263,7 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         List<InsertTask> newTasks = new ArrayList<>();
         if (plans.isEmpty()) {
             InsertTask task = new InsertTask(labelName, getCurrentDbName(), 
getExecuteSql(), getCreateUser());
-            idToTasks.put(task.getTaskId(), task);
             newTasks.add(task);
-            recordTask(task.getTaskId());
         } else {
             // use for load stmt
             for (InsertIntoTableCommand logicalPlan : plans) {
@@ -260,28 +271,24 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
                     throw new IllegalArgumentException("Load plan need label 
name.");
                 }
                 InsertTask task = new InsertTask(logicalPlan, ctx, 
stmtExecutor, loadStatistic);
-                idToTasks.put(task.getTaskId(), task);
                 newTasks.add(task);
-                recordTask(task.getTaskId());
             }
         }
         initTasks(newTasks, taskType);
+        recordTasks(newTasks);
         return new ArrayList<>(newTasks);
     }
 
-    public void recordTask(long id) {
+    public void recordTasks(List<InsertTask> tasks) {
         if (Config.max_persistence_task_count < 1) {
             return;
         }
-        if (CollectionUtils.isEmpty(historyTaskIdList)) {
-            historyTaskIdList = new ConcurrentLinkedQueue<>();
-            historyTaskIdList.add(id);
-            Env.getCurrentEnv().getEditLog().logUpdateJob(this);
-            return;
-        }
-        historyTaskIdList.add(id);
-        if (historyTaskIdList.size() >= Config.max_persistence_task_count) {
-            historyTaskIdList.poll();
+        insertTaskQueue.addAll(tasks);
+
+        while (insertTaskQueue.size() > Config.max_persistence_task_count) {
+            insertTaskQueue.poll();
+            //since we have insertTaskQueue, we do not need to store the task 
id in historyTaskIdList, so we clear it
+            historyTaskIdList.clear();
         }
         Env.getCurrentEnv().getEditLog().logUpdateJob(this);
     }
@@ -319,35 +326,54 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
 
     @Override
     public List<InsertTask> queryTasks() {
-        if (CollectionUtils.isEmpty(historyTaskIdList)) {
+        if (historyTaskIdList.isEmpty() && insertTaskQueue.isEmpty()) {
             return new ArrayList<>();
         }
+
         //TODO it's will be refactor, we will storage task info in job inner 
and query from it
-        List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList);
+
+        // merge task info from insertTaskQueue and historyTaskIdList
+        List<Long> taskIds = 
insertTaskQueue.stream().map(InsertTask::getTaskId).collect(Collectors.toList());
+        taskIds.addAll(historyTaskIdList);
+        taskIds.stream().distinct().collect(Collectors.toList());
         if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
-            Collections.reverse(taskIdList);
-            return queryLoadTasksByTaskIds(taskIdList);
+            return queryLoadTasksByTaskIds(taskIds);
         }
-        List<LoadJob> loadJobs = 
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList);
-        if (CollectionUtils.isEmpty(loadJobs)) {
-            return new ArrayList<>();
+        // query from load job
+        List<LoadJob> loadJobs = 
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
+
+        Map<Long, LoadJob> loadJobMap = 
loadJobs.stream().collect(Collectors.toMap(LoadJob::getId, loadJob -> loadJob));
+        List<InsertTask> tasksRsp = new ArrayList<>();
+        //read task info from insertTaskQueue
+        insertTaskQueue.forEach(task -> {
+            if (task.getJobInfo() == null) {
+                LoadJob loadJob = loadJobMap.get(task.getTaskId());
+                if (loadJob != null) {
+                    task.setJobInfo(loadJob);
+                }
+            }
+            tasksRsp.add(task);
+        });
+        if (CollectionUtils.isEmpty(historyTaskIdList)) {
+            return tasksRsp;
         }
-        List<InsertTask> tasks = new ArrayList<>();
-        loadJobs.forEach(loadJob -> {
-            InsertTask task;
-            try {
-                task = new InsertTask(loadJob.getLabel(), 
loadJob.getDb().getFullName(), null, getCreateUser());
-                task.setCreateTimeMs(loadJob.getCreateTimestamp());
-            } catch (MetaNotFoundException e) {
-                log.warn("load job not found, job id is {}", loadJob.getId());
+
+        historyTaskIdList.forEach(historyTaskId -> {
+            LoadJob loadJob = loadJobMap.get(historyTaskId);
+            if (null == loadJob) {
                 return;
             }
+            InsertTask task = new InsertTask(loadJob.getLabel(), 
getCurrentDbName(), null, getCreateUser());
             task.setJobId(getJobId());
             task.setTaskId(loadJob.getId());
             task.setJobInfo(loadJob);
-            tasks.add(task);
+            task.setJobId(getJobId());
+            task.setTaskId(loadJob.getId());
+            task.setJobInfo(loadJob);
+            tasksRsp.add(task);
         });
-        return tasks;
+        return tasksRsp;
+
 
     }
 
@@ -355,13 +381,13 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         if (taskIdList.isEmpty()) {
             return new ArrayList<>();
         }
-        List<InsertTask> tasks = new ArrayList<>();
-        taskIdList.forEach(id -> {
-            if (null != idToTasks.get(id)) {
-                tasks.add(idToTasks.get(id));
+        List<InsertTask> queryTasks = new ArrayList<>();
+        insertTaskQueue.forEach(task -> {
+            if (taskIdList.contains(task.getTaskId())) {
+                queryTasks.add(task);
             }
         });
-        return tasks;
+        return queryTasks;
     }
 
     @Override
@@ -462,7 +488,7 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
             // load end time
             jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs()));
             // tracking urls
-            List<String> trackingUrl = idToTasks.values().stream()
+            List<String> trackingUrl = insertTaskQueue.stream()
                     .map(task -> {
                         if (StringUtils.isNotEmpty(task.getTrackingUrl())) {
                             return task.getTrackingUrl();
@@ -527,7 +553,7 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
     public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId 
fragmentId, long scannedRows,
                                     long scannedBytes, boolean isDone) {
         loadStatistic.updateLoadProgress(beId, loadId, fragmentId, 
scannedRows, scannedBytes, isDone);
-        progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 
100);
+        progress = (int) ((double) finishedTaskIds.size() / 
insertTaskQueue.size() * 100);
         if (progress == 100) {
             progress = 99;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 8fe786555ce..6e6f59758b4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -37,6 +37,7 @@ import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.gson.annotations.SerializedName;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.log4j.Log4j2;
@@ -52,11 +53,12 @@ public class InsertTask extends AbstractTask {
     public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
             new Column("TaskId", ScalarType.createStringType()),
             new Column("JobId", ScalarType.createStringType()),
+            new Column("JobName", ScalarType.createStringType()),
             new Column("Label", ScalarType.createStringType()),
             new Column("Status", ScalarType.createStringType()),
             new Column("ErrorMsg", ScalarType.createStringType()),
-            new Column("CreateTimeMs", ScalarType.createStringType()),
-            new Column("FinishTimeMs", ScalarType.createStringType()),
+            new Column("CreateTime", ScalarType.createStringType()),
+            new Column("FinishTime", ScalarType.createStringType()),
             new Column("TrackingUrl", ScalarType.createStringType()),
             new Column("LoadStatistic", ScalarType.createStringType()),
             new Column("User", ScalarType.createStringType()));
@@ -64,7 +66,7 @@ public class InsertTask extends AbstractTask {
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
     static {
-        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder();
+        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder<>();
         for (int i = 0; i < SCHEMA.size(); i++) {
             builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
         }
@@ -77,6 +79,7 @@ public class InsertTask extends AbstractTask {
     private ConnectContext ctx;
     private String sql;
     private String currentDb;
+    @SerializedName(value = "uif")
     private UserIdentity userIdentity;
     private LoadStatistic loadStatistic;
     private AtomicBoolean isCanceled = new AtomicBoolean(false);
@@ -211,42 +214,51 @@ public class InsertTask extends AbstractTask {
     }
 
     @Override
-    public TRow getTvfInfo() {
+    public TRow getTvfInfo(String jobName) {
         TRow trow = new TRow();
         if (jobInfo == null) {
             // if task not start, load job is null,return pending task show 
info
-            return getPendingTaskTVFInfo();
+            return getPendingTaskTVFInfo(jobName);
         }
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(jobInfo.getId())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getJobId())));
-        trow.addToColumnValue(new TCell().setStringVal(labelName));
+        trow.addToColumnValue(new TCell().setStringVal(jobName));
+        trow.addToColumnValue(new TCell().setStringVal(getJobId() + 
LABEL_SPLITTER + getTaskId()));
         trow.addToColumnValue(new 
TCell().setStringVal(jobInfo.getState().name()));
         // err msg
-        String errMsg = "";
+        String errorMsg = "";
         if (failMsg != null) {
-            errMsg = "type:" + failMsg.getCancelType() + "; msg:" + 
failMsg.getMsg();
+            errorMsg = failMsg.getMsg();
+        }
+        if (StringUtils.isNotBlank(getErrMsg())) {
+            errorMsg = getErrMsg();
         }
-        trow.addToColumnValue(new TCell().setStringVal(errMsg));
+        trow.addToColumnValue(new TCell().setStringVal(errorMsg));
         // create time
-        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimestamp())));
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
         // load end time
-        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimestamp())));
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs())));
         // tracking url
         trow.addToColumnValue(new TCell().setStringVal(trackingUrl));
-        if (null != loadStatistic) {
-            trow.addToColumnValue(new 
TCell().setStringVal(loadStatistic.toJson()));
+        if (null != jobInfo.getLoadStatistic()) {
+            trow.addToColumnValue(new 
TCell().setStringVal(jobInfo.getLoadStatistic().toJson()));
         } else {
             trow.addToColumnValue(new TCell().setStringVal(""));
         }
-        trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
+        if (userIdentity == null) {
+            trow.addToColumnValue(new TCell().setStringVal(""));
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
+        }
         return trow;
     }
 
     // if task not start, load job is null,return pending task show info
-    private TRow getPendingTaskTVFInfo() {
+    private TRow getPendingTaskTVFInfo(String jobName) {
         TRow trow = new TRow();
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getTaskId())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getJobId())));
+        trow.addToColumnValue(new TCell().setStringVal(jobName));
         trow.addToColumnValue(new TCell().setStringVal(getJobId() + 
LABEL_SPLITTER + getTaskId()));
         trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
         trow.addToColumnValue(new TCell().setStringVal(""));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index f67144cd0d3..9e872cf034c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -299,11 +299,11 @@ public class MTMVTask extends AbstractTask {
     }
 
     @Override
-    public TRow getTvfInfo() {
+    public TRow getTvfInfo(String jobName) {
         TRow trow = new TRow();
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(super.getTaskId())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(super.getJobId())));
-        trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
+        trow.addToColumnValue(new TCell().setStringVal(jobName));
         String dbName = "";
         String mvName = "";
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index d8a30a968a6..4eb333426c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -29,8 +29,6 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.PatternMatcher;
 import org.apache.doris.common.PatternMatcherWrapper;
 import org.apache.doris.common.io.Writable;
-import org.apache.doris.common.util.LogBuilder;
-import org.apache.doris.common.util.LogKey;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.common.JobStatus;
@@ -267,8 +265,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
      **/
     public void replayUpdateJob(T job) {
         jobMap.put(job.getJobId(), job);
-        log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
-                .add("msg", "replay update scheduler job").build());
+        job.logUpdateOperation();
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index a104d3895e1..2100511d22b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -196,7 +196,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
     }
 
     private void clearEndJob(T job) {
-        if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < 
System.currentTimeMillis()) {
+        if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS > 
System.currentTimeMillis()) {
             return;
         }
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 71f6ff1c4f7..7bd2e58f87e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -63,7 +63,7 @@ public abstract class AbstractTask implements Task {
     }
 
     @Override
-    public void onFail(String msg) throws JobException {
+    public void onFail() throws JobException {
         status = TaskStatus.FAILED;
         if (!isCallable()) {
             return;
@@ -72,12 +72,13 @@ public abstract class AbstractTask implements Task {
     }
 
     @Override
-    public void onFail() throws JobException {
+    public void onFail(String errMsg) throws JobException {
         if (TaskStatus.CANCELED.equals(status)) {
             return;
         }
         status = TaskStatus.FAILED;
         setFinishTimeMs(System.currentTimeMillis());
+        setErrMsg(errMsg);
         if (!isCallable()) {
             return;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
index 8e82984c2f0..ee205c55c31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
@@ -70,5 +70,5 @@ public interface Task {
      * get info for tvf `tasks`
      * @return TRow
      */
-    TRow getTvfInfo();
+    TRow getTvfInfo(String jobName);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 50d97e69705..d5b43b6cfff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -809,7 +809,7 @@ public class MetadataGenerator {
             }
             List<AbstractTask> tasks = job.queryAllTasks();
             for (AbstractTask task : tasks) {
-                TRow tvfInfo = task.getTvfInfo();
+                TRow tvfInfo = task.getTvfInfo(job.getJobName());
                 if (tvfInfo != null) {
                     dataBatch.add(tvfInfo);
                 }
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy 
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index 8fe7c0e7749..3f5cd5692f9 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -118,13 +118,11 @@ suite("test_base_insert_job") {
      """
     
     Thread.sleep(2000)
-    def onceJob = sql """ select id,ExecuteSql from jobs("type"="insert") 
where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
+    def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") 
where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
     assert onceJob.size() == 1
-    def onceJobId = onceJob.get(0).get(0);
-    def onceJobSql = onceJob.get(0).get(1);
-    println onceJobSql
-    // test cancel task
-    def datas = sql """select status,taskid from tasks("type"="insert") where 
jobid= ${onceJobId}"""
+    //check succeed task count
+    assert '1' == onceJob.get(0).get(0)
+    def datas = sql """select status,taskid from tasks("type"="insert") where 
jobName= '${jobName}'"""
     println datas
     assert datas.size() == 1
     assert datas.get(0).get(0) == "FINISHED"
@@ -154,7 +152,7 @@ suite("test_base_insert_job") {
         DROP JOB IF EXISTS where jobname =  '${jobName}'
     """
     sql """
-          CREATE JOB ${jobName}  ON SCHEDULE every 1 second   comment 'test 
for test&68686781jbjbhj//ncsa' DO insert into ${tableName}  values  
('2023-07-19',5, 1001);
+          CREATE JOB ${jobName}  ON SCHEDULE every 1 second starts 
current_timestamp  comment 'test for test&68686781jbjbhj//ncsa' DO insert into 
${tableName}  values  ('2023-07-19',5, 1001);
      """
 
     Thread.sleep(2000)
@@ -162,15 +160,12 @@ suite("test_base_insert_job") {
     sql """
         PAUSE JOB where jobname =  '${jobName}'
     """
-    def job = sql """ select id,ExecuteSql from jobs("type"="insert") where 
Name like '%${jobName}%'  """
-    assert job.size() == 1
-    def jobId = job.get(0).get(0);
-    def tasks = sql """ select status from tasks("type"="insert") where jobid= 
${jobId}  """
+    def tasks = sql """ select status from tasks("type"="insert") where 
JobName= '${jobName}'  """
     sql """
         RESUME JOB where jobname =  '${jobName}'
     """
     Thread.sleep(2500)
-    def afterResumeTasks = sql """ select status from tasks("type"="insert") 
where jobid= ${jobId}  """
+    def afterResumeTasks = sql """ select status from tasks("type"="insert") 
where JobName= '${jobName}'   """
     println afterResumeTasks
     assert afterResumeTasks.size() >tasks.size
     // assert same job name


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to