This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 2932887cb5a5da63b2498b4bb0f97513d05f5aa9
Author: aloyszhang <lofterzh...@gmail.com>
AuthorDate: Thu May 16 16:37:56 2024 +0800

    Support scheudle infomartion management
---
 .../inlong/manager/common/enums/ErrorCodeEnum.java |   3 +
 .../manager/common/enums/OperationTarget.java      |   4 +-
 .../manager/common/enums/ScheduleStatus.java       |  29 +++++
 .../inlong/manager/dao/entity/ScheduleEntity.java  |  41 +++++++
 .../manager/dao/mapper/ScheduleEntityMapper.java   |  16 +++
 .../resources/mappers/ScheduleEntityMapper.xml     | 120 +++++++++++++++++++++
 .../inlong/manager/pojo/schedule/ScheduleInfo.java |  57 ++++++++++
 .../manager/pojo/schedule/ScheduleInfoRequest.java |  56 ++++++++++
 .../manager/service/schedule/ScheduleService.java  |  45 ++++++++
 .../service/schedule/ScheduleServiceImpl.java      |  96 +++++++++++++++++
 .../manager-web/sql/apache_inlong_manager.sql      |  38 +++++++
 .../web/controller/InLongSchedulerController.java  |  65 +++++++++++
 12 files changed, 569 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 98a7f50bd7..ce3bf9d015 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -125,6 +125,9 @@ public enum ErrorCodeEnum {
     MQ_TYPE_IS_NULL(1600, "MQ type is null"),
     MQ_TYPE_NOT_SUPPORT(1601, "MQ type '%s' not support"),
 
+    SCHEDULE_NOT_FOUND(1700, "Schedule info not found"),
+    SCHEDULE_DUPLICATE(1701, "Schedule info already exist"),
+
     WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
     WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no 
operation authority"),
     WORKFLOW_DELETE_RECORD_FAILED(4002, "Workflow delete record failure"),
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
index e6c866a0e1..93d4e1f92c 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
@@ -44,6 +44,8 @@ public enum OperationTarget {
 
     INLONG_ROLE,
 
-    TENANT_ROLE
+    TENANT_ROLE,
+
+    SCHEDULE
 
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
new file mode 100644
index 0000000000..818c8277a2
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
@@ -0,0 +1,29 @@
+package org.apache.inlong.manager.common.enums;
+
+import lombok.Getter;
+
+@Getter
+public enum ScheduleStatus {
+
+    NEW(100, "new"),
+    DELETED(40, "deleted");
+
+    private final Integer code;
+    private final String description;
+
+    ScheduleStatus(Integer code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+
+    public static ScheduleStatus forCode(int code) {
+        for (ScheduleStatus status : values()) {
+            if (status.getCode() == code) {
+                return status;
+            }
+        }
+        throw new IllegalStateException(String.format("Illegal code=%s for 
ScheduleStatus", code));
+    }
+
+}
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
new file mode 100644
index 0000000000..5502e8c57d
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
@@ -0,0 +1,41 @@
+package org.apache.inlong.manager.dao.entity;
+
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Data;
+
+@Data
+public class ScheduleEntity implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private Integer id;
+    // inLong group id
+    private String groupId;
+    // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+    private Integer scheduleType;
+    // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
+    // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+    private String scheduleUnit;
+    private Integer scheduleInterval;
+    // schedule start time, long type timestamp
+    private Long startTime;
+    // schedule end time, long type timestamp
+    private Long endTime;
+    // delay time to start task, in minutes
+    private Integer delayTime;
+    // if task depend on itself
+    private Integer selfDepend;
+    private Integer taskParallelism;
+    // expression of crontab, used when scheduleType is crontab
+    private String crontabExpression;
+
+    private Integer status;
+    private Integer previousStatus;
+    private Integer isDeleted;
+    private String creator;
+    private String modifier;
+    private Date createTime;
+    private Date modifyTime;
+    private Integer version;
+
+}
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java
new file mode 100644
index 0000000000..74eea2a737
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java
@@ -0,0 +1,16 @@
+package org.apache.inlong.manager.dao.mapper;
+
+import java.util.List;
+import org.apache.inlong.manager.dao.entity.ScheduleEntity;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface ScheduleEntityMapper {
+    int insert(ScheduleEntity scheduleEntity);
+
+    ScheduleEntity selectByPrimaryKey(Long id);
+
+    ScheduleEntity selectByGroupId(String groupId);
+
+    int updateByIdSelective(ScheduleEntity scheduleEntity);
+}
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
new file mode 100644
index 0000000000..c5c24dadd7
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper namespace="org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper">
+    <resultMap id="BaseResultMap" 
type="org.apache.inlong.manager.dao.entity.ScheduleEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="group_id" jdbcType="VARCHAR" property="groupId"/>
+        <result column="schedule_type" jdbcType="INTEGER" 
property="scheduleType"/>
+        <result column="schedule_unit" jdbcType="VARCHAR" 
property="scheduleUnit"/>
+        <result column="schedule_interval" jdbcType="INTEGER" 
property="scheduleInterval"/>
+        <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/>
+        <result column="end_time" jdbcType="TIMESTAMP" property="endTime"/>
+        <result column="delay_time" jdbcType="INTEGER" property="delayTime"/>
+        <result column="self_depend" jdbcType="INTEGER" property="selfDepend"/>
+        <result column="task_parallelism" jdbcType="INTEGER" 
property="taskParallelism"/>
+        <result column="crontab_expression" jdbcType="VARCHAR" 
property="crontabExpression"/>
+
+        <result column="status" jdbcType="INTEGER" property="status"/>
+        <result column="previous_status" jdbcType="INTEGER" 
property="previousStatus"/>
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+        <result column="creator" jdbcType="VARCHAR" property="creator"/>
+        <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+        <result column="create_time" jdbcType="TIMESTAMP" 
property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" 
property="modifyTime"/>
+        <result column="version" jdbcType="INTEGER" property="version"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id, group_id, schedule_type, schedule_unit, schedule_interval, 
start_time,
+          end_time, delay_time, self_depend, task_parallelism, 
crontab_expression,
+           status, previous_status, is_deleted, creator, modifier, 
create_time, modify_time, version
+    </sql>
+
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+            
parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity">
+        insert into schedule_config (id, group_id, schedule_type, 
schedule_unit,
+                                     schedule_interval, start_time, end_time, 
delay_time,
+                                     self_depend, task_parallelism, 
crontab_expression,
+                                     status, previous_status, is_deleted, 
creator, modifier,
+                                     create_time, modify_time, version)
+        values (#{id, jdbcType=INTEGER}, #{group_id, jdbcType=VARCHAR},
+                #{schedule_type, jdbcType=VARCHAR}, #{schedule_unit, 
jdbcType=VARCHAR},
+                #{schedule_interval, jdbcType=INTEGER}, #{start_time, 
jdbcType=TIMESTAMP},
+                #{end_time, jdbcType=TIMESTAMP}, #{delay_time, 
jdbcType=INTEGER},
+                #{self_depend, jdbcType=INTEGER}, #{task_parallelism, 
jdbcType=INTEGER},
+                #{crontab_expression, jdbcType=VARCHAR}, 
#{status,jdbcType=INTEGER},
+                #{previousStatus,jdbcType=INTEGER}, 
#{creator,jdbcType=VARCHAR},
+                #{modifier,jdbcType=VARCHAR})
+    </insert>
+
+    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" 
resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from schedule_config
+        where id = #{id,jdbcType=INTEGER}
+    </select>
+
+    <select id="selectByGroupId" parameterType="java.lang.String" 
resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from schedule_config
+        where group_id = #{group_id,jdbcType=VARCHAR}
+    </select>
+
+    <update id="updateByIdSelective" 
parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity">
+        update schedule_config
+        <set>
+            <if test="group_id != null">
+                group_id = #{group_id, jdbcType=VARCHAR},
+            </if>
+            <if test="schedule_type != null">
+                schedule_type = #{schedule_type, jdbcType=INTEGER},
+            </if>
+            <if test="schedule_unit !=null">
+                schedule_unit = #{schedule_unit, jdbcType=VARCHAR},
+            </if>
+            <if test="schedule_interval != null">
+                schedule_interval = #{schedule_interval, jdbcType=INTEGER},
+            </if>
+            <if test="start_time != null">
+                start_time = #{start_time, jdbcType=TIMESTAMP},
+            </if>
+            <if test="end_time != null">
+                end_time = #{end_time, jdbcType=TIMESTAMP},
+            </if>
+            <if test="delay_time != null">
+                delay_time = #{delay_time, jdbcType=INTEGER},
+            </if>
+            <if test="self_depend != null">
+                self_depend = #{self_depend, jdbcType=INTEGER},
+            </if>
+            <if test="task_parallelism != null">
+                task_parallelism = #{task_parallelism, jdbcType=INTEGER},
+            </if>
+            <if test="crontab_expression != null">
+                crontab_expression = #{crontab_expression, jdbcType=VARCHAR},
+            </if>
+        </set>
+        <where>
+            id = #{id, jdbcType=INTEGER}
+        </where>
+    </update>
+
+</mapper>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
new file mode 100644
index 0000000000..6a8e47c703
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
@@ -0,0 +1,57 @@
+package org.apache.inlong.manager.pojo.schedule;
+
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Schedule response")
+public class ScheduleInfo {
+
+    @ApiModelProperty(value = "Primary key")
+    @NotNull(groups = UpdateValidation.class)
+    private Integer id;
+
+    @ApiModelProperty("Inlong Group ID")
+    @NotNull
+    private String groupId;
+
+    // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+    @ApiModelProperty("Schedule type")
+    private Integer scheduleType;
+
+    // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
+    // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+    @ApiModelProperty("TimeUnit for schedule interval")
+    private String intervalTimeunit;
+
+    @ApiModelProperty("Schedule interval")
+    private Integer interval;
+
+    @ApiModelProperty("Start time")
+    private Long startTime;
+
+    @ApiModelProperty("End time")
+    private Long endTime;
+
+    @ApiModelProperty("Delay time")
+    private Integer delayTime;
+
+    @ApiModelProperty("Self depend")
+    private Integer selfDepend;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer taskParallelism;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer crontabExpression;
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
new file mode 100644
index 0000000000..7fcc848282
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
@@ -0,0 +1,56 @@
+package org.apache.inlong.manager.pojo.schedule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+@Data
+@ApiModel("Schedule request")
+public class ScheduleInfoRequest {
+
+    @ApiModelProperty(value = "Primary key")
+    @NotNull(groups = UpdateValidation.class)
+    private Integer id;
+
+    @ApiModelProperty("Inlong Group ID")
+    @NotNull
+    private String groupId;
+
+    // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+    @ApiModelProperty("Schedule type")
+    private Integer scheduleType;
+
+    // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
+    // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+    @ApiModelProperty("TimeUnit for schedule interval")
+    private String intervalTimeunit;
+
+    @ApiModelProperty("Schedule interval")
+    private Integer interval;
+
+    @ApiModelProperty("Start time")
+    private Long startTime;
+
+    @ApiModelProperty("End time")
+    private Long endTime;
+
+    @ApiModelProperty("Delay time")
+    private Integer delayTime;
+
+    @ApiModelProperty("Self depend")
+    private Integer selfDepend;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer taskParallelism;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer crontabExpression;
+
+}
+
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
new file mode 100644
index 0000000000..61ea32ae8a
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
@@ -0,0 +1,45 @@
+package org.apache.inlong.manager.service.schedule;
+
+import javax.validation.Valid;
+import javax.validation.constraints.NotNull;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+
+public interface ScheduleService {
+
+    /**
+     * Save schedule info.
+     *
+     * @param scheduleInfo schedule request need to save
+     * @param operator name of operator
+     * @return schedule info id in backend storage
+     */
+    int save(@Valid @NotNull(message = "schedule request cannot be null") 
ScheduleInfoRequest scheduleInfo,
+            String operator);
+
+    /**
+     * Query whether schedule info exists for specified inlong group
+     *
+     * @param groupId the group id to be queried
+     * @return does it exist
+     */
+    Boolean exist(String groupId);
+
+    /**
+     * Get schedule info based on inlong group id
+     *
+     * @param groupId inlong group id
+     * @return detail of inlong group
+     */
+    ScheduleInfo get(String groupId);
+
+    /**
+     * Modify schedule information
+     *
+     * @param scheduleInfo schedule request that needs to be modified
+     * @param operator name of operator
+     * @return whether succeed
+     */
+    Boolean update(@Valid @NotNull(message = "schedule request cannot be 
null") ScheduleInfoRequest scheduleInfo,
+            String operator);
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
new file mode 100644
index 0000000000..fe5b6f2f86
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
@@ -0,0 +1,96 @@
+package org.apache.inlong.manager.service.schedule;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.ScheduleStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.ScheduleEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ScheduleServiceImpl implements ScheduleService {
+
+    private static Logger LOGGER = 
LoggerFactory.getLogger(ScheduleServiceImpl.class);
+
+    @Autowired
+    private InlongGroupEntityMapper groupMapper;
+    @Autowired
+    private ScheduleEntityMapper scheduleEntityMapper;
+
+    @Override
+    public int save(ScheduleInfoRequest scheduleInfo, String operator) {
+        LOGGER.debug("begin to save schedule info, scheduleInfo: {}, operator: 
{}", scheduleInfo, operator);
+        Preconditions.expectNotNull(scheduleInfo, "schedule info request can't 
be null");
+
+        String groupId = scheduleInfo.getGroupId();
+        checkGroupExist(groupId);
+        if (scheduleEntityMapper.selectByGroupId(groupId) != null) {
+            LOGGER.error("schedule info for group : {} already exists", 
groupId);
+            throw new BusinessException(ErrorCodeEnum.SCHEDULE_DUPLICATE);
+        }
+
+        ScheduleEntity scheduleEntity = 
CommonBeanUtils.copyProperties(scheduleInfo, ScheduleEntity::new);
+        scheduleEntity.setStatus(ScheduleStatus.NEW.getCode());
+        scheduleEntity.setCreator(operator);
+        scheduleEntity.setModifier(operator);
+        scheduleEntityMapper.insert(scheduleEntity);
+
+        return 0;
+    }
+
+    @Override
+    public Boolean exist(String groupId) {
+        checkGroupExist(groupId);
+        return scheduleEntityMapper.selectByGroupId(groupId) != null;
+    }
+
+    @Override
+    public ScheduleInfo get(String groupId) {
+        LOGGER.debug("begin to get schedule info by groupId={}", groupId);
+        ScheduleEntity entity = getScheduleEntity(groupId);
+        return CommonBeanUtils.copyProperties(entity, ScheduleInfo::new);
+    }
+
+    @Override
+    public Boolean update(ScheduleInfoRequest request, String operator) {
+        LOGGER.debug("begin to update schedule info={}", request);
+        String groupId = request.getGroupId();
+        ScheduleEntity entity = getScheduleEntity(groupId);
+        CommonBeanUtils.copyProperties(request, entity, true);
+        entity.setModifier(operator);
+        LOGGER.info("success to update schedule info for groupId={}", groupId);
+        return true;
+    }
+
+
+    /**
+     * Check whether InLongGroup exists, throw BusinessException with 
ErrorCodeEnum.GROUP_NOT_FOUND if check failed.
+     * */
+    private void checkGroupExist(String groupId) {
+        Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+        InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+        if (entity == null) {
+            LOGGER.error("inlong group not found by groupId={}", groupId);
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+    }
+
+    private ScheduleEntity getScheduleEntity(String groupId) {
+        checkGroupExist(groupId);
+        ScheduleEntity entity = scheduleEntityMapper.selectByGroupId(groupId);
+        if (entity == null) {
+            LOGGER.error("schedule info for group : {} not found", groupId);
+            throw new BusinessException(ErrorCodeEnum.SCHEDULE_NOT_FOUND);
+        }
+        return entity;
+    }
+}
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 41e55c5206..386f99e696 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1021,4 +1021,42 @@ CREATE TABLE IF NOT EXISTS `cluster_config`
     DEFAULT CHARSET = utf8mb4 COMMENT = 'cluster_config';
 -- ----------------------------
 
+-- ----------------------------
+-- Table structure for schedule_config
+-- ----------------------------
+        <result column="schedule_unit" jdbcType="VARCHAR" 
property="scheduleUnit"/>
+        <result column="schedule_interval" jdbcType="INTEGER" 
property="scheduleInterval"/>
+        <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/>
+        <result column="end_time" jdbcType="TIMESTAMP" property="endTime"/>
+        <result column="delay_time" jdbcType="INTEGER" property="delayTime"/>
+        <result column="self_depend" jdbcType="INTEGER" property="selfDepend"/>
+        <result column="task_parallelism" jdbcType="INTEGER" 
property="taskParallelism"/>
+        <result column="crontab_expression" jdbcType="VARCHAR" 
property="crontabExpression"/>
+CREATE TABLE IF NOT EXISTS `schedule_config`
+(
+    `id`                     int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `group_id`               varchar(256) NOT NULL COMMENT 'Inlong group id, 
undeleted ones cannot be repeated',
+    `schedule_type`          int(4)       NOT NULL DEFAULT '0' COMMENT 
'Schedule type, 0 for normal, 1 for crontab',
+    `schedule_unit`          varchar(64)  NOT NULL COMMENT 'Schedule 
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+    `schedule_interval`      int(11)               COMMENT 'Schedule interval',
+    `start_time`             timestamp             COMMENT 'Start time for 
schedule',
+    `end_time`               timestamp             COMMENT 'Start time for 
schedule',
+    `delay_time`             int(11)               COMMENT 'Delay time in 
minutes to schedule',
+    `self_depend`            int(11)               COMMENT 'Self depend info',
+    `task_parallelism`       int(11)               COMMENT 'Task parallelism',
+    `crontab_expression`     varchar(256)          COMMENT 'Crontab expression 
if schedule type is crontab',
+    `status`                 int(4)                DEFAULT '100' COMMENT 
'Schedule status',
+    `previous_status`        int(4)                DEFAULT '100' COMMENT 
'Previous schedule status',
+    `is_deleted`             int(11)               DEFAULT '0' COMMENT 
'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`                varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`               varchar(64)           DEFAULT NULL COMMENT 
'Modifier name',
+    `create_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Create time',
+    `modify_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`                int(11)      NOT NULL DEFAULT '1' COMMENT 
'Version number, which will be incremented by 1 after modification'
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_inlong_group` (`inlong_group_id`, `is_deleted`),
+    ) ENGINE = InnoDB
+    DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+-- ----------------------------
+
 SET FOREIGN_KEY_CHECKS = 1;
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
new file mode 100644
index 0000000000..b3554d9a57
--- /dev/null
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
@@ -0,0 +1,65 @@
+package org.apache.inlong.manager.web.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.common.enums.OperationTarget;
+import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+import org.apache.inlong.manager.pojo.user.LoginUserUtils;
+import org.apache.inlong.manager.service.operationlog.OperationLog;
+import org.apache.inlong.manager.service.schedule.ScheduleService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api")
+@Api(tags = "Inlong-Schedule-API")
+public class InLongSchedulerController {
+
+    @Autowired
+    private ScheduleService scheduleService;
+
+    @RequestMapping(value = "/schedule/save", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.SCHEDULE)
+    @ApiOperation(value = "Save schedule info")
+    public Response<Integer> save(@RequestBody ScheduleInfoRequest request) {
+        int result = scheduleService.save(request, 
LoginUserUtils.getLoginUser().getName());
+        return Response.success(result);
+    }
+
+    @RequestMapping(value = "/schedule/exist/{groupId}", method = 
RequestMethod.GET)
+    @ApiOperation(value = "Is the schedule info exists for inlong group")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, 
required = true)
+    })
+    public Response<Boolean> exist(@PathVariable String groupId) {
+        return Response.success(scheduleService.exist(groupId));
+    }
+
+    @RequestMapping(value = "/schedule/get", method = RequestMethod.GET)
+    @ApiOperation(value = "Get schedule info for inlong group")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, 
required = true)
+    })
+    public Response<ScheduleInfo> get(@RequestParam String groupId, 
@RequestParam String streamId) {
+        return Response.success(scheduleService.get(groupId));
+    }
+
+    @RequestMapping(value = "/schedule/update", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.UPDATE, operationTarget = 
OperationTarget.SCHEDULE)
+    @ApiOperation(value = "Update schedule info")
+    public Response<Boolean> update(@Validated(UpdateValidation.class) 
@RequestBody ScheduleInfoRequest request) {
+        return Response.success(scheduleService.update(request, 
LoginUserUtils.getLoginUser().getName()));
+    }
+}

Reply via email to