This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 8bd3a20cb49f9dc3ddc83c0bcb769f95e793e4b6
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Thu May 23 18:18:31 2024 +0800

    [INLONG-10247][Manager]  Support schedule information management for 
offline sync (#10254)
---
 .../client/api/inner/client/ClientFactory.java     |   2 +
 .../api/inner/client/InLongScheduleClient.java     |  67 +++++++++
 .../client/api/service/InLongScheduleApi.java      |  49 +++++++
 .../inlong/manager/common/enums/ErrorCodeEnum.java |   3 +
 .../manager/common/enums/OperationTarget.java      |   4 +-
 .../{OperationTarget.java => ScheduleStatus.java}  |  54 ++++---
 .../inlong/manager/dao/entity/ScheduleEntity.java  |  61 ++++++++
 .../manager/dao/mapper/ScheduleEntityMapper.java}  |  36 ++---
 .../resources/mappers/ScheduleEntityMapper.xml     | 144 +++++++++++++++++++
 .../manager/dao/mapper/ScheduleEntityTest.java     | 118 ++++++++++++++++
 .../inlong/manager/pojo/schedule/ScheduleInfo.java |  82 +++++++++++
 .../manager/pojo/schedule/ScheduleInfoRequest.java |  76 ++++++++++
 .../manager/service/schedule/ScheduleService.java  |  71 ++++++++++
 .../service/schedule/ScheduleServiceImpl.java      | 155 +++++++++++++++++++++
 .../main/resources/h2/apache_inlong_manager.sql    |  31 +++++
 .../manager-web/sql/apache_inlong_manager.sql      |  28 ++++
 inlong-manager/manager-web/sql/changes-1.13.0.sql  |  30 ++++
 .../web/controller/InLongSchedulerController.java  |  88 ++++++++++++
 18 files changed, 1043 insertions(+), 56 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
index 3c67f1fa71..01c9fb6473 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
@@ -55,6 +55,7 @@ public class ClientFactory {
     private final AuditClient auditClient;
     private final InlongTenantClient inlongTenantClient;
     private final InlongTenantRoleClient inlongTenantRoleClient;
+    private final InLongScheduleClient inLongScheduleClient;
 
     public ClientFactory(ClientConfiguration configuration) {
         groupClient = new InlongGroupClient(configuration);
@@ -74,5 +75,6 @@ public class ClientFactory {
         auditClient = new AuditClient(configuration);
         inlongTenantClient = new InlongTenantClient(configuration);
         inlongTenantRoleClient = new InlongTenantRoleClient(configuration);
+        inLongScheduleClient = new InLongScheduleClient(configuration);
     }
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InLongScheduleClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InLongScheduleClient.java
new file mode 100644
index 0000000000..86638dbae8
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InLongScheduleClient.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.InLongScheduleApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+
+public class InLongScheduleClient {
+
+    private InLongScheduleApi scheduleApi;
+
+    public InLongScheduleClient(ClientConfiguration clientConfiguration) {
+        scheduleApi = 
ClientUtils.createRetrofit(clientConfiguration).create(InLongScheduleApi.class);
+    }
+
+    public Integer createScheduleInfo(ScheduleInfoRequest request) {
+        Response<Integer> response = 
ClientUtils.executeHttpCall(scheduleApi.createSchedule(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    public Boolean scheduleInfoExist(String groupId) {
+        Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(scheduleApi.exist(groupId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    public Boolean updateScheduleInfo(ScheduleInfoRequest request) {
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(scheduleApi.update(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    public ScheduleInfo getScheduleInfo(String groupId) {
+        Response<ScheduleInfo> response = 
ClientUtils.executeHttpCall(scheduleApi.get(groupId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    public Boolean deleteScheduleInfo(String groupId) {
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(scheduleApi.delete(groupId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InLongScheduleApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InLongScheduleApi.java
new file mode 100644
index 0000000000..0da7be3d6f
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InLongScheduleApi.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.service;
+
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+
+import retrofit2.Call;
+import retrofit2.http.Body;
+import retrofit2.http.DELETE;
+import retrofit2.http.GET;
+import retrofit2.http.POST;
+import retrofit2.http.Path;
+import retrofit2.http.Query;
+
+public interface InLongScheduleApi {
+
+    @POST("schedule/save")
+    Call<Response<Integer>> createSchedule(@Body ScheduleInfoRequest request);
+
+    @GET("schedule/exist/{groupId}")
+    Call<Response<Boolean>> exist(@Path("groupId") String groupId);
+
+    @POST("schedule/update")
+    Call<Response<Boolean>> update(@Body ScheduleInfoRequest request);
+
+    @GET("schedule/get")
+    Call<Response<ScheduleInfo>> get(@Query("groupId") String groupId);
+
+    @DELETE("schedule/delete/{groupId}")
+    Call<Response<Boolean>> delete(@Path("groupId") String groupId);
+
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index ed7549b699..6a8f9b4699 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -125,6 +125,9 @@ public enum ErrorCodeEnum {
     MQ_TYPE_IS_NULL(1600, "MQ type is null"),
     MQ_TYPE_NOT_SUPPORT(1601, "MQ type '%s' not support"),
 
+    SCHEDULE_NOT_FOUND(1700, "Schedule info not found"),
+    SCHEDULE_DUPLICATE(1701, "Schedule info already exist"),
+
     WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
     WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no 
operation authority"),
     WORKFLOW_DELETE_RECORD_FAILED(4002, "Workflow delete record failure"),
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
index d92fb31b33..cb9e1638be 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
@@ -46,6 +46,8 @@ public enum OperationTarget {
 
     TENANT_ROLE,
 
-    TEMPLATE
+    TEMPLATE,
+
+    SCHEDULE
 
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
similarity index 58%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
copy to 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
index d92fb31b33..cb256a491c 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
@@ -17,35 +17,29 @@
 
 package org.apache.inlong.manager.common.enums;
 
-/**
- * Operation target
- */
-public enum OperationTarget {
-
-    TENANT,
-
-    GROUP,
-
-    STREAM,
-
-    SOURCE,
-
-    SINK,
-
-    CONSUME,
-
-    WORKFLOW,
-
-    NODE,
-
-    CLUSTER,
-
-    TRANSFORM,
-
-    INLONG_ROLE,
-
-    TENANT_ROLE,
-
-    TEMPLATE
+import lombok.Getter;
+
+@Getter
+public enum ScheduleStatus {
+
+    NEW(100, "new"),
+    DELETED(40, "deleted");
+
+    private final Integer code;
+    private final String description;
+
+    ScheduleStatus(Integer code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    public static ScheduleStatus forCode(int code) {
+        for (ScheduleStatus status : values()) {
+            if (status.getCode() == code) {
+                return status;
+            }
+        }
+        throw new IllegalStateException(String.format("Illegal code=%s for 
ScheduleStatus", code));
+    }
 
 }
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
new file mode 100644
index 0000000000..75237343cb
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.entity;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+import java.util.Date;
+
+@Data
+public class ScheduleEntity implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Integer id;
+    // inLong group id
+    private String inlongGroupId;
+    // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+    private Integer scheduleType;
+    // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
+    // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+    private String scheduleUnit;
+    private Integer scheduleInterval;
+    // schedule start time, long type timestamp
+    private Timestamp startTime;
+    // schedule end time, long type timestamp
+    private Timestamp endTime;
+    // delay time to start task, in minutes
+    private Integer delayTime;
+    // if task depend on itself
+    private Integer selfDepend;
+    private Integer taskParallelism;
+    // expression of crontab, used when scheduleType is crontab
+    private String crontabExpression;
+
+    private Integer status;
+    private Integer previousStatus;
+    private Integer isDeleted;
+    private String creator;
+    private String modifier;
+    private Date createTime;
+    private Date modifyTime;
+    private Integer version;
+
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java
similarity index 60%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
copy to 
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java
index d92fb31b33..23e8e23c31 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java
@@ -15,37 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.enums;
+package org.apache.inlong.manager.dao.mapper;
 
-/**
- * Operation target
- */
-public enum OperationTarget {
-
-    TENANT,
-
-    GROUP,
-
-    STREAM,
-
-    SOURCE,
-
-    SINK,
-
-    CONSUME,
-
-    WORKFLOW,
+import org.apache.inlong.manager.dao.entity.ScheduleEntity;
 
-    NODE,
+import org.apache.ibatis.annotations.Param;
+import org.springframework.stereotype.Repository;
 
-    CLUSTER,
+@Repository
+public interface ScheduleEntityMapper {
 
-    TRANSFORM,
+    int insert(ScheduleEntity scheduleEntity);
 
-    INLONG_ROLE,
+    ScheduleEntity selectByPrimaryKey(Integer id);
 
-    TENANT_ROLE,
+    ScheduleEntity selectByGroupId(String groupId);
 
-    TEMPLATE
+    int updateByIdSelective(ScheduleEntity scheduleEntity);
 
+    int deleteByGroupId(@Param("inlongGroupId") String inlongGroupId);
 }
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
new file mode 100644
index 0000000000..d719aa8988
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper namespace="org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper">
+    <resultMap id="BaseResultMap" 
type="org.apache.inlong.manager.dao.entity.ScheduleEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="inlong_group_id" jdbcType="VARCHAR" 
property="inlongGroupId"/>
+        <result column="schedule_type" jdbcType="INTEGER" 
property="scheduleType"/>
+        <result column="schedule_unit" jdbcType="VARCHAR" 
property="scheduleUnit"/>
+        <result column="schedule_interval" jdbcType="INTEGER" 
property="scheduleInterval"/>
+        <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/>
+        <result column="end_time" jdbcType="TIMESTAMP" property="endTime"/>
+        <result column="delay_time" jdbcType="INTEGER" property="delayTime"/>
+        <result column="self_depend" jdbcType="INTEGER" property="selfDepend"/>
+        <result column="task_parallelism" jdbcType="INTEGER" 
property="taskParallelism"/>
+        <result column="crontab_expression" jdbcType="VARCHAR" 
property="crontabExpression"/>
+
+        <result column="status" jdbcType="INTEGER" property="status"/>
+        <result column="previous_status" jdbcType="INTEGER" 
property="previousStatus"/>
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+        <result column="creator" jdbcType="VARCHAR" property="creator"/>
+        <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+        <result column="create_time" jdbcType="TIMESTAMP" 
property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" 
property="modifyTime"/>
+        <result column="version" jdbcType="INTEGER" property="version"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id, inlong_group_id, schedule_type, schedule_unit, schedule_interval, 
start_time,
+          end_time, delay_time, self_depend, task_parallelism, 
crontab_expression,
+           status, previous_status, is_deleted, creator, modifier, 
create_time, modify_time, version
+    </sql>
+
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+            
parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity">
+        insert into schedule_config (id, inlong_group_id, schedule_type, 
schedule_unit,
+                                     schedule_interval, start_time, end_time, 
delay_time,
+                                     self_depend, task_parallelism, 
crontab_expression,
+                                     status, previous_status, creator, 
modifier)
+        values (#{id, jdbcType=INTEGER}, #{inlongGroupId, jdbcType=VARCHAR},
+                #{scheduleType, jdbcType=INTEGER}, #{scheduleUnit, 
jdbcType=VARCHAR},
+                #{scheduleInterval, jdbcType=INTEGER}, #{startTime, 
jdbcType=TIMESTAMP},
+                #{endTime, jdbcType=TIMESTAMP}, #{delayTime, jdbcType=INTEGER},
+                #{selfDepend, jdbcType=INTEGER}, #{taskParallelism, 
jdbcType=INTEGER},
+                #{crontabExpression, jdbcType=VARCHAR}, 
#{status,jdbcType=INTEGER},
+                #{previousStatus,jdbcType=INTEGER}, 
#{creator,jdbcType=VARCHAR},
+                #{modifier,jdbcType=VARCHAR})
+    </insert>
+
+    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" 
resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from schedule_config
+        where id = #{id,jdbcType=INTEGER}
+        and is_deleted = 0
+    </select>
+
+    <select id="selectByGroupId" parameterType="java.lang.String" 
resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from schedule_config
+        where inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+        and is_deleted = 0
+    </select>
+
+    <update id="updateByIdSelective" 
parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity">
+        update schedule_config
+        <set>
+            <if test="inlongGroupId != null">
+                inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR},
+            </if>
+            <if test="scheduleType != null">
+                schedule_type = #{scheduleType, jdbcType=INTEGER},
+            </if>
+            <if test="scheduleUnit !=null">
+                schedule_unit = #{scheduleUnit, jdbcType=VARCHAR},
+            </if>
+            <if test="scheduleInterval != null">
+                schedule_interval = #{scheduleInterval, jdbcType=INTEGER},
+            </if>
+            <if test="startTime != null">
+                start_time = #{startTime, jdbcType=TIMESTAMP},
+            </if>
+            <if test="endTime != null">
+                end_time = #{endTime, jdbcType=TIMESTAMP},
+            </if>
+            <if test="delayTime != null">
+                delay_time = #{delayTime, jdbcType=INTEGER},
+            </if>
+            <if test="selfDepend != null">
+                self_depend = #{selfDepend, jdbcType=INTEGER},
+            </if>
+            <if test="taskParallelism != null">
+                task_parallelism = #{taskParallelism, jdbcType=INTEGER},
+            </if>
+            <if test="crontabExpression != null">
+                crontab_expression = #{crontabExpression, jdbcType=VARCHAR},
+            </if>
+            <if test="status != null">
+                status = #{status, jdbcType=VARCHAR},
+            </if>
+            <if test="previousStatus != null">
+                previous_status = #{previousStatus, jdbcType=VARCHAR},
+            </if>
+            <if test="isDeleted != null">
+                is_deleted = #{isDeleted},
+            </if>
+            <if test="creator != null">
+                creator = #{creator, jdbcType=VARCHAR},
+            </if>
+            <if test="modifier != null">
+                modifier = #{modifier, jdbcType=VARCHAR},
+            </if>
+            version = #{version, jdbcType=INTEGER} + 1
+        </set>
+        <where>
+            id = #{id, jdbcType=INTEGER}
+            and version = #{version,jdbcType=INTEGER}
+        </where>
+    </update>
+
+    <delete id="deleteByGroupId">
+        delete
+        from schedule_config
+        where inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+    </delete>
+
+</mapper>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
 
b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
new file mode 100644
index 0000000000..ef4207bc84
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.dao.DaoBaseTest;
+import org.apache.inlong.manager.dao.entity.ScheduleEntity;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.sql.Timestamp;
+import java.util.Date;
+
+public class ScheduleEntityTest extends DaoBaseTest {
+
+    public static final String GROUP_ID_PREFIX = "test_group_";
+    public static final String USER = "admin";
+    public static final int SCHEDULE_TYPE = 0;
+    public static final int SCHEDULE_TYPE_NEW = 1;
+    public static final String SCHEDULE_UNIT = "H";
+    public static final String SCHEDULE_UNIT_NEW = "D";
+    public static final int SCHEDULE_INTERVAL = 1;
+    public static final int SCHEDULE_INTERVAL_NEW = 1;
+    public static final Timestamp DEFAULT_TIME = new 
Timestamp(System.currentTimeMillis());
+
+    @Autowired
+    ScheduleEntityMapper scheduleEntityMapper;
+
+    @Test
+    public void testSelectByGroupId() throws Exception {
+        ScheduleEntity scheduleEntity = genEntity();
+        scheduleEntityMapper.insert(scheduleEntity);
+        ScheduleEntity entityQueried = 
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
+        Assertions.assertEquals(scheduleEntity.getInlongGroupId(), 
entityQueried.getInlongGroupId());
+        Assertions.assertEquals(SCHEDULE_TYPE, 
entityQueried.getScheduleType());
+        Assertions.assertEquals(SCHEDULE_UNIT, 
entityQueried.getScheduleUnit());
+        Assertions.assertEquals(SCHEDULE_INTERVAL, 
entityQueried.getScheduleInterval());
+        Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
+        Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime());
+        Assertions.assertEquals(USER, entityQueried.getCreator());
+    }
+
+    @Test
+    public void testUpdate() throws Exception {
+        ScheduleEntity scheduleEntity = genEntity();
+        scheduleEntityMapper.insert(scheduleEntity);
+        ScheduleEntity entityQueried = 
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
+        Assertions.assertEquals(scheduleEntity.getInlongGroupId(), 
entityQueried.getInlongGroupId());
+        Assertions.assertEquals(SCHEDULE_TYPE, 
entityQueried.getScheduleType());
+        Assertions.assertEquals(SCHEDULE_UNIT, 
entityQueried.getScheduleUnit());
+        Assertions.assertEquals(SCHEDULE_INTERVAL, 
entityQueried.getScheduleInterval());
+        Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
+        Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime());
+        Assertions.assertEquals(USER, entityQueried.getCreator());
+
+        entityQueried.setScheduleType(SCHEDULE_TYPE_NEW);
+        entityQueried.setScheduleUnit(SCHEDULE_UNIT_NEW);
+        entityQueried.setScheduleInterval(SCHEDULE_INTERVAL_NEW);
+        scheduleEntityMapper.updateByIdSelective(entityQueried);
+        entityQueried = 
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
+        Assertions.assertEquals(scheduleEntity.getInlongGroupId(), 
entityQueried.getInlongGroupId());
+        Assertions.assertEquals(SCHEDULE_TYPE_NEW, 
entityQueried.getScheduleType());
+        Assertions.assertEquals(SCHEDULE_UNIT_NEW, 
entityQueried.getScheduleUnit());
+        Assertions.assertEquals(SCHEDULE_INTERVAL_NEW, 
entityQueried.getScheduleInterval());
+        Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
+        Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime());
+        Assertions.assertEquals(USER, entityQueried.getCreator());
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        ScheduleEntity scheduleEntity = genEntity();
+        scheduleEntityMapper.insert(scheduleEntity);
+        ScheduleEntity entityQueried = 
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
+        Assertions.assertEquals(scheduleEntity.getInlongGroupId(), 
entityQueried.getInlongGroupId());
+        Assertions.assertEquals(SCHEDULE_TYPE, 
entityQueried.getScheduleType());
+        Assertions.assertEquals(SCHEDULE_UNIT, 
entityQueried.getScheduleUnit());
+        Assertions.assertEquals(SCHEDULE_INTERVAL, 
entityQueried.getScheduleInterval());
+        Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
+        Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime());
+        Assertions.assertEquals(USER, entityQueried.getCreator());
+
+        
scheduleEntityMapper.deleteByGroupId(scheduleEntity.getInlongGroupId());
+        entityQueried = 
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
+        Assertions.assertNull(entityQueried);
+    }
+
+    private ScheduleEntity genEntity() {
+        ScheduleEntity entity = new ScheduleEntity();
+        entity.setInlongGroupId(GROUP_ID_PREFIX + System.currentTimeMillis());
+        entity.setScheduleType(SCHEDULE_TYPE);
+        entity.setScheduleUnit(SCHEDULE_UNIT);
+        entity.setScheduleInterval(SCHEDULE_INTERVAL);
+        entity.setStartTime(DEFAULT_TIME);
+        entity.setEndTime(DEFAULT_TIME);
+        entity.setCreator(USER);
+        entity.setCreateTime(new Date());
+        entity.setModifyTime(new Date());
+        entity.setIsDeleted(0);
+        return entity;
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
new file mode 100644
index 0000000000..13afce70a4
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule;
+
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+import java.sql.Timestamp;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Schedule response")
+public class ScheduleInfo {
+
+    @ApiModelProperty(value = "Primary key")
+    @NotNull(groups = UpdateValidation.class)
+    private Integer id;
+
+    @ApiModelProperty("Inlong Group ID")
+    @NotNull
+    private String inlongGroupId;
+
+    // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+    @ApiModelProperty("Schedule type")
+    private Integer scheduleType;
+
+    // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
+    // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+    @ApiModelProperty("TimeUnit for schedule interval")
+    private String scheduleUnit;
+
+    @ApiModelProperty("Schedule interval")
+    private Integer scheduleInterval;
+
+    @ApiModelProperty("Start time")
+    private Timestamp startTime;
+
+    @ApiModelProperty("End time")
+    private Timestamp endTime;
+
+    @ApiModelProperty("Delay time")
+    private Integer delayTime;
+
+    @ApiModelProperty("Self depend")
+    private Integer selfDepend;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer taskParallelism;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer crontabExpression;
+
+    @ApiModelProperty(value = "Version number")
+    @NotNull(groups = UpdateValidation.class, message = "version cannot be 
null")
+    private Integer version;
+
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
new file mode 100644
index 0000000000..eff1660719
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule;
+
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+
+import java.sql.Timestamp;
+
+@Data
+@ApiModel("Schedule request")
+public class ScheduleInfoRequest {
+
+    @ApiModelProperty(value = "Primary key")
+    @NotNull(groups = UpdateValidation.class)
+    private Integer id;
+
+    @ApiModelProperty("Inlong Group ID")
+    @NotNull
+    private String inlongGroupId;
+
+    // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+    @ApiModelProperty("Schedule type")
+    private Integer scheduleType;
+
+    // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
+    // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+    @ApiModelProperty("TimeUnit for schedule interval")
+    private String scheduleUnit;
+
+    @ApiModelProperty("Schedule interval")
+    private Integer scheduleInterval;
+
+    @ApiModelProperty("Start time")
+    private Timestamp startTime;
+
+    @ApiModelProperty("End time")
+    private Timestamp endTime;
+
+    @ApiModelProperty("Delay time")
+    private Integer delayTime;
+
+    @ApiModelProperty("Self depend")
+    private Integer selfDepend;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer taskParallelism;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer crontabExpression;
+
+    @ApiModelProperty(value = "Version number")
+    @NotNull(groups = UpdateValidation.class, message = "version cannot be 
null")
+    private Integer version;
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
new file mode 100644
index 0000000000..d00e7134d4
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.schedule;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+
+import javax.validation.Valid;
+import javax.validation.constraints.NotNull;
+
+public interface ScheduleService {
+
+    /**
+     * Save schedule info.
+     *
+     * @param request schedule request need to save
+     * @param operator name of operator
+     * @return schedule info id in backend storage
+     */
+    int save(@Valid @NotNull(message = "schedule request cannot be null") 
ScheduleInfoRequest request,
+            String operator);
+
+    /**
+     * Query whether schedule info exists for specified inlong group
+     *
+     * @param groupId the group id to be queried
+     * @return does it exist
+     */
+    Boolean exist(String groupId);
+
+    /**
+     * Get schedule info based on inlong group id
+     *
+     * @param groupId inlong group id
+     * @return detail of inlong group
+     */
+    ScheduleInfo get(String groupId);
+
+    /**
+     * Modify schedule information
+     *
+     * @param request schedule request that needs to be modified
+     * @param operator name of operator
+     * @return whether succeed
+     */
+    Boolean update(@Valid @NotNull(message = "schedule request cannot be 
null") ScheduleInfoRequest request,
+            String operator);
+
+    /**
+     * Delete schedule info for gropuId.
+     * @param groupId groupId to find a schedule info to delete
+     * @param operator  name of operator
+     * @Return whether succeed
+     * */
+    Boolean deleteByGroupId(String groupId, String operator);
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
new file mode 100644
index 0000000000..480189da9e
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.schedule;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.ScheduleStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.ScheduleEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Objects;
+
+@Service
+public class ScheduleServiceImpl implements ScheduleService {
+
+    private static Logger LOGGER = 
LoggerFactory.getLogger(ScheduleServiceImpl.class);
+
+    @Autowired
+    private InlongGroupEntityMapper groupMapper;
+    @Autowired
+    private ScheduleEntityMapper scheduleEntityMapper;
+
+    @Override
+    public int save(ScheduleInfoRequest request, String operator) {
+        LOGGER.debug("begin to save schedule info, scheduleInfo={}, 
operator={}", request, operator);
+
+        String groupId = request.getInlongGroupId();
+        checkGroupExist(groupId);
+        if (scheduleEntityMapper.selectByGroupId(groupId) != null) {
+            LOGGER.error("schedule info for group={} already exists", groupId);
+            throw new BusinessException(ErrorCodeEnum.SCHEDULE_DUPLICATE);
+        }
+
+        ScheduleEntity scheduleEntity = 
CommonBeanUtils.copyProperties(request, ScheduleEntity::new);
+        scheduleEntity.setStatus(ScheduleStatus.NEW.getCode());
+        scheduleEntity.setCreator(operator);
+        scheduleEntity.setModifier(operator);
+        return scheduleEntityMapper.insert(scheduleEntity);
+    }
+
+    @Override
+    public Boolean exist(String groupId) {
+        checkGroupExist(groupId);
+        return scheduleEntityMapper.selectByGroupId(groupId) != null;
+    }
+
+    @Override
+    public ScheduleInfo get(String groupId) {
+        LOGGER.debug("begin to get schedule info by groupId={}", groupId);
+        ScheduleEntity entity = getScheduleEntity(groupId);
+        return CommonBeanUtils.copyProperties(entity, ScheduleInfo::new);
+    }
+
+    @Override
+    public Boolean update(ScheduleInfoRequest request, String operator) {
+        LOGGER.debug("begin to update schedule info={}", request);
+        String groupId = request.getInlongGroupId();
+        ScheduleEntity entity = getScheduleEntity(groupId);
+        String errMsg =
+                String.format("schedule info has already been updated with 
groupId=%s, curVersion=%s, expectVersion=%s",
+                        entity.getInlongGroupId(), request.getVersion(), 
entity.getVersion());
+        if (!Objects.equals(entity.getVersion(), request.getVersion())) {
+            LOGGER.error(errMsg);
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+        CommonBeanUtils.copyProperties(request, entity, true);
+        entity.setModifier(operator);
+        updateScheduleInfo(entity, errMsg);
+        LOGGER.info("success to update schedule info for groupId={}", groupId);
+        return true;
+    }
+
+    @Override
+    public Boolean deleteByGroupId(String groupId, String operator) {
+        LOGGER.debug("begin to delete schedule info for groupId={}", groupId);
+        checkGroupExist(groupId);
+        ScheduleEntity entity = scheduleEntityMapper.selectByGroupId(groupId);
+        if (entity == null) {
+            LOGGER.error("schedule info for groupId={} does not exist", 
groupId);
+            return false;
+        }
+        entity.setPreviousStatus(entity.getStatus());
+        entity.setStatus(ScheduleStatus.DELETED.getCode());
+        entity.setModifier(operator);
+        entity.setIsDeleted(entity.getId());
+        updateScheduleInfo(entity,
+                String.format("schedule info has already been updated with 
groupId=%s, curVersion=%s",
+                        entity.getInlongGroupId(), entity.getVersion()));
+        LOGGER.info("success to delete schedule info for groupId={}", groupId);
+        return true;
+    }
+
+    /**
+     * Check whether InLongGroup exists, throw BusinessException with 
ErrorCodeEnum.GROUP_NOT_FOUND if check failed.
+     * */
+    private void checkGroupExist(String groupId) {
+        Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+        InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+        if (entity == null) {
+            LOGGER.error("inlong group not found by groupId={}", groupId);
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+    }
+
+    private ScheduleEntity getScheduleEntity(String groupId) {
+        checkGroupExist(groupId);
+        ScheduleEntity entity = scheduleEntityMapper.selectByGroupId(groupId);
+        if (entity == null) {
+            LOGGER.error("schedule info for group={} not found", groupId);
+            throw new BusinessException(ErrorCodeEnum.SCHEDULE_NOT_FOUND);
+        }
+        return entity;
+    }
+
+    /**
+     * Update schedule entity and throw exception if update failed.
+     * @param entity to update
+     * @param errorMsg when update failed.
+     * @return
+     *
+     * */
+    private void updateScheduleInfo(ScheduleEntity entity, String errorMsg) {
+        if (scheduleEntityMapper.updateByIdSelective(entity) != 
InlongConstants.AFFECTED_ONE_ROW) {
+            LOGGER.error(errorMsg);
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 3864ffcbe3..a8b545f7bf 100644
--- 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -962,4 +962,35 @@ CREATE TABLE IF NOT EXISTS `tenant_template`
 
 -- ----------------------------
 
+-- ----------------------------
+-- Table structure for schedule_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `schedule_config`
+(
+    `id`                     int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `inlong_group_id`        varchar(256) NOT NULL COMMENT 'Inlong group id, 
undeleted ones cannot be repeated',
+    `schedule_type`          int(4)       NOT NULL DEFAULT '0' COMMENT 
'Schedule type, 0 for normal, 1 for crontab',
+    `schedule_unit`          varchar(64)  NOT NULL COMMENT 'Schedule 
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+    `schedule_interval`      int(11)      DEFAULT '1' COMMENT 'Schedule 
interval',
+    `start_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Start time for schedule',
+    `end_time`               timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'End time for schedule',
+    `delay_time`             int(11)      DEFAULT '0' COMMENT 'Delay time in 
minutes to schedule',
+    `self_depend`            int(11)      DEFAULT NULL COMMENT 'Self depend 
info',
+    `task_parallelism`       int(11)      DEFAULT NULL COMMENT 'Task 
parallelism',
+    `crontab_expression`     varchar(256) DEFAULT NULL COMMENT 'Crontab 
expression if schedule type is crontab',
+    `status`                 int(4)       DEFAULT '100' COMMENT 'Schedule 
status',
+    `previous_status`        int(4)       DEFAULT '100' COMMENT 'Previous 
schedule status',
+    `is_deleted`             int(11)      DEFAULT '0' COMMENT 'Whether to 
delete, 0: not deleted, > 0: deleted',
+    `creator`                varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`               varchar(64)  DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Create time',
+    `modify_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`                int(11)      NOT NULL DEFAULT '1' COMMENT 
'Version number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`)
+    ) ENGINE = InnoDB
+    DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+-- ----------------------------
+
+
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index cd31161b1c..f7dc1c0a90 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1015,5 +1015,33 @@ CREATE TABLE IF NOT EXISTS `tenant_template`
 DEFAULT CHARSET = utf8mb4 COMMENT ='Tenant template table';
 
 -- ----------------------------
+-- Table structure for schedule_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `schedule_config`
+(
+    `id`                     int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `inlong_group_id`        varchar(256) NOT NULL COMMENT 'Inlong group id, 
undeleted ones cannot be repeated',
+    `schedule_type`          int(4)       NOT NULL DEFAULT '0' COMMENT 
'Schedule type, 0 for normal, 1 for crontab',
+    `schedule_unit`          varchar(64)  NOT NULL COMMENT 'Schedule 
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+    `schedule_interval`      int(11)      DEFAULT '1' COMMENT 'Schedule 
interval',
+    `start_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Start time for schedule',
+    `end_time`               timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'End time for schedule',
+    `delay_time`             int(11)      DEFAULT '0' COMMENT 'Delay time in 
minutes to schedule',
+    `self_depend`            int(11)      DEFAULT NULL COMMENT 'Self depend 
info',
+    `task_parallelism`       int(11)      DEFAULT NULL COMMENT 'Task 
parallelism',
+    `crontab_expression`     varchar(256) DEFAULT NULL COMMENT 'Crontab 
expression if schedule type is crontab',
+    `status`                 int(4)       DEFAULT '100' COMMENT 'Schedule 
status',
+    `previous_status`        int(4)       DEFAULT '100' COMMENT 'Previous 
schedule status',
+    `is_deleted`             int(11)      DEFAULT '0' COMMENT 'Whether to 
delete, 0: not deleted, > 0: deleted',
+    `creator`                varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`               varchar(64)  DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Create time',
+    `modify_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`                int(11)      NOT NULL DEFAULT '1' COMMENT 
'Version number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`)
+) ENGINE = InnoDB
+    DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+-- ----------------------------
 
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql 
b/inlong-manager/manager-web/sql/changes-1.13.0.sql
index ecb38b543d..38e7627c5e 100644
--- a/inlong-manager/manager-web/sql/changes-1.13.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql
@@ -88,3 +88,33 @@ CREATE TABLE IF NOT EXISTS `tenant_template`
     UNIQUE KEY `unique_tenant_inlong_template` (`tenant`, `template_name`, 
`is_deleted`)
 ) ENGINE = InnoDB
     DEFAULT CHARSET = utf8mb4 COMMENT ='Tenant template table';
+
+-- ----------------------------
+-- Table structure for schedule_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `schedule_config`
+(
+    `id`                     int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
+    `inlong_group_id`               varchar(256) NOT NULL COMMENT 'Inlong 
group id, undeleted ones cannot be repeated',
+    `schedule_type`          int(4)       NOT NULL DEFAULT '0' COMMENT 
'Schedule type, 0 for normal, 1 for crontab',
+    `schedule_unit`          varchar(64)  NOT NULL COMMENT 'Schedule 
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+    `schedule_interval`      int(11)      DEFAULT '1' COMMENT 'Schedule 
interval',
+    `start_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Start time for schedule',
+    `end_time`               timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'End time for schedule',
+    `delay_time`             int(11)      DEFAULT '0' COMMENT 'Delay time in 
minutes to schedule',
+    `self_depend`            int(11)      DEFAULT NULL COMMENT 'Self depend 
info',
+    `task_parallelism`       int(11)      DEFAULT NULL COMMENT 'Task 
parallelism',
+    `crontab_expression`     varchar(256) DEFAULT NULL COMMENT 'Crontab 
expression if schedule type is crontab',
+    `status`                 int(4)       DEFAULT '100' COMMENT 'Schedule 
status',
+    `previous_status`        int(4)       DEFAULT '100' COMMENT 'Previous 
schedule status',
+    `is_deleted`             int(11)      DEFAULT '0' COMMENT 'Whether to 
delete, 0: not deleted, > 0: deleted',
+    `creator`                varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`               varchar(64)  DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Create time',
+    `modify_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`                int(11)      NOT NULL DEFAULT '1' COMMENT 
'Version number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`)
+    ) ENGINE = InnoDB
+    DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+-- ----------------------------
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
new file mode 100644
index 0000000000..801cc09b0e
--- /dev/null
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import org.apache.inlong.manager.common.enums.OperationTarget;
+import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+import org.apache.inlong.manager.pojo.user.LoginUserUtils;
+import org.apache.inlong.manager.service.operationlog.OperationLog;
+import org.apache.inlong.manager.service.schedule.ScheduleService;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api")
+@Api(tags = "Inlong-Schedule-API")
+public class InLongSchedulerController {
+
+    @Autowired
+    private ScheduleService scheduleService;
+
+    @RequestMapping(value = "/schedule/save", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.SCHEDULE)
+    @ApiOperation(value = "Save schedule info")
+    public Response<Integer> save(@RequestBody ScheduleInfoRequest request) {
+        int result = scheduleService.save(request, 
LoginUserUtils.getLoginUser().getName());
+        return Response.success(result);
+    }
+
+    @RequestMapping(value = "/schedule/exist/{groupId}", method = 
RequestMethod.GET)
+    @ApiOperation(value = "Is the schedule info exists for inlong group")
+    @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required 
= true)
+    public Response<Boolean> exist(@PathVariable String groupId) {
+        return Response.success(scheduleService.exist(groupId));
+    }
+
+    @RequestMapping(value = "/schedule/get", method = RequestMethod.GET)
+    @ApiOperation(value = "Get schedule info for inlong group")
+    @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required 
= true)
+    public Response<ScheduleInfo> get(@RequestParam String groupId) {
+        return Response.success(scheduleService.get(groupId));
+    }
+
+    @RequestMapping(value = "/schedule/update", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.UPDATE, operationTarget = 
OperationTarget.SCHEDULE)
+    @ApiOperation(value = "Update schedule info")
+    public Response<Boolean> update(@Validated(UpdateValidation.class) 
@RequestBody ScheduleInfoRequest request) {
+        return Response.success(scheduleService.update(request, 
LoginUserUtils.getLoginUser().getName()));
+    }
+
+    @RequestMapping(value = "/schedule/delete/{groupId}", method = 
RequestMethod.DELETE)
+    @ApiOperation(value = "Delete schedule info")
+    @OperationLog(operation = OperationType.DELETE, operationTarget = 
OperationTarget.SCHEDULE)
+    @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class, required = true)
+    public Response<Boolean> delete(@PathVariable String groupId) {
+        String operator = LoginUserUtils.getLoginUser().getName();
+        return Response.success(scheduleService.deleteByGroupId(groupId, 
operator));
+    }
+
+}

Reply via email to