This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 84ea93652d9 [Fix](Job)Incorrect task query result of insert type 
(#30024)
84ea93652d9 is described below

commit 84ea93652d93ca8e315f31f85fb0645c3e003611
Author: Calvin Kirs <acm_mas...@163.com>
AuthorDate: Mon Jan 22 14:46:40 2024 +0800

    [Fix](Job)Incorrect task query result of insert type (#30024)
    
    - IdToTask has no persistence, so the queried task will be lost once it is 
restarted.
    
    - The cancel task does not update metadata after being removed from the 
running task.
    
    - tvf displays an error when some fields in the query task result are empty
    
    - cycle scheduling job should not be STOP when task fail
---
 .../org/apache/doris/job/base/AbstractJob.java     | 15 ++++---
 .../doris/job/extensions/insert/InsertJob.java     | 46 ++++++++++++++++------
 .../doris/job/extensions/insert/InsertTask.java    | 30 ++++++++------
 .../org/apache/doris/job/manager/JobManager.java   | 23 +++++++----
 .../apache/doris/job/scheduler/JobScheduler.java   |  6 +--
 .../suites/job_p0/test_base_insert_job.groovy      | 24 +++++++----
 6 files changed, 95 insertions(+), 49 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 2416a6bca5f..091ac158c1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -166,7 +166,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
             throw new JobException("no running task");
         }
         runningTasks.stream().filter(task -> 
task.getTaskId().equals(taskId)).findFirst()
-                .orElseThrow(() -> new JobException("no task id: " + 
taskId)).cancel();
+                .orElseThrow(() -> new JobException("Not found task id: " + 
taskId)).cancel();
         runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
         if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
             updateJobStatus(JobStatus.FINISHED);
@@ -289,19 +289,19 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
 
     @Override
     public void onTaskFail(T task) throws JobException {
-        updateJobStatusIfEnd();
+        updateJobStatusIfEnd(false);
         runningTasks.remove(task);
     }
 
     @Override
     public void onTaskSuccess(T task) throws JobException {
-        updateJobStatusIfEnd();
+        updateJobStatusIfEnd(true);
         runningTasks.remove(task);
 
     }
 
 
-    private void updateJobStatusIfEnd() throws JobException {
+    private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException 
{
         JobExecuteType executeType = getJobConfig().getExecuteType();
         if (executeType.equals(JobExecuteType.MANUAL)) {
             return;
@@ -309,7 +309,12 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         switch (executeType) {
             case ONE_TIME:
             case INSTANT:
-                
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
+                this.finishTimeMs = System.currentTimeMillis();
+                if (taskSuccess) {
+                    
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
+                } else {
+                    
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.STOPPED);
+                }
                 break;
             case RECURRING:
                 TimerDefinition timerDefinition = 
getJobConfig().getTimerDefinition();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index ce918c426f8..15e0c37987f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
@@ -42,6 +43,7 @@ import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadStatistic;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.mysql.privilege.Privilege;
@@ -272,14 +274,15 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         }
         if (CollectionUtils.isEmpty(historyTaskIdList)) {
             historyTaskIdList = new ConcurrentLinkedQueue<>();
-            Env.getCurrentEnv().getEditLog().logUpdateJob(this);
             historyTaskIdList.add(id);
+            Env.getCurrentEnv().getEditLog().logUpdateJob(this);
             return;
         }
         historyTaskIdList.add(id);
         if (historyTaskIdList.size() >= Config.max_persistence_task_count) {
             historyTaskIdList.poll();
         }
+        Env.getCurrentEnv().getEditLog().logUpdateJob(this);
     }
 
     @Override
@@ -320,22 +323,44 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         }
         //TODO it's will be refactor, we will storage task info in job inner 
and query from it
         List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList);
+        if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
+            Collections.reverse(taskIdList);
+            return queryLoadTasksByTaskIds(taskIdList);
+        }
+        List<LoadJob> loadJobs = 
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList);
+        if (CollectionUtils.isEmpty(loadJobs)) {
+            return new ArrayList<>();
+        }
+        List<InsertTask> tasks = new ArrayList<>();
+        loadJobs.forEach(loadJob -> {
+            InsertTask task;
+            try {
+                task = new InsertTask(loadJob.getLabel(), 
loadJob.getDb().getFullName(), null, getCreateUser());
+                task.setCreateTimeMs(loadJob.getCreateTimestamp());
+            } catch (MetaNotFoundException e) {
+                log.warn("load job not found, job id is {}", loadJob.getId());
+                return;
+            }
+            task.setJobId(getJobId());
+            task.setTaskId(loadJob.getId());
+            task.setJobInfo(loadJob);
+            tasks.add(task);
+        });
+        return tasks;
 
-        Collections.reverse(taskIdList);
-        return queryLoadTasksByTaskIds(taskIdList);
     }
 
     public List<InsertTask> queryLoadTasksByTaskIds(List<Long> taskIdList) {
         if (taskIdList.isEmpty()) {
             return new ArrayList<>();
         }
-        List<InsertTask> jobs = new ArrayList<>();
+        List<InsertTask> tasks = new ArrayList<>();
         taskIdList.forEach(id -> {
             if (null != idToTasks.get(id)) {
-                jobs.add(idToTasks.get(id));
+                tasks.add(idToTasks.get(id));
             }
         });
-        return jobs;
+        return tasks;
     }
 
     @Override
@@ -354,14 +379,11 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
     }
 
     @Override
-    public void onTaskFail(InsertTask task) {
-        try {
-            updateJobStatus(JobStatus.STOPPED);
+    public void onTaskFail(InsertTask task) throws JobException {
+        if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
             this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, 
task.getErrMsg());
-        } catch (JobException e) {
-            throw new RuntimeException(e);
         }
-        getRunningTasks().remove(task);
+        super.onTaskFail(task);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index e85e7a1b027..b5d8ea7fc17 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -21,11 +21,11 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
 import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadStatistic;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
@@ -89,7 +89,7 @@ public class InsertTask extends AbstractTask {
 
     @Getter
     @Setter
-    private InsertJob jobInfo;
+    private LoadJob jobInfo;
     private TaskType taskType = TaskType.PENDING;
     private MergeType mergeType = MergeType.APPEND;
 
@@ -127,7 +127,7 @@ public class InsertTask extends AbstractTask {
     }
 
     public InsertTask(String labelName, InsertIntoTableCommand insertInto,
-                       ConnectContext ctx, StmtExecutor executor, 
LoadStatistic statistic) {
+                      ConnectContext ctx, StmtExecutor executor, LoadStatistic 
statistic) {
         this.labelName = labelName;
         this.command = insertInto;
         this.userIdentity = ctx.getCurrentUserIdentity();
@@ -216,23 +216,27 @@ public class InsertTask extends AbstractTask {
             // if task not start, load job is null,return pending task show 
info
             return getPendingTaskTVFInfo();
         }
-        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(jobInfo.getJobId())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(jobInfo.getId())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getJobId())));
         trow.addToColumnValue(new TCell().setStringVal(labelName));
-        trow.addToColumnValue(new 
TCell().setStringVal(jobInfo.getJobStatus().name()));
+        trow.addToColumnValue(new 
TCell().setStringVal(jobInfo.getState().name()));
         // err msg
-        String errMsg = FeConstants.null_string;
+        String errMsg = "";
         if (failMsg != null) {
             errMsg = "type:" + failMsg.getCancelType() + "; msg:" + 
failMsg.getMsg();
         }
         trow.addToColumnValue(new TCell().setStringVal(errMsg));
         // create time
-        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimeMs())));
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimestamp())));
         // load end time
-        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimeMs())));
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimestamp())));
         // tracking url
         trow.addToColumnValue(new TCell().setStringVal(trackingUrl));
-        trow.addToColumnValue(new 
TCell().setStringVal(loadStatistic.toJson()));
+        if (null != loadStatistic) {
+            trow.addToColumnValue(new 
TCell().setStringVal(loadStatistic.toJson()));
+        } else {
+            trow.addToColumnValue(new TCell().setStringVal(""));
+        }
         trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
         return trow;
     }
@@ -244,11 +248,11 @@ public class InsertTask extends AbstractTask {
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getJobId())));
         trow.addToColumnValue(new TCell().setStringVal(getJobId() + 
LABEL_SPLITTER + getTaskId()));
         trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
-        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
-        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        trow.addToColumnValue(new TCell().setStringVal(""));
+        trow.addToColumnValue(new TCell().setStringVal(""));
+        trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
         return trow;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index b069fd5eca6..7e8b01ce287 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -137,16 +137,21 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
      * @param ifExists is is true, if job not exist,we will ignore job not 
exist exception, else throw exception
      */
     public void unregisterJob(String jobName, boolean ifExists) throws 
JobException {
-        T dropJob = null;
-        for (T job : jobMap.values()) {
-            if (job.getJobName().equals(jobName)) {
-                dropJob = job;
+        try {
+            T dropJob = null;
+            for (T job : jobMap.values()) {
+                if (job.getJobName().equals(jobName)) {
+                    dropJob = job;
+                }
             }
+            if (dropJob == null && ifExists) {
+                return;
+            }
+            dropJob(dropJob, jobName);
+        } catch (Exception e) {
+            log.error("drop job error, jobName:" + jobName, e);
+            throw new JobException("unregister job error, jobName:" + jobName);
         }
-        if (dropJob == null && ifExists) {
-            return;
-        }
-        dropJob(dropJob, jobName);
     }
 
     private void dropJob(T dropJob, String jobName) throws JobException {
@@ -284,6 +289,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
         for (T job : jobMap.values()) {
             if (job.getJobName().equals(jobName)) {
                 job.cancelTaskById(taskId);
+                job.logUpdateOperation();
                 return;
             }
         }
@@ -378,6 +384,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
         }
     }
 
+    //todo it's not belong to JobManager
     public void cancelLoadJob(CancelLoadStmt cs)
             throws JobException, AnalysisException, DdlException {
         String dbName = cs.getDbName();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 597e39d96ed..a104d3895e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -184,8 +184,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
         }
         for (Map.Entry<Long, T> entry : jobMap.entrySet()) {
             T job = entry.getValue();
-            if (job.getJobStatus().equals(JobStatus.FINISHED)) {
-                clearFinishedJob(job);
+            if (job.getJobStatus().equals(JobStatus.FINISHED) || 
job.getJobStatus().equals(JobStatus.STOPPED)) {
+                clearEndJob(job);
                 continue;
             }
             if (!job.getJobStatus().equals(JobStatus.RUNNING) && 
!job.getJobConfig().checkIsTimerJob()) {
@@ -195,7 +195,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
         }
     }
 
-    private void clearFinishedJob(T job) {
+    private void clearEndJob(T job) {
         if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < 
System.currentTimeMillis()) {
             return;
         }
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy 
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index f4db5907fa2..d9ebb832152 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -71,9 +71,18 @@ suite("test_base_insert_job") {
        CREATE JOB ${jobName}  ON SCHEDULE every 1 second   comment 'test' DO 
insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
     """
     Thread.sleep(2500)
-    def jobs = sql """select * from ${tableName}"""
-    println jobs
-    assert 3 >= jobs.size() >= (2 as Boolean) //at least 2 records, some times 
3 records
+    sql """
+        PAUSE JOB where jobname =  '${jobName}'
+    """
+    def tblDatas = sql """select * from ${tableName}"""
+    println tblDatas
+    assert 3 >= tblDatas.size() >= (2 as Boolean) //at least 2 records, some 
times 3 records
+    def pauseJobId = sql """select id from jobs("type"="insert") where 
Name='${jobName}'"""
+    def taskStatus = sql """select status from tasks("type"="insert") where 
jobid= '${pauseJobId.get(0).get(0)}'"""
+    println taskStatus
+    for (int i = 0; i < taskStatus.size(); i++) {
+        assert taskStatus.get(i).get(0) != "FAILED"||taskStatus.get(i).get(0) 
!= "STOPPED"||taskStatus.get(i).get(0) != "STOPPED"
+    }
     sql """
        CREATE JOB ${jobMixedName}  ON SCHEDULE every 1 second  DO insert into 
${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
     """
@@ -132,9 +141,8 @@ suite("test_base_insert_job") {
     sql """cancel  task where jobName='${jobName}' and taskId= ${taskId}"""
     def cancelTask = sql """ select status from tasks("type"="insert") where 
jobid= ${onceJobId}"""
     println cancelTask
-    //check task status
-    assert cancelTask.size() == 1
-    assert cancelTask.get(0).get(0) == "CANCELED"
+    //check task size is 0, cancel task where be deleted
+    assert cancelTask.size() == 0
     // check table data
     def dataCount1 = sql """select count(1) from ${tableName}"""
     assert dataCount1.get(0).get(0) == 0
@@ -161,14 +169,14 @@ suite("test_base_insert_job") {
     assert job.size() == 1
     def jobId = job.get(0).get(0);
     def tasks = sql """ select status from tasks("type"="insert") where jobid= 
${jobId}  """
-    assert tasks.size() == 1
+    assert tasks.size() == 0
     sql """
         RESUME JOB where jobname =  '${jobName}'
     """
     Thread.sleep(2500)
     def resumeTasks = sql """ select status from tasks("type"="insert") where 
jobid= ${jobId}  """
     println resumeTasks
-    assert resumeTasks.size() == 2
+    assert resumeTasks.size() == 1
     // assert same job name
     try {
         sql """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to