BePPPower commented on code in PR #21916: URL: https://github.com/apache/doris/pull/21916#discussion_r1273066173
########## fe/fe-core/src/main/cup/sql_parser.cup: ########## @@ -2609,7 +2630,72 @@ resource_desc ::= RESULT = new ResourceDesc(resourceName, properties); :} ; - +create_event_stmt ::= + KW_CREATE KW_EVENT job_label:jobLabel KW_ON KW_SCHEDULER KW_EVERY INTEGER_LITERAL:time_interval ident:time_unit opt_event_starts:startsTime opt_event_ends:endsTime opt_comment:comment KW_DO stmt:executeSql + {: + CreateJobStmt stmt = new CreateJobStmt(jobLabel,null,time_interval,time_unit, startsTime, endsTime,comment,executeSql); + RESULT = stmt; + :} + | KW_CREATE KW_EVENT job_label:jobLabel KW_ON KW_SCHEDULER KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql + {: + CreateJobStmt stmt = new CreateJobStmt(jobLabel,atTime,null,null,null,null,comment,executeSql); + RESULT = stmt; + :} + ; + opt_event_starts ::= + {: + RESULT = null; + :} + | KW_STARTS STRING_LITERAL:startTime + {: + RESULT = startTime; + :} + ; + + opt_event_ends ::= + {: + RESULT = null; + :} + | KW_ENDS STRING_LITERAL:endTime + {: + RESULT = endTime; + :} + ; + event_do ::= + KW_DO STRING_LITERAL:sqlString + {: + RESULT = sqlString; + :} + ; Review Comment: `event_do` does not seem to be used? ########## fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java: ########## @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; + +/** + * syntax: + * PAUSE EVENT FOR [database.]name + * we can pause a job by jobName + * it's only running job can be paused, and it will be paused immediately + * paused job can be resumed by RESUME EVENT FOR jobName + */ +public class PauseJobStmt extends DdlStmt { + + private final LabelName labelName; + private String db; + + public PauseJobStmt(LabelName labelName) { + this.labelName = labelName; + } + + public boolean isAll() { + return labelName == null; + } + + public String getName() { + return labelName.getLabelName(); + } Review Comment: ```suggestion public String getLabelName() { return labelName.getLabelName(); } ``` In my opinion, it ‘s better to use `getLablename`. There are also in `Resumejobstmt`, `StopJobStmt` ########## fe/fe-core/src/main/java/org/apache/doris/scheduler/job/AsyncJobManager.java: ########## @@ -77,63 +84,189 @@ public AsyncJobManager() { cycleSystemSchedulerTasks(); } - public Long registerJob(Job job) { + public Long registerJob(Job job) throws DdlException { if (!job.checkJobParam()) { - log.warn("registerJob failed, job: {} param is invalid", job); - return null; + throw new DdlException("Job param is invalid, please check time param"); } - if (job.getStartTimestamp() != 0L) { - job.setNextExecuteTimestamp(job.getStartTimestamp() + job.getIntervalMilliSeconds()); - } else { - job.setNextExecuteTimestamp(System.currentTimeMillis() + job.getIntervalMilliSeconds()); + checkIsJobNameUsed(job.getDbName(), job.getJobName(), job.getJobCategory()); + jobMap.putIfAbsent(job.getJobId(), job); + initAndSchedulerJob(job); + Env.getCurrentEnv().getEditLog().logCreateJob(job); + return job.getJobId(); + } + + public void replayCreateJob(Job job) { + jobMap.putIfAbsent(job.getJobId(), job); + initAndSchedulerJob(job); + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay create scheduler job").build()); + } + + /** + * Replay update load job. + **/ + public void replayUpdateJob(Job job) { + jobMap.put(job.getJobId(), job); + if (JobStatus.RUNNING.equals(job.getJobStatus())) { + initAndSchedulerJob(job); } + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay update scheduler job").build()); + } - if (job.getNextExecuteTimestamp() < BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) { - List<Long> executeTimestamp = findTasksBetweenTime(job, System.currentTimeMillis(), + private void checkIsJobNameUsed(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + Optional<Job> optionalJob = jobMap.values().stream().filter(job -> job.getJobCategory().equals(jobCategory)) + .filter(job -> job.getDbName().equals(dbName)) + .filter(job -> job.getJobName().equals(jobName)).findFirst(); + if (optionalJob.isPresent()) { + throw new DdlException("Name " + jobName + " already used in db " + dbName); + } + } + + private void initAndSchedulerJob(Job job) { + if (!job.getJobStatus().equals(JobStatus.RUNNING)) { + return; + } + + Long currentTimeMs = System.currentTimeMillis(); + Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(), + job.getIntervalMs(), job.isCycleJob()); + job.setNextExecuteTimeMs(nextExecuteTimeMs); + if (job.getNextExecuteTimeMs() < BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) { + List<Long> executeTimestamp = findTasksBetweenTime(job, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp, - job.getNextExecuteTimestamp()); + job.getNextExecuteTimeMs()); if (!executeTimestamp.isEmpty()) { for (Long timestamp : executeTimestamp) { putOneTask(job.getJobId(), timestamp); } } } + } - jobMap.putIfAbsent(job.getJobId(), job); - return job.getJobId(); + private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, Long intervalMs, boolean isCycleJob) { + if (startTimeMs != 0L && startTimeMs > currentTimeMs) { + return startTimeMs; + } + // if it's cycle job and already delay, next execute time is current time Review Comment: ```suggestion // if it's not cycle job and already delay, next execute time is current time ``` ########## fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java: ########## @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; + +/** + * syntax: + * PAUSE EVENT FOR [database.]name + * we can pause a job by jobName + * it's only running job can be paused, and it will be paused immediately + * paused job can be resumed by RESUME EVENT FOR jobName + */ +public class PauseJobStmt extends DdlStmt { + + private final LabelName labelName; + private String db; + + public PauseJobStmt(LabelName labelName) { + this.labelName = labelName; + } + + public boolean isAll() { + return labelName == null; + } + + public String getName() { + return labelName.getLabelName(); + } Review Comment: ```suggestion public String getLabelName() { return labelName.getLabelName(); } ``` In my opinion, it ‘s better to use `getLablename`. There are also in `Resumejobstmt`, `StopJobStmt` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org