This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new eb55831cf0 [INLONG-11483][Manager] Support multiple scedule engine 
selection (#11484)
eb55831cf0 is described below

commit eb55831cf0408d4c30a12199a4d6f060d74cc670
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Tue Nov 12 17:00:41 2024 +0800

    [INLONG-11483][Manager] Support multiple scedule engine selection (#11484)
    
    Co-authored-by: Aloys Zhang <aloyszh...@apche.org>
---
 .../inlong/manager/dao/entity/ScheduleEntity.java  |  2 ++
 .../resources/mappers/ScheduleEntityMapper.xml     | 26 ++++++++++++---------
 .../manager/dao/mapper/ScheduleEntityTest.java     |  3 +++
 .../manager/pojo/group/InlongGroupRequest.java     |  5 ++++
 .../inlong/manager/pojo/schedule/ScheduleInfo.java | 10 +++++---
 .../manager/pojo/schedule/ScheduleInfoRequest.java | 10 +++++---
 .../manager/schedule/ScheduleClientFactory.java    |  6 +----
 .../service/schedule/ScheduleOperatorImpl.java     |  8 +++----
 .../main/resources/h2/apache_inlong_manager.sql    |  1 +
 .../manager-web/sql/apache_inlong_manager.sql      |  1 +
 inlong-manager/manager-web/sql/changes-2.1.0.sql   | 27 ++++++++++++++++++++++
 11 files changed, 73 insertions(+), 26 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
index 6d301703fc..a9798c91f1 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
@@ -33,6 +33,8 @@ public class ScheduleEntity implements Serializable {
     private String inlongGroupId;
     // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
     private Integer scheduleType;
+    // schedule engine type, support [Quartz, Airflow, DolphinScheduler]
+    private String scheduleEngine;
     // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneround]
     // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
     private String scheduleUnit;
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
index d719aa8988..33d25ad78a 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
@@ -22,6 +22,7 @@
         <id column="id" jdbcType="INTEGER" property="id"/>
         <result column="inlong_group_id" jdbcType="VARCHAR" 
property="inlongGroupId"/>
         <result column="schedule_type" jdbcType="INTEGER" 
property="scheduleType"/>
+        <result column="schedule_engine" jdbcType="VARCHAR" 
property="scheduleEngine"/>
         <result column="schedule_unit" jdbcType="VARCHAR" 
property="scheduleUnit"/>
         <result column="schedule_interval" jdbcType="INTEGER" 
property="scheduleInterval"/>
         <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/>
@@ -42,25 +43,25 @@
     </resultMap>
 
     <sql id="Base_Column_List">
-        id, inlong_group_id, schedule_type, schedule_unit, schedule_interval, 
start_time,
+        id, inlong_group_id, schedule_type, schedule_engine, schedule_unit, 
schedule_interval, start_time,
           end_time, delay_time, self_depend, task_parallelism, 
crontab_expression,
            status, previous_status, is_deleted, creator, modifier, 
create_time, modify_time, version
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
             
parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity">
-        insert into schedule_config (id, inlong_group_id, schedule_type, 
schedule_unit,
-                                     schedule_interval, start_time, end_time, 
delay_time,
-                                     self_depend, task_parallelism, 
crontab_expression,
+        insert into schedule_config (id, inlong_group_id, schedule_type, 
schedule_engine,
+                                     schedule_unit, schedule_interval, 
start_time, end_time,
+                                     delay_time, self_depend, 
task_parallelism, crontab_expression,
                                      status, previous_status, creator, 
modifier)
         values (#{id, jdbcType=INTEGER}, #{inlongGroupId, jdbcType=VARCHAR},
-                #{scheduleType, jdbcType=INTEGER}, #{scheduleUnit, 
jdbcType=VARCHAR},
-                #{scheduleInterval, jdbcType=INTEGER}, #{startTime, 
jdbcType=TIMESTAMP},
-                #{endTime, jdbcType=TIMESTAMP}, #{delayTime, jdbcType=INTEGER},
-                #{selfDepend, jdbcType=INTEGER}, #{taskParallelism, 
jdbcType=INTEGER},
-                #{crontabExpression, jdbcType=VARCHAR}, 
#{status,jdbcType=INTEGER},
-                #{previousStatus,jdbcType=INTEGER}, 
#{creator,jdbcType=VARCHAR},
-                #{modifier,jdbcType=VARCHAR})
+                #{scheduleType, jdbcType=INTEGER},  #{scheduleEngine, 
jdbcType=VARCHAR},
+                #{scheduleUnit, jdbcType=VARCHAR}, #{scheduleInterval, 
jdbcType=INTEGER},
+                #{startTime, jdbcType=TIMESTAMP}, #{endTime, 
jdbcType=TIMESTAMP},
+                #{delayTime, jdbcType=INTEGER}, #{selfDepend, 
jdbcType=INTEGER},
+                #{taskParallelism, jdbcType=INTEGER}, #{crontabExpression, 
jdbcType=VARCHAR},
+                #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
+                #{creator,jdbcType=VARCHAR},  #{modifier,jdbcType=VARCHAR})
     </insert>
 
     <select id="selectByPrimaryKey" parameterType="java.lang.Integer" 
resultMap="BaseResultMap">
@@ -88,6 +89,9 @@
             <if test="scheduleType != null">
                 schedule_type = #{scheduleType, jdbcType=INTEGER},
             </if>
+            <if test="scheduleEngine != null">
+                schedule_engine = #{scheduleEngine, jdbcType=VARCHAR},
+            </if>
             <if test="scheduleUnit !=null">
                 schedule_unit = #{scheduleUnit, jdbcType=VARCHAR},
             </if>
diff --git 
a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
 
b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
index ef4207bc84..6d2cdaa661 100644
--- 
a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
+++ 
b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
@@ -32,6 +32,7 @@ public class ScheduleEntityTest extends DaoBaseTest {
     public static final String GROUP_ID_PREFIX = "test_group_";
     public static final String USER = "admin";
     public static final int SCHEDULE_TYPE = 0;
+    public static final String SCHEDULE_ENGINE = "Quartz";
     public static final int SCHEDULE_TYPE_NEW = 1;
     public static final String SCHEDULE_UNIT = "H";
     public static final String SCHEDULE_UNIT_NEW = "D";
@@ -63,6 +64,7 @@ public class ScheduleEntityTest extends DaoBaseTest {
         ScheduleEntity entityQueried = 
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
         Assertions.assertEquals(scheduleEntity.getInlongGroupId(), 
entityQueried.getInlongGroupId());
         Assertions.assertEquals(SCHEDULE_TYPE, 
entityQueried.getScheduleType());
+        Assertions.assertEquals(SCHEDULE_ENGINE, 
entityQueried.getScheduleEngine());
         Assertions.assertEquals(SCHEDULE_UNIT, 
entityQueried.getScheduleUnit());
         Assertions.assertEquals(SCHEDULE_INTERVAL, 
entityQueried.getScheduleInterval());
         Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
@@ -105,6 +107,7 @@ public class ScheduleEntityTest extends DaoBaseTest {
         ScheduleEntity entity = new ScheduleEntity();
         entity.setInlongGroupId(GROUP_ID_PREFIX + System.currentTimeMillis());
         entity.setScheduleType(SCHEDULE_TYPE);
+        entity.setScheduleEngine(SCHEDULE_ENGINE);
         entity.setScheduleUnit(SCHEDULE_UNIT);
         entity.setScheduleInterval(SCHEDULE_INTERVAL);
         entity.setStartTime(DEFAULT_TIME);
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index 1749d1427c..194bf35454 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -135,6 +135,11 @@ public abstract class InlongGroupRequest extends 
BaseInlongGroup {
     @ApiModelProperty("Schedule type")
     private Integer scheduleType;
 
+    // schedule engine type, support [Quartz, Airflow, DolphinScheduler]
+    @ApiModelProperty(value = "Schedule engine, support Quartz, Airflow and 
DolphinScheduler")
+    @Length(min = 1, max = 20, message = "length must be between 1 and 20")
+    private String scheduleEngine;
+
     // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneround]
     // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
     @ApiModelProperty("TimeUnit for schedule interval")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
index bb4fb2ca41..b6527cf308 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
@@ -50,6 +50,10 @@ public class ScheduleInfo {
     @ApiModelProperty("Schedule type")
     private Integer scheduleType;
 
+    // schedule engine type, support [Quartz, Airflow, DolphinScheduler]
+    @ApiModelProperty("Schedule engine")
+    private String scheduleEngine;
+
     // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneround]
     // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
     @ApiModelProperty("TimeUnit for schedule interval")
@@ -91,6 +95,7 @@ public class ScheduleInfo {
         ScheduleInfo that = (ScheduleInfo) o;
         return Objects.equals(inlongGroupId, that.inlongGroupId)
                 && Objects.equals(scheduleType, that.scheduleType)
+                && Objects.equals(scheduleEngine, that.scheduleEngine)
                 && Objects.equals(scheduleUnit, that.scheduleUnit)
                 && Objects.equals(scheduleInterval, that.scheduleInterval)
                 && Objects.equals(startTime, that.startTime)
@@ -103,9 +108,8 @@ public class ScheduleInfo {
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, 
scheduleInterval, startTime, endTime,
-                delayTime,
-                selfDepend, taskParallelism, crontabExpression, version);
+        return Objects.hash(id, inlongGroupId, scheduleType, scheduleEngine, 
scheduleUnit, scheduleInterval, startTime,
+                endTime, delayTime, selfDepend, taskParallelism, 
crontabExpression, version);
     }
 
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
index bf6fb298bf..882b490cf1 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
@@ -44,6 +44,10 @@ public class ScheduleInfoRequest {
     @ApiModelProperty("Schedule type")
     private Integer scheduleType;
 
+    // schedule engine type, support [Quartz, Airflow, DolphinScheduler]
+    @ApiModelProperty(value = "Schedule engine")
+    private String scheduleEngine;
+
     // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneround]
     // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
     @ApiModelProperty("TimeUnit for schedule interval")
@@ -85,6 +89,7 @@ public class ScheduleInfoRequest {
         ScheduleInfoRequest that = (ScheduleInfoRequest) o;
         return Objects.equals(inlongGroupId, that.inlongGroupId)
                 && Objects.equals(scheduleType, that.scheduleType)
+                && Objects.equals(scheduleEngine, that.scheduleEngine)
                 && Objects.equals(scheduleUnit, that.scheduleUnit)
                 && Objects.equals(scheduleInterval, that.scheduleInterval)
                 && Objects.equals(startTime, that.startTime)
@@ -97,8 +102,7 @@ public class ScheduleInfoRequest {
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, 
scheduleInterval, startTime, endTime,
-                delayTime,
-                selfDepend, taskParallelism, crontabExpression, version);
+        return Objects.hash(id, inlongGroupId, scheduleType, scheduleEngine, 
scheduleUnit, scheduleInterval,
+                startTime, endTime, delayTime, selfDepend, taskParallelism, 
crontabExpression, version);
     }
 }
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
index 13f87b3c45..26570fa36b 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
@@ -23,7 +23,6 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
@@ -34,13 +33,10 @@ public class ScheduleClientFactory {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ScheduleClientFactory.class);
 
-    @Value("${inlong.schedule.engine:none}")
-    private String scheduleEngineName;
-
     @Autowired
     List<ScheduleEngineClient> scheduleEngineClients;
 
-    public ScheduleEngineClient getInstance() {
+    public ScheduleEngineClient getInstance(String scheduleEngineName) {
         Optional<ScheduleEngineClient> optScheduleClient =
                 scheduleEngineClients.stream().filter(t -> 
t.accept(scheduleEngineName)).findFirst();
         if (!optScheduleClient.isPresent()) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
index 61f847f244..6ef899ab9e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
@@ -88,9 +88,9 @@ public class ScheduleOperatorImpl implements ScheduleOperator 
{
         }
     }
 
-    private ScheduleEngineClient getScheduleEngineClient() {
+    private ScheduleEngineClient getScheduleEngineClient(String 
scheduleEngine) {
         if (scheduleEngineClient == null) {
-            scheduleEngineClient = scheduleClientFactory.getInstance();
+            scheduleEngineClient = 
scheduleClientFactory.getInstance(scheduleEngine);
         }
         return scheduleEngineClient;
     }
@@ -143,8 +143,8 @@ public class ScheduleOperatorImpl implements 
ScheduleOperator {
      * */
     private Boolean registerToScheduleEngine(ScheduleInfo scheduleInfo, String 
operator, boolean isUpdate) {
         // update(un-register and then register) or register
-        boolean res = isUpdate ? getScheduleEngineClient().update(scheduleInfo)
-                : getScheduleEngineClient().register(scheduleInfo);
+        boolean res = isUpdate ? 
getScheduleEngineClient(scheduleInfo.getScheduleEngine()).update(scheduleInfo)
+                : 
getScheduleEngineClient(scheduleInfo.getScheduleEngine()).register(scheduleInfo);
         // update status to REGISTERED
         scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), 
REGISTERED, operator);
         LOGGER.info("{} schedule info success for group {}",
diff --git 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index bfaa28b1cd..1d92f14f46 100644
--- 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -992,6 +992,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
     `id`                     int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
     `inlong_group_id`        varchar(256) NOT NULL COMMENT 'Inlong group id, 
undeleted ones cannot be repeated',
     `schedule_type`          int(4)       NOT NULL DEFAULT '0' COMMENT 
'Schedule type, 0 for normal, 1 for crontab',
+    `schedule_engine`        varchar(64)  NOT NULL DEFAULT 'Quartz' COMMENT 
'Schedule engine, support Quartz, Airflow and DolphinScheduler',
     `schedule_unit`          varchar(64)  DEFAULT NULL COMMENT 'Schedule unit, 
Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround',
     `schedule_interval`      int(11)      DEFAULT '1' COMMENT 'Schedule 
interval',
     `start_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Start time for schedule',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 8697d3c0e7..58259c9047 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1045,6 +1045,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
     `id`                     int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
     `inlong_group_id`        varchar(256) NOT NULL COMMENT 'Inlong group id, 
undeleted ones cannot be repeated',
     `schedule_type`          int(4)       NOT NULL DEFAULT '0' COMMENT 
'Schedule type, 0 for normal, 1 for crontab',
+    `schedule_engine`        varchar(64)  NOT NULL DEFAULT 'Quartz' COMMENT 
'Schedule engine, support Quartz, Airflow and DolphinScheduler',
     `schedule_unit`          varchar(64)  DEFAULT NULL COMMENT 'Schedule unit, 
Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround',
     `schedule_interval`      int(11)      DEFAULT '1' COMMENT 'Schedule 
interval',
     `start_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Start time for schedule',
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql 
b/inlong-manager/manager-web/sql/changes-2.1.0.sql
new file mode 100644
index 0000000000..0d4c984778
--- /dev/null
+++ b/inlong-manager/manager-web/sql/changes-2.1.0.sql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
+-- When upgrading to version 1.5.0, please execute those SQLs in the DB (such 
as MySQL) used by the Manager module.
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+
+USE `apache_inlong_manager`;
+
+ALTER TABLE `schedule_config`
+    ADD COLUMN  `schedule_engine` varchar(64)  NOT NULL DEFAULT 'Quartz' 
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';

Reply via email to