This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 447a3e9eb64 [Fix](Job)Fix some issues in the Insert job. (#44543) 447a3e9eb64 is described below commit 447a3e9eb64ca44f31d0037ed05c5bdf80b2d4f2 Author: Calvin Kirs <guoqi...@selectdb.com> AuthorDate: Tue Nov 26 11:34:37 2024 +0800 [Fix](Job)Fix some issues in the Insert job. (#44543) ### What problem does this PR solve? - The job does not account for tasks in the Canceled state. - When a job is canceled, its status is marked as FAILED, and a NullPointerException (NPE) occurs because resources have already been released. ``` java.lang.NullPointerException: Cannot invoke "org.apache.doris.qe.ConnectContext.getState()" because "this.ctx" is null at org.apache.doris.job.extensions.insert.InsertTask.run(InsertTask.java:200) ~[classes/:?] at org.apache.doris.job.task.AbstractTask.runTask(AbstractTask.java:167) ~[classes/:?] at org.apache.doris.job.executor.DefaultTaskExecutorHandler.onEvent(DefaultTaskExecutorHandler.java:50) ~[classes/:?] at org.apache.doris.job.executor.DefaultTaskExecutorHandler.onEvent(DefaultTaskExecutorHandler.java:33) ~[classes/:?] at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143) ~[disruptor-3.4.4.jar:?] at java.lang.Thread.run(Thread.java:833) ~[?:?]15e8716a7ab9"> ``` - The RESUME job does not immediately schedule the job. --- .../src/main/java/org/apache/doris/job/base/AbstractJob.java | 8 +++++--- .../org/apache/doris/job/extensions/insert/InsertTask.java | 3 +++ .../main/java/org/apache/doris/job/manager/JobManager.java | 3 +++ .../src/main/java/org/apache/doris/job/task/AbstractTask.java | 3 +++ regression-test/suites/job_p0/test_base_insert_job.groovy | 11 ++++++++++- 5 files changed, 24 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 62ac0c4d59d..906b86494fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -155,6 +155,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C } for (T task : runningTasks) { task.cancel(); + canceledTaskCount.incrementAndGet(); } runningTasks = new CopyOnWriteArrayList<>(); logUpdateOperation(); @@ -185,6 +186,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst() .orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(); runningTasks.removeIf(task -> task.getTaskId().equals(taskId)); + canceledTaskCount.incrementAndGet(); if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) { updateJobStatus(JobStatus.FINISHED); } @@ -418,13 +420,13 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C /** * Generates a common error message when the execution queue is full. * - * @param taskId The ID of the task. - * @param queueConfigName The name of the queue configuration. + * @param taskId The ID of the task. + * @param queueConfigName The name of the queue configuration. * @param executeThreadConfigName The name of the execution thread configuration. * @return A formatted error message. */ protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String queueConfigName, - String executeThreadConfigName) { + String executeThreadConfigName) { return String.format("Dispatch task failed, jobId: %d, jobName: %s, taskId: %d, the queue size is full, " + "you can increase the queue size by setting the property " + "%s in the fe.conf file or increase the value of " diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index c997ebcd30e..23a367d5d6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -209,6 +209,9 @@ public class InsertTask extends AbstractTask { @Override public void onFail() throws JobException { + if (isCanceled.get()) { + return; + } isFinished.set(true); super.onFail(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 39646bab18f..47a3a0c5c19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -189,6 +189,9 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable { public void alterJobStatus(Long jobId, JobStatus status) throws JobException { checkJobExist(jobId); jobMap.get(jobId).updateJobStatus(status); + if (status.equals(JobStatus.RUNNING)) { + jobScheduler.scheduleOneJob(jobMap.get(jobId)); + } jobMap.get(jobId).logUpdateOperation(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index f78446aaf85..8a230c0bd38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -167,6 +167,9 @@ public abstract class AbstractTask implements Task { run(); onSuccess(); } catch (Exception e) { + if (TaskStatus.CANCELED.equals(status)) { + return; + } this.errMsg = e.getMessage(); onFail(); log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e); diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index 19f4422d64f..8a0bb34ca43 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -219,9 +219,11 @@ suite("test_base_insert_job") { RESUME JOB where jobname = '${jobName}' """ println(tasks.size()) + // test resume job success Awaitility.await("resume-job-test").atMost(60, SECONDS).until({ def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """ println "resume tasks :" + afterResumeTasks + //resume tasks size should be greater than before pause afterResumeTasks.size() > tasks.size() }) @@ -247,7 +249,6 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - println e.getMessage() assert e.getMessage().contains("startTimeMs must be greater than current time") } // assert end time less than start time @@ -281,6 +282,14 @@ suite("test_base_insert_job") { } catch (Exception e) { assert e.getMessage().contains("Invalid interval time unit: years") } + // assert interval time unit is -1 + try { + sql """ + CREATE JOB test_error_starts ON SCHEDULE every -1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + """ + } catch (Exception e) { + assert e.getMessage().contains("expecting INTEGER_VALUE") + } // test keyword as job name sql """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org