This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 8bd3a20cb49f9dc3ddc83c0bcb769f95e793e4b6 Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Thu May 23 18:18:31 2024 +0800 [INLONG-10247][Manager] Support schedule information management for offline sync (#10254) --- .../client/api/inner/client/ClientFactory.java | 2 + .../api/inner/client/InLongScheduleClient.java | 67 +++++++++ .../client/api/service/InLongScheduleApi.java | 49 +++++++ .../inlong/manager/common/enums/ErrorCodeEnum.java | 3 + .../manager/common/enums/OperationTarget.java | 4 +- .../{OperationTarget.java => ScheduleStatus.java} | 54 ++++--- .../inlong/manager/dao/entity/ScheduleEntity.java | 61 ++++++++ .../manager/dao/mapper/ScheduleEntityMapper.java} | 36 ++--- .../resources/mappers/ScheduleEntityMapper.xml | 144 +++++++++++++++++++ .../manager/dao/mapper/ScheduleEntityTest.java | 118 ++++++++++++++++ .../inlong/manager/pojo/schedule/ScheduleInfo.java | 82 +++++++++++ .../manager/pojo/schedule/ScheduleInfoRequest.java | 76 ++++++++++ .../manager/service/schedule/ScheduleService.java | 71 ++++++++++ .../service/schedule/ScheduleServiceImpl.java | 155 +++++++++++++++++++++ .../main/resources/h2/apache_inlong_manager.sql | 31 +++++ .../manager-web/sql/apache_inlong_manager.sql | 28 ++++ inlong-manager/manager-web/sql/changes-1.13.0.sql | 30 ++++ .../web/controller/InLongSchedulerController.java | 88 ++++++++++++ 18 files changed, 1043 insertions(+), 56 deletions(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java index 3c67f1fa71..01c9fb6473 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java @@ -55,6 +55,7 @@ public class ClientFactory { private final AuditClient auditClient; private final InlongTenantClient inlongTenantClient; private final InlongTenantRoleClient inlongTenantRoleClient; + private final InLongScheduleClient inLongScheduleClient; public ClientFactory(ClientConfiguration configuration) { groupClient = new InlongGroupClient(configuration); @@ -74,5 +75,6 @@ public class ClientFactory { auditClient = new AuditClient(configuration); inlongTenantClient = new InlongTenantClient(configuration); inlongTenantRoleClient = new InlongTenantRoleClient(configuration); + inLongScheduleClient = new InLongScheduleClient(configuration); } } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InLongScheduleClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InLongScheduleClient.java new file mode 100644 index 0000000000..86638dbae8 --- /dev/null +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InLongScheduleClient.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.client.api.inner.client; + +import org.apache.inlong.manager.client.api.ClientConfiguration; +import org.apache.inlong.manager.client.api.service.InLongScheduleApi; +import org.apache.inlong.manager.client.api.util.ClientUtils; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; + +public class InLongScheduleClient { + + private InLongScheduleApi scheduleApi; + + public InLongScheduleClient(ClientConfiguration clientConfiguration) { + scheduleApi = ClientUtils.createRetrofit(clientConfiguration).create(InLongScheduleApi.class); + } + + public Integer createScheduleInfo(ScheduleInfoRequest request) { + Response<Integer> response = ClientUtils.executeHttpCall(scheduleApi.createSchedule(request)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + + public Boolean scheduleInfoExist(String groupId) { + Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY); + Response<Boolean> response = ClientUtils.executeHttpCall(scheduleApi.exist(groupId)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + + public Boolean updateScheduleInfo(ScheduleInfoRequest request) { + Response<Boolean> response = ClientUtils.executeHttpCall(scheduleApi.update(request)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + + public ScheduleInfo getScheduleInfo(String groupId) { + Response<ScheduleInfo> response = ClientUtils.executeHttpCall(scheduleApi.get(groupId)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + + public Boolean deleteScheduleInfo(String groupId) { + Response<Boolean> response = ClientUtils.executeHttpCall(scheduleApi.delete(groupId)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } +} diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InLongScheduleApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InLongScheduleApi.java new file mode 100644 index 0000000000..0da7be3d6f --- /dev/null +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InLongScheduleApi.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.client.api.service; + +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; + +import retrofit2.Call; +import retrofit2.http.Body; +import retrofit2.http.DELETE; +import retrofit2.http.GET; +import retrofit2.http.POST; +import retrofit2.http.Path; +import retrofit2.http.Query; + +public interface InLongScheduleApi { + + @POST("schedule/save") + Call<Response<Integer>> createSchedule(@Body ScheduleInfoRequest request); + + @GET("schedule/exist/{groupId}") + Call<Response<Boolean>> exist(@Path("groupId") String groupId); + + @POST("schedule/update") + Call<Response<Boolean>> update(@Body ScheduleInfoRequest request); + + @GET("schedule/get") + Call<Response<ScheduleInfo>> get(@Query("groupId") String groupId); + + @DELETE("schedule/delete/{groupId}") + Call<Response<Boolean>> delete(@Path("groupId") String groupId); + +} diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java index ed7549b699..6a8f9b4699 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java @@ -125,6 +125,9 @@ public enum ErrorCodeEnum { MQ_TYPE_IS_NULL(1600, "MQ type is null"), MQ_TYPE_NOT_SUPPORT(1601, "MQ type '%s' not support"), + SCHEDULE_NOT_FOUND(1700, "Schedule info not found"), + SCHEDULE_DUPLICATE(1701, "Schedule info already exist"), + WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"), WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no operation authority"), WORKFLOW_DELETE_RECORD_FAILED(4002, "Workflow delete record failure"), diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java index d92fb31b33..cb9e1638be 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java @@ -46,6 +46,8 @@ public enum OperationTarget { TENANT_ROLE, - TEMPLATE + TEMPLATE, + + SCHEDULE } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java similarity index 58% copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java index d92fb31b33..cb256a491c 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java @@ -17,35 +17,29 @@ package org.apache.inlong.manager.common.enums; -/** - * Operation target - */ -public enum OperationTarget { - - TENANT, - - GROUP, - - STREAM, - - SOURCE, - - SINK, - - CONSUME, - - WORKFLOW, - - NODE, - - CLUSTER, - - TRANSFORM, - - INLONG_ROLE, - - TENANT_ROLE, - - TEMPLATE +import lombok.Getter; + +@Getter +public enum ScheduleStatus { + + NEW(100, "new"), + DELETED(40, "deleted"); + + private final Integer code; + private final String description; + + ScheduleStatus(Integer code, String description) { + this.code = code; + this.description = description; + } + + public static ScheduleStatus forCode(int code) { + for (ScheduleStatus status : values()) { + if (status.getCode() == code) { + return status; + } + } + throw new IllegalStateException(String.format("Illegal code=%s for ScheduleStatus", code)); + } } diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java new file mode 100644 index 0000000000..75237343cb --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +import lombok.Data; + +import java.io.Serializable; +import java.sql.Timestamp; +import java.util.Date; + +@Data +public class ScheduleEntity implements Serializable { + + private static final long serialVersionUID = 1L; + + private Integer id; + // inLong group id + private String inlongGroupId; + // schedule type, support [normal, crontab], 0 for normal and 1 for crontab + private Integer scheduleType; + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] + // M=month, W=week, D=day, H=hour, M=minute, O=oneway + private String scheduleUnit; + private Integer scheduleInterval; + // schedule start time, long type timestamp + private Timestamp startTime; + // schedule end time, long type timestamp + private Timestamp endTime; + // delay time to start task, in minutes + private Integer delayTime; + // if task depend on itself + private Integer selfDepend; + private Integer taskParallelism; + // expression of crontab, used when scheduleType is crontab + private String crontabExpression; + + private Integer status; + private Integer previousStatus; + private Integer isDeleted; + private String creator; + private String modifier; + private Date createTime; + private Date modifyTime; + private Integer version; + +} diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java similarity index 60% copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java index d92fb31b33..23e8e23c31 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java @@ -15,37 +15,23 @@ * limitations under the License. */ -package org.apache.inlong.manager.common.enums; +package org.apache.inlong.manager.dao.mapper; -/** - * Operation target - */ -public enum OperationTarget { - - TENANT, - - GROUP, - - STREAM, - - SOURCE, - - SINK, - - CONSUME, - - WORKFLOW, +import org.apache.inlong.manager.dao.entity.ScheduleEntity; - NODE, +import org.apache.ibatis.annotations.Param; +import org.springframework.stereotype.Repository; - CLUSTER, +@Repository +public interface ScheduleEntityMapper { - TRANSFORM, + int insert(ScheduleEntity scheduleEntity); - INLONG_ROLE, + ScheduleEntity selectByPrimaryKey(Integer id); - TENANT_ROLE, + ScheduleEntity selectByGroupId(String groupId); - TEMPLATE + int updateByIdSelective(ScheduleEntity scheduleEntity); + int deleteByGroupId(@Param("inlongGroupId") String inlongGroupId); } diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml new file mode 100644 index 0000000000..d719aa8988 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml @@ -0,0 +1,144 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> +<mapper namespace="org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper"> + <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ScheduleEntity"> + <id column="id" jdbcType="INTEGER" property="id"/> + <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/> + <result column="schedule_type" jdbcType="INTEGER" property="scheduleType"/> + <result column="schedule_unit" jdbcType="VARCHAR" property="scheduleUnit"/> + <result column="schedule_interval" jdbcType="INTEGER" property="scheduleInterval"/> + <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/> + <result column="end_time" jdbcType="TIMESTAMP" property="endTime"/> + <result column="delay_time" jdbcType="INTEGER" property="delayTime"/> + <result column="self_depend" jdbcType="INTEGER" property="selfDepend"/> + <result column="task_parallelism" jdbcType="INTEGER" property="taskParallelism"/> + <result column="crontab_expression" jdbcType="VARCHAR" property="crontabExpression"/> + + <result column="status" jdbcType="INTEGER" property="status"/> + <result column="previous_status" jdbcType="INTEGER" property="previousStatus"/> + <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/> + <result column="creator" jdbcType="VARCHAR" property="creator"/> + <result column="modifier" jdbcType="VARCHAR" property="modifier"/> + <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/> + <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/> + <result column="version" jdbcType="INTEGER" property="version"/> + </resultMap> + + <sql id="Base_Column_List"> + id, inlong_group_id, schedule_type, schedule_unit, schedule_interval, start_time, + end_time, delay_time, self_depend, task_parallelism, crontab_expression, + status, previous_status, is_deleted, creator, modifier, create_time, modify_time, version + </sql> + + <insert id="insert" useGeneratedKeys="true" keyProperty="id" + parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity"> + insert into schedule_config (id, inlong_group_id, schedule_type, schedule_unit, + schedule_interval, start_time, end_time, delay_time, + self_depend, task_parallelism, crontab_expression, + status, previous_status, creator, modifier) + values (#{id, jdbcType=INTEGER}, #{inlongGroupId, jdbcType=VARCHAR}, + #{scheduleType, jdbcType=INTEGER}, #{scheduleUnit, jdbcType=VARCHAR}, + #{scheduleInterval, jdbcType=INTEGER}, #{startTime, jdbcType=TIMESTAMP}, + #{endTime, jdbcType=TIMESTAMP}, #{delayTime, jdbcType=INTEGER}, + #{selfDepend, jdbcType=INTEGER}, #{taskParallelism, jdbcType=INTEGER}, + #{crontabExpression, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, + #{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, + #{modifier,jdbcType=VARCHAR}) + </insert> + + <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap"> + select + <include refid="Base_Column_List"/> + from schedule_config + where id = #{id,jdbcType=INTEGER} + and is_deleted = 0 + </select> + + <select id="selectByGroupId" parameterType="java.lang.String" resultMap="BaseResultMap"> + select + <include refid="Base_Column_List"/> + from schedule_config + where inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR} + and is_deleted = 0 + </select> + + <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity"> + update schedule_config + <set> + <if test="inlongGroupId != null"> + inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}, + </if> + <if test="scheduleType != null"> + schedule_type = #{scheduleType, jdbcType=INTEGER}, + </if> + <if test="scheduleUnit !=null"> + schedule_unit = #{scheduleUnit, jdbcType=VARCHAR}, + </if> + <if test="scheduleInterval != null"> + schedule_interval = #{scheduleInterval, jdbcType=INTEGER}, + </if> + <if test="startTime != null"> + start_time = #{startTime, jdbcType=TIMESTAMP}, + </if> + <if test="endTime != null"> + end_time = #{endTime, jdbcType=TIMESTAMP}, + </if> + <if test="delayTime != null"> + delay_time = #{delayTime, jdbcType=INTEGER}, + </if> + <if test="selfDepend != null"> + self_depend = #{selfDepend, jdbcType=INTEGER}, + </if> + <if test="taskParallelism != null"> + task_parallelism = #{taskParallelism, jdbcType=INTEGER}, + </if> + <if test="crontabExpression != null"> + crontab_expression = #{crontabExpression, jdbcType=VARCHAR}, + </if> + <if test="status != null"> + status = #{status, jdbcType=VARCHAR}, + </if> + <if test="previousStatus != null"> + previous_status = #{previousStatus, jdbcType=VARCHAR}, + </if> + <if test="isDeleted != null"> + is_deleted = #{isDeleted}, + </if> + <if test="creator != null"> + creator = #{creator, jdbcType=VARCHAR}, + </if> + <if test="modifier != null"> + modifier = #{modifier, jdbcType=VARCHAR}, + </if> + version = #{version, jdbcType=INTEGER} + 1 + </set> + <where> + id = #{id, jdbcType=INTEGER} + and version = #{version,jdbcType=INTEGER} + </where> + </update> + + <delete id="deleteByGroupId"> + delete + from schedule_config + where inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR} + </delete> + +</mapper> \ No newline at end of file diff --git a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java new file mode 100644 index 0000000000..ef4207bc84 --- /dev/null +++ b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.mapper; + +import org.apache.inlong.manager.dao.DaoBaseTest; +import org.apache.inlong.manager.dao.entity.ScheduleEntity; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +import java.sql.Timestamp; +import java.util.Date; + +public class ScheduleEntityTest extends DaoBaseTest { + + public static final String GROUP_ID_PREFIX = "test_group_"; + public static final String USER = "admin"; + public static final int SCHEDULE_TYPE = 0; + public static final int SCHEDULE_TYPE_NEW = 1; + public static final String SCHEDULE_UNIT = "H"; + public static final String SCHEDULE_UNIT_NEW = "D"; + public static final int SCHEDULE_INTERVAL = 1; + public static final int SCHEDULE_INTERVAL_NEW = 1; + public static final Timestamp DEFAULT_TIME = new Timestamp(System.currentTimeMillis()); + + @Autowired + ScheduleEntityMapper scheduleEntityMapper; + + @Test + public void testSelectByGroupId() throws Exception { + ScheduleEntity scheduleEntity = genEntity(); + scheduleEntityMapper.insert(scheduleEntity); + ScheduleEntity entityQueried = scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId()); + Assertions.assertEquals(scheduleEntity.getInlongGroupId(), entityQueried.getInlongGroupId()); + Assertions.assertEquals(SCHEDULE_TYPE, entityQueried.getScheduleType()); + Assertions.assertEquals(SCHEDULE_UNIT, entityQueried.getScheduleUnit()); + Assertions.assertEquals(SCHEDULE_INTERVAL, entityQueried.getScheduleInterval()); + Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime()); + Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime()); + Assertions.assertEquals(USER, entityQueried.getCreator()); + } + + @Test + public void testUpdate() throws Exception { + ScheduleEntity scheduleEntity = genEntity(); + scheduleEntityMapper.insert(scheduleEntity); + ScheduleEntity entityQueried = scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId()); + Assertions.assertEquals(scheduleEntity.getInlongGroupId(), entityQueried.getInlongGroupId()); + Assertions.assertEquals(SCHEDULE_TYPE, entityQueried.getScheduleType()); + Assertions.assertEquals(SCHEDULE_UNIT, entityQueried.getScheduleUnit()); + Assertions.assertEquals(SCHEDULE_INTERVAL, entityQueried.getScheduleInterval()); + Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime()); + Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime()); + Assertions.assertEquals(USER, entityQueried.getCreator()); + + entityQueried.setScheduleType(SCHEDULE_TYPE_NEW); + entityQueried.setScheduleUnit(SCHEDULE_UNIT_NEW); + entityQueried.setScheduleInterval(SCHEDULE_INTERVAL_NEW); + scheduleEntityMapper.updateByIdSelective(entityQueried); + entityQueried = scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId()); + Assertions.assertEquals(scheduleEntity.getInlongGroupId(), entityQueried.getInlongGroupId()); + Assertions.assertEquals(SCHEDULE_TYPE_NEW, entityQueried.getScheduleType()); + Assertions.assertEquals(SCHEDULE_UNIT_NEW, entityQueried.getScheduleUnit()); + Assertions.assertEquals(SCHEDULE_INTERVAL_NEW, entityQueried.getScheduleInterval()); + Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime()); + Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime()); + Assertions.assertEquals(USER, entityQueried.getCreator()); + } + + @Test + public void testDelete() throws Exception { + ScheduleEntity scheduleEntity = genEntity(); + scheduleEntityMapper.insert(scheduleEntity); + ScheduleEntity entityQueried = scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId()); + Assertions.assertEquals(scheduleEntity.getInlongGroupId(), entityQueried.getInlongGroupId()); + Assertions.assertEquals(SCHEDULE_TYPE, entityQueried.getScheduleType()); + Assertions.assertEquals(SCHEDULE_UNIT, entityQueried.getScheduleUnit()); + Assertions.assertEquals(SCHEDULE_INTERVAL, entityQueried.getScheduleInterval()); + Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime()); + Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime()); + Assertions.assertEquals(USER, entityQueried.getCreator()); + + scheduleEntityMapper.deleteByGroupId(scheduleEntity.getInlongGroupId()); + entityQueried = scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId()); + Assertions.assertNull(entityQueried); + } + + private ScheduleEntity genEntity() { + ScheduleEntity entity = new ScheduleEntity(); + entity.setInlongGroupId(GROUP_ID_PREFIX + System.currentTimeMillis()); + entity.setScheduleType(SCHEDULE_TYPE); + entity.setScheduleUnit(SCHEDULE_UNIT); + entity.setScheduleInterval(SCHEDULE_INTERVAL); + entity.setStartTime(DEFAULT_TIME); + entity.setEndTime(DEFAULT_TIME); + entity.setCreator(USER); + entity.setCreateTime(new Date()); + entity.setModifyTime(new Date()); + entity.setIsDeleted(0); + return entity; + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java new file mode 100644 index 0000000000..13afce70a4 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule; + +import org.apache.inlong.manager.common.validation.UpdateValidation; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotNull; + +import java.sql.Timestamp; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Schedule response") +public class ScheduleInfo { + + @ApiModelProperty(value = "Primary key") + @NotNull(groups = UpdateValidation.class) + private Integer id; + + @ApiModelProperty("Inlong Group ID") + @NotNull + private String inlongGroupId; + + // schedule type, support [normal, crontab], 0 for normal and 1 for crontab + @ApiModelProperty("Schedule type") + private Integer scheduleType; + + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] + // M=month, W=week, D=day, H=hour, M=minute, O=oneway + @ApiModelProperty("TimeUnit for schedule interval") + private String scheduleUnit; + + @ApiModelProperty("Schedule interval") + private Integer scheduleInterval; + + @ApiModelProperty("Start time") + private Timestamp startTime; + + @ApiModelProperty("End time") + private Timestamp endTime; + + @ApiModelProperty("Delay time") + private Integer delayTime; + + @ApiModelProperty("Self depend") + private Integer selfDepend; + + @ApiModelProperty("Schedule task parallelism") + private Integer taskParallelism; + + @ApiModelProperty("Schedule task parallelism") + private Integer crontabExpression; + + @ApiModelProperty(value = "Version number") + @NotNull(groups = UpdateValidation.class, message = "version cannot be null") + private Integer version; + +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java new file mode 100644 index 0000000000..eff1660719 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule; + +import org.apache.inlong.manager.common.validation.UpdateValidation; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotNull; + +import java.sql.Timestamp; + +@Data +@ApiModel("Schedule request") +public class ScheduleInfoRequest { + + @ApiModelProperty(value = "Primary key") + @NotNull(groups = UpdateValidation.class) + private Integer id; + + @ApiModelProperty("Inlong Group ID") + @NotNull + private String inlongGroupId; + + // schedule type, support [normal, crontab], 0 for normal and 1 for crontab + @ApiModelProperty("Schedule type") + private Integer scheduleType; + + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] + // M=month, W=week, D=day, H=hour, M=minute, O=oneway + @ApiModelProperty("TimeUnit for schedule interval") + private String scheduleUnit; + + @ApiModelProperty("Schedule interval") + private Integer scheduleInterval; + + @ApiModelProperty("Start time") + private Timestamp startTime; + + @ApiModelProperty("End time") + private Timestamp endTime; + + @ApiModelProperty("Delay time") + private Integer delayTime; + + @ApiModelProperty("Self depend") + private Integer selfDepend; + + @ApiModelProperty("Schedule task parallelism") + private Integer taskParallelism; + + @ApiModelProperty("Schedule task parallelism") + private Integer crontabExpression; + + @ApiModelProperty(value = "Version number") + @NotNull(groups = UpdateValidation.class, message = "version cannot be null") + private Integer version; + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java new file mode 100644 index 0000000000..d00e7134d4 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.schedule; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; + +import javax.validation.Valid; +import javax.validation.constraints.NotNull; + +public interface ScheduleService { + + /** + * Save schedule info. + * + * @param request schedule request need to save + * @param operator name of operator + * @return schedule info id in backend storage + */ + int save(@Valid @NotNull(message = "schedule request cannot be null") ScheduleInfoRequest request, + String operator); + + /** + * Query whether schedule info exists for specified inlong group + * + * @param groupId the group id to be queried + * @return does it exist + */ + Boolean exist(String groupId); + + /** + * Get schedule info based on inlong group id + * + * @param groupId inlong group id + * @return detail of inlong group + */ + ScheduleInfo get(String groupId); + + /** + * Modify schedule information + * + * @param request schedule request that needs to be modified + * @param operator name of operator + * @return whether succeed + */ + Boolean update(@Valid @NotNull(message = "schedule request cannot be null") ScheduleInfoRequest request, + String operator); + + /** + * Delete schedule info for gropuId. + * @param groupId groupId to find a schedule info to delete + * @param operator name of operator + * @Return whether succeed + * */ + Boolean deleteByGroupId(String groupId, String operator); +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java new file mode 100644 index 0000000000..480189da9e --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.schedule; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.enums.ScheduleStatus; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongGroupEntity; +import org.apache.inlong.manager.dao.entity.ScheduleEntity; +import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; +import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Objects; + +@Service +public class ScheduleServiceImpl implements ScheduleService { + + private static Logger LOGGER = LoggerFactory.getLogger(ScheduleServiceImpl.class); + + @Autowired + private InlongGroupEntityMapper groupMapper; + @Autowired + private ScheduleEntityMapper scheduleEntityMapper; + + @Override + public int save(ScheduleInfoRequest request, String operator) { + LOGGER.debug("begin to save schedule info, scheduleInfo={}, operator={}", request, operator); + + String groupId = request.getInlongGroupId(); + checkGroupExist(groupId); + if (scheduleEntityMapper.selectByGroupId(groupId) != null) { + LOGGER.error("schedule info for group={} already exists", groupId); + throw new BusinessException(ErrorCodeEnum.SCHEDULE_DUPLICATE); + } + + ScheduleEntity scheduleEntity = CommonBeanUtils.copyProperties(request, ScheduleEntity::new); + scheduleEntity.setStatus(ScheduleStatus.NEW.getCode()); + scheduleEntity.setCreator(operator); + scheduleEntity.setModifier(operator); + return scheduleEntityMapper.insert(scheduleEntity); + } + + @Override + public Boolean exist(String groupId) { + checkGroupExist(groupId); + return scheduleEntityMapper.selectByGroupId(groupId) != null; + } + + @Override + public ScheduleInfo get(String groupId) { + LOGGER.debug("begin to get schedule info by groupId={}", groupId); + ScheduleEntity entity = getScheduleEntity(groupId); + return CommonBeanUtils.copyProperties(entity, ScheduleInfo::new); + } + + @Override + public Boolean update(ScheduleInfoRequest request, String operator) { + LOGGER.debug("begin to update schedule info={}", request); + String groupId = request.getInlongGroupId(); + ScheduleEntity entity = getScheduleEntity(groupId); + String errMsg = + String.format("schedule info has already been updated with groupId=%s, curVersion=%s, expectVersion=%s", + entity.getInlongGroupId(), request.getVersion(), entity.getVersion()); + if (!Objects.equals(entity.getVersion(), request.getVersion())) { + LOGGER.error(errMsg); + throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); + } + CommonBeanUtils.copyProperties(request, entity, true); + entity.setModifier(operator); + updateScheduleInfo(entity, errMsg); + LOGGER.info("success to update schedule info for groupId={}", groupId); + return true; + } + + @Override + public Boolean deleteByGroupId(String groupId, String operator) { + LOGGER.debug("begin to delete schedule info for groupId={}", groupId); + checkGroupExist(groupId); + ScheduleEntity entity = scheduleEntityMapper.selectByGroupId(groupId); + if (entity == null) { + LOGGER.error("schedule info for groupId={} does not exist", groupId); + return false; + } + entity.setPreviousStatus(entity.getStatus()); + entity.setStatus(ScheduleStatus.DELETED.getCode()); + entity.setModifier(operator); + entity.setIsDeleted(entity.getId()); + updateScheduleInfo(entity, + String.format("schedule info has already been updated with groupId=%s, curVersion=%s", + entity.getInlongGroupId(), entity.getVersion())); + LOGGER.info("success to delete schedule info for groupId={}", groupId); + return true; + } + + /** + * Check whether InLongGroup exists, throw BusinessException with ErrorCodeEnum.GROUP_NOT_FOUND if check failed. + * */ + private void checkGroupExist(String groupId) { + Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY); + InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); + if (entity == null) { + LOGGER.error("inlong group not found by groupId={}", groupId); + throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND); + } + } + + private ScheduleEntity getScheduleEntity(String groupId) { + checkGroupExist(groupId); + ScheduleEntity entity = scheduleEntityMapper.selectByGroupId(groupId); + if (entity == null) { + LOGGER.error("schedule info for group={} not found", groupId); + throw new BusinessException(ErrorCodeEnum.SCHEDULE_NOT_FOUND); + } + return entity; + } + + /** + * Update schedule entity and throw exception if update failed. + * @param entity to update + * @param errorMsg when update failed. + * @return + * + * */ + private void updateScheduleInfo(ScheduleEntity entity, String errorMsg) { + if (scheduleEntityMapper.updateByIdSelective(entity) != InlongConstants.AFFECTED_ONE_ROW) { + LOGGER.error(errorMsg); + throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); + } + } +} diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index 3864ffcbe3..a8b545f7bf 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -962,4 +962,35 @@ CREATE TABLE IF NOT EXISTS `tenant_template` -- ---------------------------- +-- ---------------------------- +-- Table structure for schedule_config +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `schedule_config` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated', + `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab', + `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway', + `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval', + `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule', + `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'End time for schedule', + `delay_time` int(11) DEFAULT '0' COMMENT 'Delay time in minutes to schedule', + `self_depend` int(11) DEFAULT NULL COMMENT 'Self depend info', + `task_parallelism` int(11) DEFAULT NULL COMMENT 'Task parallelism', + `crontab_expression` varchar(256) DEFAULT NULL COMMENT 'Crontab expression if schedule type is crontab', + `status` int(4) DEFAULT '100' COMMENT 'Schedule status', + `previous_status` int(4) DEFAULT '100' COMMENT 'Previous schedule status', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`) + ) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config'; +-- ---------------------------- + + SET FOREIGN_KEY_CHECKS = 1; diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index cd31161b1c..f7dc1c0a90 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -1015,5 +1015,33 @@ CREATE TABLE IF NOT EXISTS `tenant_template` DEFAULT CHARSET = utf8mb4 COMMENT ='Tenant template table'; -- ---------------------------- +-- Table structure for schedule_config +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `schedule_config` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated', + `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab', + `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway', + `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval', + `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule', + `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'End time for schedule', + `delay_time` int(11) DEFAULT '0' COMMENT 'Delay time in minutes to schedule', + `self_depend` int(11) DEFAULT NULL COMMENT 'Self depend info', + `task_parallelism` int(11) DEFAULT NULL COMMENT 'Task parallelism', + `crontab_expression` varchar(256) DEFAULT NULL COMMENT 'Crontab expression if schedule type is crontab', + `status` int(4) DEFAULT '100' COMMENT 'Schedule status', + `previous_status` int(4) DEFAULT '100' COMMENT 'Previous schedule status', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config'; +-- ---------------------------- SET FOREIGN_KEY_CHECKS = 1; diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql b/inlong-manager/manager-web/sql/changes-1.13.0.sql index ecb38b543d..38e7627c5e 100644 --- a/inlong-manager/manager-web/sql/changes-1.13.0.sql +++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql @@ -88,3 +88,33 @@ CREATE TABLE IF NOT EXISTS `tenant_template` UNIQUE KEY `unique_tenant_inlong_template` (`tenant`, `template_name`, `is_deleted`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT ='Tenant template table'; + +-- ---------------------------- +-- Table structure for schedule_config +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `schedule_config` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated', + `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab', + `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway', + `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval', + `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule', + `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'End time for schedule', + `delay_time` int(11) DEFAULT '0' COMMENT 'Delay time in minutes to schedule', + `self_depend` int(11) DEFAULT NULL COMMENT 'Self depend info', + `task_parallelism` int(11) DEFAULT NULL COMMENT 'Task parallelism', + `crontab_expression` varchar(256) DEFAULT NULL COMMENT 'Crontab expression if schedule type is crontab', + `status` int(4) DEFAULT '100' COMMENT 'Schedule status', + `previous_status` int(4) DEFAULT '100' COMMENT 'Previous schedule status', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`) + ) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config'; +-- ---------------------------- diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java new file mode 100644 index 0000000000..801cc09b0e --- /dev/null +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.web.controller; + +import org.apache.inlong.manager.common.enums.OperationTarget; +import org.apache.inlong.manager.common.enums.OperationType; +import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; +import org.apache.inlong.manager.pojo.user.LoginUserUtils; +import org.apache.inlong.manager.service.operationlog.OperationLog; +import org.apache.inlong.manager.service.schedule.ScheduleService; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api") +@Api(tags = "Inlong-Schedule-API") +public class InLongSchedulerController { + + @Autowired + private ScheduleService scheduleService; + + @RequestMapping(value = "/schedule/save", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SCHEDULE) + @ApiOperation(value = "Save schedule info") + public Response<Integer> save(@RequestBody ScheduleInfoRequest request) { + int result = scheduleService.save(request, LoginUserUtils.getLoginUser().getName()); + return Response.success(result); + } + + @RequestMapping(value = "/schedule/exist/{groupId}", method = RequestMethod.GET) + @ApiOperation(value = "Is the schedule info exists for inlong group") + @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true) + public Response<Boolean> exist(@PathVariable String groupId) { + return Response.success(scheduleService.exist(groupId)); + } + + @RequestMapping(value = "/schedule/get", method = RequestMethod.GET) + @ApiOperation(value = "Get schedule info for inlong group") + @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true) + public Response<ScheduleInfo> get(@RequestParam String groupId) { + return Response.success(scheduleService.get(groupId)); + } + + @RequestMapping(value = "/schedule/update", method = RequestMethod.POST) + @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SCHEDULE) + @ApiOperation(value = "Update schedule info") + public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody ScheduleInfoRequest request) { + return Response.success(scheduleService.update(request, LoginUserUtils.getLoginUser().getName())); + } + + @RequestMapping(value = "/schedule/delete/{groupId}", method = RequestMethod.DELETE) + @ApiOperation(value = "Delete schedule info") + @OperationLog(operation = OperationType.DELETE, operationTarget = OperationTarget.SCHEDULE) + @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) + public Response<Boolean> delete(@PathVariable String groupId) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(scheduleService.deleteByGroupId(groupId, operator)); + } + +}