wsjz commented on code in PR #26356: URL: https://github.com/apache/doris/pull/26356#discussion_r1427690881
########## fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java: ########## @@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> { new Column("CreateTime", ScalarType.createStringType()), new Column("Comment", ScalarType.createStringType())); + private static final ShowResultSetMetaData TASK_META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("TaskId", ScalarType.createVarchar(20))) + .addColumn(new Column("Label", ScalarType.createVarchar(20))) + .addColumn(new Column("Status", ScalarType.createVarchar(20))) + .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20))) + .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20))) + .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20))) + + .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20))) + .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20))) + .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20))) + .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20))) + .addColumn(new Column("User", ScalarType.createVarchar(20))) + .build(); + public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX; static { - ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder(); + ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>(); for (int i = 0; i < SCHEMA.size(); i++) { builder.put(SCHEMA.get(i).getName().toLowerCase(), i); } COLUMN_TO_INDEX = builder.build(); } - @SerializedName(value = "lp") - String labelPrefix; + @SerializedName("taskIdList") + ConcurrentLinkedQueue<Long> taskIdList; + @SerializedName("dbId") + private final long dbId; + @SerializedName("labelName") + private String labelName; + @SerializedName("loadType") + private InsertJob.LoadType loadType; + // 0: the job status is pending + // n/100: n is the number of task which has been finished + // 99: all tasks have been finished + // 100: txn status is visible and load has been finished + @SerializedName("progress") + private int progress; + @SerializedName("failMsg") + private FailMsg failMsg; + @SerializedName("plans") + private List<InsertIntoTableCommand> plans; + private LoadStatistic loadStatistic = new LoadStatistic(); + private Set<Long> finishedTaskIds = new HashSet<>(); + private Set<String> tableNames; + private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>(); + private Map<String, String> properties; + private AuthorizationInfo authorizationInfo; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + private ConnectContext ctx; + private StmtExecutor stmtExecutor; + private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>(); + private List<TabletCommitInfo> commitInfos = new ArrayList<>(); + + // max save task num, do we need to config it? + private static final int MAX_SAVE_TASK_NUM = 100; + + /** + * load job type + */ + public enum LoadType { + BULK, + SPARK, + LOCAL_FILE, + UNKNOWN + + } + + public enum Priority { + HIGH(0), + NORMAL(1), + LOW(2); - InsertIntoTableCommand command; + Priority(int value) { + this.value = value; + } - StmtExecutor stmtExecutor; + private final int value; - ConnectContext ctx; + public int getValue() { + return value; + } + } - @SerializedName("tis") - ConcurrentLinkedQueue<Long> taskIdList; + public InsertJob(Long jobId, String jobName, + JobStatus jobStatus, + LabelName labelName, + String comment, + UserIdentity createUser, + JobExecutionConfiguration jobConfig, + Long createTimeMs, + String executeSql) { + super(jobId, jobName, jobStatus, labelName.getDbName(), comment, createUser, + jobConfig, createTimeMs, executeSql, null); + this.dbId = ConnectContext.get().getCurrentDbId(); + this.labelName = labelName.getLabelName(); + } - // max save task num, do we need to config it? - private static final int MAX_SAVE_TASK_NUM = 100; + public InsertJob(ConnectContext ctx, + StmtExecutor executor, + String labelName, + List<InsertIntoTableCommand> plans, + Set<String> sinkTableNames, + Map<String, String> properties, + String comment, + JobExecutionConfiguration jobConfig) { + super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null, + comment, ctx.getCurrentUserIdentity(), jobConfig); + this.ctx = ctx; + this.plans = plans; + this.stmtExecutor = executor; + this.dbId = ctx.getCurrentDbId(); + this.labelName = labelName; + this.tableNames = sinkTableNames; + this.properties = properties; + // TODO: not support other type yet + this.loadType = InsertJob.LoadType.BULK; + } @Override - public List<InsertTask> createTasks(TaskType taskType, Map taskContext) { - //nothing need to do in insert job - InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser()); - task.setJobId(getJobId()); - task.setTaskType(taskType); - task.setTaskId(Env.getCurrentEnv().getNextId()); - ArrayList<InsertTask> tasks = new ArrayList<>(); - tasks.add(task); - super.initTasks(tasks); - addNewTask(task.getTaskId()); - return tasks; + public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> taskContext) { + if (plans.isEmpty()) { + InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser()); + task.setTaskType(taskType); + task.setJobId(getJobId()); + task.setCreateTimeMs(System.currentTimeMillis()); + task.setStatus(TaskStatus.PENDING); + ArrayList<InsertTask> tasks = new ArrayList<>(); + tasks.add(task); + super.initTasks(tasks); + addNewTask(task.getTaskId()); + return tasks; + } else { + return createBatchTasks(taskType); + } } - public void addNewTask(long id) { + private List<InsertTask> createBatchTasks(TaskType taskType) { Review Comment: refactor in the future, because there are many conflicts for this method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org