This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new eb55831cf0 [INLONG-11483][Manager] Support multiple scedule engine selection (#11484) eb55831cf0 is described below commit eb55831cf0408d4c30a12199a4d6f060d74cc670 Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Tue Nov 12 17:00:41 2024 +0800 [INLONG-11483][Manager] Support multiple scedule engine selection (#11484) Co-authored-by: Aloys Zhang <aloyszh...@apche.org> --- .../inlong/manager/dao/entity/ScheduleEntity.java | 2 ++ .../resources/mappers/ScheduleEntityMapper.xml | 26 ++++++++++++--------- .../manager/dao/mapper/ScheduleEntityTest.java | 3 +++ .../manager/pojo/group/InlongGroupRequest.java | 5 ++++ .../inlong/manager/pojo/schedule/ScheduleInfo.java | 10 +++++--- .../manager/pojo/schedule/ScheduleInfoRequest.java | 10 +++++--- .../manager/schedule/ScheduleClientFactory.java | 6 +---- .../service/schedule/ScheduleOperatorImpl.java | 8 +++---- .../main/resources/h2/apache_inlong_manager.sql | 1 + .../manager-web/sql/apache_inlong_manager.sql | 1 + inlong-manager/manager-web/sql/changes-2.1.0.sql | 27 ++++++++++++++++++++++ 11 files changed, 73 insertions(+), 26 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java index 6d301703fc..a9798c91f1 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java @@ -33,6 +33,8 @@ public class ScheduleEntity implements Serializable { private String inlongGroupId; // schedule type, support [normal, crontab], 0 for normal and 1 for crontab private Integer scheduleType; + // schedule engine type, support [Quartz, Airflow, DolphinScheduler] + private String scheduleEngine; // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround] // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround private String scheduleUnit; diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml index d719aa8988..33d25ad78a 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml @@ -22,6 +22,7 @@ <id column="id" jdbcType="INTEGER" property="id"/> <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/> <result column="schedule_type" jdbcType="INTEGER" property="scheduleType"/> + <result column="schedule_engine" jdbcType="VARCHAR" property="scheduleEngine"/> <result column="schedule_unit" jdbcType="VARCHAR" property="scheduleUnit"/> <result column="schedule_interval" jdbcType="INTEGER" property="scheduleInterval"/> <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/> @@ -42,25 +43,25 @@ </resultMap> <sql id="Base_Column_List"> - id, inlong_group_id, schedule_type, schedule_unit, schedule_interval, start_time, + id, inlong_group_id, schedule_type, schedule_engine, schedule_unit, schedule_interval, start_time, end_time, delay_time, self_depend, task_parallelism, crontab_expression, status, previous_status, is_deleted, creator, modifier, create_time, modify_time, version </sql> <insert id="insert" useGeneratedKeys="true" keyProperty="id" parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity"> - insert into schedule_config (id, inlong_group_id, schedule_type, schedule_unit, - schedule_interval, start_time, end_time, delay_time, - self_depend, task_parallelism, crontab_expression, + insert into schedule_config (id, inlong_group_id, schedule_type, schedule_engine, + schedule_unit, schedule_interval, start_time, end_time, + delay_time, self_depend, task_parallelism, crontab_expression, status, previous_status, creator, modifier) values (#{id, jdbcType=INTEGER}, #{inlongGroupId, jdbcType=VARCHAR}, - #{scheduleType, jdbcType=INTEGER}, #{scheduleUnit, jdbcType=VARCHAR}, - #{scheduleInterval, jdbcType=INTEGER}, #{startTime, jdbcType=TIMESTAMP}, - #{endTime, jdbcType=TIMESTAMP}, #{delayTime, jdbcType=INTEGER}, - #{selfDepend, jdbcType=INTEGER}, #{taskParallelism, jdbcType=INTEGER}, - #{crontabExpression, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, - #{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, - #{modifier,jdbcType=VARCHAR}) + #{scheduleType, jdbcType=INTEGER}, #{scheduleEngine, jdbcType=VARCHAR}, + #{scheduleUnit, jdbcType=VARCHAR}, #{scheduleInterval, jdbcType=INTEGER}, + #{startTime, jdbcType=TIMESTAMP}, #{endTime, jdbcType=TIMESTAMP}, + #{delayTime, jdbcType=INTEGER}, #{selfDepend, jdbcType=INTEGER}, + #{taskParallelism, jdbcType=INTEGER}, #{crontabExpression, jdbcType=VARCHAR}, + #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, + #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) </insert> <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap"> @@ -88,6 +89,9 @@ <if test="scheduleType != null"> schedule_type = #{scheduleType, jdbcType=INTEGER}, </if> + <if test="scheduleEngine != null"> + schedule_engine = #{scheduleEngine, jdbcType=VARCHAR}, + </if> <if test="scheduleUnit !=null"> schedule_unit = #{scheduleUnit, jdbcType=VARCHAR}, </if> diff --git a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java index ef4207bc84..6d2cdaa661 100644 --- a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java +++ b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java @@ -32,6 +32,7 @@ public class ScheduleEntityTest extends DaoBaseTest { public static final String GROUP_ID_PREFIX = "test_group_"; public static final String USER = "admin"; public static final int SCHEDULE_TYPE = 0; + public static final String SCHEDULE_ENGINE = "Quartz"; public static final int SCHEDULE_TYPE_NEW = 1; public static final String SCHEDULE_UNIT = "H"; public static final String SCHEDULE_UNIT_NEW = "D"; @@ -63,6 +64,7 @@ public class ScheduleEntityTest extends DaoBaseTest { ScheduleEntity entityQueried = scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId()); Assertions.assertEquals(scheduleEntity.getInlongGroupId(), entityQueried.getInlongGroupId()); Assertions.assertEquals(SCHEDULE_TYPE, entityQueried.getScheduleType()); + Assertions.assertEquals(SCHEDULE_ENGINE, entityQueried.getScheduleEngine()); Assertions.assertEquals(SCHEDULE_UNIT, entityQueried.getScheduleUnit()); Assertions.assertEquals(SCHEDULE_INTERVAL, entityQueried.getScheduleInterval()); Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime()); @@ -105,6 +107,7 @@ public class ScheduleEntityTest extends DaoBaseTest { ScheduleEntity entity = new ScheduleEntity(); entity.setInlongGroupId(GROUP_ID_PREFIX + System.currentTimeMillis()); entity.setScheduleType(SCHEDULE_TYPE); + entity.setScheduleEngine(SCHEDULE_ENGINE); entity.setScheduleUnit(SCHEDULE_UNIT); entity.setScheduleInterval(SCHEDULE_INTERVAL); entity.setStartTime(DEFAULT_TIME); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java index 1749d1427c..194bf35454 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java @@ -135,6 +135,11 @@ public abstract class InlongGroupRequest extends BaseInlongGroup { @ApiModelProperty("Schedule type") private Integer scheduleType; + // schedule engine type, support [Quartz, Airflow, DolphinScheduler] + @ApiModelProperty(value = "Schedule engine, support Quartz, Airflow and DolphinScheduler") + @Length(min = 1, max = 20, message = "length must be between 1 and 20") + private String scheduleEngine; + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround] // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround @ApiModelProperty("TimeUnit for schedule interval") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java index bb4fb2ca41..b6527cf308 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java @@ -50,6 +50,10 @@ public class ScheduleInfo { @ApiModelProperty("Schedule type") private Integer scheduleType; + // schedule engine type, support [Quartz, Airflow, DolphinScheduler] + @ApiModelProperty("Schedule engine") + private String scheduleEngine; + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround] // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround @ApiModelProperty("TimeUnit for schedule interval") @@ -91,6 +95,7 @@ public class ScheduleInfo { ScheduleInfo that = (ScheduleInfo) o; return Objects.equals(inlongGroupId, that.inlongGroupId) && Objects.equals(scheduleType, that.scheduleType) + && Objects.equals(scheduleEngine, that.scheduleEngine) && Objects.equals(scheduleUnit, that.scheduleUnit) && Objects.equals(scheduleInterval, that.scheduleInterval) && Objects.equals(startTime, that.startTime) @@ -103,9 +108,8 @@ public class ScheduleInfo { @Override public int hashCode() { - return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, scheduleInterval, startTime, endTime, - delayTime, - selfDepend, taskParallelism, crontabExpression, version); + return Objects.hash(id, inlongGroupId, scheduleType, scheduleEngine, scheduleUnit, scheduleInterval, startTime, + endTime, delayTime, selfDepend, taskParallelism, crontabExpression, version); } } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java index bf6fb298bf..882b490cf1 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java @@ -44,6 +44,10 @@ public class ScheduleInfoRequest { @ApiModelProperty("Schedule type") private Integer scheduleType; + // schedule engine type, support [Quartz, Airflow, DolphinScheduler] + @ApiModelProperty(value = "Schedule engine") + private String scheduleEngine; + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround] // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround @ApiModelProperty("TimeUnit for schedule interval") @@ -85,6 +89,7 @@ public class ScheduleInfoRequest { ScheduleInfoRequest that = (ScheduleInfoRequest) o; return Objects.equals(inlongGroupId, that.inlongGroupId) && Objects.equals(scheduleType, that.scheduleType) + && Objects.equals(scheduleEngine, that.scheduleEngine) && Objects.equals(scheduleUnit, that.scheduleUnit) && Objects.equals(scheduleInterval, that.scheduleInterval) && Objects.equals(startTime, that.startTime) @@ -97,8 +102,7 @@ public class ScheduleInfoRequest { @Override public int hashCode() { - return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, scheduleInterval, startTime, endTime, - delayTime, - selfDepend, taskParallelism, crontabExpression, version); + return Objects.hash(id, inlongGroupId, scheduleType, scheduleEngine, scheduleUnit, scheduleInterval, + startTime, endTime, delayTime, selfDepend, taskParallelism, crontabExpression, version); } } diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java index 13f87b3c45..26570fa36b 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java @@ -23,7 +23,6 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.List; @@ -34,13 +33,10 @@ public class ScheduleClientFactory { private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleClientFactory.class); - @Value("${inlong.schedule.engine:none}") - private String scheduleEngineName; - @Autowired List<ScheduleEngineClient> scheduleEngineClients; - public ScheduleEngineClient getInstance() { + public ScheduleEngineClient getInstance(String scheduleEngineName) { Optional<ScheduleEngineClient> optScheduleClient = scheduleEngineClients.stream().filter(t -> t.accept(scheduleEngineName)).findFirst(); if (!optScheduleClient.isPresent()) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java index 61f847f244..6ef899ab9e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java @@ -88,9 +88,9 @@ public class ScheduleOperatorImpl implements ScheduleOperator { } } - private ScheduleEngineClient getScheduleEngineClient() { + private ScheduleEngineClient getScheduleEngineClient(String scheduleEngine) { if (scheduleEngineClient == null) { - scheduleEngineClient = scheduleClientFactory.getInstance(); + scheduleEngineClient = scheduleClientFactory.getInstance(scheduleEngine); } return scheduleEngineClient; } @@ -143,8 +143,8 @@ public class ScheduleOperatorImpl implements ScheduleOperator { * */ private Boolean registerToScheduleEngine(ScheduleInfo scheduleInfo, String operator, boolean isUpdate) { // update(un-register and then register) or register - boolean res = isUpdate ? getScheduleEngineClient().update(scheduleInfo) - : getScheduleEngineClient().register(scheduleInfo); + boolean res = isUpdate ? getScheduleEngineClient(scheduleInfo.getScheduleEngine()).update(scheduleInfo) + : getScheduleEngineClient(scheduleInfo.getScheduleEngine()).register(scheduleInfo); // update status to REGISTERED scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), REGISTERED, operator); LOGGER.info("{} schedule info success for group {}", diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index bfaa28b1cd..1d92f14f46 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -992,6 +992,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config` `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated', `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab', + `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler', `schedule_unit` varchar(64) DEFAULT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround', `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval', `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule', diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index 8697d3c0e7..58259c9047 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -1045,6 +1045,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config` `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated', `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab', + `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler', `schedule_unit` varchar(64) DEFAULT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround', `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval', `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule', diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql b/inlong-manager/manager-web/sql/changes-2.1.0.sql new file mode 100644 index 0000000000..0d4c984778 --- /dev/null +++ b/inlong-manager/manager-web/sql/changes-2.1.0.sql @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This is the SQL change file from version 1.4.0 to the current version 1.5.0. +-- When upgrading to version 1.5.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module. + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +USE `apache_inlong_manager`; + +ALTER TABLE `schedule_config` + ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';