This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 2932887cb5a5da63b2498b4bb0f97513d05f5aa9 Author: aloyszhang <lofterzh...@gmail.com> AuthorDate: Thu May 16 16:37:56 2024 +0800 Support scheudle infomartion management --- .../inlong/manager/common/enums/ErrorCodeEnum.java | 3 + .../manager/common/enums/OperationTarget.java | 4 +- .../manager/common/enums/ScheduleStatus.java | 29 +++++ .../inlong/manager/dao/entity/ScheduleEntity.java | 41 +++++++ .../manager/dao/mapper/ScheduleEntityMapper.java | 16 +++ .../resources/mappers/ScheduleEntityMapper.xml | 120 +++++++++++++++++++++ .../inlong/manager/pojo/schedule/ScheduleInfo.java | 57 ++++++++++ .../manager/pojo/schedule/ScheduleInfoRequest.java | 56 ++++++++++ .../manager/service/schedule/ScheduleService.java | 45 ++++++++ .../service/schedule/ScheduleServiceImpl.java | 96 +++++++++++++++++ .../manager-web/sql/apache_inlong_manager.sql | 38 +++++++ .../web/controller/InLongSchedulerController.java | 65 +++++++++++ 12 files changed, 569 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java index 98a7f50bd7..ce3bf9d015 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java @@ -125,6 +125,9 @@ public enum ErrorCodeEnum { MQ_TYPE_IS_NULL(1600, "MQ type is null"), MQ_TYPE_NOT_SUPPORT(1601, "MQ type '%s' not support"), + SCHEDULE_NOT_FOUND(1700, "Schedule info not found"), + SCHEDULE_DUPLICATE(1701, "Schedule info already exist"), + WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"), WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no operation authority"), WORKFLOW_DELETE_RECORD_FAILED(4002, "Workflow delete record failure"), diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java index e6c866a0e1..93d4e1f92c 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java @@ -44,6 +44,8 @@ public enum OperationTarget { INLONG_ROLE, - TENANT_ROLE + TENANT_ROLE, + + SCHEDULE } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java new file mode 100644 index 0000000000..818c8277a2 --- /dev/null +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java @@ -0,0 +1,29 @@ +package org.apache.inlong.manager.common.enums; + +import lombok.Getter; + +@Getter +public enum ScheduleStatus { + + NEW(100, "new"), + DELETED(40, "deleted"); + + private final Integer code; + private final String description; + + ScheduleStatus(Integer code, String description) { + this.code = code; + this.description = description; + } + + + public static ScheduleStatus forCode(int code) { + for (ScheduleStatus status : values()) { + if (status.getCode() == code) { + return status; + } + } + throw new IllegalStateException(String.format("Illegal code=%s for ScheduleStatus", code)); + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java new file mode 100644 index 0000000000..5502e8c57d --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java @@ -0,0 +1,41 @@ +package org.apache.inlong.manager.dao.entity; + +import java.io.Serializable; +import java.util.Date; +import lombok.Data; + +@Data +public class ScheduleEntity implements Serializable { + private static final long serialVersionUID = 1L; + + private Integer id; + // inLong group id + private String groupId; + // schedule type, support [normal, crontab], 0 for normal and 1 for crontab + private Integer scheduleType; + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] + // M=month, W=week, D=day, H=hour, M=minute, O=oneway + private String scheduleUnit; + private Integer scheduleInterval; + // schedule start time, long type timestamp + private Long startTime; + // schedule end time, long type timestamp + private Long endTime; + // delay time to start task, in minutes + private Integer delayTime; + // if task depend on itself + private Integer selfDepend; + private Integer taskParallelism; + // expression of crontab, used when scheduleType is crontab + private String crontabExpression; + + private Integer status; + private Integer previousStatus; + private Integer isDeleted; + private String creator; + private String modifier; + private Date createTime; + private Date modifyTime; + private Integer version; + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java new file mode 100644 index 0000000000..74eea2a737 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java @@ -0,0 +1,16 @@ +package org.apache.inlong.manager.dao.mapper; + +import java.util.List; +import org.apache.inlong.manager.dao.entity.ScheduleEntity; +import org.springframework.stereotype.Repository; + +@Repository +public interface ScheduleEntityMapper { + int insert(ScheduleEntity scheduleEntity); + + ScheduleEntity selectByPrimaryKey(Long id); + + ScheduleEntity selectByGroupId(String groupId); + + int updateByIdSelective(ScheduleEntity scheduleEntity); +} diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml new file mode 100644 index 0000000000..c5c24dadd7 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> +<mapper namespace="org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper"> + <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ScheduleEntity"> + <id column="id" jdbcType="INTEGER" property="id"/> + <result column="group_id" jdbcType="VARCHAR" property="groupId"/> + <result column="schedule_type" jdbcType="INTEGER" property="scheduleType"/> + <result column="schedule_unit" jdbcType="VARCHAR" property="scheduleUnit"/> + <result column="schedule_interval" jdbcType="INTEGER" property="scheduleInterval"/> + <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/> + <result column="end_time" jdbcType="TIMESTAMP" property="endTime"/> + <result column="delay_time" jdbcType="INTEGER" property="delayTime"/> + <result column="self_depend" jdbcType="INTEGER" property="selfDepend"/> + <result column="task_parallelism" jdbcType="INTEGER" property="taskParallelism"/> + <result column="crontab_expression" jdbcType="VARCHAR" property="crontabExpression"/> + + <result column="status" jdbcType="INTEGER" property="status"/> + <result column="previous_status" jdbcType="INTEGER" property="previousStatus"/> + <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/> + <result column="creator" jdbcType="VARCHAR" property="creator"/> + <result column="modifier" jdbcType="VARCHAR" property="modifier"/> + <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/> + <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/> + <result column="version" jdbcType="INTEGER" property="version"/> + </resultMap> + + <sql id="Base_Column_List"> + id, group_id, schedule_type, schedule_unit, schedule_interval, start_time, + end_time, delay_time, self_depend, task_parallelism, crontab_expression, + status, previous_status, is_deleted, creator, modifier, create_time, modify_time, version + </sql> + + <insert id="insert" useGeneratedKeys="true" keyProperty="id" + parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity"> + insert into schedule_config (id, group_id, schedule_type, schedule_unit, + schedule_interval, start_time, end_time, delay_time, + self_depend, task_parallelism, crontab_expression, + status, previous_status, is_deleted, creator, modifier, + create_time, modify_time, version) + values (#{id, jdbcType=INTEGER}, #{group_id, jdbcType=VARCHAR}, + #{schedule_type, jdbcType=VARCHAR}, #{schedule_unit, jdbcType=VARCHAR}, + #{schedule_interval, jdbcType=INTEGER}, #{start_time, jdbcType=TIMESTAMP}, + #{end_time, jdbcType=TIMESTAMP}, #{delay_time, jdbcType=INTEGER}, + #{self_depend, jdbcType=INTEGER}, #{task_parallelism, jdbcType=INTEGER}, + #{crontab_expression, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, + #{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, + #{modifier,jdbcType=VARCHAR}) + </insert> + + <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap"> + select + <include refid="Base_Column_List"/> + from schedule_config + where id = #{id,jdbcType=INTEGER} + </select> + + <select id="selectByGroupId" parameterType="java.lang.String" resultMap="BaseResultMap"> + select + <include refid="Base_Column_List"/> + from schedule_config + where group_id = #{group_id,jdbcType=VARCHAR} + </select> + + <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity"> + update schedule_config + <set> + <if test="group_id != null"> + group_id = #{group_id, jdbcType=VARCHAR}, + </if> + <if test="schedule_type != null"> + schedule_type = #{schedule_type, jdbcType=INTEGER}, + </if> + <if test="schedule_unit !=null"> + schedule_unit = #{schedule_unit, jdbcType=VARCHAR}, + </if> + <if test="schedule_interval != null"> + schedule_interval = #{schedule_interval, jdbcType=INTEGER}, + </if> + <if test="start_time != null"> + start_time = #{start_time, jdbcType=TIMESTAMP}, + </if> + <if test="end_time != null"> + end_time = #{end_time, jdbcType=TIMESTAMP}, + </if> + <if test="delay_time != null"> + delay_time = #{delay_time, jdbcType=INTEGER}, + </if> + <if test="self_depend != null"> + self_depend = #{self_depend, jdbcType=INTEGER}, + </if> + <if test="task_parallelism != null"> + task_parallelism = #{task_parallelism, jdbcType=INTEGER}, + </if> + <if test="crontab_expression != null"> + crontab_expression = #{crontab_expression, jdbcType=VARCHAR}, + </if> + </set> + <where> + id = #{id, jdbcType=INTEGER} + </where> + </update> + +</mapper> \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java new file mode 100644 index 0000000000..6a8e47c703 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java @@ -0,0 +1,57 @@ +package org.apache.inlong.manager.pojo.schedule; + + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.validation.UpdateValidation; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Schedule response") +public class ScheduleInfo { + + @ApiModelProperty(value = "Primary key") + @NotNull(groups = UpdateValidation.class) + private Integer id; + + @ApiModelProperty("Inlong Group ID") + @NotNull + private String groupId; + + // schedule type, support [normal, crontab], 0 for normal and 1 for crontab + @ApiModelProperty("Schedule type") + private Integer scheduleType; + + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] + // M=month, W=week, D=day, H=hour, M=minute, O=oneway + @ApiModelProperty("TimeUnit for schedule interval") + private String intervalTimeunit; + + @ApiModelProperty("Schedule interval") + private Integer interval; + + @ApiModelProperty("Start time") + private Long startTime; + + @ApiModelProperty("End time") + private Long endTime; + + @ApiModelProperty("Delay time") + private Integer delayTime; + + @ApiModelProperty("Self depend") + private Integer selfDepend; + + @ApiModelProperty("Schedule task parallelism") + private Integer taskParallelism; + + @ApiModelProperty("Schedule task parallelism") + private Integer crontabExpression; +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java new file mode 100644 index 0000000000..7fcc848282 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java @@ -0,0 +1,56 @@ +package org.apache.inlong.manager.pojo.schedule; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.validation.UpdateValidation; + +@Data +@ApiModel("Schedule request") +public class ScheduleInfoRequest { + + @ApiModelProperty(value = "Primary key") + @NotNull(groups = UpdateValidation.class) + private Integer id; + + @ApiModelProperty("Inlong Group ID") + @NotNull + private String groupId; + + // schedule type, support [normal, crontab], 0 for normal and 1 for crontab + @ApiModelProperty("Schedule type") + private Integer scheduleType; + + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] + // M=month, W=week, D=day, H=hour, M=minute, O=oneway + @ApiModelProperty("TimeUnit for schedule interval") + private String intervalTimeunit; + + @ApiModelProperty("Schedule interval") + private Integer interval; + + @ApiModelProperty("Start time") + private Long startTime; + + @ApiModelProperty("End time") + private Long endTime; + + @ApiModelProperty("Delay time") + private Integer delayTime; + + @ApiModelProperty("Self depend") + private Integer selfDepend; + + @ApiModelProperty("Schedule task parallelism") + private Integer taskParallelism; + + @ApiModelProperty("Schedule task parallelism") + private Integer crontabExpression; + +} + diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java new file mode 100644 index 0000000000..61ea32ae8a --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java @@ -0,0 +1,45 @@ +package org.apache.inlong.manager.service.schedule; + +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; + +public interface ScheduleService { + + /** + * Save schedule info. + * + * @param scheduleInfo schedule request need to save + * @param operator name of operator + * @return schedule info id in backend storage + */ + int save(@Valid @NotNull(message = "schedule request cannot be null") ScheduleInfoRequest scheduleInfo, + String operator); + + /** + * Query whether schedule info exists for specified inlong group + * + * @param groupId the group id to be queried + * @return does it exist + */ + Boolean exist(String groupId); + + /** + * Get schedule info based on inlong group id + * + * @param groupId inlong group id + * @return detail of inlong group + */ + ScheduleInfo get(String groupId); + + /** + * Modify schedule information + * + * @param scheduleInfo schedule request that needs to be modified + * @param operator name of operator + * @return whether succeed + */ + Boolean update(@Valid @NotNull(message = "schedule request cannot be null") ScheduleInfoRequest scheduleInfo, + String operator); +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java new file mode 100644 index 0000000000..fe5b6f2f86 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java @@ -0,0 +1,96 @@ +package org.apache.inlong.manager.service.schedule; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.enums.ScheduleStatus; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongGroupEntity; +import org.apache.inlong.manager.dao.entity.ScheduleEntity; +import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; +import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class ScheduleServiceImpl implements ScheduleService { + + private static Logger LOGGER = LoggerFactory.getLogger(ScheduleServiceImpl.class); + + @Autowired + private InlongGroupEntityMapper groupMapper; + @Autowired + private ScheduleEntityMapper scheduleEntityMapper; + + @Override + public int save(ScheduleInfoRequest scheduleInfo, String operator) { + LOGGER.debug("begin to save schedule info, scheduleInfo: {}, operator: {}", scheduleInfo, operator); + Preconditions.expectNotNull(scheduleInfo, "schedule info request can't be null"); + + String groupId = scheduleInfo.getGroupId(); + checkGroupExist(groupId); + if (scheduleEntityMapper.selectByGroupId(groupId) != null) { + LOGGER.error("schedule info for group : {} already exists", groupId); + throw new BusinessException(ErrorCodeEnum.SCHEDULE_DUPLICATE); + } + + ScheduleEntity scheduleEntity = CommonBeanUtils.copyProperties(scheduleInfo, ScheduleEntity::new); + scheduleEntity.setStatus(ScheduleStatus.NEW.getCode()); + scheduleEntity.setCreator(operator); + scheduleEntity.setModifier(operator); + scheduleEntityMapper.insert(scheduleEntity); + + return 0; + } + + @Override + public Boolean exist(String groupId) { + checkGroupExist(groupId); + return scheduleEntityMapper.selectByGroupId(groupId) != null; + } + + @Override + public ScheduleInfo get(String groupId) { + LOGGER.debug("begin to get schedule info by groupId={}", groupId); + ScheduleEntity entity = getScheduleEntity(groupId); + return CommonBeanUtils.copyProperties(entity, ScheduleInfo::new); + } + + @Override + public Boolean update(ScheduleInfoRequest request, String operator) { + LOGGER.debug("begin to update schedule info={}", request); + String groupId = request.getGroupId(); + ScheduleEntity entity = getScheduleEntity(groupId); + CommonBeanUtils.copyProperties(request, entity, true); + entity.setModifier(operator); + LOGGER.info("success to update schedule info for groupId={}", groupId); + return true; + } + + + /** + * Check whether InLongGroup exists, throw BusinessException with ErrorCodeEnum.GROUP_NOT_FOUND if check failed. + * */ + private void checkGroupExist(String groupId) { + Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY); + InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); + if (entity == null) { + LOGGER.error("inlong group not found by groupId={}", groupId); + throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND); + } + } + + private ScheduleEntity getScheduleEntity(String groupId) { + checkGroupExist(groupId); + ScheduleEntity entity = scheduleEntityMapper.selectByGroupId(groupId); + if (entity == null) { + LOGGER.error("schedule info for group : {} not found", groupId); + throw new BusinessException(ErrorCodeEnum.SCHEDULE_NOT_FOUND); + } + return entity; + } +} diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index 41e55c5206..386f99e696 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -1021,4 +1021,42 @@ CREATE TABLE IF NOT EXISTS `cluster_config` DEFAULT CHARSET = utf8mb4 COMMENT = 'cluster_config'; -- ---------------------------- +-- ---------------------------- +-- Table structure for schedule_config +-- ---------------------------- + <result column="schedule_unit" jdbcType="VARCHAR" property="scheduleUnit"/> + <result column="schedule_interval" jdbcType="INTEGER" property="scheduleInterval"/> + <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/> + <result column="end_time" jdbcType="TIMESTAMP" property="endTime"/> + <result column="delay_time" jdbcType="INTEGER" property="delayTime"/> + <result column="self_depend" jdbcType="INTEGER" property="selfDepend"/> + <result column="task_parallelism" jdbcType="INTEGER" property="taskParallelism"/> + <result column="crontab_expression" jdbcType="VARCHAR" property="crontabExpression"/> +CREATE TABLE IF NOT EXISTS `schedule_config` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated', + `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab', + `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway', + `schedule_interval` int(11) COMMENT 'Schedule interval', + `start_time` timestamp COMMENT 'Start time for schedule', + `end_time` timestamp COMMENT 'Start time for schedule', + `delay_time` int(11) COMMENT 'Delay time in minutes to schedule', + `self_depend` int(11) COMMENT 'Self depend info', + `task_parallelism` int(11) COMMENT 'Task parallelism', + `crontab_expression` varchar(256) COMMENT 'Crontab expression if schedule type is crontab', + `status` int(4) DEFAULT '100' COMMENT 'Schedule status', + `previous_status` int(4) DEFAULT '100' COMMENT 'Previous schedule status', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification' + PRIMARY KEY (`id`), + UNIQUE KEY `unique_inlong_group` (`inlong_group_id`, `is_deleted`), + ) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config'; +-- ---------------------------- + SET FOREIGN_KEY_CHECKS = 1; diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java new file mode 100644 index 0000000000..b3554d9a57 --- /dev/null +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java @@ -0,0 +1,65 @@ +package org.apache.inlong.manager.web.controller; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import org.apache.inlong.manager.common.enums.OperationTarget; +import org.apache.inlong.manager.common.enums.OperationType; +import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; +import org.apache.inlong.manager.pojo.user.LoginUserUtils; +import org.apache.inlong.manager.service.operationlog.OperationLog; +import org.apache.inlong.manager.service.schedule.ScheduleService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api") +@Api(tags = "Inlong-Schedule-API") +public class InLongSchedulerController { + + @Autowired + private ScheduleService scheduleService; + + @RequestMapping(value = "/schedule/save", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SCHEDULE) + @ApiOperation(value = "Save schedule info") + public Response<Integer> save(@RequestBody ScheduleInfoRequest request) { + int result = scheduleService.save(request, LoginUserUtils.getLoginUser().getName()); + return Response.success(result); + } + + @RequestMapping(value = "/schedule/exist/{groupId}", method = RequestMethod.GET) + @ApiOperation(value = "Is the schedule info exists for inlong group") + @ApiImplicitParams({ + @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true) + }) + public Response<Boolean> exist(@PathVariable String groupId) { + return Response.success(scheduleService.exist(groupId)); + } + + @RequestMapping(value = "/schedule/get", method = RequestMethod.GET) + @ApiOperation(value = "Get schedule info for inlong group") + @ApiImplicitParams({ + @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true) + }) + public Response<ScheduleInfo> get(@RequestParam String groupId, @RequestParam String streamId) { + return Response.success(scheduleService.get(groupId)); + } + + @RequestMapping(value = "/schedule/update", method = RequestMethod.POST) + @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SCHEDULE) + @ApiOperation(value = "Update schedule info") + public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody ScheduleInfoRequest request) { + return Response.success(scheduleService.update(request, LoginUserUtils.getLoginUser().getName())); + } +}