This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 84ea93652d9 [Fix](Job)Incorrect task query result of insert type (#30024) 84ea93652d9 is described below commit 84ea93652d93ca8e315f31f85fb0645c3e003611 Author: Calvin Kirs <acm_mas...@163.com> AuthorDate: Mon Jan 22 14:46:40 2024 +0800 [Fix](Job)Incorrect task query result of insert type (#30024) - IdToTask has no persistence, so the queried task will be lost once it is restarted. - The cancel task does not update metadata after being removed from the running task. - tvf displays an error when some fields in the query task result are empty - cycle scheduling job should not be STOP when task fail --- .../org/apache/doris/job/base/AbstractJob.java | 15 ++++--- .../doris/job/extensions/insert/InsertJob.java | 46 ++++++++++++++++------ .../doris/job/extensions/insert/InsertTask.java | 30 ++++++++------ .../org/apache/doris/job/manager/JobManager.java | 23 +++++++---- .../apache/doris/job/scheduler/JobScheduler.java | 6 +-- .../suites/job_p0/test_base_insert_job.groovy | 24 +++++++---- 6 files changed, 95 insertions(+), 49 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 2416a6bca5f..091ac158c1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -166,7 +166,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C throw new JobException("no running task"); } runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst() - .orElseThrow(() -> new JobException("no task id: " + taskId)).cancel(); + .orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(); runningTasks.removeIf(task -> task.getTaskId().equals(taskId)); if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) { updateJobStatus(JobStatus.FINISHED); @@ -289,19 +289,19 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C @Override public void onTaskFail(T task) throws JobException { - updateJobStatusIfEnd(); + updateJobStatusIfEnd(false); runningTasks.remove(task); } @Override public void onTaskSuccess(T task) throws JobException { - updateJobStatusIfEnd(); + updateJobStatusIfEnd(true); runningTasks.remove(task); } - private void updateJobStatusIfEnd() throws JobException { + private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException { JobExecuteType executeType = getJobConfig().getExecuteType(); if (executeType.equals(JobExecuteType.MANUAL)) { return; @@ -309,7 +309,12 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C switch (executeType) { case ONE_TIME: case INSTANT: - Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED); + this.finishTimeMs = System.currentTimeMillis(); + if (taskSuccess) { + Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED); + } else { + Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.STOPPED); + } break; case RECURRING: TimerDefinition timerDefinition = getJobConfig().getTimerDefinition(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index ce918c426f8..15e0c37987f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -30,6 +30,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; @@ -42,6 +43,7 @@ import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; import org.apache.doris.load.FailMsg; +import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.mysql.privilege.Privilege; @@ -272,14 +274,15 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl } if (CollectionUtils.isEmpty(historyTaskIdList)) { historyTaskIdList = new ConcurrentLinkedQueue<>(); - Env.getCurrentEnv().getEditLog().logUpdateJob(this); historyTaskIdList.add(id); + Env.getCurrentEnv().getEditLog().logUpdateJob(this); return; } historyTaskIdList.add(id); if (historyTaskIdList.size() >= Config.max_persistence_task_count) { historyTaskIdList.poll(); } + Env.getCurrentEnv().getEditLog().logUpdateJob(this); } @Override @@ -320,22 +323,44 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl } //TODO it's will be refactor, we will storage task info in job inner and query from it List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList); + if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { + Collections.reverse(taskIdList); + return queryLoadTasksByTaskIds(taskIdList); + } + List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList); + if (CollectionUtils.isEmpty(loadJobs)) { + return new ArrayList<>(); + } + List<InsertTask> tasks = new ArrayList<>(); + loadJobs.forEach(loadJob -> { + InsertTask task; + try { + task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser()); + task.setCreateTimeMs(loadJob.getCreateTimestamp()); + } catch (MetaNotFoundException e) { + log.warn("load job not found, job id is {}", loadJob.getId()); + return; + } + task.setJobId(getJobId()); + task.setTaskId(loadJob.getId()); + task.setJobInfo(loadJob); + tasks.add(task); + }); + return tasks; - Collections.reverse(taskIdList); - return queryLoadTasksByTaskIds(taskIdList); } public List<InsertTask> queryLoadTasksByTaskIds(List<Long> taskIdList) { if (taskIdList.isEmpty()) { return new ArrayList<>(); } - List<InsertTask> jobs = new ArrayList<>(); + List<InsertTask> tasks = new ArrayList<>(); taskIdList.forEach(id -> { if (null != idToTasks.get(id)) { - jobs.add(idToTasks.get(id)); + tasks.add(idToTasks.get(id)); } }); - return jobs; + return tasks; } @Override @@ -354,14 +379,11 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl } @Override - public void onTaskFail(InsertTask task) { - try { - updateJobStatus(JobStatus.STOPPED); + public void onTaskFail(InsertTask task) throws JobException { + if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, task.getErrMsg()); - } catch (JobException e) { - throw new RuntimeException(e); } - getRunningTasks().remove(task); + super.onTaskFail(task); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index e85e7a1b027..b5d8ea7fc17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -21,11 +21,11 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.load.FailMsg; +import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; @@ -89,7 +89,7 @@ public class InsertTask extends AbstractTask { @Getter @Setter - private InsertJob jobInfo; + private LoadJob jobInfo; private TaskType taskType = TaskType.PENDING; private MergeType mergeType = MergeType.APPEND; @@ -127,7 +127,7 @@ public class InsertTask extends AbstractTask { } public InsertTask(String labelName, InsertIntoTableCommand insertInto, - ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) { + ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) { this.labelName = labelName; this.command = insertInto; this.userIdentity = ctx.getCurrentUserIdentity(); @@ -216,23 +216,27 @@ public class InsertTask extends AbstractTask { // if task not start, load job is null,return pending task show info return getPendingTaskTVFInfo(); } - trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getJobId()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); trow.addToColumnValue(new TCell().setStringVal(labelName)); - trow.addToColumnValue(new TCell().setStringVal(jobInfo.getJobStatus().name())); + trow.addToColumnValue(new TCell().setStringVal(jobInfo.getState().name())); // err msg - String errMsg = FeConstants.null_string; + String errMsg = ""; if (failMsg != null) { errMsg = "type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg(); } trow.addToColumnValue(new TCell().setStringVal(errMsg)); // create time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimestamp()))); // load end time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimestamp()))); // tracking url trow.addToColumnValue(new TCell().setStringVal(trackingUrl)); - trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + if (null != loadStatistic) { + trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + } else { + trow.addToColumnValue(new TCell().setStringVal("")); + } trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); return trow; } @@ -244,11 +248,11 @@ public class InsertTask extends AbstractTask { trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId())); trow.addToColumnValue(new TCell().setStringVal(getStatus().name())); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal("")); + trow.addToColumnValue(new TCell().setStringVal("")); + trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index b069fd5eca6..7e8b01ce287 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -137,16 +137,21 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable { * @param ifExists is is true, if job not exist,we will ignore job not exist exception, else throw exception */ public void unregisterJob(String jobName, boolean ifExists) throws JobException { - T dropJob = null; - for (T job : jobMap.values()) { - if (job.getJobName().equals(jobName)) { - dropJob = job; + try { + T dropJob = null; + for (T job : jobMap.values()) { + if (job.getJobName().equals(jobName)) { + dropJob = job; + } } + if (dropJob == null && ifExists) { + return; + } + dropJob(dropJob, jobName); + } catch (Exception e) { + log.error("drop job error, jobName:" + jobName, e); + throw new JobException("unregister job error, jobName:" + jobName); } - if (dropJob == null && ifExists) { - return; - } - dropJob(dropJob, jobName); } private void dropJob(T dropJob, String jobName) throws JobException { @@ -284,6 +289,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable { for (T job : jobMap.values()) { if (job.getJobName().equals(jobName)) { job.cancelTaskById(taskId); + job.logUpdateOperation(); return; } } @@ -378,6 +384,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable { } } + //todo it's not belong to JobManager public void cancelLoadJob(CancelLoadStmt cs) throws JobException, AnalysisException, DdlException { String dbName = cs.getDbName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 597e39d96ed..a104d3895e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -184,8 +184,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { } for (Map.Entry<Long, T> entry : jobMap.entrySet()) { T job = entry.getValue(); - if (job.getJobStatus().equals(JobStatus.FINISHED)) { - clearFinishedJob(job); + if (job.getJobStatus().equals(JobStatus.FINISHED) || job.getJobStatus().equals(JobStatus.STOPPED)) { + clearEndJob(job); continue; } if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) { @@ -195,7 +195,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { } } - private void clearFinishedJob(T job) { + private void clearEndJob(T job) { if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < System.currentTimeMillis()) { return; } diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index f4db5907fa2..d9ebb832152 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -71,9 +71,18 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ Thread.sleep(2500) - def jobs = sql """select * from ${tableName}""" - println jobs - assert 3 >= jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 records + sql """ + PAUSE JOB where jobname = '${jobName}' + """ + def tblDatas = sql """select * from ${tableName}""" + println tblDatas + assert 3 >= tblDatas.size() >= (2 as Boolean) //at least 2 records, some times 3 records + def pauseJobId = sql """select id from jobs("type"="insert") where Name='${jobName}'""" + def taskStatus = sql """select status from tasks("type"="insert") where jobid= '${pauseJobId.get(0).get(0)}'""" + println taskStatus + for (int i = 0; i < taskStatus.size(); i++) { + assert taskStatus.get(i).get(0) != "FAILED"||taskStatus.get(i).get(0) != "STOPPED"||taskStatus.get(i).get(0) != "STOPPED" + } sql """ CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ @@ -132,9 +141,8 @@ suite("test_base_insert_job") { sql """cancel task where jobName='${jobName}' and taskId= ${taskId}""" def cancelTask = sql """ select status from tasks("type"="insert") where jobid= ${onceJobId}""" println cancelTask - //check task status - assert cancelTask.size() == 1 - assert cancelTask.get(0).get(0) == "CANCELED" + //check task size is 0, cancel task where be deleted + assert cancelTask.size() == 0 // check table data def dataCount1 = sql """select count(1) from ${tableName}""" assert dataCount1.get(0).get(0) == 0 @@ -161,14 +169,14 @@ suite("test_base_insert_job") { assert job.size() == 1 def jobId = job.get(0).get(0); def tasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """ - assert tasks.size() == 1 + assert tasks.size() == 0 sql """ RESUME JOB where jobname = '${jobName}' """ Thread.sleep(2500) def resumeTasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """ println resumeTasks - assert resumeTasks.size() == 2 + assert resumeTasks.size() == 1 // assert same job name try { sql """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org