This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 447a3e9eb64 [Fix](Job)Fix some issues in the Insert job. (#44543)
447a3e9eb64 is described below

commit 447a3e9eb64ca44f31d0037ed05c5bdf80b2d4f2
Author: Calvin Kirs <guoqi...@selectdb.com>
AuthorDate: Tue Nov 26 11:34:37 2024 +0800

    [Fix](Job)Fix some issues in the Insert job. (#44543)
    
    ### What problem does this PR solve?
    - The job does not account for tasks in the Canceled state.
    - When a job is canceled, its status is marked as FAILED, and a
    NullPointerException (NPE) occurs because resources have already been
    released.
    ```
    java.lang.NullPointerException: Cannot invoke 
"org.apache.doris.qe.ConnectContext.getState()" because "this.ctx" is null
            at 
org.apache.doris.job.extensions.insert.InsertTask.run(InsertTask.java:200) 
~[classes/:?]
            at 
org.apache.doris.job.task.AbstractTask.runTask(AbstractTask.java:167) 
~[classes/:?]
            at 
org.apache.doris.job.executor.DefaultTaskExecutorHandler.onEvent(DefaultTaskExecutorHandler.java:50)
 ~[classes/:?]
            at 
org.apache.doris.job.executor.DefaultTaskExecutorHandler.onEvent(DefaultTaskExecutorHandler.java:33)
 ~[classes/:?]
            at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143) 
~[disruptor-3.4.4.jar:?]
            at java.lang.Thread.run(Thread.java:833) ~[?:?]15e8716a7ab9">
    ```
    
    - The RESUME job does not immediately schedule the job.
---
 .../src/main/java/org/apache/doris/job/base/AbstractJob.java  |  8 +++++---
 .../org/apache/doris/job/extensions/insert/InsertTask.java    |  3 +++
 .../main/java/org/apache/doris/job/manager/JobManager.java    |  3 +++
 .../src/main/java/org/apache/doris/job/task/AbstractTask.java |  3 +++
 regression-test/suites/job_p0/test_base_insert_job.groovy     | 11 ++++++++++-
 5 files changed, 24 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 62ac0c4d59d..906b86494fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -155,6 +155,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         }
         for (T task : runningTasks) {
             task.cancel();
+            canceledTaskCount.incrementAndGet();
         }
         runningTasks = new CopyOnWriteArrayList<>();
         logUpdateOperation();
@@ -185,6 +186,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         runningTasks.stream().filter(task -> 
task.getTaskId().equals(taskId)).findFirst()
                 .orElseThrow(() -> new JobException("Not found task id: " + 
taskId)).cancel();
         runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
+        canceledTaskCount.incrementAndGet();
         if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
             updateJobStatus(JobStatus.FINISHED);
         }
@@ -418,13 +420,13 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
     /**
      * Generates a common error message when the execution queue is full.
      *
-     * @param taskId                The ID of the task.
-     * @param queueConfigName       The name of the queue configuration.
+     * @param taskId                  The ID of the task.
+     * @param queueConfigName         The name of the queue configuration.
      * @param executeThreadConfigName The name of the execution thread 
configuration.
      * @return A formatted error message.
      */
     protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String 
queueConfigName,
-                                                            String 
executeThreadConfigName) {
+                                                         String 
executeThreadConfigName) {
         return String.format("Dispatch task failed, jobId: %d, jobName: %s, 
taskId: %d, the queue size is full, "
                         + "you can increase the queue size by setting the 
property "
                         + "%s in the fe.conf file or increase the value of "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index c997ebcd30e..23a367d5d6e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -209,6 +209,9 @@ public class InsertTask extends AbstractTask {
 
     @Override
     public void onFail() throws JobException {
+        if (isCanceled.get()) {
+            return;
+        }
         isFinished.set(true);
         super.onFail();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 39646bab18f..47a3a0c5c19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -189,6 +189,9 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
     public void alterJobStatus(Long jobId, JobStatus status) throws 
JobException {
         checkJobExist(jobId);
         jobMap.get(jobId).updateJobStatus(status);
+        if (status.equals(JobStatus.RUNNING)) {
+            jobScheduler.scheduleOneJob(jobMap.get(jobId));
+        }
         jobMap.get(jobId).logUpdateOperation();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index f78446aaf85..8a230c0bd38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -167,6 +167,9 @@ public abstract class AbstractTask implements Task {
             run();
             onSuccess();
         } catch (Exception e) {
+            if (TaskStatus.CANCELED.equals(status)) {
+                return;
+            }
             this.errMsg = e.getMessage();
             onFail();
             log.warn("execute task error, job id is {}, task id is {}", jobId, 
taskId, e);
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy 
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index 19f4422d64f..8a0bb34ca43 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -219,9 +219,11 @@ suite("test_base_insert_job") {
         RESUME JOB where jobname =  '${jobName}'
     """
     println(tasks.size())
+    // test resume job success
     Awaitility.await("resume-job-test").atMost(60, SECONDS).until({
         def afterResumeTasks = sql """ select status from 
tasks("type"="insert") where JobName= '${jobName}'   """
         println "resume tasks :" + afterResumeTasks
+        //resume tasks size should be greater than before pause
         afterResumeTasks.size() > tasks.size()
     })
 
@@ -247,7 +249,6 @@ suite("test_base_insert_job") {
             CREATE JOB ${jobName}  ON SCHEDULE at '2023-11-13 14:18:07'   
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
         """
     } catch (Exception e) {
-        println e.getMessage()
         assert e.getMessage().contains("startTimeMs must be greater than 
current time")
     }
     // assert end time less than start time
@@ -281,6 +282,14 @@ suite("test_base_insert_job") {
     } catch (Exception e) {
         assert e.getMessage().contains("Invalid interval time unit: years")
     }
+    // assert interval time unit is -1
+    try {
+        sql """
+            CREATE JOB test_error_starts  ON SCHEDULE every -1 second    
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
+        """
+    } catch (Exception e) {
+        assert e.getMessage().contains("expecting INTEGER_VALUE")
+    }
 
     // test keyword as job name
     sql """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to