This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3fd3dfe16f6c2d4289fdc91c96f3426be6a77e8f Author: Calvin Kirs <[email protected]> AuthorDate: Tue Apr 30 10:51:24 2024 +0800 [Feat](Job) Job supports task execution statistics (#34109) * Support statistics * - Fix Failed task not showing up in the task list - Task metadata add jobName - Fix Finished job clear time error - Job metadata add successCount, failedCount, totalTaskCount * add test --- .../org/apache/doris/job/base/AbstractJob.java | 35 +++++- .../doris/job/extensions/insert/InsertJob.java | 140 ++++++++++++--------- .../doris/job/extensions/insert/InsertTask.java | 42 ++++--- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 4 +- .../org/apache/doris/job/manager/JobManager.java | 5 +- .../apache/doris/job/scheduler/JobScheduler.java | 2 +- .../org/apache/doris/job/task/AbstractTask.java | 5 +- .../main/java/org/apache/doris/job/task/Task.java | 2 +- .../doris/tablefunction/MetadataGenerator.java | 2 +- .../suites/job_p0/test_base_insert_job.groovy | 19 ++- 10 files changed, 157 insertions(+), 99 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 8c1d6da3a63..3f595d6daf5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -51,6 +51,7 @@ import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -58,7 +59,11 @@ import java.util.stream.Collectors; @Data @Log4j2 public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C>, Writable { - + public static final ImmutableList<Column> COMMON_SCHEMA = ImmutableList.of( + new Column("SucceedTaskCount", ScalarType.createStringType()), + new Column("FailedTaskCount", ScalarType.createStringType()), + new Column("CanceledTaskCount", ScalarType.createStringType()) + ); @SerializedName(value = "jid") private Long jobId; @@ -92,6 +97,16 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C @SerializedName(value = "sql") String executeSql; + + @SerializedName(value = "stc") + private AtomicLong succeedTaskCount = new AtomicLong(0); + + @SerializedName(value = "ftc") + private AtomicLong failedTaskCount = new AtomicLong(0); + + @SerializedName(value = "ctc") + private AtomicLong canceledTaskCount = new AtomicLong(0); + public AbstractJob() { } @@ -142,6 +157,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C task.cancel(); } runningTasks = new CopyOnWriteArrayList<>(); + logUpdateOperation(); } private static final ImmutableList<String> TITLE_NAMES = @@ -290,14 +306,18 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C @Override public void onTaskFail(T task) throws JobException { + failedTaskCount.incrementAndGet(); updateJobStatusIfEnd(false); runningTasks.remove(task); + logUpdateOperation(); } @Override public void onTaskSuccess(T task) throws JobException { + succeedTaskCount.incrementAndGet(); updateJobStatusIfEnd(true); runningTasks.remove(task); + logUpdateOperation(); } @@ -309,12 +329,15 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C } switch (executeType) { case ONE_TIME: + updateJobStatus(JobStatus.FINISHED); + this.finishTimeMs = System.currentTimeMillis(); + break; case INSTANT: this.finishTimeMs = System.currentTimeMillis(); if (taskSuccess) { - Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED); + updateJobStatus(JobStatus.FINISHED); } else { - Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.STOPPED); + updateJobStatus(JobStatus.STOPPED); } break; case RECURRING: @@ -322,7 +345,8 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C if (null != timerDefinition.getEndTimeMs() && timerDefinition.getEndTimeMs() < System.currentTimeMillis() + timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval())) { - Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED); + this.finishTimeMs = System.currentTimeMillis(); + updateJobStatus(JobStatus.FINISHED); } break; default: @@ -360,6 +384,9 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C trow.addToColumnValue(new TCell().setStringVal(jobStatus.name())); trow.addToColumnValue(new TCell().setStringVal(executeSql)); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(createTimeMs))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(succeedTaskCount.get()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(failedTaskCount.get()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(canceledTaskCount.get()))); trow.addToColumnValue(new TCell().setStringVal(comment)); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index a9e6bd4fc40..47d52c170b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -30,7 +30,6 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LabelAlreadyUsedException; -import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; @@ -67,38 +66,39 @@ import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @EqualsAndHashCode(callSuper = true) @Data -@Slf4j +@Log4j2 public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> implements GsonPostProcessable { - public static final ImmutableList<Column> SCHEMA = ImmutableList.of( - new Column("Id", ScalarType.createStringType()), - new Column("Name", ScalarType.createStringType()), - new Column("Definer", ScalarType.createStringType()), - new Column("ExecuteType", ScalarType.createStringType()), - new Column("RecurringStrategy", ScalarType.createStringType()), - new Column("Status", ScalarType.createStringType()), - new Column("ExecuteSql", ScalarType.createStringType()), - new Column("CreateTime", ScalarType.createStringType()), - new Column("Comment", ScalarType.createStringType())); + public static final ImmutableList<Column> SCHEMA = ImmutableList.<Column>builder() + .add(new Column("Id", ScalarType.createStringType())) + .add(new Column("Name", ScalarType.createStringType())) + .add(new Column("Definer", ScalarType.createStringType())) + .add(new Column("ExecuteType", ScalarType.createStringType())) + .add(new Column("RecurringStrategy", ScalarType.createStringType())) + .add(new Column("Status", ScalarType.createStringType())) + .add(new Column("ExecuteSql", ScalarType.createStringType())) + .add(new Column("CreateTime", ScalarType.createStringType())) + .addAll(COMMON_SCHEMA) + .add(new Column("Comment", ScalarType.createStringType())) + .build(); private static final ShowResultSetMetaData TASK_META_DATA = ShowResultSetMetaData.builder() @@ -126,8 +126,10 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl COLUMN_TO_INDEX = builder.build(); } + //we used insertTaskQueue to store the task info, and we will query the task info from it + @Deprecated @SerializedName("tis") - ConcurrentLinkedQueue<Long> historyTaskIdList; + ConcurrentLinkedQueue<Long> historyTaskIdList = new ConcurrentLinkedQueue<>(); @SerializedName("did") private final long dbId; @SerializedName("ln") @@ -146,7 +148,9 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl private List<InsertIntoTableCommand> plans = new ArrayList<>(); private LoadStatistic loadStatistic = new LoadStatistic(); private Set<Long> finishedTaskIds = new HashSet<>(); - private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>(); + + @SerializedName("tas") + private ConcurrentLinkedQueue<InsertTask> insertTaskQueue = new ConcurrentLinkedQueue<>(); private Map<String, String> properties = new HashMap<>(); private Set<String> tableNames; private AuthorizationInfo authorizationInfo; @@ -164,8 +168,8 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl if (null == plans) { plans = new ArrayList<>(); } - if (null == idToTasks) { - idToTasks = new ConcurrentHashMap<>(); + if (null == insertTaskQueue) { + insertTaskQueue = new ConcurrentLinkedQueue<>(); } if (null == loadStatistic) { loadStatistic = new LoadStatistic(); @@ -182,6 +186,15 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl if (null == historyTaskIdList) { historyTaskIdList = new ConcurrentLinkedQueue<>(); } + if (null == getSucceedTaskCount()) { + setSucceedTaskCount(new AtomicLong(0)); + } + if (null == getFailedTaskCount()) { + setFailedTaskCount(new AtomicLong(0)); + } + if (null == getCanceledTaskCount()) { + setCanceledTaskCount(new AtomicLong(0)); + } } /** @@ -250,9 +263,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl List<InsertTask> newTasks = new ArrayList<>(); if (plans.isEmpty()) { InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser()); - idToTasks.put(task.getTaskId(), task); newTasks.add(task); - recordTask(task.getTaskId()); } else { // use for load stmt for (InsertIntoTableCommand logicalPlan : plans) { @@ -260,28 +271,24 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl throw new IllegalArgumentException("Load plan need label name."); } InsertTask task = new InsertTask(logicalPlan, ctx, stmtExecutor, loadStatistic); - idToTasks.put(task.getTaskId(), task); newTasks.add(task); - recordTask(task.getTaskId()); } } initTasks(newTasks, taskType); + recordTasks(newTasks); return new ArrayList<>(newTasks); } - public void recordTask(long id) { + public void recordTasks(List<InsertTask> tasks) { if (Config.max_persistence_task_count < 1) { return; } - if (CollectionUtils.isEmpty(historyTaskIdList)) { - historyTaskIdList = new ConcurrentLinkedQueue<>(); - historyTaskIdList.add(id); - Env.getCurrentEnv().getEditLog().logUpdateJob(this); - return; - } - historyTaskIdList.add(id); - if (historyTaskIdList.size() >= Config.max_persistence_task_count) { - historyTaskIdList.poll(); + insertTaskQueue.addAll(tasks); + + while (insertTaskQueue.size() > Config.max_persistence_task_count) { + insertTaskQueue.poll(); + //since we have insertTaskQueue, we do not need to store the task id in historyTaskIdList, so we clear it + historyTaskIdList.clear(); } Env.getCurrentEnv().getEditLog().logUpdateJob(this); } @@ -319,35 +326,54 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl @Override public List<InsertTask> queryTasks() { - if (CollectionUtils.isEmpty(historyTaskIdList)) { + if (historyTaskIdList.isEmpty() && insertTaskQueue.isEmpty()) { return new ArrayList<>(); } + //TODO it's will be refactor, we will storage task info in job inner and query from it - List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList); + + // merge task info from insertTaskQueue and historyTaskIdList + List<Long> taskIds = insertTaskQueue.stream().map(InsertTask::getTaskId).collect(Collectors.toList()); + taskIds.addAll(historyTaskIdList); + taskIds.stream().distinct().collect(Collectors.toList()); if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { - Collections.reverse(taskIdList); - return queryLoadTasksByTaskIds(taskIdList); + return queryLoadTasksByTaskIds(taskIds); } - List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList); - if (CollectionUtils.isEmpty(loadJobs)) { - return new ArrayList<>(); + // query from load job + List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds); + + Map<Long, LoadJob> loadJobMap = loadJobs.stream().collect(Collectors.toMap(LoadJob::getId, loadJob -> loadJob)); + List<InsertTask> tasksRsp = new ArrayList<>(); + //read task info from insertTaskQueue + insertTaskQueue.forEach(task -> { + if (task.getJobInfo() == null) { + LoadJob loadJob = loadJobMap.get(task.getTaskId()); + if (loadJob != null) { + task.setJobInfo(loadJob); + } + } + tasksRsp.add(task); + }); + if (CollectionUtils.isEmpty(historyTaskIdList)) { + return tasksRsp; } - List<InsertTask> tasks = new ArrayList<>(); - loadJobs.forEach(loadJob -> { - InsertTask task; - try { - task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser()); - task.setCreateTimeMs(loadJob.getCreateTimestamp()); - } catch (MetaNotFoundException e) { - log.warn("load job not found, job id is {}", loadJob.getId()); + + historyTaskIdList.forEach(historyTaskId -> { + LoadJob loadJob = loadJobMap.get(historyTaskId); + if (null == loadJob) { return; } + InsertTask task = new InsertTask(loadJob.getLabel(), getCurrentDbName(), null, getCreateUser()); task.setJobId(getJobId()); task.setTaskId(loadJob.getId()); task.setJobInfo(loadJob); - tasks.add(task); + task.setJobId(getJobId()); + task.setTaskId(loadJob.getId()); + task.setJobInfo(loadJob); + tasksRsp.add(task); }); - return tasks; + return tasksRsp; + } @@ -355,13 +381,13 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl if (taskIdList.isEmpty()) { return new ArrayList<>(); } - List<InsertTask> tasks = new ArrayList<>(); - taskIdList.forEach(id -> { - if (null != idToTasks.get(id)) { - tasks.add(idToTasks.get(id)); + List<InsertTask> queryTasks = new ArrayList<>(); + insertTaskQueue.forEach(task -> { + if (taskIdList.contains(task.getTaskId())) { + queryTasks.add(task); } }); - return tasks; + return queryTasks; } @Override @@ -462,7 +488,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl // load end time jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs())); // tracking urls - List<String> trackingUrl = idToTasks.values().stream() + List<String> trackingUrl = insertTaskQueue.stream() .map(task -> { if (StringUtils.isNotEmpty(task.getTrackingUrl())) { return task.getTrackingUrl(); @@ -527,7 +553,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, long scannedBytes, boolean isDone) { loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone); - progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 100); + progress = (int) ((double) finishedTaskIds.size() / insertTaskQueue.size() * 100); if (progress == 100) { progress = 99; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 8fe786555ce..6e6f59758b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -37,6 +37,7 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.gson.annotations.SerializedName; import lombok.Getter; import lombok.Setter; import lombok.extern.log4j.Log4j2; @@ -52,11 +53,12 @@ public class InsertTask extends AbstractTask { public static final ImmutableList<Column> SCHEMA = ImmutableList.of( new Column("TaskId", ScalarType.createStringType()), new Column("JobId", ScalarType.createStringType()), + new Column("JobName", ScalarType.createStringType()), new Column("Label", ScalarType.createStringType()), new Column("Status", ScalarType.createStringType()), new Column("ErrorMsg", ScalarType.createStringType()), - new Column("CreateTimeMs", ScalarType.createStringType()), - new Column("FinishTimeMs", ScalarType.createStringType()), + new Column("CreateTime", ScalarType.createStringType()), + new Column("FinishTime", ScalarType.createStringType()), new Column("TrackingUrl", ScalarType.createStringType()), new Column("LoadStatistic", ScalarType.createStringType()), new Column("User", ScalarType.createStringType())); @@ -64,7 +66,7 @@ public class InsertTask extends AbstractTask { public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX; static { - ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder(); + ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>(); for (int i = 0; i < SCHEMA.size(); i++) { builder.put(SCHEMA.get(i).getName().toLowerCase(), i); } @@ -77,6 +79,7 @@ public class InsertTask extends AbstractTask { private ConnectContext ctx; private String sql; private String currentDb; + @SerializedName(value = "uif") private UserIdentity userIdentity; private LoadStatistic loadStatistic; private AtomicBoolean isCanceled = new AtomicBoolean(false); @@ -211,42 +214,51 @@ public class InsertTask extends AbstractTask { } @Override - public TRow getTvfInfo() { + public TRow getTvfInfo(String jobName) { TRow trow = new TRow(); if (jobInfo == null) { // if task not start, load job is null,return pending task show info - return getPendingTaskTVFInfo(); + return getPendingTaskTVFInfo(jobName); } trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); - trow.addToColumnValue(new TCell().setStringVal(labelName)); + trow.addToColumnValue(new TCell().setStringVal(jobName)); + trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId())); trow.addToColumnValue(new TCell().setStringVal(jobInfo.getState().name())); // err msg - String errMsg = ""; + String errorMsg = ""; if (failMsg != null) { - errMsg = "type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg(); + errorMsg = failMsg.getMsg(); + } + if (StringUtils.isNotBlank(getErrMsg())) { + errorMsg = getErrMsg(); } - trow.addToColumnValue(new TCell().setStringVal(errMsg)); + trow.addToColumnValue(new TCell().setStringVal(errorMsg)); // create time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimestamp()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); // load end time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimestamp()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs()))); // tracking url trow.addToColumnValue(new TCell().setStringVal(trackingUrl)); - if (null != loadStatistic) { - trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + if (null != jobInfo.getLoadStatistic()) { + trow.addToColumnValue(new TCell().setStringVal(jobInfo.getLoadStatistic().toJson())); } else { trow.addToColumnValue(new TCell().setStringVal("")); } - trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); + if (userIdentity == null) { + trow.addToColumnValue(new TCell().setStringVal("")); + } else { + trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); + } return trow; } // if task not start, load job is null,return pending task show info - private TRow getPendingTaskTVFInfo() { + private TRow getPendingTaskTVFInfo(String jobName) { TRow trow = new TRow(); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getTaskId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); + trow.addToColumnValue(new TCell().setStringVal(jobName)); trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId())); trow.addToColumnValue(new TCell().setStringVal(getStatus().name())); trow.addToColumnValue(new TCell().setStringVal("")); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index f67144cd0d3..9e872cf034c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -299,11 +299,11 @@ public class MTMVTask extends AbstractTask { } @Override - public TRow getTvfInfo() { + public TRow getTvfInfo(String jobName) { TRow trow = new TRow(); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getTaskId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId()))); - trow.addToColumnValue(new TCell().setStringVal(super.getJobName())); + trow.addToColumnValue(new TCell().setStringVal(jobName)); String dbName = ""; String mvName = ""; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index d8a30a968a6..4eb333426c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -29,8 +29,6 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.LogBuilder; -import org.apache.doris.common.util.LogKey; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.common.JobStatus; @@ -267,8 +265,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable { **/ public void replayUpdateJob(T job) { jobMap.put(job.getJobId(), job); - log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) - .add("msg", "replay update scheduler job").build()); + job.logUpdateOperation(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index a104d3895e1..2100511d22b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -196,7 +196,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { } private void clearEndJob(T job) { - if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < System.currentTimeMillis()) { + if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS > System.currentTimeMillis()) { return; } try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 71f6ff1c4f7..7bd2e58f87e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -63,7 +63,7 @@ public abstract class AbstractTask implements Task { } @Override - public void onFail(String msg) throws JobException { + public void onFail() throws JobException { status = TaskStatus.FAILED; if (!isCallable()) { return; @@ -72,12 +72,13 @@ public abstract class AbstractTask implements Task { } @Override - public void onFail() throws JobException { + public void onFail(String errMsg) throws JobException { if (TaskStatus.CANCELED.equals(status)) { return; } status = TaskStatus.FAILED; setFinishTimeMs(System.currentTimeMillis()); + setErrMsg(errMsg); if (!isCallable()) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java index 8e82984c2f0..ee205c55c31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java @@ -70,5 +70,5 @@ public interface Task { * get info for tvf `tasks` * @return TRow */ - TRow getTvfInfo(); + TRow getTvfInfo(String jobName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 50d97e69705..d5b43b6cfff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -809,7 +809,7 @@ public class MetadataGenerator { } List<AbstractTask> tasks = job.queryAllTasks(); for (AbstractTask task : tasks) { - TRow tvfInfo = task.getTvfInfo(); + TRow tvfInfo = task.getTvfInfo(job.getJobName()); if (tvfInfo != null) { dataBatch.add(tvfInfo); } diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index 8fe7c0e7749..3f5cd5692f9 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -118,13 +118,11 @@ suite("test_base_insert_job") { """ Thread.sleep(2000) - def onceJob = sql """ select id,ExecuteSql from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ + def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ assert onceJob.size() == 1 - def onceJobId = onceJob.get(0).get(0); - def onceJobSql = onceJob.get(0).get(1); - println onceJobSql - // test cancel task - def datas = sql """select status,taskid from tasks("type"="insert") where jobid= ${onceJobId}""" + //check succeed task count + assert '1' == onceJob.get(0).get(0) + def datas = sql """select status,taskid from tasks("type"="insert") where jobName= '${jobName}'""" println datas assert datas.size() == 1 assert datas.get(0).get(0) == "FINISHED" @@ -154,7 +152,7 @@ suite("test_base_insert_job") { DROP JOB IF EXISTS where jobname = '${jobName}' """ sql """ - CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19',5, 1001); + CREATE JOB ${jobName} ON SCHEDULE every 1 second starts current_timestamp comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19',5, 1001); """ Thread.sleep(2000) @@ -162,15 +160,12 @@ suite("test_base_insert_job") { sql """ PAUSE JOB where jobname = '${jobName}' """ - def job = sql """ select id,ExecuteSql from jobs("type"="insert") where Name like '%${jobName}%' """ - assert job.size() == 1 - def jobId = job.get(0).get(0); - def tasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """ + def tasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """ sql """ RESUME JOB where jobname = '${jobName}' """ Thread.sleep(2500) - def afterResumeTasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """ + def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """ println afterResumeTasks assert afterResumeTasks.size() >tasks.size // assert same job name --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
