This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 042e568624 [INLONG-11401][Manager] Support Dolphinscheduler schedule 
engine (#11468)
042e568624 is described below

commit 042e56862440422ca0e6f045996908ed6a54a8ba
Author: emptyOVO <118812562+empty...@users.noreply.github.com>
AuthorDate: Wed Nov 20 10:27:58 2024 +0800

    [INLONG-11401][Manager] Support Dolphinscheduler schedule engine (#11468)
---
 .../schedule/dolphinschedule/DSTaskDefinition.java | 120 ++++
 .../schedule/dolphinschedule/DSTaskParams.java     |  46 ++
 .../schedule/dolphinschedule/DSTaskRelation.java   |  59 ++
 .../schedule/dolphinschedule/DScheduleInfo.java}   |  32 +-
 inlong-manager/manager-schedule/pom.xml            |  30 +
 .../manager/schedule/ScheduleEngineType.java       |   3 +-
 .../dolphinscheduler/DolphinScheduleClient.java    |  58 ++
 .../dolphinscheduler/DolphinScheduleConstants.java |  76 ++
 .../dolphinscheduler/DolphinScheduleEngine.java    | 268 +++++++
 .../dolphinscheduler/DolphinScheduleOperator.java  | 173 +++++
 .../dolphinscheduler/DolphinScheduleUtils.java     | 790 +++++++++++++++++++++
 .../exception/DolphinScheduleException.java        | 105 +++
 .../DolphinScheduleContainerTestEnv.java           | 170 +++++
 .../DolphinScheduleEngineTest.java                 | 127 ++++
 .../DolphinSchedulerContainerEnvConstants.java     |  51 ++
 .../src/main/resources/application-dev.properties  |  10 +-
 .../src/main/resources/application-prod.properties |  11 +-
 .../src/main/resources/application-test.properties |  11 +-
 18 files changed, 2114 insertions(+), 26 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java
new file mode 100644
index 0000000000..700638b63c
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+public class DSTaskDefinition {
+
+    @ApiModelProperty("DolphinScheduler task definition code")
+    @JsonProperty("code")
+    private long code;
+
+    @ApiModelProperty("DolphinScheduler task definition code")
+    @JsonProperty("delayTime")
+    private String delayTime;
+
+    @ApiModelProperty("DolphinScheduler task definition description")
+    @JsonProperty("description")
+    private String description;
+
+    @ApiModelProperty("DolphinScheduler task definition environment code")
+    @JsonProperty("environmentCode")
+    private int environmentCode;
+
+    @ApiModelProperty("DolphinScheduler task fail retry interval")
+    @JsonProperty("failRetryInterval")
+    private String failRetryInterval;
+
+    @ApiModelProperty("DolphinScheduler task definition fail retry times")
+    @JsonProperty("failRetryTimes")
+    private String failRetryTimes;
+
+    @ApiModelProperty("DolphinScheduler task definition flag")
+    @JsonProperty("flag")
+    private String flag;
+
+    @ApiModelProperty("DolphinScheduler task definition isCache")
+    @JsonProperty("isCache")
+    private String isCache;
+
+    @ApiModelProperty("DolphinScheduler task definition name")
+    @JsonProperty("name")
+    private String name;
+
+    @ApiModelProperty("DolphinScheduler task definition params")
+    @JsonProperty("taskParams")
+    private DSTaskParams taskParams;
+
+    @ApiModelProperty("DolphinScheduler task definition priority")
+    @JsonProperty("taskPriority")
+    private String taskPriority;
+
+    @ApiModelProperty("DolphinScheduler task definition type")
+    @JsonProperty("taskType")
+    private String taskType;
+
+    @ApiModelProperty("DolphinScheduler task definition timeout")
+    @JsonProperty("timeout")
+    private int timeout;
+
+    @ApiModelProperty("DolphinScheduler task definition timeout flag")
+    @JsonProperty("timeoutFlag")
+    private String timeoutFlag;
+
+    @ApiModelProperty("DolphinScheduler task definition timeout notify 
strategy")
+    @JsonProperty("timeoutNotifyStrategy")
+    private String timeoutNotifyStrategy;
+
+    @ApiModelProperty("DolphinScheduler task definition worker group")
+    @JsonProperty("workerGroup")
+    private String workerGroup;
+
+    @ApiModelProperty("DolphinScheduler task definition apu quota")
+    @JsonProperty("cpuQuota")
+    private int cpuQuota;
+
+    @ApiModelProperty("DolphinScheduler task definition memory max")
+    @JsonProperty("memoryMax")
+    private int memoryMax;
+
+    @ApiModelProperty("DolphinScheduler task definition execute type")
+    @JsonProperty("taskExecuteType")
+    private String taskExecuteType;
+
+    public DSTaskDefinition() {
+        this.delayTime = "0";
+        this.description = "";
+        this.environmentCode = -1;
+        this.failRetryInterval = "1";
+        this.failRetryTimes = "0";
+        this.flag = "YES";
+        this.isCache = "NO";
+        this.taskPriority = "MEDIUM";
+        this.taskType = "SHELL";
+        this.timeoutFlag = "CLOSE";
+        this.timeoutNotifyStrategy = "";
+        this.workerGroup = "default";
+        this.cpuQuota = -1;
+        this.memoryMax = -1;
+        this.taskExecuteType = "BATCH";
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java
new file mode 100644
index 0000000000..a5344f5fac
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.List;
+@Data
+public class DSTaskParams {
+
+    @ApiModelProperty("DolphinScheduler task params local params")
+    @JsonProperty("localParams")
+    private List<Object> localParams;
+
+    @ApiModelProperty("DolphinScheduler task params raw script")
+    @JsonProperty("rawScript")
+    private String rawScript;
+
+    @ApiModelProperty("DolphinScheduler task params resource list")
+    @JsonProperty("resourceList")
+    private List<Object> resourceList;
+
+    public DSTaskParams() {
+        this.localParams = new ArrayList<>();
+        this.resourceList = new ArrayList<>();
+        this.rawScript = "";
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java
new file mode 100644
index 0000000000..e853317df7
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+public class DSTaskRelation {
+
+    @ApiModelProperty("DolphinScheduler task relation name")
+    @JsonProperty("name")
+    private String name;
+
+    @ApiModelProperty("DolphinScheduler task relation pre-task code")
+    @JsonProperty("preTaskCode")
+    private int preTaskCode;
+
+    @ApiModelProperty("DolphinScheduler task relation pre-task version")
+    @JsonProperty("preTaskVersion")
+    private int preTaskVersion;
+
+    @ApiModelProperty("DolphinScheduler task relation post-task code")
+    @JsonProperty("postTaskCode")
+    private long postTaskCode;
+
+    @ApiModelProperty("DolphinScheduler task relation post-task version")
+    @JsonProperty("postTaskVersion")
+    private int postTaskVersion;
+
+    @ApiModelProperty("DolphinScheduler task relation condition type")
+    @JsonProperty("conditionType")
+    private String conditionType;
+
+    @ApiModelProperty("DolphinScheduler task relation condition params")
+    @JsonProperty("conditionParams")
+    private Object conditionParams;
+
+    public DSTaskRelation() {
+        this.name = "";
+        this.conditionType = "NONE";
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java
similarity index 52%
copy from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java
index 71949ef744..ac45b26f19 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java
@@ -15,19 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
 
-import lombok.Getter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
 
-@Getter
-public enum ScheduleEngineType {
+@Data
+public class DScheduleInfo {
 
-    NONE("None"),
-    QUARTZ("Quartz");
+    @ApiModelProperty("DolphinScheduler schedule start time")
+    @JsonProperty("startTime")
+    private String startTime;
 
-    private final String type;
+    @ApiModelProperty("DolphinScheduler schedule end time")
+    @JsonProperty("endTime")
+    private String endTime;
 
-    ScheduleEngineType(String type) {
-        this.type = type;
-    }
-}
\ No newline at end of file
+    @ApiModelProperty("DolphinScheduler schedule crontab expression")
+    @JsonProperty("crontab")
+    private String crontab;
+
+    @ApiModelProperty("DolphinScheduler schedule timezone id")
+    @JsonProperty("timezoneId")
+    private String timezoneId;
+
+}
diff --git a/inlong-manager/manager-schedule/pom.xml 
b/inlong-manager/manager-schedule/pom.xml
index a9d9fb3e1e..82632a1aff 100644
--- a/inlong-manager/manager-schedule/pom.xml
+++ b/inlong-manager/manager-schedule/pom.xml
@@ -73,5 +73,35 @@
             <artifactId>junit-jupiter</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-test</artifactId>
+            <version>${spring.boot.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.vaadin.external.google</groupId>
+                    <artifactId>android-json</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.jayway.jsonpath</groupId>
+                    <artifactId>json-path</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.junit.jupiter</groupId>
+                    <artifactId>junit-jupiter-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
index 71949ef744..ac71e4e2d1 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
@@ -23,7 +23,8 @@ import lombok.Getter;
 public enum ScheduleEngineType {
 
     NONE("None"),
-    QUARTZ("Quartz");
+    QUARTZ("Quartz"),
+    DOLPHINSCHEDULER("DolphinScheduler");
 
     private final String type;
 
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java
new file mode 100644
index 0000000000..a75085b9ac
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.schedule.ScheduleEngineClient;
+import org.apache.inlong.manager.schedule.ScheduleEngineType;
+
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+/**
+ * Built-in implementation of third-party schedule engine client corresponding 
with {@link DolphinScheduleEngine}.
+ * DolphinScheduleClient simply invokes the {@link DolphinScheduleEngine} to 
register/unregister/update
+ * schedule info, all the logic for invoking the remote scheduling service is 
implemented in {@link DolphinScheduleEngine}
+ */
+@Service
+public class DolphinScheduleClient implements ScheduleEngineClient {
+
+    @Resource
+    public DolphinScheduleEngine scheduleEngine;
+
+    @Override
+    public boolean accept(String engineType) {
+        return 
ScheduleEngineType.DOLPHINSCHEDULER.getType().equalsIgnoreCase(engineType);
+    }
+
+    @Override
+    public boolean register(ScheduleInfo scheduleInfo) {
+        return scheduleEngine.handleRegister(scheduleInfo);
+    }
+
+    @Override
+    public boolean unregister(String groupId) {
+        return scheduleEngine.handleUnregister(groupId);
+    }
+
+    @Override
+    public boolean update(ScheduleInfo scheduleInfo) {
+        return scheduleEngine.handleUpdate(scheduleInfo);
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java
new file mode 100644
index 0000000000..89dcda5b77
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+public class DolphinScheduleConstants {
+
+    // DS public constants
+    public static final String DS_ID = "id";
+    public static final String DS_CODE = "code";
+    public static final String DS_TOKEN = "token";
+    public static final String DS_PAGE_SIZE = "pageSize";
+    public static final String DS_PAGE_NO = "pageNo";
+    public static final String DS_SEARCH_VAL = "searchVal";
+    public static final String DS_RESPONSE_DATA = "data";
+    public static final String DS_RESPONSE_NAME = "name";
+    public static final String DS_RESPONSE_TOTAL_LIST = "totalList";
+    public static final String DS_DEFAULT_PAGE_SIZE = "10";
+    public static final String DS_DEFAULT_PAGE_NO = "1";
+    public static final String DS_DEFAULT_TIMEZONE_ID = "Asia/Shanghai";
+
+    // DS project related constants
+    public static final String DS_PROJECT_URL = "/projects";
+    public static final String DS_PROJECT_NAME = "projectName";
+    public static final String DS_PROJECT_DESC = "description";
+    public static final String DS_DEFAULT_PROJECT_NAME = 
"default_inlong_offline_scheduler";
+    public static final String DS_DEFAULT_PROJECT_DESC = "default scheduler 
project for inlong offline job";
+
+    // DS task related constants
+    public static final String DS_TASK_CODE_URL = 
"/task-definition/gen-task-codes";
+    public static final String DS_TASK_RELATION = "taskRelationJson";
+    public static final String DS_TASK_DEFINITION = "taskDefinitionJson";
+    public static final String DS_TASK_GEN_NUM = "genNum";
+    public static final String DS_DEFAULT_TASK_GEN_NUM = "1";
+    public static final String DS_DEFAULT_TASK_NAME = 
"default-inlong-http-callback";
+    public static final String DS_DEFAULT_TASK_DESC = "default http request 
using shell script callbacks to inlong";
+
+    // DS process definition related constants
+    public static final String DS_PROCESS_URL = "/process-definition";
+    public static final String DS_PROCESS_QUERY_URL = 
"/query-process-definition-list";
+    public static final String DS_PROCESS_NAME = "name";
+    public static final String DS_PROCESS_DESC = "description";
+    public static final String DS_PROCESS_CODE = "processDefinitionCode";
+    public static final String DS_DEFAULT_PROCESS_NAME = 
"_inlong_offline_process_definition";
+    public static final String DS_DEFAULT_PROCESS_DESC = "scheduler process 
definition for inlong group: ";
+
+    // DS release related constants
+    public static final String DS_RELEASE_URL = "/release";
+    public static final String DS_RELEASE_STATE = "releaseState";
+
+    // DS schedule related constants
+    public static final String DS_SCHEDULE_URL = "/schedules";
+    public static final String DS_SCHEDULE_DEF = "schedule";
+    public static final String DS_DEFAULT_SCHEDULE_TIME_FORMAT = "yyyy-MM-dd 
HH:mm:ss";
+
+    // DS online/offline related constants
+    public static final String DS_ONLINE_URL = "/online";
+    public static final String DS_ONLINE_STATE = "ONLINE";
+    public static final String DS_OFFLINE_URL = "/offline";
+    public static final String DS_OFFLINE_STATE = "OFFLINE";
+
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
new file mode 100644
index 0000000000..7b09481cea
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.schedule.ScheduleEngine;
+import org.apache.inlong.manager.schedule.exception.DolphinScheduleException;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL;
+
+/**
+ * The default implementation of DolphinScheduler engine based on 
DolphinScheduler API. Response for processing
+ * the register/unregister/update requests from {@link DolphinScheduleClient}
+ */
+@Data
+@Service
+public class DolphinScheduleEngine implements ScheduleEngine {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DolphinScheduleEngine.class);
+
+    @Value("${server.host:127.0.0.1}")
+    private String host;
+
+    @Value("${server.port:8083}")
+    private int port;
+
+    @Value("${default.admin.user:admin}")
+    private String username;
+
+    @Value("${default.admin.password:inlong}")
+    private String password;
+
+    
@Value("${schedule.engine.dolphinscheduler.url:http://127.0.0.1:12345/dolphinscheduler}";)
+    private String dolphinUrl;
+
+    @Value("${schedule.engine.dolphinscheduler.token:default_token_value}")
+    private String token;
+
+    @Resource
+    private DolphinScheduleOperator dolphinScheduleOperator;
+
+    private long projectCode;
+    private String projectBaseUrl;
+    private final Map<Long, String> scheduledProcessMap;
+
+    @PostConstruct
+    public void init() {
+        this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL;
+    }
+
+    public DolphinScheduleEngine(String host, int port, String username, 
String password, String dolphinUrl,
+            String token) {
+        this.host = host;
+        this.port = port;
+        this.username = username;
+        this.password = password;
+        this.dolphinUrl = dolphinUrl;
+        this.token = token;
+        this.scheduledProcessMap = new ConcurrentHashMap<>();
+    }
+
+    public DolphinScheduleEngine() {
+        this.scheduledProcessMap = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * check if there already exists a project for inlong offline schedule
+     * if no then build a new project for inlong-group-id in DolphinScheduler
+     */
+    @Override
+    public void start() {
+        LOGGER.info("Starting dolphin scheduler engine, Checking project 
exists...");
+        long code = 
dolphinScheduleOperator.checkAndGetUniqueId(projectBaseUrl, token, 
DS_DEFAULT_PROJECT_NAME);
+        if (code != 0) {
+            LOGGER.info("Project exists, project code: {}", code);
+            this.projectCode = code;
+
+            LOGGER.info("Starting synchronize existing process definition");
+            String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + 
DS_PROCESS_URL + DS_PROCESS_QUERY_URL;
+            
scheduledProcessMap.putAll(dolphinScheduleOperator.queryAllProcessDef(queryProcessDefUrl,
 token));
+
+        } else {
+            LOGGER.info("There is no inlong offline project exists, default 
project will be created");
+            this.projectCode =
+                    dolphinScheduleOperator.creatProject(projectBaseUrl, 
token, DS_DEFAULT_PROJECT_NAME,
+                            DS_DEFAULT_PROJECT_DESC);
+        }
+    }
+
+    /**
+     * Handle schedule register.
+     * @param scheduleInfo schedule info to register
+     */
+    @Override
+    @VisibleForTesting
+    public boolean handleRegister(ScheduleInfo scheduleInfo) {
+        String processDefUrl = projectBaseUrl + "/" + projectCode + 
DS_PROCESS_URL;
+        String scheduleUrl = projectBaseUrl + "/" + projectCode + 
DS_SCHEDULE_URL;
+        String processName = scheduleInfo.getInlongGroupId() + 
DS_DEFAULT_PROCESS_NAME;
+        String processDesc = DS_DEFAULT_PROCESS_DESC + 
scheduleInfo.getInlongGroupId();
+
+        LOGGER.info("Dolphin Scheduler handle register begin for {}, Checking 
process definition id uniqueness...",
+                scheduleInfo.getInlongGroupId());
+        try {
+            long processDefCode = 
dolphinScheduleOperator.checkAndGetUniqueId(processDefUrl, token, processName);
+
+            boolean online = false;
+            if (processDefCode != 0 || 
scheduledProcessMap.containsKey(processDefCode)) {
+
+                // process definition already exists, delete and rebuild
+                LOGGER.info("Process definition exists, process definition id: 
{}, deleting...", processDefCode);
+                if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, 
processDefCode, token, DS_OFFLINE_STATE)) {
+                    dolphinScheduleOperator.deleteProcessDef(processDefUrl, 
token, processDefCode);
+                    scheduledProcessMap.remove(processDefCode);
+                }
+            }
+            String taskCodeUrl = projectBaseUrl + "/" + projectCode + 
DS_TASK_CODE_URL;
+
+            long taskCode = dolphinScheduleOperator.genTaskCode(taskCodeUrl, 
token);
+            LOGGER.info("Generate task code for process definition success, 
task code: {}", taskCode);
+
+            long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo);
+            processDefCode =
+                    dolphinScheduleOperator.createProcessDef(processDefUrl, 
token, processName, processDesc, taskCode,
+                            host, port,
+                            username, password, offset, 
scheduleInfo.getInlongGroupId());
+            LOGGER.info("Create process definition success, process definition 
code: {}", processDefCode);
+
+            if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, 
processDefCode, token, DS_ONLINE_STATE)) {
+                LOGGER.info("Release process definition success, release 
status: {}", DS_ONLINE_STATE);
+
+                int scheduleId = 
dolphinScheduleOperator.createScheduleForProcessDef(scheduleUrl, 
processDefCode, token,
+                        scheduleInfo);
+                LOGGER.info("Create schedule for process definition success, 
schedule info: {}", scheduleInfo);
+
+                online = 
dolphinScheduleOperator.onlineScheduleForProcessDef(scheduleUrl, scheduleId, 
token);
+                LOGGER.info("Online schedule for process definition, status: 
{}", online);
+            }
+
+            scheduledProcessMap.putIfAbsent(processDefCode, processName);
+            return online;
+        } catch (Exception e) {
+            LOGGER.error("Failed to handle unregister dolphin scheduler: ", e);
+            throw new DolphinScheduleException(
+                    String.format("Failed to handle unregister dolphin 
scheduler: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Handle schedule unregister.
+     * @param groupId group to un-register schedule info
+     */
+    @Override
+    @VisibleForTesting
+    public boolean handleUnregister(String groupId) {
+        String processName = groupId + DS_DEFAULT_PROCESS_NAME;
+        String processDefUrl = projectBaseUrl + "/" + projectCode + 
DS_PROCESS_URL;
+
+        LOGGER.info("Dolphin Scheduler handle Unregister begin for {}, 
Checking process definition id uniqueness...",
+                groupId);
+        try {
+            long processDefCode = 
dolphinScheduleOperator.checkAndGetUniqueId(processDefUrl, token, processName);
+            if (processDefCode != 0 || 
scheduledProcessMap.containsKey(processDefCode)) {
+
+                LOGGER.info("Deleting process definition, process definition 
id: {}", processDefCode);
+                if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, 
processDefCode, token, DS_OFFLINE_STATE)) {
+
+                    dolphinScheduleOperator.deleteProcessDef(processDefUrl, 
token, processDefCode);
+                    scheduledProcessMap.remove(processDefCode);
+                    LOGGER.info("Process definition deleted");
+                }
+            }
+            LOGGER.info("Un-registered dolphin schedule info for {}", groupId);
+            return !scheduledProcessMap.containsKey(processDefCode);
+        } catch (Exception e) {
+            LOGGER.error("Failed to handle unregister dolphin scheduler: ", e);
+            throw new DolphinScheduleException(
+                    String.format("Failed to handle unregister dolphin 
scheduler: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Handle schedule update.
+     * @param scheduleInfo schedule info to update
+     */
+    @Override
+    @VisibleForTesting
+    public boolean handleUpdate(ScheduleInfo scheduleInfo) {
+        LOGGER.info("Update dolphin schedule info for {}", 
scheduleInfo.getInlongGroupId());
+        try {
+            return handleUnregister(scheduleInfo.getInlongGroupId()) && 
handleRegister(scheduleInfo);
+        } catch (Exception e) {
+            LOGGER.error("Failed to handle update dolphin scheduler: ", e);
+            throw new DolphinScheduleException(
+                    String.format("Failed to handle update dolphin scheduler: 
%s", e.getMessage()));
+        }
+    }
+
+    /**
+     * stop and delete all process definition in DolphinScheduler
+     * remove all process stored in scheduledProcessMap
+     * delete project for inlong-group-id in DolphinScheduler
+     */
+    @Override
+    public void stop() {
+        LOGGER.info("Stopping dolphin scheduler engine...");
+        String processDefUrl = projectBaseUrl + "/" + projectCode + 
DS_PROCESS_URL;
+        try {
+
+            String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + 
DS_PROCESS_URL + DS_PROCESS_QUERY_URL;
+            Map<Long, String> allProcessDef = 
dolphinScheduleOperator.queryAllProcessDef(queryProcessDefUrl, token);
+
+            for (Long processDefCode : allProcessDef.keySet()) {
+
+                LOGGER.info("delete process definition id: {}", 
processDefCode);
+                dolphinScheduleOperator.releaseProcessDef(processDefUrl, 
processDefCode, token, DS_OFFLINE_STATE);
+                dolphinScheduleOperator.deleteProcessDef(processDefUrl, token, 
processDefCode);
+                scheduledProcessMap.remove(processDefCode);
+            }
+
+            dolphinScheduleOperator.deleteProject(projectBaseUrl, token, 
projectCode);
+            LOGGER.info("Dolphin scheduler engine stopped");
+
+        } catch (Exception e) {
+            LOGGER.error("Failed to stop dolphin scheduler: ", e);
+            throw new DolphinScheduleException(String.format("Failed to stop 
dolphin scheduler: %s", e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
new file mode 100644
index 0000000000..e317478c64
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.schedule.exception.DolphinScheduleException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNEXPECTED_ERROR;
+
+/**
+ * DolphinScheduler operator, This class includes methods for creating, 
updating, and deleting projects,
+ * tasks, and process definitions in DolphinScheduler.
+ */
+@Service
+public class DolphinScheduleOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DolphinScheduleOperator.class);
+
+    /**
+     * Checks the uniqueness of a DolphinScheduler project ID based on the 
given search value.
+     */
+    public long checkAndGetUniqueId(String url, String token, String 
searchVal) {
+        try {
+            return DolphinScheduleUtils.checkAndGetUniqueId(url, token, 
searchVal);
+        } catch (Exception e) {
+            LOGGER.error("Unexpected wrong in check id uniqueness: ", e);
+            throw new DolphinScheduleException(UNEXPECTED_ERROR,
+                    String.format("Unexpected wrong in check id uniqueness: 
%s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Creates a new project in DolphinScheduler.
+     */
+    public long creatProject(String url, String token, String projectName, 
String description) {
+        try {
+            return DolphinScheduleUtils.creatProject(url, token, projectName, 
description);
+        } catch (Exception e) {
+            LOGGER.error("Unexpected error while creating new project: ", e);
+            throw new DolphinScheduleException(UNEXPECTED_ERROR,
+                    String.format("Unexpected error while creating new 
project: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Query all process definition in DolphinScheduler project.
+     */
+    public Map<Long, String> queryAllProcessDef(String url, String token) {
+        try {
+            return DolphinScheduleUtils.queryAllProcessDef(url, token);
+        } catch (Exception e) {
+            LOGGER.error("Unexpected error while querying process definition: 
", e);
+            throw new DolphinScheduleException(UNEXPECTED_ERROR,
+                    String.format("Unexpected error while querying process 
definition: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Generates a new task code in DolphinScheduler.
+     */
+    public long genTaskCode(String url, String token) {
+        try {
+            return DolphinScheduleUtils.genTaskCode(url, token);
+        } catch (Exception e) {
+            LOGGER.error("Unexpected wrong in generating task code: ", e);
+            throw new DolphinScheduleException(UNEXPECTED_ERROR,
+                    String.format("Unexpected wrong in generating task code: 
%s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Creates a process definition in DolphinScheduler.
+     */
+    public long createProcessDef(String url, String token, String name, String 
desc, long taskCode, String host,
+            int port, String username, String password, long offset, String 
groupId) {
+        try {
+            return DolphinScheduleUtils.createProcessDef(url, token, name, 
desc, taskCode, host,
+                    port, username, password, offset, groupId);
+        } catch (Exception e) {
+            LOGGER.error("Unexpected wrong in creating process definition: ", 
e);
+            throw new DolphinScheduleException(UNEXPECTED_ERROR,
+                    String.format("Unexpected wrong in creating process 
definition: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Releases a process definition in DolphinScheduler.
+     */
+    public boolean releaseProcessDef(String processDefUrl, long 
processDefCode, String token, String status) {
+        try {
+            return DolphinScheduleUtils.releaseProcessDef(processDefUrl, 
processDefCode, token, status);
+        } catch (Exception e) {
+            LOGGER.error("Unexpected wrong in release process definition: ", 
e);
+            throw new DolphinScheduleException(UNEXPECTED_ERROR,
+                    String.format("Unexpected wrong in release process 
definition: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Create a schedule for process definition in DolphinScheduler.
+     */
+    public int createScheduleForProcessDef(String url, long processDefCode, 
String token, ScheduleInfo scheduleInfo) {
+        try {
+            return DolphinScheduleUtils.createScheduleForProcessDef(url, 
processDefCode, token,
+                    scheduleInfo);
+        } catch (Exception e) {
+            LOGGER.error("Unexpected wrong in creating schedule for process 
definition: ", e);
+            throw new DolphinScheduleException(UNEXPECTED_ERROR,
+                    String.format("Unexpected wrong in creating schedule for 
process definition: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Online the schedule for process definition in DolphinScheduler.
+     */
+    public boolean onlineScheduleForProcessDef(String scheduleUrl, int 
scheduleId, String token) {
+        try {
+            return 
DolphinScheduleUtils.onlineScheduleForProcessDef(scheduleUrl, scheduleId, 
token);
+        } catch (Exception e) {
+            LOGGER.error("Unexpected wrong in online process definition: ", e);
+            throw new DolphinScheduleException(UNEXPECTED_ERROR,
+                    String.format("Unexpected wrong in online process 
definition: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Delete the process definition in DolphinScheduler.
+     */
+    public void deleteProcessDef(String processDefUrl, String token, long 
processDefCode) {
+        try {
+            DolphinScheduleUtils.delete(processDefUrl, token, processDefCode);
+        } catch (Exception e) {
+            LOGGER.error("Unexpected wrong in deleting process definition: ", 
e);
+            throw new DolphinScheduleException(UNEXPECTED_ERROR,
+                    String.format("Unexpected wrong in deleting process 
definition: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Delete the project in DolphinScheduler.
+     */
+    public void deleteProject(String projectBaseUrl, String token, long 
projectCode) {
+        try {
+            DolphinScheduleUtils.delete(projectBaseUrl, token, projectCode);
+        } catch (Exception e) {
+            LOGGER.error("Unexpected wrong in deleting project definition: ", 
e);
+            throw new DolphinScheduleException(UNEXPECTED_ERROR,
+                    String.format("Unexpected wrong in deleting project 
definition: %s", e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
new file mode 100644
index 0000000000..87cb1c5127
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
@@ -0,0 +1,790 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.common.bounded.BoundaryType;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import 
org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskDefinition;
+import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskParams;
+import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskRelation;
+import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DScheduleInfo;
+import org.apache.inlong.manager.schedule.ScheduleUnit;
+import org.apache.inlong.manager.schedule.exception.DolphinScheduleException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.core.util.CronExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_CODE;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_NO;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_SIZE;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_SCHEDULE_TIME_FORMAT;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_DESC;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_GEN_NUM;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_NAME;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ID;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_URL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_NO;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_SIZE;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_CODE;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_DESC;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_NAME;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_DESC;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_NAME;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RELEASE_STATE;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RELEASE_URL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_DATA;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_NAME;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_TOTAL_LIST;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_DEF;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SEARCH_VAL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_DEFINITION;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_GEN_NUM;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_RELATION;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TOKEN;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.DELETION_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.GEN_TASK_CODE_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.HTTP_REQUEST_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.INVALID_HTTP_METHOD;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.JSON_PARSE_ERROR;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.NETWORK_ERROR;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_CREATION_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_QUERY_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_RELEASE_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROJECT_CREATION_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.SCHEDULE_CREATION_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.SCHEDULE_ONLINE_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNEXPECTED_ERROR;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNIQUE_CHECK_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNSUPPORTED_SCHEDULE_TYPE;
+
+/**
+ * DolphinScheduler utils
+ * A utility class for interacting with DolphinScheduler API.
+ */
+public class DolphinScheduleUtils {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DolphinScheduleEngine.class);
+
+    private static final String POST = "POST";
+    private static final String GET = "GET";
+    private static final String DELETE = "DELETE";
+    private static final long MILLIS_IN_SECOND = 1000L;
+    private static final long MILLIS_IN_MINUTE = 60 * MILLIS_IN_SECOND;
+    private static final long MILLIS_IN_HOUR = 60 * MILLIS_IN_MINUTE;
+    private static final long MILLIS_IN_DAY = 24 * MILLIS_IN_HOUR;
+    private static final long MILLIS_IN_WEEK = 7 * MILLIS_IN_DAY;
+    private static final long MILLIS_IN_MONTH = 30 * MILLIS_IN_DAY;
+    private static final long MILLIS_IN_YEAR = 365 * MILLIS_IN_DAY;
+    private static final String CONTENT_TYPE = "Content-Type: 
application/json; charset=utf-8";
+    private static final String SHELL_REQUEST_API = 
"/inlong/manager/api/group/submitOfflineJob";
+    private static final OkHttpClient CLIENT = new OkHttpClient();
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    private DolphinScheduleUtils() {
+    }
+
+    /**
+     * Checks the uniqueness of a project ID based on the given search value.
+     *
+     * @param url       The base URL of the DolphinScheduler API.
+     * @param token     The authentication token to be used in the request 
header.
+     * @param searchVal The name of the project to search for.
+     * @return The unique project ID if found, or 0 if not found or an error 
occurs.
+     */
+    public static long checkAndGetUniqueId(String url, String token, String 
searchVal) {
+        try {
+            Map<String, String> header = buildHeader(token);
+            Map<String, String> queryParams = buildPageParam(searchVal);
+
+            JsonObject response = executeHttpRequest(url, GET, queryParams, 
header);
+
+            JsonObject data = response.get(DS_RESPONSE_DATA).getAsJsonObject();
+            JsonArray totalList = data.getAsJsonArray(DS_RESPONSE_TOTAL_LIST);
+
+            // check uniqueness
+            if (totalList != null && totalList.size() == 1) {
+                JsonObject project = totalList.get(0).getAsJsonObject();
+                String name = project.get(DS_RESPONSE_NAME).getAsString();
+                if (name.equals(searchVal)) {
+                    return project.get(DS_CODE).getAsLong();
+                }
+            }
+            return 0;
+        } catch (JsonParseException e) {
+            LOGGER.error("JsonParseException during checkAndGetUniqueId", e);
+            throw new DolphinScheduleException(JSON_PARSE_ERROR,
+                    String.format("Error parsing json during unique ID check 
for: %s at URL: %s", searchVal, url), e);
+
+        } catch (DolphinScheduleException e) {
+            LOGGER.error("DolphinScheduleException during unique ID check: 
{}", e.getDetailedMessage(), e);
+            throw new DolphinScheduleException(UNIQUE_CHECK_FAILED,
+                    String.format("Error checking unique ID for %s at URL: 
%s", searchVal, url));
+        }
+    }
+
+    /**
+     * Creates a new project in DolphinScheduler.
+     *
+     * @param url         The base URL of the DolphinScheduler API.
+     * @param token       The authentication token to be used in the request 
header.
+     * @param projectName The name of the new project.
+     * @param description The description of the new project.
+     * @return The project code (ID) if creation is successful, or 0 if an 
error occurs.
+     */
+    public static long creatProject(String url, String token, String 
projectName,
+            String description) {
+        try {
+            Map<String, String> header = buildHeader(token);
+
+            Map<String, String> queryParams = new HashMap<>();
+            queryParams.put(DS_PROJECT_NAME, projectName);
+            queryParams.put(DS_PROJECT_DESC, description);
+
+            JsonObject response = executeHttpRequest(url, POST, queryParams, 
header);
+
+            JsonObject data = response.get(DS_RESPONSE_DATA).getAsJsonObject();
+            LOGGER.info("create project success, project data: {}", data);
+
+            return data != null ? data.get(DS_CODE).getAsLong() : 0;
+        } catch (JsonParseException e) {
+            LOGGER.error("JsonParseException during creating project", e);
+            throw new DolphinScheduleException(
+                    JSON_PARSE_ERROR,
+                    String.format("Error creating project with name: %s and 
description: %s at URL: %s",
+                            projectName, description, url));
+
+        } catch (DolphinScheduleException e) {
+            LOGGER.error("Creating project failed: {}", e.getMessage());
+            throw new DolphinScheduleException(
+                    PROJECT_CREATION_FAILED,
+                    String.format("Error creating project with name: %s and 
description: %s at URL: %s",
+                            projectName, description, url));
+        }
+    }
+
+    /**
+     * Query all process definition in project
+     *
+     * @param url The base URL of the DolphinScheduler API.
+     * @param token The authentication token to be used in the request header.
+     * @return Map of all the process definition
+     */
+    public static Map<Long, String> queryAllProcessDef(String url, String 
token) {
+        Map<String, String> header = buildHeader(token);
+        try {
+            JsonObject response = executeHttpRequest(url, GET, new 
HashMap<>(), header);
+
+            Map<Long, String> processDef =
+                    
StreamSupport.stream(response.get(DS_RESPONSE_DATA).getAsJsonArray().spliterator(),
 false)
+                            .map(JsonElement::getAsJsonObject)
+                            .collect(Collectors.toMap(
+                                    jsonObject -> 
jsonObject.get(DS_CODE).getAsLong(),
+                                    jsonObject -> 
jsonObject.get(DS_PROCESS_NAME).getAsString()));
+
+            LOGGER.info("Query all process definition success, processes info: 
{}", processDef);
+            return processDef;
+
+        } catch (JsonParseException e) {
+            LOGGER.error("JsonParseException during query all process 
definition", e);
+            throw new DolphinScheduleException(
+                    JSON_PARSE_ERROR,
+                    String.format("Error querying all process definitions at 
URL: %s", url));
+
+        } catch (DolphinScheduleException e) {
+            LOGGER.info("Query all process definition failed: {}", 
e.getMessage());
+            throw new DolphinScheduleException(
+                    PROCESS_DEFINITION_QUERY_FAILED,
+                    String.format("Error querying all process definitions at 
URL: %s", url));
+        }
+
+    }
+
+    /**
+     * Generates a new task code in DolphinScheduler.
+     *
+     * @param url   The base URL of the DolphinScheduler API.
+     * @param token The authentication token to be used in the request header.
+     * @return The task code (ID) if generation is successful, or 0 if an 
error occurs.
+     */
+    public static long genTaskCode(String url, String token) {
+        try {
+            Map<String, String> header = buildHeader(token);
+
+            Map<String, String> queryParams = new HashMap<>();
+            queryParams.put(DS_TASK_GEN_NUM, DS_DEFAULT_TASK_GEN_NUM);
+
+            JsonObject response = executeHttpRequest(url, GET, queryParams, 
header);
+
+            JsonArray data = response.get(DS_RESPONSE_DATA).getAsJsonArray();
+
+            LOGGER.info("Generate task code success, task code data: {}", 
data);
+            return data != null && data.size() == 1 ? data.get(0).getAsLong() 
: 0;
+
+        } catch (JsonParseException e) {
+            LOGGER.error("JsonParseException during generate task code", e);
+            throw new DolphinScheduleException(
+                    JSON_PARSE_ERROR,
+                    String.format("Error generate task code at URL: %s", url));
+
+        } catch (DolphinScheduleException e) {
+            LOGGER.info("generate task code failed: {}", e.getMessage());
+            throw new DolphinScheduleException(
+                    GEN_TASK_CODE_FAILED,
+                    String.format("Error generate task code at URL: %s", url));
+        }
+    }
+
+    /**
+     * Creates a process definition in DolphinScheduler.
+     *
+     * @param url         The base URL of the DolphinScheduler API.
+     * @param token       The authentication token to be used in the request 
header.
+     * @param name        The name of the process definition.
+     * @param desc        The description of the process definition.
+     * @param taskCode    The task code to be associated with this process 
definition.
+     * @param host        The host where the process will run.
+     * @param port        The port where the process will run.
+     * @param username    The username for authentication.
+     * @param password    The password for authentication.
+     * @param offset      The offset for the scheduling.
+     * @param groupId     The group ID of the process.
+     * @return The process definition code (ID) if creation is successful, or 
0 if an error occurs.
+     */
+    public static long createProcessDef(String url, String token, String name, 
String desc,
+            long taskCode, String host,
+            int port, String username, String password, long offset, String 
groupId) throws Exception {
+        try {
+            Map<String, String> header = buildHeader(token);
+
+            DSTaskRelation taskRelation = new DSTaskRelation();
+            taskRelation.setPostTaskCode(taskCode);
+            String taskRelationJson = 
MAPPER.writeValueAsString(Collections.singletonList(taskRelation));
+
+            DSTaskParams taskParams = new DSTaskParams();
+            taskParams.setRawScript(buildScript(host, port, username, 
password, offset, groupId));
+
+            DSTaskDefinition taskDefinition = new DSTaskDefinition();
+            taskDefinition.setCode(taskCode);
+            taskDefinition.setName(DS_DEFAULT_TASK_NAME);
+            taskDefinition.setDescription(DS_DEFAULT_TASK_DESC);
+            taskDefinition.setTaskParams(taskParams);
+            String taskDefinitionJson = 
MAPPER.writeValueAsString(Collections.singletonList(taskDefinition));
+
+            Map<String, String> queryParams = new HashMap<>();
+            queryParams.put(DS_TASK_RELATION, taskRelationJson);
+            queryParams.put(DS_TASK_DEFINITION, taskDefinitionJson);
+            queryParams.put(DS_PROCESS_NAME, name);
+            queryParams.put(DS_PROCESS_DESC, desc);
+
+            JsonObject data = executeHttpRequest(url, POST, queryParams, 
header);
+
+            LOGGER.info("create process definition success, process definition 
data: {}", data);
+            return data != null ? 
data.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_CODE).getAsLong() : 0;
+        } catch (JsonParseException e) {
+            LOGGER.error("JsonParseException during creating process 
definition", e);
+            throw new DolphinScheduleException(
+                    JSON_PARSE_ERROR,
+                    String.format("Error creating process definition with 
name: %s and description: %s at URL: %s",
+                            name, desc, url));
+
+        } catch (DolphinScheduleException e) {
+            throw new DolphinScheduleException(
+                    PROCESS_DEFINITION_CREATION_FAILED,
+                    String.format("Error creating process definition with 
name: %s and description: %s at URL: %s",
+                            name, desc, url));
+        }
+    }
+
+    /**
+     * Releases a process definition in DolphinScheduler.
+     *
+     * @param processDefUrl The URL to release the process definition.
+     * @param processDefCode The ID of the process definition.
+     * @param token          The authentication token to be used in the 
request header.
+     * @param status         The status to set for the process definition 
(e.g., "online" or "offline").
+     * @return true if the process definition was successfully released, false 
otherwise.
+     */
+    public static boolean releaseProcessDef(String processDefUrl, long 
processDefCode,
+            String token, String status) {
+        try {
+            String url = processDefUrl + "/" + processDefCode + DS_RELEASE_URL;
+            Map<String, String> header = buildHeader(token);
+
+            Map<String, String> queryParam = new HashMap<>();
+            queryParam.put(DS_RELEASE_STATE, status);
+
+            JsonObject response = executeHttpRequest(url, POST, queryParam, 
header);
+            LOGGER.info("release process definition success, response data: 
{}", response);
+
+            return response.get(DS_RESPONSE_DATA).getAsBoolean();
+
+        } catch (JsonParseException e) {
+            LOGGER.error("JsonParseException during releasing process 
definition", e);
+            throw new DolphinScheduleException(
+                    JSON_PARSE_ERROR,
+                    String.format("Error releasing process definition with 
code: %d and status: %s at URL: %s",
+                            processDefCode, status, processDefUrl));
+
+        } catch (DolphinScheduleException e) {
+            throw new DolphinScheduleException(
+                    PROCESS_DEFINITION_RELEASE_FAILED,
+                    String.format("Error releasing process definition with 
code: %d and status: %s at URL: %s",
+                            processDefCode, status, processDefUrl));
+        }
+    }
+
+    /**
+     * Create a schedule for process definition in DolphinScheduler.
+     *
+     * @param url The URL to create a schedule for the process definition.
+     * @param processDefCode The ID of the process definition.
+     * @param token          The authentication token to be used in the 
request header.
+     * @param scheduleInfo    The schedule info
+     * @return The schedule id
+     */
+    public static int createScheduleForProcessDef(String url, long 
processDefCode,
+            String token, ScheduleInfo scheduleInfo) throws Exception {
+
+        try {
+            Map<String, String> header = buildHeader(token);
+
+            DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern(DS_DEFAULT_SCHEDULE_TIME_FORMAT);
+            String startTime = scheduleInfo.getStartTime().toLocalDateTime()
+                    
.atZone(ZoneId.of(DS_DEFAULT_TIMEZONE_ID)).format(formatter);
+            String endTime = scheduleInfo.getEndTime().toLocalDateTime()
+                    
.atZone(ZoneId.of(DS_DEFAULT_TIMEZONE_ID)).format(formatter);
+
+            String crontab;
+            switch (scheduleInfo.getScheduleType()) {
+                case 0:
+                    crontab = 
generateCrontabExpression(scheduleInfo.getScheduleUnit(),
+                            scheduleInfo.getScheduleInterval());
+                    break;
+
+                case 1:
+                    crontab = scheduleInfo.getCrontabExpression();
+                    break;
+
+                default:
+                    LOGGER.error("Unsupported schedule type: {}", 
scheduleInfo.getScheduleType());
+                    throw new DolphinScheduleException("Unsupported schedule 
type: " + scheduleInfo.getScheduleType());
+            }
+
+            DScheduleInfo dScheduleInfo = new DScheduleInfo();
+            dScheduleInfo.setStartTime(startTime);
+            dScheduleInfo.setEndTime(endTime);
+            dScheduleInfo.setCrontab(crontab);
+            dScheduleInfo.setTimezoneId(DS_DEFAULT_TIMEZONE_ID);
+            String scheduleDef = MAPPER.writeValueAsString(dScheduleInfo);
+
+            Map<String, String> queryParams = new HashMap<>();
+            queryParams.put(DS_PROCESS_CODE, String.valueOf(processDefCode));
+            queryParams.put(DS_SCHEDULE_DEF, scheduleDef);
+
+            JsonObject response = executeHttpRequest(url, POST, queryParams, 
header);
+            LOGGER.info("create schedule for process definition success, 
response data: {}", response);
+
+            return 
response.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_ID).getAsInt();
+
+        } catch (JsonParseException e) {
+            LOGGER.error("JsonParseException during releasing process 
definition", e);
+            throw new DolphinScheduleException(
+                    JSON_PARSE_ERROR,
+                    String.format("Error creating schedule for process 
definition code: %d at URL: %s",
+                            processDefCode, url));
+
+        } catch (DolphinScheduleException e) {
+            throw new DolphinScheduleException(
+                    SCHEDULE_CREATION_FAILED,
+                    String.format("Error creating schedule for process 
definition code: %d at URL: %s",
+                            processDefCode, url));
+        }
+    }
+
+    /**
+     * Online the schedule for process definition in DolphinScheduler.
+     *
+     * @param scheduleUrl The URL to online the schedule for process 
definition.
+     * @param scheduleId The ID of the schedule of process definition.
+     * @param token          The authentication token to be used in the 
request header.
+     * @return whether online is succeeded
+     */
+    public static boolean onlineScheduleForProcessDef(String scheduleUrl, int 
scheduleId,
+            String token) {
+        try {
+            Map<String, String> header = buildHeader(token);
+
+            String url = scheduleUrl + "/" + scheduleId + DS_ONLINE_URL;
+            JsonObject response = executeHttpRequest(url, POST, new 
HashMap<>(), header);
+            LOGGER.info("online schedule for process definition success, 
response data: {}", response);
+
+            if (response != null && 
!response.get(DS_RESPONSE_DATA).isJsonNull()) {
+                return response.get(DS_RESPONSE_DATA).getAsBoolean();
+            }
+            return false;
+
+        } catch (JsonParseException e) {
+            LOGGER.error("JsonParseException during online schedule", e);
+            throw new DolphinScheduleException(
+                    JSON_PARSE_ERROR,
+                    String.format("Error online schedule with ID: %d online at 
URL: %s", scheduleId, scheduleUrl));
+
+        } catch (DolphinScheduleException e) {
+            throw new DolphinScheduleException(
+                    SCHEDULE_ONLINE_FAILED,
+                    String.format("Error online schedule with ID: %d online at 
URL: %s", scheduleId, scheduleUrl));
+        }
+    }
+
+    /**
+     * Delete the process definition in DolphinScheduler.
+     *
+     * @param url The URL to delete the project or process definition.
+     * @param token          The authentication token to be used in the 
request header.
+     * @param code          The project code or process definition code
+     */
+    public static void delete(String url, String token, long code) {
+        try {
+            Map<String, String> header = buildHeader(token);
+
+            String requestUrl = url + "/" + code;
+
+            JsonObject response = executeHttpRequest(requestUrl, DELETE, new 
HashMap<>(), header);
+            LOGGER.info("delete process or project success, response data: 
{}", response);
+
+        } catch (JsonParseException e) {
+            LOGGER.error("JsonParseException during deleting process or 
project", e);
+            throw new DolphinScheduleException(
+                    JSON_PARSE_ERROR,
+                    String.format("Error deleting process or project with 
code: %d at URL: %s", code, url), e);
+
+        } catch (DolphinScheduleException e) {
+            throw new DolphinScheduleException(
+                    DELETION_FAILED,
+                    String.format("Error deleting process or project with 
code: %d at URL: %s", code, url), e);
+        }
+    }
+
+    /**
+     * Builds the header map for HTTP requests, including the authentication 
token.
+     *
+     * @param token The authentication token for the request.
+     * @return A map representing the headers of the HTTP request.
+     */
+    private static Map<String, String> buildHeader(String token) {
+        Map<String, String> headers = new HashMap<>();
+        if (StringUtils.isNotEmpty(token)) {
+            headers.put(DS_TOKEN, token);
+        }
+        return headers;
+    }
+
+    /**
+     * Builds a query parameter map used for API calls that need to paginate 
or filter results.
+     * This method can be used for searching projects or tasks.
+     *
+     * @param searchVal The value to search for.
+     * @return A map containing the necessary query parameters.
+     */
+    private static Map<String, String> buildPageParam(String searchVal) {
+        Map<String, String> queryParams = new HashMap<>();
+        queryParams.put(DS_SEARCH_VAL, searchVal);
+        queryParams.put(DS_PAGE_SIZE, DS_DEFAULT_PAGE_SIZE);
+        queryParams.put(DS_PAGE_NO, DS_DEFAULT_PAGE_NO);
+        return queryParams;
+    }
+
+    /**
+     * Calculate the offset according to schedule info
+     *
+     * @param scheduleInfo The schedule info
+     * @return timestamp between two schedule task
+     */
+    public static long calculateOffset(ScheduleInfo scheduleInfo) {
+        if (scheduleInfo == null) {
+            LOGGER.error("ScheduleInfo cannot be null");
+            throw new DolphinScheduleException("ScheduleInfo cannot be null");
+        }
+
+        long offset = 0;
+
+        // Determine offset based on schedule type
+        if (scheduleInfo.getScheduleType() == null) {
+            LOGGER.error("Schedule type cannot be null");
+            throw new DolphinScheduleException("Schedule type cannot be null");
+        }
+
+        switch (scheduleInfo.getScheduleType()) {
+            case 0: // Normal scheduling
+                offset = calculateNormalOffset(scheduleInfo);
+                break;
+            case 1: // Crontab scheduling
+                offset = calculateCronOffset(scheduleInfo);
+                break;
+            default:
+                LOGGER.error("Invalid schedule type");
+                throw new DolphinScheduleException(
+                        UNSUPPORTED_SCHEDULE_TYPE, "Invalid schedule type");
+        }
+
+        // Add delay time if specified
+        if (scheduleInfo.getDelayTime() != null) {
+            offset += scheduleInfo.getDelayTime() * MILLIS_IN_SECOND;
+        }
+
+        return offset;
+    }
+
+    private static long calculateNormalOffset(ScheduleInfo scheduleInfo) {
+        if (scheduleInfo.getScheduleInterval() == null || 
scheduleInfo.getScheduleUnit() == null) {
+            LOGGER.error("Schedule interval and unit cannot be null for normal 
scheduling");
+            throw new IllegalArgumentException("Schedule interval and unit 
cannot be null for normal scheduling");
+        }
+        switch 
(Objects.requireNonNull(ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit())))
 {
+            case YEAR:
+                return scheduleInfo.getScheduleInterval() * MILLIS_IN_YEAR;
+            case MONTH:
+                return scheduleInfo.getScheduleInterval() * MILLIS_IN_MONTH;
+            case WEEK:
+                return scheduleInfo.getScheduleInterval() * MILLIS_IN_WEEK;
+            case DAY:
+                return scheduleInfo.getScheduleInterval() * MILLIS_IN_DAY;
+            case HOUR:
+                return scheduleInfo.getScheduleInterval() * MILLIS_IN_HOUR;
+            case MINUTE:
+                return scheduleInfo.getScheduleInterval() * MILLIS_IN_MINUTE;
+            case SECOND:
+                return scheduleInfo.getScheduleInterval() * MILLIS_IN_SECOND;
+            case ONE_ROUND:
+                return scheduleInfo.getScheduleInterval();
+            default:
+                LOGGER.error("Invalid schedule unit");
+                throw new DolphinScheduleException("Invalid schedule unit");
+        }
+    }
+
+    private static long calculateCronOffset(ScheduleInfo scheduleInfo) {
+        if (scheduleInfo.getCrontabExpression() == null) {
+            LOGGER.error("Crontab expression cannot be null for schedule type 
crontab");
+            throw new DolphinScheduleException("Crontab expression cannot be 
null for schedule type crontab");
+        }
+
+        try {
+            CronExpression cronExpression = new 
CronExpression(scheduleInfo.getCrontabExpression());
+            Date firstExecution = cronExpression.getNextValidTimeAfter(new 
Date());
+            Date secondExecution = 
cronExpression.getNextValidTimeAfter(firstExecution);
+
+            if (secondExecution != null) {
+                return secondExecution.getTime() - firstExecution.getTime();
+            } else {
+                LOGGER.error("Unable to calculate the next execution times for 
the cron expression");
+                throw new DolphinScheduleException(
+                        "Unable to calculate the next execution times for the 
cron expression");
+            }
+        } catch (Exception e) {
+            LOGGER.error("Invalid cron expression: ", e);
+            throw new DolphinScheduleException(String.format("Invalid cron 
expression: %s", e.getMessage()));
+        }
+    }
+
+    private static String generateCrontabExpression(String scheduleUnit, 
Integer scheduleInterval) {
+        if (scheduleUnit.isEmpty()) {
+            LOGGER.error("Schedule unit and interval must not be null for 
generating crontab expression");
+            throw new DolphinScheduleException(
+                    "Schedule unit and interval must not be null for 
generating crontab expression");
+        }
+        String crontabExpression;
+
+        switch 
(Objects.requireNonNull(ScheduleUnit.getScheduleUnit(scheduleUnit))) {
+            case SECOND:
+                crontabExpression = String.format("0/%d * * * * ? *", 
scheduleInterval);
+                break;
+            case MINUTE:
+                crontabExpression = String.format("* 0/%d * * * ? *", 
scheduleInterval);
+                break;
+            case HOUR:
+                crontabExpression = String.format("* * 0/%d * * ? *", 
scheduleInterval);
+                break;
+            case DAY:
+                crontabExpression = String.format("* * * 1/%d * ? *", 
scheduleInterval);
+                break;
+            case WEEK:
+                crontabExpression = String.format("* * * 1/%d * ? *", 
scheduleInterval * 7);
+                break;
+            case MONTH:
+                crontabExpression = String.format("* * * * 0/%d ? *", 
scheduleInterval);
+                break;
+            case YEAR:
+                crontabExpression = String.format("* * * * * ? 0/%d", 
scheduleInterval);
+                break;
+            default:
+                LOGGER.error("Unsupported schedule unit for generating 
crontab: {}", scheduleUnit);
+                throw new DolphinScheduleException("Unsupported schedule unit 
for generating crontab: " + scheduleUnit);
+        }
+
+        return crontabExpression;
+    }
+
+    /**
+     * Executes an HTTP request using OkHttp. Supports various HTTP methods 
(GET, POST, PUT, DELETE).
+     *
+     * @param url         The URL of the request.
+     * @param method      The HTTP method (GET, POST, PUT, DELETE).
+     * @param queryParams The query parameters for the request (optional).
+     * @param headers     The headers for the request.
+     * @return A JsonObject containing the response from the server.
+     * @throws DolphinScheduleException If an error occurs during the request.
+     */
+    private static JsonObject executeHttpRequest(String url, String method, 
Map<String, String> queryParams,
+            Map<String, String> headers) {
+        HttpUrl.Builder urlBuilder = 
Objects.requireNonNull(HttpUrl.parse(url)).newBuilder();
+
+        for (Map.Entry<String, String> entry : queryParams.entrySet()) {
+            urlBuilder.addQueryParameter(entry.getKey(), entry.getValue());
+        }
+        HttpUrl httpUrl = urlBuilder.build();
+
+        Request.Builder requestBuilder = new Request.Builder()
+                .url(httpUrl);
+
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+            requestBuilder.addHeader(entry.getKey(), entry.getValue());
+        }
+        RequestBody body = RequestBody.create(MediaType.parse(CONTENT_TYPE), 
"");
+
+        switch (method.toUpperCase()) {
+            case POST:
+                requestBuilder.post(body);
+                break;
+            case GET:
+                requestBuilder.get();
+                break;
+            case DELETE:
+                requestBuilder.delete(body);
+                break;
+            default:
+                throw new DolphinScheduleException(INVALID_HTTP_METHOD,
+                        String.format("Unsupported request method: %s", 
method));
+        }
+
+        Request request = requestBuilder.build();
+
+        // get response
+        try (Response response = CLIENT.newCall(request).execute()) {
+            String responseBody = response.body() != null ? 
response.body().string() : null;
+            LOGGER.debug("HTTP request to {} completed with status code {}", 
httpUrl, response.code());
+
+            if (response.isSuccessful() && responseBody != null) {
+                return JsonParser.parseString(responseBody).getAsJsonObject();
+            } else {
+                LOGGER.error("HTTP request to {} failed. HTTP Status: {}, 
Response Body: {}", httpUrl, response.code(),
+                        responseBody != null ? responseBody : "No response 
body");
+
+                throw new DolphinScheduleException(
+                        HTTP_REQUEST_FAILED,
+                        String.format("HTTP request to %s failed. Status: %d, 
Response: %s",
+                                httpUrl, response.code(), responseBody != null 
? responseBody : "No response body"));
+            }
+        } catch (IOException e) {
+            throw new DolphinScheduleException(
+                    NETWORK_ERROR,
+                    String.format("Network error during HTTP request to %s. 
Reason: %s", httpUrl, e.getMessage()));
+        } catch (Exception e) {
+            throw new DolphinScheduleException(
+                    UNEXPECTED_ERROR,
+                    String.format("Unexpected error during HTTP request to %s. 
Reason: %s", httpUrl, e.getMessage()));
+        }
+    }
+
+    /**
+     * Shell node in DolphinScheduler need to write in a script
+     * When process definition schedule run, the shell node run,
+     * Call back in inlong, sending a request with parameters required
+     */
+    private static String buildScript(String host, int port, String username, 
String password, long offset,
+            String groupId) {
+        LOGGER.info("build script for host: {}, port: {}, username: {}, 
password: {}, offset: {}, groupId: {}", host,
+                port, username, password, offset, groupId);
+        return "#!/bin/bash\n\n" +
+
+        // Get current timestamp
+                "# Get current timestamp\n" +
+                "lowerBoundary=$(date +%s)\n" +
+                "echo \"get lowerBoundary: ${lowerBoundary}\"\n" +
+                "upperBoundary=$(($lowerBoundary + " + offset + "))\n" +
+                "echo \"get upperBoundary: ${upperBoundary}\"\n\n" +
+
+                // Set URL
+                "# Set URL and HTTP method\n" +
+                "url=\"http://"; + host + ":" + port + SHELL_REQUEST_API +
+                "?username=" + username + "&password=" + password + "\"\n" +
+                "echo \"get url: ${url}\"\n" +
+
+                // Set HTTP method
+                "httpMethod=\"POST\"\n\n" +
+
+                // Set request body
+                "# Build request body\n" +
+                "jsonBody=$(cat <<EOF\n" +
+                "{\n" +
+                "  \"boundaryType\": \"" + BoundaryType.TIME.getType() + 
"\",\n" +
+                "  \"groupId\": \"" + groupId + "\",\n" +
+                "  \"lowerBoundary\": \"${lowerBoundary}\",\n" +
+                "  \"upperBoundary\": \"${upperBoundary}\"\n" +
+                "}\n" +
+                "EOF\n)\n\n" +
+                "echo \"${jsonBody}\"\n\n" +
+
+                // Send request
+                "# Send request\n" +
+                "response=$(curl -s -X \"$httpMethod\" \"$url\" \\\n" +
+                "  -H \"" + CONTENT_TYPE + "\" \\\n" +
+                "  -d \"$jsonBody\")\n\n" +
+
+                // Log response
+                "# Log response\n" +
+                "echo \"Request Sending success, Response: $response\"";
+    }
+
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java
new file mode 100644
index 0000000000..348697b672
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.exception;
+
+import lombok.Getter;
+
+/**
+ * Custom exception for DolphinScheduler operations.
+ * Provides error codes, detailed messages, and optional nested exceptions.
+ */
+@Getter
+public class DolphinScheduleException extends RuntimeException {
+
+    // Common error codes
+    public static final String UNIQUE_CHECK_FAILED = "UNIQUE_CHECK_FAILED";
+    public static final String JSON_PARSE_ERROR = "JSON_PARSE_ERROR";
+    public static final String DELETION_FAILED = "DELETION_FAILED";
+    public static final String INVALID_HTTP_METHOD = "INVALID_HTTP_METHOD";
+    public static final String HTTP_REQUEST_FAILED = "HTTP_REQUEST_FAILED";
+    public static final String NETWORK_ERROR = "NETWORK_ERROR";
+    public static final String UNEXPECTED_ERROR = "UNEXPECTED_ERROR";
+
+    // Project-related error codes
+    public static final String PROJECT_CREATION_FAILED = 
"PROJECT_CREATION_FAILED";
+
+    // TaskCode-related error codes
+    public static final String GEN_TASK_CODE_FAILED = "GEN_TASK_CODE_FAILED";
+
+    // Process-related error codes
+    public static final String PROCESS_DEFINITION_QUERY_FAILED = 
"PROCESS_DEFINITION_QUERY_FAILED";
+    public static final String PROCESS_DEFINITION_CREATION_FAILED = 
"PROCESS_DEFINITION_CREATION_FAILED";
+    public static final String PROCESS_DEFINITION_RELEASE_FAILED = 
"PROCESS_DEFINITION_RELEASE_FAILED";
+    public static final String SCHEDULE_CREATION_FAILED = 
"SCHEDULE_CREATION_FAILED";
+    public static final String SCHEDULE_ONLINE_FAILED = 
"SCHEDULE_ONLINE_FAILED";
+    public static final String UNSUPPORTED_SCHEDULE_TYPE = 
"UNSUPPORTED_SCHEDULE_TYPE";
+
+    private final String errorCode;
+    private final String detailedMessage;
+
+    /**
+     * Constructor with message only.
+     *
+     * @param message The error message.
+     */
+    public DolphinScheduleException(String message) {
+        this(null, message, null);
+    }
+
+    /**
+     * Constructor with message and cause.
+     *
+     * @param message The error message.
+     * @param cause   The underlying cause of the exception.
+     */
+    public DolphinScheduleException(String message, Throwable cause) {
+        this(null, message, cause);
+    }
+
+    /**
+     * Constructor with error code, message, and cause.
+     *
+     * @param errorCode       A specific error code for the exception.
+     * @param detailedMessage A detailed error message providing additional 
context.
+     * @param cause           The underlying cause of the exception (optional).
+     */
+    public DolphinScheduleException(String errorCode, String detailedMessage, 
Throwable cause) {
+        super(detailedMessage, cause);
+        this.errorCode = errorCode;
+        this.detailedMessage = detailedMessage;
+    }
+
+    /**
+     * Constructor with error code and message.
+     *
+     * @param errorCode       A specific error code for the exception.
+     * @param detailedMessage A detailed error message providing additional 
context.
+     */
+    public DolphinScheduleException(String errorCode, String detailedMessage) {
+        this(errorCode, detailedMessage, null);
+    }
+
+    @Override
+    public String toString() {
+        return "DolphinScheduleException{" +
+                "errorCode='" + errorCode + '\'' +
+                ", detailedMessage='" + detailedMessage + '\'' +
+                ", cause=" + getCause() +
+                '}';
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleContainerTestEnv.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleContainerTestEnv.java
new file mode 100644
index 0000000000..c57bd6e783
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleContainerTestEnv.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.manager.schedule.BaseScheduleTest;
+import org.apache.inlong.manager.schedule.exception.DolphinScheduleException;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.http.HttpHeaders.CONTENT_TYPE;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_DATA;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_COOKIE;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_COOKIE_SC_TYPE;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_COOKIE_SESSION_ID;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_PASSWORD;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_SERVICE_URL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_USERID;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_USERNAME;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_EXPIRE_TIME;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_EXPIRE_TIME_FORMAT;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_IMAGE_NAME;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_IMAGE_TAG;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_LOGIN_URL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_PASSWORD;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_RESPONSE_TOKEN;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_TOKEN_GEN_URL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_TOKEN_URL;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_USERID;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_USERNAME;
+import static 
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.INTER_CONTAINER_DS_ALIAS;
+
+public abstract class DolphinScheduleContainerTestEnv extends BaseScheduleTest 
{
+
+    private static final Logger DS_LOG = 
LoggerFactory.getLogger(DolphinScheduleEngineTest.class);
+
+    private static final Network NETWORK = Network.newNetwork();
+
+    protected static final GenericContainer<?> dolphinSchedulerContainer =
+            new GenericContainer<>(DS_IMAGE_NAME + ":" + DS_IMAGE_TAG)
+                    .withExposedPorts(12345, 25333)
+                    .withEnv("TZ", DS_DEFAULT_TIMEZONE_ID)
+                    .withNetwork(NETWORK)
+                    .withAccessToHost(true)
+                    .withNetworkAliases(INTER_CONTAINER_DS_ALIAS)
+                    .withLogConsumer(new Slf4jLogConsumer(DS_LOG));
+
+    /**
+     * This method just for DS testing, login by default admin username and 
password
+     * generate a 1-day expiring token for test, the token will disappear with 
the DS container shutting down
+     *
+     * @return the DS token
+     */
+    protected static String accessToken() {
+        Map<String, String> loginParams = new HashMap<>();
+        loginParams.put(DS_USERNAME, DS_DEFAULT_USERNAME);
+        loginParams.put(DS_PASSWORD, DS_DEFAULT_PASSWORD);
+        try {
+            JsonObject loginResponse =
+                    executeHttpRequest(DS_DEFAULT_SERVICE_URL + DS_LOGIN_URL, 
loginParams, new HashMap<>());
+            if (loginResponse.get("success").getAsBoolean()) {
+                String tokenGenUrl = DS_DEFAULT_SERVICE_URL + DS_TOKEN_URL + 
DS_TOKEN_GEN_URL;
+                Map<String, String> tokenParams = new HashMap<>();
+                tokenParams.put(DS_USERID, String.valueOf(DS_DEFAULT_USERID));
+
+                LocalDateTime now = LocalDateTime.now();
+                LocalDateTime tomorrow = now.plusDays(1);
+                DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern(DS_EXPIRE_TIME_FORMAT);
+                String expireTime = tomorrow.format(formatter);
+                tokenParams.put(DS_EXPIRE_TIME, expireTime);
+
+                Map<String, String> cookies = new HashMap<>();
+                cookies.put(DS_COOKIE_SC_TYPE, 
loginResponse.get(DS_RESPONSE_DATA)
+                        
.getAsJsonObject().get(DS_COOKIE_SC_TYPE).getAsString());
+                cookies.put(DS_COOKIE_SESSION_ID, 
loginResponse.get(DS_RESPONSE_DATA)
+                        
.getAsJsonObject().get(DS_COOKIE_SESSION_ID).getAsString());
+
+                JsonObject tokenGenResponse = executeHttpRequest(tokenGenUrl, 
tokenParams, cookies);
+
+                String accessTokenUrl = DS_DEFAULT_SERVICE_URL + DS_TOKEN_URL;
+                tokenParams.put(DS_RESPONSE_TOKEN, 
tokenGenResponse.get(DS_RESPONSE_DATA).getAsString());
+                JsonObject result = executeHttpRequest(accessTokenUrl, 
tokenParams, cookies);
+                String token = 
result.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_RESPONSE_TOKEN).getAsString();
+                DS_LOG.info("login and generate token success, token: {}", 
token);
+                return token;
+            }
+            return null;
+        } catch (Exception e) {
+            DS_LOG.error("login and generate token fail: ", e);
+            throw new DolphinScheduleException(String.format("login and 
generate token fail: %s", e.getMessage()));
+        }
+    }
+
+    private static JsonObject executeHttpRequest(String url, Map<String, 
String> queryParams,
+            Map<String, String> cookies) throws IOException {
+        OkHttpClient client = new OkHttpClient();
+
+        // Build query parameters
+        HttpUrl.Builder urlBuilder = 
Objects.requireNonNull(HttpUrl.parse(url)).newBuilder();
+        for (Map.Entry<String, String> entry : queryParams.entrySet()) {
+            urlBuilder.addQueryParameter(entry.getKey(), entry.getValue());
+        }
+        HttpUrl httpUrl = urlBuilder.build();
+
+        // Build the request
+        Request.Builder requestBuilder = new Request.Builder()
+                .url(httpUrl);
+
+        // Add cookies to the request
+        if (cookies != null && !cookies.isEmpty()) {
+            String cookieHeader = cookies.entrySet()
+                    .stream()
+                    .map(entry -> entry.getKey() + "=" + entry.getValue())
+                    .collect(Collectors.joining("; "));
+            requestBuilder.header(DS_COOKIE, cookieHeader);
+        }
+
+        RequestBody body = RequestBody.create(MediaType.parse(CONTENT_TYPE), 
"");
+        requestBuilder.post(body);
+
+        Request request = requestBuilder.build();
+
+        // Execute the request and parse the response
+        try (Response response = client.newCall(request).execute()) {
+            if (response.isSuccessful() && response.body() != null) {
+                String responseBody = response.body().string();
+                return JsonParser.parseString(responseBody).getAsJsonObject();
+            } else {
+                DS_LOG.error("Unexpected http response error: {}", response);
+                throw new DolphinScheduleException("Unexpected http response 
error " + response);
+            }
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java
new file mode 100644
index 0000000000..f95a5268ee
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.Timeout;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.ComponentScan;
+
+import javax.annotation.Resource;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+@SpringBootTest(classes = DolphinScheduleEngineTest.class)
+@ComponentScan(basePackages = "org.apache.inlong.manager")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class DolphinScheduleEngineTest extends DolphinScheduleContainerTestEnv 
{
+
+    @Resource
+    private DolphinScheduleEngine dolphinScheduleEngine;
+
+    @BeforeAll
+    public void beforeAll() {
+        dolphinSchedulerContainer.setPortBindings(Arrays.asList("12345:12345", 
"25333:25333"));
+        dolphinSchedulerContainer.start();
+        assertTrue(dolphinSchedulerContainer.isRunning(), "DolphinScheduler 
container should be running");
+
+        String token = accessToken();
+        dolphinScheduleEngine.setToken(token);
+        dolphinScheduleEngine.start();
+    }
+
+    @AfterAll
+    public void afterAll() {
+        dolphinScheduleEngine.stop();
+        if (dolphinSchedulerContainer != null) {
+            dolphinSchedulerContainer.stop();
+        }
+    }
+
+    @Test
+    @Order(1)
+    @Timeout(30)
+    public void testRegisterScheduleInfo() {
+        // 1. test for normal schedule
+        ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+        testRegister(scheduleInfo);
+
+        // 2. test for cron schedule
+        scheduleInfo = genDefaultCronScheduleInfo();
+        testRegister(scheduleInfo);
+    }
+
+    private void testRegister(ScheduleInfo scheduleInfo) {
+        // register schedule info
+        dolphinScheduleEngine.handleRegister(scheduleInfo);
+        assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size());
+    }
+
+    @Test
+    @Order(2)
+    @Timeout(30)
+    public void testUnRegisterScheduleInfo() {
+        // 1. test for normal schedule
+        ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+        testUnRegister(scheduleInfo);
+
+        // 2. test for cron schedule, gen cron schedule info, */2 * * * * ?
+        scheduleInfo = genDefaultCronScheduleInfo();
+        testUnRegister(scheduleInfo);
+    }
+
+    private void testUnRegister(ScheduleInfo scheduleInfo) {
+        // register schedule info
+        dolphinScheduleEngine.handleRegister(scheduleInfo);
+        assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size());
+
+        // Un-register schedule info
+        
dolphinScheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId());
+        assertEquals(0, dolphinScheduleEngine.getScheduledProcessMap().size());
+    }
+
+    @Test
+    @Order(3)
+    @Timeout(30)
+    public void testUpdateScheduleInfo() {
+        // 1. test for normal schedule
+        ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+        testRegister(scheduleInfo);
+
+        // 2. test for cron schedule, gen cron schedule info, */2 * * * * ?
+        scheduleInfo = genDefaultCronScheduleInfo();
+        testUpdate(scheduleInfo);
+    }
+
+    private void testUpdate(ScheduleInfo scheduleInfo) {
+        // register schedule info
+        dolphinScheduleEngine.handleUpdate(scheduleInfo);
+        assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size());
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java
new file mode 100644
index 0000000000..a2f6d97e0c
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+public class DolphinSchedulerContainerEnvConstants {
+
+    // DS env image related constants
+    protected static final String DS_IMAGE_NAME = 
"apache/dolphinscheduler-standalone-server";
+    protected static final String DS_IMAGE_TAG = "3.2.2";
+    protected static final String INTER_CONTAINER_DS_ALIAS = 
"dolphinscheduler";
+
+    // DS env url related constants
+    protected static final String DS_DEFAULT_SERVICE_URL = 
"http://127.0.0.1:12345/dolphinscheduler";;
+    protected static final String DS_LOGIN_URL = "/login";
+    protected static final String DS_TOKEN_URL = "/access-tokens";
+    protected static final String DS_TOKEN_GEN_URL = "/generate";
+
+    // DS env api params related constants
+    protected static final String DS_USERNAME = "userName";
+    protected static final String DS_PASSWORD = "userPassword";
+    protected static final String DS_USERID = "userId";
+    protected static final String DS_COOKIE = "Cookie";
+    protected static final String DS_COOKIE_SC_TYPE = "securityConfigType";
+    protected static final String DS_COOKIE_SESSION_ID = "sessionId";
+    protected static final String DS_EXPIRE_TIME = "expireTime";
+    protected static final String DS_EXPIRE_TIME_FORMAT = "yyyy-MM-dd 
HH:mm:ss";
+
+    // DS env token related constants
+    protected static final String DS_RESPONSE_TOKEN = "token";
+
+    // DS env default admin user info
+    protected static final String DS_DEFAULT_USERNAME = "admin";
+    protected static final String DS_DEFAULT_PASSWORD = "dolphinscheduler123";
+    protected static final Integer DS_DEFAULT_USERID = 1;
+
+}
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-dev.properties 
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 2bad5f801f..e05c66a674 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -103,11 +103,13 @@ agent.install.temp.path=inlong/agent-installer-temp/
 
 # The primary key id of the default agent module used
 default.module.id=1
-# schedule engine type
-# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
 
+# Dirty log
 dirty.log.clean.enabled=false
 dirty.log.clean.interval.minutes=5
 dirty.dirty.retention.minutes=10
-dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
+
+# DolphinScheduler related config
+schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
+schedule.engine.dolphinscheduler.token=default_token_value
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-prod.properties 
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 040c868bcf..7441ea55e6 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -95,11 +95,12 @@ group.deleted.enabled=false
 # Tencent cloud log service endpoint, The Operator cls resource by it
 cls.manager.endpoint=127.0.0.1
 
-# schedule engine type
-# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
-
+# Dirty log
 dirty.log.clean.enabled=false
 dirty.log.clean.interval.minutes=5
 dirty.dirty.retention.minutes=10
-dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
+
+# DolphinScheduler related config
+schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
+schedule.engine.dolphinscheduler.token=default_token_value
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-test.properties 
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 393eef6b05..f0e42182c5 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -96,11 +96,12 @@ group.deleted.enabled=false
 # Tencent cloud log service endpoint, The Operator cls resource by it
 cls.manager.endpoint=127.0.0.1
 
-# schedule engine type
-# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
-
+# Dirty log
 dirty.log.clean.enabled=false
 dirty.log.clean.interval.minutes=5
 dirty.dirty.retention.minutes=10
-dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
+
+# DolphinScheduler related config
+schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
+schedule.engine.dolphinscheduler.token=default_token_value

Reply via email to