This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new befe172c08 [INLONG-11400][Manager] Support Airflow schedule engine 
(#11479)
befe172c08 is described below

commit befe172c08b4e07f6de378cd36c392ea452bf0fd
Author: Zkplo <87751516+zk...@users.noreply.github.com>
AuthorDate: Wed Nov 20 22:04:03 2024 +0800

    [INLONG-11400][Manager] Support Airflow schedule engine (#11479)
---
 .../pojo/schedule/airflow/AirflowConnection.java   |  70 +++++
 .../inlong/manager/pojo/schedule/airflow/DAG.java  |  50 ++++
 .../pojo/schedule/airflow/DAGCollection.java}      |  30 ++-
 .../manager/pojo/schedule/airflow/DAGRun.java      |  50 ++++
 .../manager/pojo/schedule/airflow/DAGRunConf.java  |  68 +++++
 .../manager/pojo/schedule/airflow/Error.java       |  52 ++++
 .../manager/schedule/ScheduleEngineType.java       |   1 +
 .../schedule/airflow/AirFlowAPIConstant.java       |  40 +++
 .../schedule/airflow/AirflowScheduleClient.java    |  58 ++++
 .../schedule/airflow/AirflowScheduleEngine.java    | 258 ++++++++++++++++++
 .../schedule/airflow/AirflowServerClient.java      |  71 +++++
 .../manager/schedule/airflow/api/AirflowApi.java   |  75 ++++++
 .../api/AirflowResponse.java}                      |  37 ++-
 .../schedule/airflow/api/BaseAirflowApi.java       | 149 +++++++++++
 .../api/connection/AirflowConnectionCreator.java   |  99 +++++++
 .../api/connection/AirflowConnectionGetter.java    |  61 +++++
 .../airflow/api/dag/DAGCollectionUpdater.java      |  79 ++++++
 .../schedule/airflow/api/dag/DAGDeletor.java       |  69 +++++
 .../schedule/airflow/api/dag/DAGUpdater.java       |  78 ++++++
 .../airflow/api/dagruns/DAGRunsTrigger.java        | 100 +++++++
 .../schedule/airflow/config/AirflowConfig.java     |  86 ++++++
 .../interceptor/AirflowAuthInterceptor.java        |  51 ++++
 .../interceptor/LoggingInterceptor.java}           |  32 ++-
 .../util/DAGUtil.java}                             |  19 +-
 .../manager/schedule/airflow/util/DateUtil.java    |  58 ++++
 .../exception/AirflowScheduleException.java        |  62 +++++
 .../schedule/airflow/AirflowContainerEnv.java      | 139 ++++++++++
 .../airflow/AirflowScheduleEngineTest.java         | 110 ++++++++
 .../src/test/resources/airflow/dag_cleaner.py      |  80 ++++++
 .../src/test/resources/airflow/dag_creator.py      | 148 +++++++++++
 .../src/test/resources/airflow/docker-compose.yaml | 292 +++++++++++++++++++++
 .../src/test/resources/airflow/testGroup_cron.py   | 112 ++++++++
 .../src/test/resources/airflow/testGroup_normal.py | 110 ++++++++
 .../src/main/resources/application-dev.properties  |  10 +
 .../src/main/resources/application-prod.properties |  10 +
 .../src/main/resources/application-test.properties |  10 +
 36 files changed, 2776 insertions(+), 48 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java
new file mode 100644
index 0000000000..deb056ed4f
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.airflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "Full representation of the connection.")
+public class AirflowConnection {
+
+    @JsonProperty("connection_id")
+    @ApiModelProperty("The connection ID.")
+    private String connectionId;
+
+    @JsonProperty("conn_type")
+    @ApiModelProperty("The connection type.")
+    private String connType;
+
+    @JsonProperty("description")
+    @ApiModelProperty("The description of the connection.")
+    private String description;
+
+    @JsonProperty("host")
+    @ApiModelProperty("Host of the connection.")
+    private String host;
+
+    @JsonProperty("login")
+    @ApiModelProperty("Login of the connection.")
+    private String login;
+
+    @JsonProperty("schema")
+    @ApiModelProperty("Schema of the connection.")
+    private String schema;
+
+    @JsonProperty("port")
+    @ApiModelProperty("Port of the connection.")
+    private Integer port;
+
+    @JsonProperty("password")
+    @ApiModelProperty("Password of the connection.")
+    private String password;
+
+    @JsonProperty("extra")
+    @ApiModelProperty("Additional information description of the connection.")
+    private String extra;
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java
new file mode 100644
index 0000000000..578eadb151
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.airflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "DAG Description Information.")
+public class DAG {
+
+    @JsonProperty("dag_id")
+    @ApiModelProperty("The ID of the DAG.")
+    private String dagId;
+
+    @JsonProperty("root_dag_id")
+    @ApiModelProperty("If the DAG is SubDAG then it is the top level DAG 
identifier. Otherwise, null.")
+    private String rootDagId;
+
+    @JsonProperty("is_paused")
+    @ApiModelProperty("Whether the DAG is paused.")
+    private Boolean isPaused;
+
+    @JsonProperty("is_active")
+    @ApiModelProperty("Whether the DAG is currently seen by the scheduler(s).")
+    private Boolean isActive;
+
+    @JsonProperty("description")
+    @ApiModelProperty("User-provided DAG description, which can consist of 
several sentences or paragraphs that describe DAG contents.")
+    private String description;
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java
similarity index 55%
copy from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java
index ac71e4e2d1..7a52548f41 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java
@@ -15,20 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.pojo.schedule.airflow;
 
-import lombok.Getter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
 
-@Getter
-public enum ScheduleEngineType {
+import java.util.List;
 
-    NONE("None"),
-    QUARTZ("Quartz"),
-    DOLPHINSCHEDULER("DolphinScheduler");
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "Collection of DAGs.")
+public class DAGCollection {
 
-    private final String type;
+    @JsonProperty("dags")
+    @ApiModelProperty("List of DAGs.")
+    private List<DAG> dags = null;
 
-    ScheduleEngineType(String type) {
-        this.type = type;
-    }
-}
\ No newline at end of file
+    @JsonProperty("total_entries")
+    @ApiModelProperty("The length of DAG list.")
+    private Integer totalEntries;
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java
new file mode 100644
index 0000000000..e9384c75da
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.airflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "DAGRun Description Information.")
+public class DAGRun {
+
+    @JsonProperty("conf")
+    @ApiModelProperty("JSON object describing additional configuration 
parameters.")
+    private Object conf;
+
+    @JsonProperty("dag_id")
+    @ApiModelProperty("Airflow DAG id.")
+    private String dagId;
+
+    @JsonProperty("dag_run_id")
+    @ApiModelProperty("Airflow DAGRun id (Nullable).")
+    private String dagRunId;
+
+    @JsonProperty("end_date")
+    @ApiModelProperty("The end time of this DAGRun.")
+    private String endDate;
+
+    @JsonProperty("start_date")
+    @ApiModelProperty("The start time of this DAGRun.")
+    private String startDate;
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java
new file mode 100644
index 0000000000..4154c2526c
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.airflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "DAGRunConf Description Information.")
+public class DAGRunConf {
+
+    @JsonProperty("inlong_group_id")
+    @ApiModelProperty("Specify the Inlong group ID")
+    private String inlongGroupId;
+
+    @JsonProperty("start_time")
+    @ApiModelProperty("The start time of DAG scheduling.")
+    private long startTime;
+
+    @JsonProperty("end_time")
+    @ApiModelProperty("The end time of DAG scheduling.")
+    private long endTime;
+
+    @JsonProperty("boundary_type")
+    @ApiModelProperty("The offline task boundary type.")
+    private String boundaryType;
+
+    @JsonProperty("cron_expr")
+    @ApiModelProperty("Cron expression.")
+    private String cronExpr;
+
+    @JsonProperty("seconds_interval")
+    @ApiModelProperty("Time interval (in seconds).")
+    private String secondsInterval;
+
+    @JsonProperty("connection_id")
+    @ApiModelProperty("Airflow Connection Id of Inlong Manager.")
+    private String connectionId;
+
+    @JsonProperty("timezone")
+    @ApiModelProperty("The timezone.")
+    private String timezone;
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java
new file mode 100644
index 0000000000..3eb76fd677
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.airflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "[RFC7807](https://tools.ietf.org/html/rfc7807) 
compliant response. ")
+public class Error {
+
+    @JsonProperty("detail")
+    @ApiModelProperty("Error Details.")
+    private String detail;
+
+    @JsonProperty("instance")
+    @ApiModelProperty("Error of the instance.")
+    private String instance;
+
+    @JsonProperty("status")
+    @ApiModelProperty("Error of the status.")
+    private BigDecimal status;
+
+    @JsonProperty("title")
+    @ApiModelProperty("Error of the title.")
+    private String title;
+
+    @JsonProperty("type")
+    @ApiModelProperty("Error of the type.")
+    private String type;
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
index ac71e4e2d1..8ae586609d 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
@@ -24,6 +24,7 @@ public enum ScheduleEngineType {
 
     NONE("None"),
     QUARTZ("Quartz"),
+    AIRFLOW("Airflow"),
     DOLPHINSCHEDULER("DolphinScheduler");
 
     private final String type;
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java
new file mode 100644
index 0000000000..e328d8fd0a
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+/**
+ * Contains constants for interacting with the Airflow API.
+ */
+public class AirFlowAPIConstant {
+
+    public static final String DEFAULT_TIMEZONE = "Asia/Shanghai";
+    public static final String INLONG_OFFLINE_DAG_TASK_PREFIX = 
"inlong_offline_task_";
+    public static final String SUBMIT_OFFLINE_JOB_URI = 
"/inlong/manager/api/group/submitOfflineJob";
+
+    // AirflowConnection
+    public static final String LIST_CONNECTIONS_URI = "/api/v1/connections";
+    public static final String GET_CONNECTION_URI = 
"/api/v1/connections/{connection_id}";
+
+    // DAG
+    public static final String LIST_DAGS_URI = "/api/v1/dags";
+    public static final String UPDATE_DAG_URI = "/api/v1/dags/{dag_id}";
+
+    // DAGRun
+    public static final String TRIGGER_NEW_DAG_RUN_URI = 
"/api/v1/dags/{dag_id}/dagRuns";
+
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java
new file mode 100644
index 0000000000..bbb8e59149
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.schedule.ScheduleEngineClient;
+import org.apache.inlong.manager.schedule.ScheduleEngineType;
+
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+/**
+ * Built-in implementation of schedule engine client corresponding with {@link 
AirflowScheduleEngine}.
+ * AirflowScheduleClient simply invokes the {@link AirflowScheduleEngine} to 
register/unregister/update
+ * schedule info instead of calling a remote schedule service.
+ * */
+@Service
+public class AirflowScheduleClient implements ScheduleEngineClient {
+
+    @Resource
+    public AirflowScheduleEngine scheduleEngine;
+
+    @Override
+    public boolean accept(String engineType) {
+        return 
ScheduleEngineType.AIRFLOW.getType().equalsIgnoreCase(engineType);
+    }
+
+    @Override
+    public boolean register(ScheduleInfo scheduleInfo) {
+        return scheduleEngine.handleRegister(scheduleInfo);
+    }
+
+    @Override
+    public boolean unregister(String groupId) {
+        return scheduleEngine.handleUnregister(groupId);
+    }
+
+    @Override
+    public boolean update(ScheduleInfo scheduleInfo) {
+        return scheduleEngine.handleUpdate(scheduleInfo);
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
new file mode 100644
index 0000000000..792307e6ae
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+import org.apache.inlong.common.bounded.BoundaryType;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection;
+import org.apache.inlong.manager.pojo.schedule.airflow.DAG;
+import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection;
+import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun;
+import org.apache.inlong.manager.pojo.schedule.airflow.DAGRunConf;
+import org.apache.inlong.manager.schedule.ScheduleEngine;
+import org.apache.inlong.manager.schedule.ScheduleUnit;
+import org.apache.inlong.manager.schedule.airflow.api.AirflowResponse;
+import 
org.apache.inlong.manager.schedule.airflow.api.connection.AirflowConnectionCreator;
+import 
org.apache.inlong.manager.schedule.airflow.api.connection.AirflowConnectionGetter;
+import org.apache.inlong.manager.schedule.airflow.api.dag.DAGCollectionUpdater;
+import org.apache.inlong.manager.schedule.airflow.api.dag.DAGDeletor;
+import org.apache.inlong.manager.schedule.airflow.api.dag.DAGUpdater;
+import org.apache.inlong.manager.schedule.airflow.api.dagruns.DAGRunsTrigger;
+import org.apache.inlong.manager.schedule.airflow.config.AirflowConfig;
+import org.apache.inlong.manager.schedule.airflow.util.DAGUtil;
+import org.apache.inlong.manager.schedule.airflow.util.DateUtil;
+import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.mina.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.DEFAULT_TIMEZONE;
+import static 
org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX;
+import static 
org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.SUBMIT_OFFLINE_JOB_URI;
+import static 
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.DAG_DUPLICATE;
+import static 
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.INIT_CONNECTION_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_ENGINE_SHUTDOWN_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_REGISTER_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_UPDATE_FAILED;
+import static 
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.TASK_DAG_SWITCH_FAILED;
+
+/**
+ * Response for processing the start/register/unregister/update/stop requests 
from {@link AirflowScheduleClient}
+ */
+@Service
+public class AirflowScheduleEngine implements ScheduleEngine {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AirflowScheduleEngine.class);
+    private final Set<String> scheduledJobSet = new ConcurrentHashSet<>();
+    private AirflowServerClient serverClient;
+    private AirflowConfig airflowConfig;
+
+    public AirflowScheduleEngine(AirflowServerClient serverClient, 
AirflowConfig airflowConfig) {
+        this.serverClient = serverClient;
+        this.airflowConfig = airflowConfig;
+        start();
+    }
+
+    @Override
+    public void start() {
+        try {
+            // Create authentication information for the Inlong Manger API 
used by AirFlow
+            initConnection();
+            // Check if DagCleaner and DagCreator exist and unpause them
+            switchOriginalDAG(false);
+            // Start all task DAGs and load all DAG ID(Group Id) into the 
local cache
+            switchAllTaskDAG(false);
+            LOGGER.info("Airflow initialization succeeded.");
+        } catch (Exception e) {
+            LOGGER.error("Airflow initialization failed.", e);
+        }
+    }
+
+    private void initConnection() throws Exception {
+        LOGGER.info("Initializing Inlong Manager AirflowConnection for Airflow 
... ");
+        // Check if Airflow has the Inlong AirflowConnection
+        AirflowResponse<AirflowConnection> response = serverClient.sendRequest(
+                new AirflowConnectionGetter(airflowConfig.getConnectionId()));
+        if (!response.isSuccess()) {
+            AirflowConnection newConn = new 
AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "",
+                    airflowConfig.getHost(), 
airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
+                    airflowConfig.getPort(), 
airflowConfig.getInlongPassword(), "");
+            response = serverClient.sendRequest(new 
AirflowConnectionCreator(newConn));
+            LOGGER.info("AirflowConnection registration response: {}", 
response.toString());
+            if (!response.isSuccess()) {
+                LOGGER.error("Initialization connection failed.");
+                throw new AirflowScheduleException(INIT_CONNECTION_FAILED, 
"Initialization connection failed.");
+            }
+        }
+    }
+
+    private void switchOriginalDAG(boolean isPaused) {
+        for (String dagId : Arrays.asList(airflowConfig.getDagCleanerId(), 
airflowConfig.getDagCreatorId())) {
+            try {
+                AirflowResponse<DAG> response = serverClient.sendRequest(new 
DAGUpdater(dagId, isPaused));
+                LOGGER.info("Response to {} the original DAG : {}", isPaused ? 
"stop" : "start", response.toString());
+                if (!response.isSuccess()) {
+                    throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
+                            String.format("%s does not exist or failed to 
%s.", dagId, (isPaused ? "stop" : "start")));
+                }
+            } catch (Exception e) {
+                LOGGER.error("The original DAG {} failed.", isPaused ? "stop" 
: "start", e);
+                throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
+                        String.format("The original DAG %s failed: %s.", 
isPaused ? "stop" : "start", e.getMessage()));
+            }
+        }
+    }
+
+    private void switchAllTaskDAG(boolean isPaused) {
+        try {
+            AirflowResponse<DAGCollection> response = serverClient.sendRequest(
+                    new DAGCollectionUpdater(INLONG_OFFLINE_DAG_TASK_PREFIX, 
isPaused));
+            LOGGER.info("Response to {} task DAG : {}", isPaused ? "stop" : 
"start", response.toString());
+            if (!response.isSuccess()) {
+                throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
+                        String.format("Failed to %s task DAGs.", isPaused ? 
"stop" : "start"));
+            }
+            if (!isPaused) {
+                List<DAG> dagList = response.getData().getDags();
+                if (dagList != null) {
+                    dagList.forEach(dag -> scheduledJobSet
+                            
.add(dag.getDagId().substring(INLONG_OFFLINE_DAG_TASK_PREFIX.length() - 1)));
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("Failed to {} task DAGs.", isPaused ? "stop" : 
"start", e);
+            throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
+                    String.format("Failed to %s task DAGs: %s", isPaused ? 
"stop" : "start", e.getMessage()));
+        }
+    }
+
+    @Override
+    public boolean handleRegister(ScheduleInfo scheduleInfo) {
+        try {
+            LOGGER.info("Registering DAG for {}", 
scheduleInfo.getInlongGroupId());
+            return doRegister(scheduleInfo, true);
+        } catch (Exception e) {
+            LOGGER.error("The Airflow scheduling task with Group ID {} failed 
to register.",
+                    scheduleInfo.getInlongGroupId(), e);
+            throw new AirflowScheduleException(SCHEDULE_TASK_REGISTER_FAILED,
+                    String.format("The Airflow scheduling task with Group ID 
%s failed to register: %s",
+                            scheduleInfo.getInlongGroupId(), e.getMessage()));
+        }
+    }
+
+    @Override
+    public boolean handleUnregister(String groupId) {
+        LOGGER.info("Unregistering Airflow Dag with GroupId {} ", groupId);
+        if (scheduledJobSet.contains(groupId)) {
+            try {
+                if (!completelyDelete(DAGUtil.buildDAGIdByGroupId(groupId))) {
+                    return false;
+                }
+            } catch (Exception e) {
+                LOGGER.warn("May not be completely removed {}", groupId, e);
+            }
+        }
+        scheduledJobSet.remove(groupId);
+        LOGGER.info("Un-registered airflow schedule info for {}", groupId);
+        return true;
+    }
+
+    private boolean completelyDelete(String groupId) throws Exception {
+        // Trigger the removal of the DAG file for the Cleaner DAG
+        DAGRunConf dagRunConf = DAGRunConf.builder()
+                .inlongGroupId(DAGUtil.buildDAGIdByGroupId(groupId)).build();
+        AirflowResponse<DAGRun> response = serverClient.sendRequest(
+                new DAGRunsTrigger(airflowConfig.getDagCleanerId(), 
ImmutableMap.of("conf", dagRunConf)));
+        LOGGER.info("Response to DAG file clearing: {}", response.toString());
+        if (!response.isSuccess()) {
+            LOGGER.warn("Failed to delete DAG file corresponding to {}.", 
groupId);
+            return false;
+        }
+        // Delete DAG tasks that have been loaded into memory
+        AirflowResponse<Object> deleteResponse = serverClient.sendRequest(new 
DAGDeletor(groupId));
+        LOGGER.info("Response to DAG scheduling instance clearing: {}", 
deleteResponse.toString());
+        if (!deleteResponse.isSuccess()) {
+            LOGGER.warn("Failed to delete DAG instance corresponding to {}.", 
groupId);
+        }
+        return deleteResponse.isSuccess();
+    }
+
+    @Override
+    public boolean handleUpdate(ScheduleInfo scheduleInfo) {
+        try {
+            LOGGER.info("Updating DAG for {}", 
scheduleInfo.getInlongGroupId());
+            return doRegister(scheduleInfo, false);
+        } catch (Exception e) {
+            LOGGER.error("The Airflow scheduling task with Group ID {} failed 
to update.",
+                    scheduleInfo.getInlongGroupId(), e);
+            throw new AirflowScheduleException(SCHEDULE_TASK_UPDATE_FAILED,
+                    String.format("The Airflow scheduling task with Group ID 
%s failed to update: %s",
+                            scheduleInfo.getInlongGroupId(), e.getMessage()));
+        }
+    }
+
+    public boolean doRegister(ScheduleInfo scheduleInfo, boolean isFirst) 
throws Exception {
+        if (isFirst && 
scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
+            throw new AirflowScheduleException(DAG_DUPLICATE,
+                    String.format("Group %s is already registered", 
scheduleInfo.getInlongGroupId()));
+        }
+        DAGRunConf.DAGRunConfBuilder confBuilder = DAGRunConf.builder()
+                .inlongGroupId(scheduleInfo.getInlongGroupId())
+                .startTime(scheduleInfo.getStartTime().getTime())
+                .endTime(scheduleInfo.getEndTime().getTime())
+                .boundaryType(BoundaryType.TIME.getType())
+                .connectionId(airflowConfig.getConnectionId())
+                .timezone(DEFAULT_TIMEZONE);
+        if (scheduleInfo.getScheduleType() == 1) {
+            confBuilder = 
confBuilder.cronExpr(scheduleInfo.getCrontabExpression());
+        } else {
+            confBuilder = 
confBuilder.secondsInterval(DateUtil.intervalToSeconds(scheduleInfo.getScheduleInterval(),
+                    scheduleInfo.getScheduleUnit()))
+                    
.startTime(ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit()) == 
ScheduleUnit.ONE_ROUND
+                            ? scheduleInfo.getEndTime().getTime()
+                            : scheduleInfo.getStartTime().getTime());
+        }
+        DAGRunConf dagRunConf = confBuilder.build();
+        AirflowResponse<DAGRun> response = serverClient.sendRequest(
+                new DAGRunsTrigger(airflowConfig.getDagCreatorId(), 
ImmutableMap.of("conf", dagRunConf)));
+        LOGGER.info("DAG {} response: {}", isFirst ? "registration" : 
"update", response.toString());
+        if (response.isSuccess()) {
+            scheduledJobSet.add(scheduleInfo.getInlongGroupId());
+        }
+        return response.isSuccess();
+    }
+
+    @Override
+    public void stop() {
+        try {
+            switchOriginalDAG(true);
+            switchAllTaskDAG(true);
+        } catch (Exception e) {
+            LOGGER.error("Airflow Schedule Engine shutdown failed: ", e);
+            throw new AirflowScheduleException(SCHEDULE_ENGINE_SHUTDOWN_FAILED,
+                    String.format("Airflow Schedule Engine shutdown failed: 
%s", e.getMessage()));
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java
new file mode 100644
index 0000000000..be67a36475
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.Error;
+import org.apache.inlong.manager.schedule.airflow.api.AirflowApi;
+import org.apache.inlong.manager.schedule.airflow.api.AirflowResponse;
+import org.apache.inlong.manager.schedule.airflow.config.AirflowConfig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A unified class used for Airflow RESTful API processing.
+ */
+public class AirflowServerClient {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AirflowServerClient.class);
+    private final OkHttpClient httpClient;
+    private final AirflowConfig config;
+    private final ObjectMapper objectMapper;
+
+    public AirflowServerClient(OkHttpClient httpClient, AirflowConfig config) {
+        this.httpClient = httpClient;
+        this.config = config;
+        this.objectMapper = new ObjectMapper();
+    }
+
+    /**
+     * Send request and parse response
+     *
+     * @param apiEndpoint apiEndpoint
+     * @param <T>         Response to Generic Types
+     * @return Parsed response object
+     * @throws IOException Network request exception
+     */
+    public <T> AirflowResponse<T> sendRequest(AirflowApi<T> apiEndpoint) 
throws IOException {
+        Request request = apiEndpoint.buildRequest(config.getBaseUrl());
+        try (Response response = httpClient.newCall(request).execute()) {
+            String responseBody = response.body().string();
+            if (response.isSuccessful()) {
+                return new AirflowResponse<>(true, 
objectMapper.readValue(responseBody, apiEndpoint.getResponseType()));
+            } else {
+                logger.error("Airflow Web API Request failed, status code: {} 
, detail: {}",
+                        response.code(), objectMapper.readValue(responseBody, 
Error.class).getDetail());
+                return new AirflowResponse<>(false, null);
+            }
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java
new file mode 100644
index 0000000000..4ff1a3284d
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api;
+
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import org.springframework.http.HttpMethod;
+
+import java.util.Map;
+/**
+ * Represents a generic interface for defining and constructing API requests 
to interact with Airflow.
+ * This interface provides methods for specifying HTTP methods, endpoint 
paths, parameters,
+ * request bodies, and constructing complete requests.
+ * @param <T> the type of the response expected from the API, allowing 
flexibility for various response types.
+ */
+public interface AirflowApi<T> {
+
+    /**
+     * Get HTTP Method
+     * @return HTTP Method
+     */
+    HttpMethod getMethod();
+
+    /**
+     * Get the requested path (relative to baseUrl)
+     * @return Request path
+     */
+    String getPath();
+
+    /**
+     * Get path parameters to replace placeholders in the path (e.g. : 
"/api/v1/dags/{dag_id}/dagRuns")
+     * @return Path parameter map
+     */
+    Map<String, String> getPathParams();
+
+    /**
+     * Get query parameters (e.g. "?Key=value")
+     * @return GET parameter map
+     */
+    Map<String, Object> getQueryParams();
+
+    /**
+     * Get the request body (applicable to methods such as POST, PUT, etc.)
+     * @return Post RequestBody Object
+     */
+    RequestBody getRequestBody();
+
+    /**
+     * Constructing a complete Request object
+     * @param baseUrl Base URL
+     * @return Constructed Request object
+     */
+    Request buildRequest(String baseUrl);
+
+    /**
+     * Returns the type of the response expected from this method.
+     * @return The expected response type.
+     */
+    Class<T> getResponseType();
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java
similarity index 51%
copy from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
copy to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java
index ac71e4e2d1..60e0ef6366 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java
@@ -15,20 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.schedule.airflow.api;
 
-import lombok.Getter;
+/**
+ * A generic response wrapper for handling responses from Airflow services.
+ * @param <T> the type of data included in the response, allowing flexibility 
for various data types.
+ */
+public class AirflowResponse<T> {
 
-@Getter
-public enum ScheduleEngineType {
+    private final boolean success;
+    private final T data;
 
-    NONE("None"),
-    QUARTZ("Quartz"),
-    DOLPHINSCHEDULER("DolphinScheduler");
+    public AirflowResponse(boolean success, T data) {
+        this.success = success;
+        this.data = data;
+    }
 
-    private final String type;
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public T getData() {
+        return data;
+    }
 
-    ScheduleEngineType(String type) {
-        this.type = type;
+    @Override
+    public String toString() {
+        return "AirflowResponse{" +
+                "success=" + success +
+                ", data=" + data +
+                '}';
     }
-}
\ No newline at end of file
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java
new file mode 100644
index 0000000000..18a1ed5206
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api;
+
+import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.MediaType;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import org.springframework.http.HttpMethod;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED;
+
+/**
+ * The basic implementation of Airflow API interface.
+ *
+ * @param <T> the type of the response expected from the API, allowing 
flexibility for various response types.
+ */
+
+@Slf4j
+public abstract class BaseAirflowApi<T> implements AirflowApi<T> {
+
+    protected static final ObjectMapper objectMapper = new ObjectMapper();
+    protected Map<String, String> pathParams = Maps.newHashMap();
+    protected Map<String, Object> queryParams = Maps.newHashMap();
+    protected Map<String, Object> requestBodyParams = Maps.newHashMap();
+
+    @Override
+    public abstract HttpMethod getMethod();
+
+    @Override
+    public abstract String getPath();
+
+    @Override
+    public abstract Class<T> getResponseType();
+
+    @Override
+    public Map<String, String> getPathParams() {
+        return pathParams;
+    }
+
+    @Override
+    public Map<String, Object> getQueryParams() {
+        return queryParams;
+    }
+
+    /**
+     * Create JSON request body
+     * @return RequestBody Object
+     */
+    @Override
+    public RequestBody getRequestBody() {
+        try {
+            return RequestBody.create(MediaType.parse("application/json; 
charset=utf-8"),
+                    objectMapper.writeValueAsString(requestBodyParams));
+        } catch (Exception e) {
+            log.error("Airflow request body construction failed: {}", 
e.getMessage(), e);
+            throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED,
+                    String.format("Airflow request body construction failed: 
%s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public Request buildRequest(String baseUrl) {
+        // Build a complete URL
+        String path = buildPathParams(getPath(), getPathParams());
+        String url = baseUrl + path;
+
+        // Add query parameters
+        if (!getQueryParams().isEmpty()) {
+            String queryString = buildQueryString(getQueryParams());
+            url += "?" + queryString;
+        }
+
+        // Build Request Builder
+        Request.Builder builder = new Request.Builder().url(url);
+
+        // Set requests based on HTTP methods
+        switch (getMethod()) {
+            case GET:
+                builder.get();
+                break;
+            case POST:
+                builder.post(getRequestBody());
+                break;
+            case PATCH:
+                builder.patch(getRequestBody());
+                break;
+            case PUT:
+                builder.put(getRequestBody());
+                break;
+            case DELETE:
+                if (!requestBodyParams.isEmpty()) {
+                    builder.delete(getRequestBody());
+                } else {
+                    builder.delete();
+                }
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported HTTP method: " 
+ getMethod());
+        }
+        return builder.build();
+    }
+
+    private String buildPathParams(String path, Map<String, String> 
pathParams) {
+        for (Map.Entry<String, String> entry : pathParams.entrySet()) {
+            path = path.replace("{" + entry.getKey() + "}", entry.getValue());
+        }
+        return path;
+    }
+
+    private String buildQueryString(Map<String, Object> queryParams) {
+        StringBuilder sb = new StringBuilder();
+        // Multiple values can be specified for the same parameter name in the 
Get parameter.
+        // (e.g. "?Key=value1&Key=value2")
+        queryParams.forEach((key, value) -> {
+            if (value instanceof List) {
+                ((List) value).forEach(item -> 
sb.append(key).append("=").append(item).append("&"));
+            } else {
+                sb.append(key).append("=").append(value).append("&");
+            }
+        });
+        if (sb.length() > 0) {
+            sb.setLength(sb.length() - 1);
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java
new file mode 100644
index 0000000000..1e5b7d737c
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.connection;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection;
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.MediaType;
+import okhttp3.RequestBody;
+import org.springframework.http.HttpMethod;
+
+import java.util.Map;
+
+import static 
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED;
+
+/**
+ * Build call for AirflowConnectionGetter<br>
+ * <table border="10">
+ * <tr><th> Request Body Param </th><th> Description </th></tr>
+ * <tr><td> connection_id </td><td> The connection ID. </td></tr>
+ * <tr><td> conn_type </td><td> The connection type. </td></tr>
+ * <tr><td> description </td><td> The description of the connection. </td></tr>
+ * <tr><td> host </td><td> Host of the connection. </td></tr>
+ * <tr><td> login </td><td> Login of the connection. </td></tr>
+ * <tr><td> schema </td><td> Schema of the connection. </td></tr>
+ * <tr><td> port </td><td> Port of the connection. </td></tr>
+ * <tr><td> password </td><td> Password of the connection. </td></tr>
+ * <tr><td> extra </td><td> Other values that cannot be put into another 
field, e.g. RSA keys.(optional) </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td></tr>
+ * <tr><td> 400 </td><td> Client specified an invalid argument. </td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, 
authentication info. </td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission. 
</td></tr>
+ * </table>
+ */
+@Slf4j
+public class AirflowConnectionCreator extends 
BaseAirflowApi<AirflowConnection> {
+
+    AirflowConnection connection = null;
+
+    public AirflowConnectionCreator(AirflowConnection connection) {
+        this.connection = connection;
+    }
+
+    public AirflowConnectionCreator(Map<String, Object> requestBodyParams) {
+        this.requestBodyParams = requestBodyParams;
+    }
+
+    @Override
+    public RequestBody getRequestBody() {
+        if (connection != null) {
+            try {
+                return RequestBody.create(MediaType.parse("application/json; 
charset=utf-8"),
+                        objectMapper.writeValueAsString(connection));
+            } catch (Exception e) {
+                log.error("Airflow request body construction failed: {}", 
e.getMessage(), e);
+                throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED,
+                        String.format("Airflow request body construction 
failed: %s", e.getMessage()));
+            }
+        }
+        return super.getRequestBody();
+    }
+
+    @Override
+    public Class<AirflowConnection> getResponseType() {
+        return AirflowConnection.class;
+    }
+
+    @Override
+    public HttpMethod getMethod() {
+        return HttpMethod.POST;
+    }
+
+    @Override
+    public String getPath() {
+        return AirFlowAPIConstant.LIST_CONNECTIONS_URI;
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java
new file mode 100644
index 0000000000..7dc278ae0b
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.connection;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection;
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Build call for AirflowConnectionGetter<br>
+ * <table border="10">
+ * <tr><th> Path Param </th><th> Description </th></tr>
+ * <tr><td> connection_id </td><td> The connection ID. </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, 
authentication info. </td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission. 
</td></tr>
+ * <tr><td> 404 </td><td> A specified resource is not found. </td></tr>
+ * </table>
+ */
+public class AirflowConnectionGetter extends BaseAirflowApi<AirflowConnection> 
{
+
+    public AirflowConnectionGetter(String connectionId) {
+        pathParams.put("connection_id", connectionId);
+    }
+
+    @Override
+    public HttpMethod getMethod() {
+        return HttpMethod.GET;
+    }
+
+    @Override
+    public String getPath() {
+        return AirFlowAPIConstant.GET_CONNECTION_URI;
+    }
+
+    @Override
+    public Class<AirflowConnection> getResponseType() {
+        return AirflowConnection.class;
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java
new file mode 100644
index 0000000000..039a18d9d8
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.dag;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection;
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+
+import org.springframework.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Build call for DAGCollectionUpdater< br>
+ * <table border="10">
+ * <tr><th> GET Param </th><th> Description </th></tr>
+ * <tr><td> limit </td><td> The numbers of items to return. (optional, default 
to 100) </td></tr>
+ * <tr><td> offset </td><td> The number of items to skip before starting to 
collect the result set. (optional) </td></tr>
+ * <tr><td> tags </td><td> List of tags to filter results.(optional) </td></tr>
+ * <tr><td> update_mask </td><td> The fields to update on the resource. If 
absent or empty, all modifiable fields are updated. A comma-separated list of 
fully qualified names of fields.(optional) </td></tr>
+ * <tr><td> only_active </td><td> Only filter active DAGs. (optional, default 
to true) </td></tr>
+ * <tr><td> dag_id_pattern </td><td> If set, only return DAGs with dag_ids 
matching this pattern. (required) </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> Request Body Param </th><th> Description </th></tr>
+ * <tr><td> is_paused </td><td> Whether the DAG is paused. </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td><td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, 
authentication info. </td><td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission. 
</td><td></tr>
+ * <tr><td> 404 </td><td> A specified resource is not found. </td><td></tr>
+ * </table>
+ */
+public class DAGCollectionUpdater extends BaseAirflowApi<DAGCollection> {
+
+    public DAGCollectionUpdater(String dagIdPattern, boolean isPaused) {
+        this.queryParams.put("dag_id_pattern", dagIdPattern);
+        this.requestBodyParams.put("is_paused", isPaused);
+    }
+
+    public DAGCollectionUpdater(Map<String, Object> queryParams, Map<String, 
Object> requestBodyParams) {
+        this.queryParams = queryParams;
+        this.requestBodyParams = requestBodyParams;
+    }
+
+    @Override
+    public HttpMethod getMethod() {
+        return HttpMethod.PATCH;
+    }
+
+    @Override
+    public String getPath() {
+        return AirFlowAPIConstant.LIST_DAGS_URI;
+    }
+
+    @Override
+    public Class<DAGCollection> getResponseType() {
+        return DAGCollection.class;
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java
new file mode 100644
index 0000000000..23a348d766
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.dag;
+
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Build call for DAGDeleter< br>
+ * <table border="10">
+ * <tr><th> Path Param </th><th> Description </th></tr>
+ * <tr><td> dag_id </td><td> The DAG ID. </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> GET Param </th><th> Description </th></tr>
+ * <tr><td> update_mask </td><td> The fields to update on the resource. If 
absent or empty, all modifiable fields are updated. A comma-separated list of 
fully qualified names of fields.(optional) </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> Request Body Param </th><th> Description </th></tr>
+ * <tr><td> is_paused </td><td> Whether the DAG is paused. </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, 
authentication info. </td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission. 
</td></tr>
+ * <tr><td> 404 </td><td> A specified resource is not found. </td></tr>
+ * </table>
+ */
+public class DAGDeletor extends BaseAirflowApi<Object> {
+
+    public DAGDeletor(String dagId) {
+        this.pathParams.put("dag_id", dagId);
+    }
+    @Override
+    public HttpMethod getMethod() {
+        return HttpMethod.DELETE;
+    }
+
+    @Override
+    public String getPath() {
+        return AirFlowAPIConstant.UPDATE_DAG_URI;
+    }
+
+    @Override
+    public Class<Object> getResponseType() {
+        return Object.class;
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java
new file mode 100644
index 0000000000..be8313f1b1
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.dag;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.DAG;
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Build call for DAGUpdater< br>
+ * <table border="10">
+ * <tr><th> Path Param </th><th> Description </th></tr>
+ * <tr><td> dag_id </td><td> The DAG ID. </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> GET Param </th><th> Description </th></tr>
+ * <tr><td> update_mask </td><td> The fields to update on the resource. If 
absent or empty, all modifiable fields are updated. A comma-separated list of 
fully qualified names of fields.(optional) </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> Request Body Param </th><th> Description </th></tr>
+ * <tr><td> is_paused </td><td> Whether the DAG is paused. </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, 
authentication info. </td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission. 
</td></tr>
+ * <tr><td> 404 </td><td> A specified resource is not found. </td></tr>
+ * </table>
+ */
+public class DAGUpdater extends BaseAirflowApi<DAG> {
+
+    public DAGUpdater(String dagId, boolean isPaused) {
+        this.pathParams.put("dag_id", dagId);
+        this.requestBodyParams.put("is_paused", isPaused);
+    }
+
+    public DAGUpdater(String dagId, String updateMask, boolean isPaused) {
+        this.pathParams.put("dag_id", dagId);
+        this.queryParams.put("update_mask", updateMask);
+        this.requestBodyParams.put("is_paused", isPaused);
+    }
+
+    @Override
+    public HttpMethod getMethod() {
+        return HttpMethod.PATCH;
+    }
+
+    @Override
+    public String getPath() {
+        return AirFlowAPIConstant.UPDATE_DAG_URI;
+    }
+
+    @Override
+    public Class<DAG> getResponseType() {
+        return DAG.class;
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java
new file mode 100644
index 0000000000..b9fe7b2260
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.dagruns;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun;
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+
+import org.springframework.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Build call for DAGRunsTrigger <br>
+ * <table border="10">
+ * <tr><th> Path Param </th><th> Description </th></tr>
+ * <tr><td> dag_id </td><td> The DAG ID. </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> Request Body Param </th><th> Description </th></tr>
+ * <tr>
+ *     <td> conf </td>
+ *     <td>
+ *         JSON object describing additional configuration parameters. <br>
+ *         The value of this field can be set only when creating the object. 
If you try to modify the field of an existing object, the request fails with an 
BAD_REQUEST error.<br>
+ *     </td>
+ * </tr>
+ * <tr>
+ *     <td> dag_run_id </td>
+ *     <td> Run ID. <br>
+ *         The value of this field can be set only when creating the object. 
If you try to modify the field of an existing object, the request fails with an 
BAD_REQUEST error. <br>
+ *         If not provided, a value will be generated based on execution_date. 
<br>
+ *         If the specified dag_run_id is in use, the creation request fails 
with an ALREADY_EXISTS error. <br>
+ *         This together with DAG_ID are a unique key.<br>
+ *     </td>
+ * </tr>
+ * <tr><td> data_interval_end </td><td> The end of the interval the DAG run 
covers. </td></tr>
+ * <tr><td> data_interval_start </td><td> The beginning of the interval the 
DAG run covers. </td></tr>
+ * <tr>
+ *     <td> logical_date </td>
+ *     <td>
+ *         The logical date (previously called execution date). This is the 
time or interval covered by this DAG run, according to the DAG definition. <br>
+ *         The value of this field can be set only when creating the object. 
If you try to modify the field of an existing object, the request fails with an 
BAD_REQUEST error.<br>
+ *         This together with DAG_ID are a unique key. <br>
+ *     </td>
+ * </tr>
+ * <tr><td> note </td><td> Contains manually entered notes by the user about 
the DagRun. </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, 
authentication info. </td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission. 
</td></tr>
+ * <tr><td> 404 </td><td> A specified resource is not found. </td></tr>
+ * </table>
+ */
+
+public class DAGRunsTrigger extends BaseAirflowApi<DAGRun> {
+
+    public DAGRunsTrigger(String dagId) {
+        this.pathParams.put("dag_id", dagId);
+    }
+
+    public DAGRunsTrigger(String dagId, Map<String, Object> requestBodyParams) 
{
+        this.pathParams.put("dag_id", dagId);
+        this.requestBodyParams = requestBodyParams;
+    }
+
+    @Override
+    public HttpMethod getMethod() {
+        return HttpMethod.POST;
+    }
+
+    @Override
+    public String getPath() {
+        return AirFlowAPIConstant.TRIGGER_NEW_DAG_RUN_URI;
+    }
+
+    @Override
+    public Class<DAGRun> getResponseType() {
+        return DAGRun.class;
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
new file mode 100644
index 0000000000..60bb6673ce
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.config;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.schedule.airflow.AirflowServerClient;
+import 
org.apache.inlong.manager.schedule.airflow.interceptor.AirflowAuthInterceptor;
+import 
org.apache.inlong.manager.schedule.airflow.interceptor.LoggingInterceptor;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import okhttp3.OkHttpClient;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Data
+@Configuration
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class AirflowConfig extends ClientConfiguration {
+
+    @Value("${schedule.engine.airflow.inlong.manager.host:127.0.0.1}")
+    private String host;
+
+    @Value("${server.port:8083}")
+    private int port;
+
+    @Value("${default.admin.user:admin}")
+    private String inlongUsername;
+
+    @Value("${default.admin.password:inlong}")
+    private String inlongPassword;
+
+    @Value("${schedule.engine.airflow.connection.id:inlong_connection}")
+    private String connectionId;
+
+    @Value("${schedule.engine.airflow.cleaner.id:dag_cleaner}")
+    private String dagCleanerId;
+
+    @Value("${schedule.engine.airflow.creator.id:dag_creator}")
+    private String dagCreatorId;
+
+    @Value("${schedule.engine.airflow.username:airflow}")
+    private String airflowUsername;
+
+    @Value("${schedule.engine.airflow.password:airflow}")
+    private String airflowPassword;
+
+    @Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}";)
+    private String baseUrl;
+
+    @Bean
+    public OkHttpClient okHttpClient() {
+        return new OkHttpClient.Builder()
+                .addInterceptor(new 
AirflowAuthInterceptor(this.getAirflowUsername(), this.getAirflowPassword()))
+                .addInterceptor(new LoggingInterceptor())
+                .connectTimeout(this.getConnectTimeout(), this.getTimeUnit())
+                .readTimeout(this.getReadTimeout(), this.getTimeUnit())
+                .writeTimeout(this.getWriteTimeout(), this.getTimeUnit())
+                .retryOnConnectionFailure(true)
+                .build();
+    }
+    @Bean
+    public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, 
AirflowConfig airflowConfig) {
+        return new AirflowServerClient(okHttpClient, airflowConfig);
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java
new file mode 100644
index 0000000000..714614bf94
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.interceptor;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import java.io.IOException;
+import java.util.Base64;
+
+/**
+ * AirflowAuthInterceptor
+ * Before okhttp call a request, uniformly encapsulate the relevant parameters 
of authentication
+ */
+@Slf4j
+public class AirflowAuthInterceptor implements Interceptor {
+
+    // Airflow Authentication Header
+    private final String authHeader;
+
+    public AirflowAuthInterceptor(String username, String password) {
+        String credentials = username + ":" + password;
+        this.authHeader = "Basic " + 
Base64.getEncoder().encodeToString(credentials.getBytes());
+    }
+
+    @Override
+    public Response intercept(Chain chain) throws IOException {
+        Request originalRequest = chain.request();
+        Request.Builder requestBuilder = originalRequest
+                .newBuilder()
+                .header("Authorization", authHeader);
+        return chain.proceed(requestBuilder.build());
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java
similarity index 52%
copy from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
copy to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java
index ac71e4e2d1..c3028385b1 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java
@@ -15,20 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.schedule.airflow.interceptor;
 
-import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
 
-@Getter
-public enum ScheduleEngineType {
+import java.io.IOException;
 
-    NONE("None"),
-    QUARTZ("Quartz"),
-    DOLPHINSCHEDULER("DolphinScheduler");
-
-    private final String type;
+/**
+ * LoggingInterceptor
+ * Provide unified logging for okhttp
+ */
+@Slf4j
+public class LoggingInterceptor implements Interceptor {
 
-    ScheduleEngineType(String type) {
-        this.type = type;
+    @Override
+    public Response intercept(Chain chain) throws IOException {
+        Request request = chain.request();
+        Response response = chain.proceed(request);
+        log.info("Airflow API request information - Address: {}, URI: {}, 
Request method: {}, Response status code: {}",
+                request.url(), request.url().uri(), request.method(), 
response.code());
+        return response;
     }
-}
\ No newline at end of file
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java
similarity index 71%
copy from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
copy to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java
index ac71e4e2d1..fad05f2116 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java
@@ -15,20 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.schedule.airflow.util;
 
-import lombok.Getter;
+import static 
org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX;
 
-@Getter
-public enum ScheduleEngineType {
+public class DAGUtil {
 
-    NONE("None"),
-    QUARTZ("Quartz"),
-    DOLPHINSCHEDULER("DolphinScheduler");
-
-    private final String type;
-
-    ScheduleEngineType(String type) {
-        this.type = type;
+    public static String buildDAGIdByGroupId(String groupId) {
+        return INLONG_OFFLINE_DAG_TASK_PREFIX.concat(groupId);
     }
-}
\ No newline at end of file
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java
new file mode 100644
index 0000000000..950e334921
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.util;
+
+import org.apache.inlong.manager.schedule.ScheduleUnit;
+
+import java.math.BigInteger;
+import java.util.Objects;
+
+public class DateUtil {
+
+    public static String intervalToSeconds(long interval, String timeUnit) {
+        BigInteger seconds = new BigInteger(String.valueOf(interval));
+        String intervalStr = "";
+        switch 
(Objects.requireNonNull(ScheduleUnit.getScheduleUnit(timeUnit))) {
+            case SECOND:
+                intervalStr = "1";
+                break;
+            case MINUTE:
+                intervalStr = "60";
+                break;
+            case HOUR:
+                intervalStr = "3600";
+                break;
+            case DAY:
+                intervalStr = "86400";
+                break;
+            case WEEK:
+                intervalStr = "604800";
+                break;
+            case MONTH:
+                intervalStr = "2592000";
+                break;
+            case YEAR:
+                intervalStr = "31536000";
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported time unit");
+        }
+        return seconds.multiply(new BigInteger(intervalStr)).toString();
+    }
+
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java
new file mode 100644
index 0000000000..6b6830fb30
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.exception;
+
+/**
+ * Represents exceptions specific to the Airflow scheduling process.
+ * Each exception is associated with a specific error code for better 
identification.
+ */
+public class AirflowScheduleException extends RuntimeException {
+
+    /**
+     * Enum to define all error codes associated with Airflow scheduling 
exceptions.
+     */
+    public enum AirflowErrorCode {
+        INIT_CONNECTION_FAILED,
+        TASK_DAG_SWITCH_FAILED,
+        SCHEDULE_TASK_REGISTER_FAILED,
+        SCHEDULE_TASK_UPDATE_FAILED,
+        SCHEDULE_ENGINE_SHUTDOWN_FAILED,
+        BUILD_REQUEST_BODY_FAILED,
+        DAG_DUPLICATE
+    }
+
+    private AirflowErrorCode errorCode;
+
+    public AirflowScheduleException(String message) {
+        super(message);
+    }
+    public AirflowScheduleException(AirflowErrorCode errorCode, String 
message) {
+        super(message);
+        this.errorCode = errorCode;
+    }
+
+    public AirflowScheduleException(AirflowErrorCode errorCode, String 
message, Throwable cause) {
+        super(message, cause);
+        this.errorCode = errorCode;
+    }
+
+    public AirflowErrorCode getErrorCode() {
+        return errorCode;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("ErrorCode: %s, Message: %s", errorCode, 
getMessage());
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java
new file mode 100644
index 0000000000..f99282a2ed
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.testcontainers.containers.ContainerState;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+@Slf4j
+public class AirflowContainerEnv {
+
+    public static String BASE_URL = "http://localhost:8080";;
+    public static String AIRFLOW_USERNAME = "airflow";
+    public static String AIRFLOW_PASSWORD = "airflow";
+    public static String NORMAL_POSTFIX = "_normal";
+    public static String CORN_POSTFIX = "_cron";
+    public static String AIRFLOW_SCHEDULER_CONTAINER_NAME = 
"airflow-scheduler_1";
+    public static String DOCKER_COMPOSE_YAML_PATH = 
"src/test/resources/airflow/docker-compose.yaml";
+    public static String DEFAULT_DAGS_PATH = "/opt/airflow/dags/";
+
+    private static DockerComposeContainer<?> environment;
+    private static OkHttpClient httpClient = new OkHttpClient();
+
+    public static void setUp() {
+        // Step 1: Start only the airflow-init service
+        environment = new DockerComposeContainer<>(new 
File(DOCKER_COMPOSE_YAML_PATH))
+                .withServices("airflow-init")
+                .withEnv("AIRFLOW_UID", "$(id -u)");
+        // Start the environment
+        environment.start();
+        // Step 2: Wait until the "airflow-init" service has completed 
initialization
+        // Once initialized, stop the init-only environment and start the full 
environment
+        environment.stop();
+        // Step 3: Start all services in detached mode after initialization
+        environment = new DockerComposeContainer<>(new 
File(DOCKER_COMPOSE_YAML_PATH))
+                .withEnv("AIRFLOW_UID", "0")
+                .withEnv("AIRFLOW__CORE__LOAD_EXAMPLES", "false")
+                .withEnv("AIRFLOW__API__AUTH_BACKEND",
+                        
"airflow.providers.fab.auth_manager.api.auth.backend.basic_auth");
+        environment.start();
+        copyTestDAGs();
+        waitForDAGsLoad("dag_cleaner");
+        log.info("Airflow runtime environment created successfully.");
+    }
+
+    public static void shutDown() {
+        if (environment != null) {
+            environment.stop();
+        }
+    }
+
+    private static void copyTestDAGs() {
+        // After the DAG file is created, the scheduler will regularly scan 
the DAG file directory and
+        // then load it into memory for scheduling. In order to quickly test 
the update and unregister, two
+        // test DAGs need to be loaded at the beginning.
+        Optional<ContainerState> container = 
environment.getContainerByServiceName(AIRFLOW_SCHEDULER_CONTAINER_NAME);
+        if (container.isPresent()) {
+            ContainerState airflowScheduler = container.get();
+            Path dagPath1 = 
Paths.get("src/test/resources/airflow/dag_cleaner.py").toAbsolutePath();
+            Path dagPath2 = 
Paths.get("src/test/resources/airflow/dag_creator.py").toAbsolutePath();
+            Path dagPath3 = 
Paths.get("src/test/resources/airflow/testGroup_cron.py").toAbsolutePath();
+            Path dagPath4 = 
Paths.get("src/test/resources/airflow/testGroup_normal.py").toAbsolutePath();
+            
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath1),
+                    DEFAULT_DAGS_PATH.concat("dag_cleaner.py"));
+            
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath2),
+                    DEFAULT_DAGS_PATH.concat("dag_creator.py"));
+            
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath3),
+                    DEFAULT_DAGS_PATH.concat("testGroup_cron.py"));
+            
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath4),
+                    DEFAULT_DAGS_PATH.concat("testGroup_normal.py"));
+            try {
+                String result =
+                        airflowScheduler.execInContainer("bash", "-c", "ls 
".concat(DEFAULT_DAGS_PATH)).getStdout();
+                log.info(DEFAULT_DAGS_PATH.concat(" has file: {}"), result);
+            } catch (Exception e) {
+                log.warn(String.format(
+                        "Copying the test DAG file may have failed. Docker 
Container command(\"%s\") execution failed.",
+                        "ls ".contains(DEFAULT_DAGS_PATH)), e);
+            }
+        } else {
+            log.error(String.format("Copying test DAG file failed. Airflow 
scheduler container(%s) does not exist.",
+                    AIRFLOW_SCHEDULER_CONTAINER_NAME));
+            throw new AirflowScheduleException("Copying test DAG file 
failed.");
+        }
+        log.info("Copy test DAG file successfully.");
+    }
+
+    public static void waitForDAGsLoad(String dagId) {
+        int total = 10;
+        // Waiting for Airflow to load the initial DAG
+        while (total > 0) {
+            String credential = okhttp3.Credentials.basic(AIRFLOW_USERNAME, 
AIRFLOW_PASSWORD);
+            Request request = new Request.Builder()
+                    .url(BASE_URL + "/api/v1/dags/" + dagId + "/details")
+                    .header("Authorization", credential)
+                    .build();
+            try (Response response = httpClient.newCall(request).execute()) {
+                if (response.code() == 200) {
+                    break;
+                }
+            } catch (Exception e) {
+                log.error("The request to check if the original DAG exists 
failed: {}", e.getMessage(), e);
+            }
+            try {
+                Thread.sleep(30000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            total--;
+        }
+        log.info("DAG successfully loaded.");
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java
new file mode 100644
index 0000000000..fe5d070afd
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.schedule.BaseScheduleTest;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import 
org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.ComponentScan;
+
+import static 
org.apache.inlong.manager.schedule.airflow.AirflowContainerEnv.CORN_POSTFIX;
+import static 
org.apache.inlong.manager.schedule.airflow.AirflowContainerEnv.NORMAL_POSTFIX;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Slf4j
+@EnableConfigurationProperties
+@ComponentScan(basePackages = "org.apache.inlong.manager")
+@SpringBootTest(classes = AirflowScheduleEngineTest.class)
+public class AirflowScheduleEngineTest {
+
+    @Autowired
+    private AirflowScheduleEngine scheduleEngine;
+    private static BaseScheduleTest baseScheduleTest = new BaseScheduleTest();
+
+    @BeforeAll
+    public static void initScheduleEngine() {
+        try {
+            AirflowContainerEnv.setUp();
+        } catch (Exception e) {
+            log.error("Airflow runtime environment creation failed.", e);
+            throw new RuntimeException(
+                    String.format("Airflow runtime environment creation 
failed: %s", e.getMessage()));
+        }
+    }
+
+    @AfterAll
+    public static void stopScheduleEngine() {
+        AirflowContainerEnv.shutDown();
+    }
+
+    @Test
+    @Order(1)
+    public void testRegisterScheduleInfo() {
+        // 1. test for normal schedule
+        ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo();
+        String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX + 
System.currentTimeMillis();
+        scheduleInfo.setInlongGroupId(groupId);
+        assertTrue(scheduleEngine.handleRegister(scheduleInfo));
+
+        // 2. test for cron schedule
+        scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo();
+        groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX + 
System.currentTimeMillis();
+        scheduleInfo.setInlongGroupId(groupId);
+        assertTrue(scheduleEngine.handleRegister(scheduleInfo));
+    }
+
+    @Test
+    @Order(2)
+    public void testUpdateScheduleInfo() {
+        // 1. test for normal schedule
+        ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo();
+        String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX;
+        scheduleInfo.setInlongGroupId(groupId);
+        assertTrue(scheduleEngine.handleUpdate(scheduleInfo));
+
+        // 2. test for cron schedule, gen cron schedule info, */2 * * * * ?
+        scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo();
+        groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX;
+        scheduleInfo.setInlongGroupId(groupId);
+        assertTrue(scheduleEngine.handleUpdate(scheduleInfo));
+    }
+
+    @Test
+    @Order(3)
+    public void testUnRegisterScheduleInfo() {
+        // 1. test for normal schedule
+        ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo();
+        String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX;
+        scheduleInfo.setInlongGroupId(groupId);
+        
assertTrue(scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId()));
+
+        // 2. test for cron schedule, gen cron schedule info, */2 * * * * ?
+        scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo();
+        groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX;
+        scheduleInfo.setInlongGroupId(groupId);
+        
assertTrue(scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId()));
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py 
b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
new file mode 100644
index 0000000000..be20fe1bb1
--- /dev/null
+++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import DAG
+from datetime import datetime, timedelta
+from airflow.operators.python_operator import PythonOperator
+from airflow.models import Variable
+from airflow.utils.dates import days_ago
+from datetime import datetime
+import os
+import logging
+import pytz
+from croniter import croniter
+from airflow.hooks.base_hook import BaseHook
+from airflow import configuration
+
+DAG_PATH = configuration.get('core', 'dags_folder') + "/"
+
+
+def clean_expired_dags(**context):
+    original_time = context.get('execution_date')
+    target_timezone = pytz.timezone("Asia/Shanghai")
+    utc_time = original_time.astimezone(target_timezone)
+    current_time = utc_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
+    logging.info(f"Current time: {current_time}")
+    for dag_file in os.listdir(DAG_PATH):
+        if dag_file.endswith(".py") and 
dag_file.startswith("inlong_offline_task_"):
+            with open(DAG_PATH + dag_file, "r") as file:
+                line = file.readline()
+                while line and "end_offset_datetime_str" not in line:
+                    line = file.readline()
+                end_date_str = None
+                if len(line.split("=")) > 1:
+                    end_date_str = line.split("=")[1].strip().strip("\"")
+                logging.info(f"DAG end time: {end_date_str}")
+                if end_date_str:
+                    try:
+                        if str(current_time) > str(end_date_str):
+                            dag_file_path = os.path.join(DAG_PATH, dag_file)
+                            os.remove(dag_file_path)
+                            # Optionally, delete the end_date variable
+                            logging.info(f"Deleted expired DAG: {dag_file}")
+                    except ValueError:
+                        logging.error(f"Invalid date format for DAG 
{dag_file}: {end_date_str}")
+
+
+default_args = {
+    'owner': 'airflow',
+    'start_date': datetime.now() - timedelta(minutes=5),
+    'catchup': False,
+    'tags': ["inlong"]
+}
+
+dag = DAG(
+    'dag_cleaner',
+    default_args=default_args,
+    schedule_interval="*/20 * * * *",
+    is_paused_upon_creation=False
+)
+
+clean_task = PythonOperator(
+    task_id='clean_expired_dags',
+    python_callable=clean_expired_dags,
+    provide_context=True,
+    dag=dag,
+)
diff --git 
a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py 
b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
new file mode 100644
index 0000000000..4034cf467c
--- /dev/null
+++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import DAG
+from airflow.operators.python_operator import PythonOperator
+from airflow.utils.dates import days_ago
+from airflow.models import Variable
+import os
+from datetime import datetime
+from airflow.hooks.base_hook import BaseHook
+from airflow import configuration
+
+DAG_PATH = configuration.get('core', 'dags_folder') + "/"
+DAG_PREFIX = 'inlong_offline_task_'
+
+def create_dag_file(**context):
+    conf = context.get('dag_run').conf
+    print('conf: ', conf)
+    groupId = conf.get('inlong_group_id')
+    task_name = DAG_PREFIX + groupId
+    timezone = conf.get('timezone')
+    boundaryType = str(conf.get('boundary_type'))
+    start_time = int(conf.get('start_time'))
+    end_time = int(conf.get('end_time'))
+    cron_expr = conf.get('cron_expr')
+    seconds_interval = conf.get('seconds_interval')
+    schedule_interval = cron_expr
+    if cron_expr is None or len(cron_expr) == 0:
+        schedule_interval = f'timedelta(seconds={seconds_interval})'
+    else:
+        schedule_interval = '"' + cron_expr + '"'
+    connectionId = conf.get('connection_id')
+    dag_content = f'''from airflow import DAG
+from datetime import datetime, timedelta
+from airflow.operators.python_operator import PythonOperator
+from datetime import datetime
+from croniter import croniter
+from airflow.hooks.base_hook import BaseHook
+import requests
+import pytz
+
+timezone = "{timezone}"
+start_offset_datetime_str = {start_time}
+end_offset_datetime_str = {end_time}
+schedule_interval = {schedule_interval}  # Or put cron expression
+dag_id = "{task_name}"
+groupId = "{groupId}"
+connectionId = "{connectionId}"
+boundaryType = "{boundaryType}"
+
+target_timezone = pytz.timezone(timezone)
+
+start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, 
tz=target_timezone)
+end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, 
tz=target_timezone)
+
+def taskFunction(**context):
+    print("#########################")
+    conn = BaseHook.get_connection(connectionId)
+    url = f"http://{{conn.host}}:{{conn.port}}/{{conn.schema}}";
+    params = {{
+        "username": conn.login,
+        "password": conn.password
+    }}
+    print("params", params)
+    headers = {{
+        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) 
Gecko/20100101 Firefox/131.0",
+        "Accept": "application/json",
+        "Accept-Language": 
"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
+        "Accept-Encoding": "gzip, deflate",
+        "Referer": "http://192.168.101.2:8083/";,
+        "Content-Type": "application/json;charset=UTF-8",
+        "tenant": "public",
+        "Origin": "http://192.168.101.2";,
+        "Connection": "close",
+        "Priority": "u=0"
+    }}
+    time_interval = get_time_interval(context)
+    data = {{
+        "boundaryType": boundaryType,
+        "groupId": groupId,
+        "lowerBoundary": str(int(time_interval[0])),
+        "upperBoundary": str(int(int(time_interval[1])))
+    }}
+    print("Request Body: ", data)
+    response = requests.post(url, params=params, headers=headers, json=data)
+    if response.status_code == 200:
+        print(response.json())
+    else:
+        print(response.text)
+    print("#########################")
+
+
+def get_time_interval(context):
+    execution_date = context.get('execution_date')
+    execution_date = execution_date.astimezone(target_timezone)
+    dag = context.get('dag')
+    schedule_interval = dag.schedule_interval
+    if isinstance(schedule_interval, timedelta):
+        return execution_date.timestamp(), (execution_date + 
schedule_interval).timestamp()
+    else:
+        cron_expr = dag.schedule_interval
+        cron = croniter(cron_expr, execution_date)
+        next_run = cron.get_next(datetime)
+        return execution_date.timestamp(), next_run.timestamp()
+
+
+default_args = {{
+    'owner': 'inlong',
+    'start_date': start_date,
+    'end_date': end_date,
+    'catchup': False,
+}}
+
+dag = DAG(
+    dag_id,
+    default_args=default_args,
+    schedule_interval=schedule_interval,
+    is_paused_upon_creation=False
+)
+
+clean_task = PythonOperator(
+    task_id=dag_id,
+    python_callable=taskFunction,
+    provide_context=True,
+    dag=dag,
+)
+    '''
+    dag_file_path = os.path.join(DAG_PATH, f'{task_name}.py')
+    with open(dag_file_path, 'w') as f:
+        f.write(dag_content)
+    print(f'Generated DAG file: {dag_file_path}')
+default_args = {'owner': 'airflow', 'start_date': days_ago(1), 'catchup': 
False}
+dag = DAG('dag_creator', default_args=default_args, schedule_interval=None, 
is_paused_upon_creation=False)
+create_dag_task = PythonOperator(task_id='create_dag_file', 
python_callable=create_dag_file, provide_context=True, dag=dag)
\ No newline at end of file
diff --git 
a/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml
 
b/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml
new file mode 100644
index 0000000000..c97195c03f
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml
@@ -0,0 +1,292 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Basic Airflow cluster configuration for CeleryExecutor with Redis and 
PostgreSQL.
+#
+# WARNING: This configuration is for local development. Do not use it in a 
production deployment.
+#
+# This configuration supports basic configuration using environment variables 
or an .env file
+# The following variables are supported:
+#
+# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
+#                                Default: apache/airflow:2.6.0
+# AIRFLOW_UID                  - User ID in Airflow containers
+#                                Default: 50000
+# AIRFLOW_PROJ_DIR             - Base path to which all the files will be 
volumed.
+#                                Default: .
+# Those configurations are useful mostly in case of standalone testing/running 
Airflow in test/try-out mode
+#
+# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if 
requested).
+#                                Default: airflow
+# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if 
requested).
+#                                Default: airflow
+# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when 
starting all containers.
+#                                Use this option ONLY for quick checks. 
Installing requirements at container
+#                                startup is done EVERY TIME the service is 
started.
+#                                A better way is to build a custom image or 
extend the official image
+#                                as described in 
https://airflow.apache.org/docs/docker-stack/build.html.
+#                                Default: ''
+#
+# Feel free to modify this file to suit your needs.
+---
+version: '3.8'
+x-airflow-common:
+  &airflow-common
+  # In order to add custom dependencies or upgrade provider packages you can 
use your extended image.
+  # Comment the image line, place your Dockerfile in the directory where you 
placed the docker-compose.yaml
+  # and uncomment the "build" line below, Then run `docker-compose build` to 
build the images.
+  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.0}
+  # build: .
+  environment:
+    &airflow-common-env
+    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
+    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@postgres/airflow
+    # For backward compatibility, with Airflow <2.3
+    AIRFLOW__CORE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@postgres/airflow
+    AIRFLOW__CELERY__RESULT_BACKEND: 
db+postgresql://airflow:airflow@postgres/airflow
+    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
+    AIRFLOW__CORE__FERNET_KEY: ''
+    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
+    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
+    AIRFLOW__API__AUTH_BACKENDS: 
'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
+    # yamllint disable rule:line-length
+    # Use simple http server on scheduler for health checks
+    # See 
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
+    # yamllint enable rule:line-length
+    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
+    # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
+    # for other purpose (development, test and especially production usage) 
build/extend Airflow image.
+    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
+  user: "${AIRFLOW_UID:-50000}:0"
+  depends_on:
+    &airflow-common-depends-on
+    redis:
+      condition: service_healthy
+    postgres:
+      condition: service_healthy
+
+services:
+  postgres:
+    image: postgres:13
+    environment:
+      POSTGRES_USER: airflow
+      POSTGRES_PASSWORD: airflow
+      POSTGRES_DB: airflow
+    volumes:
+      - postgres-db-volume:/var/lib/postgresql/data
+    healthcheck:
+      test: ["CMD", "pg_isready", "-U", "airflow"]
+      interval: 10s
+      retries: 5
+      start_period: 5s
+    restart: always
+
+  redis:
+    image: redis:latest
+    expose:
+      - 6379
+    healthcheck:
+      test: ["CMD", "redis-cli", "ping"]
+      interval: 10s
+      timeout: 30s
+      retries: 50
+      start_period: 30s
+    restart: always
+
+  airflow-webserver:
+    <<: *airflow-common
+    command: webserver
+    ports:
+      - "8080:8080"
+    healthcheck:
+      test: ["CMD", "curl", "--fail", "http://localhost:8080/health";]
+      interval: 30s
+      timeout: 10s
+      retries: 5
+      start_period: 30s
+    restart: always
+    depends_on:
+      <<: *airflow-common-depends-on
+      airflow-init:
+        condition: service_completed_successfully
+
+  airflow-scheduler:
+    <<: *airflow-common
+    command: scheduler
+    healthcheck:
+      test: ["CMD", "curl", "--fail", "http://localhost:8974/health";]
+      interval: 30s
+      timeout: 10s
+      retries: 5
+      start_period: 30s
+    restart: always
+    depends_on:
+      <<: *airflow-common-depends-on
+      airflow-init:
+        condition: service_completed_successfully
+
+  airflow-worker:
+    <<: *airflow-common
+    command: celery worker
+    healthcheck:
+      test:
+        - "CMD-SHELL"
+        - 'celery --app airflow.executors.celery_executor.app inspect ping -d 
"celery@$${HOSTNAME}"'
+      interval: 30s
+      timeout: 10s
+      retries: 5
+      start_period: 30s
+    environment:
+      <<: *airflow-common-env
+      # Required to handle warm shutdown of the celery workers properly
+      # See 
https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
+      DUMB_INIT_SETSID: "0"
+    restart: always
+    depends_on:
+      <<: *airflow-common-depends-on
+      airflow-init:
+        condition: service_completed_successfully
+
+  airflow-triggerer:
+    <<: *airflow-common
+    command: triggerer
+    healthcheck:
+      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob 
--hostname "$${HOSTNAME}"']
+      interval: 30s
+      timeout: 10s
+      retries: 5
+      start_period: 30s
+    restart: always
+    depends_on:
+      <<: *airflow-common-depends-on
+      airflow-init:
+        condition: service_completed_successfully
+
+  airflow-init:
+    <<: *airflow-common
+    entrypoint: /bin/bash
+    # yamllint disable rule:line-length
+    command:
+      - -c
+      - |
+        function ver() {
+          printf "%04d%04d%04d%04d" $${1//./ }
+        }
+        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu 
airflow airflow version)
+        airflow_version_comparable=$$(ver $${airflow_version})
+        min_airflow_version=2.2.0
+        min_airflow_version_comparable=$$(ver $${min_airflow_version})
+        if (( airflow_version_comparable < min_airflow_version_comparable )); 
then
+          echo
+          echo -e "\033[1;31mERROR!!!: Too old Airflow version 
$${airflow_version}!\e[0m"
+          echo "The minimum Airflow version supported: 
$${min_airflow_version}. Only use this or higher!"
+          echo
+          exit 1
+        fi
+        if [[ -z "${AIRFLOW_UID}" ]]; then
+          echo
+          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
+          echo "If you are on Linux, you SHOULD follow the instructions below 
to set "
+          echo "AIRFLOW_UID environment variable, otherwise files will be 
owned by root."
+          echo "For other operating systems you can get rid of the warning 
with manually created .env file:"
+          echo "    See: 
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user";
+          echo
+        fi
+        one_meg=1048576
+        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / 
one_meg))
+        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
+        disk_available=$$(df / | tail -1 | awk '{print $$4}')
+        warning_resources="false"
+        if (( mem_available < 4000 )) ; then
+          echo
+          echo -e "\033[1;33mWARNING!!!: Not enough memory available for 
Docker.\e[0m"
+          echo "At least 4GB of memory required. You have $$(numfmt --to iec 
$$((mem_available * one_meg)))"
+          echo
+          warning_resources="true"
+        fi
+        if (( cpus_available < 2 )); then
+          echo
+          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for 
Docker.\e[0m"
+          echo "At least 2 CPUs recommended. You have $${cpus_available}"
+          echo
+          warning_resources="true"
+        fi
+        if (( disk_available < one_meg * 10 )); then
+          echo
+          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for 
Docker.\e[0m"
+          echo "At least 10 GBs recommended. You have $$(numfmt --to iec 
$$((disk_available * 1024 )))"
+          echo
+          warning_resources="true"
+        fi
+        if [[ $${warning_resources} == "true" ]]; then
+          echo
+          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run 
Airflow (see above)!\e[0m"
+          echo "Please follow the instructions to increase amount of resources 
available:"
+          echo "   
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin";
+          echo
+        fi
+        exec /entrypoint airflow version
+    # yamllint enable rule:line-length
+    environment:
+      <<: *airflow-common-env
+      _AIRFLOW_DB_UPGRADE: 'true'
+      _AIRFLOW_WWW_USER_CREATE: 'true'
+      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
+      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
+      _PIP_ADDITIONAL_REQUIREMENTS: ''
+    user: "0:0"
+    volumes:
+      - ${AIRFLOW_PROJ_DIR:-.}:/sources
+
+  airflow-cli:
+    <<: *airflow-common
+    profiles:
+      - debug
+    environment:
+      <<: *airflow-common-env
+      CONNECTION_CHECK_MAX_COUNT: "0"
+    # Workaround for entrypoint issue. See: 
https://github.com/apache/airflow/issues/16252
+    command:
+      - bash
+      - -c
+      - airflow
+
+  # You can enable flower by adding "--profile flower" option e.g. 
docker-compose --profile flower up
+  # or by explicitly targeted on the command line e.g. docker-compose up 
flower.
+  # See: https://docs.docker.com/compose/profiles/
+  flower:
+    <<: *airflow-common
+    command: celery flower
+    profiles:
+      - flower
+    ports:
+      - "5555:5555"
+    healthcheck:
+      test: ["CMD", "curl", "--fail", "http://localhost:5555/";]
+      interval: 30s
+      timeout: 10s
+      retries: 5
+      start_period: 30s
+    restart: always
+    depends_on:
+      <<: *airflow-common-depends-on
+      airflow-init:
+        condition: service_completed_successfully
+
+volumes:
+  postgres-db-volume:
diff --git 
a/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py 
b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py
new file mode 100644
index 0000000000..b753eb7587
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import DAG
+from datetime import datetime, timedelta
+from airflow.operators.python_operator import PythonOperator
+from datetime import datetime
+from croniter import croniter
+from airflow.hooks.base_hook import BaseHook
+import requests
+import pytz
+
+timezone = "Asia/Shanghai"
+start_offset_datetime_str = 1731072908243
+end_offset_datetime_str = 1731142800000
+schedule_interval = "*/1 * * * *"  # Or put cron expression
+dag_id = "inlong_offline_task_testGroup_cron"
+groupId = "test_offline_1"
+connectionId = "inlong_connection"
+boundaryType = str("time")
+
+target_timezone = pytz.timezone(timezone)
+
+start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, 
tz=target_timezone)
+end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, 
tz=target_timezone)
+
+
+def taskFunction(**context):
+    print("#########################")
+    conn = BaseHook.get_connection(connectionId)
+    url = f"http://{conn.host}:{conn.port}/{conn.schema}";
+    params = {
+        "username": conn.login,
+        "password": conn.password
+    }
+    print("params", params)
+    headers = {
+        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) 
Gecko/20100101 Firefox/131.0",
+        "Accept": "application/json",
+        "Accept-Language": 
"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
+        "Accept-Encoding": "gzip, deflate",
+        "Referer": "http://192.168.101.2:8083/";,
+        "Content-Type": "application/json;charset=UTF-8",
+        "tenant": "public",
+        "Origin": "http://192.168.101.2";,
+        "Connection": "close",
+        "Priority": "u=0"
+    }
+    time_interval = get_time_interval(context)
+    data = {
+        "boundaryType": boundaryType,
+        "groupId": groupId,
+        "lowerBoundary": time_interval[0],
+        "upperBoundary": time_interval[1]
+    }
+    print("Request Body: ", data)
+    response = requests.post(url, params=params, headers=headers, json=data)
+    if response.status_code == 200:
+        print(response.json())
+    else:
+        print(response.text)
+    print("#########################")
+
+
+def get_time_interval(context):
+    execution_date = context.get('execution_date')
+    execution_date = execution_date.astimezone(target_timezone)
+    dag = context.get('dag')
+    schedule_interval = dag.schedule_interval
+    if isinstance(schedule_interval, timedelta):
+        return execution_date.timestamp(), (execution_date + 
schedule_interval).timestamp()
+    else:
+        cron_expr = dag.schedule_interval
+        cron = croniter(cron_expr, execution_date)
+        next_run = cron.get_next(datetime)
+        return execution_date.timestamp(), next_run.timestamp()
+
+
+default_args = {
+    'owner': 'inlong',
+    'start_date': start_date,
+    'end_date': end_date,
+    'catchup': False,
+}
+
+dag = DAG(
+    dag_id,
+    default_args=default_args,
+    schedule_interval=schedule_interval,
+    is_paused_upon_creation=False
+)
+
+clean_task = PythonOperator(
+    task_id=dag_id,
+    python_callable=taskFunction,
+    provide_context=True,
+    dag=dag,
+)
diff --git 
a/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py
 
b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py
new file mode 100644
index 0000000000..5666f9f471
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import DAG
+from datetime import datetime, timedelta
+from airflow.operators.python_operator import PythonOperator
+from datetime import datetime
+from croniter import croniter
+from airflow.hooks.base_hook import BaseHook
+import requests
+import pytz
+
+timezone = "Asia/Shanghai"
+start_offset_datetime_str = 1731072908243
+end_offset_datetime_str = 1731142800000
+schedule_interval = "*/1 * * * *"  # Or put cron expression
+dag_id = "inlong_offline_task_testGroup_normal"
+groupId = "test_offline_1"
+connectionId = "inlong_connection"
+boundaryType = str("time")
+
+target_timezone = pytz.timezone(timezone)
+
+start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, 
tz=target_timezone)
+end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, 
tz=target_timezone)
+
+
+def taskFunction(**context):
+    print("#########################")
+    conn = BaseHook.get_connection(connectionId)
+    url = f"http://{conn.host}:{conn.port}/{conn.schema}";
+    params = {
+        "username": conn.login,
+        "password": conn.password
+    }
+    print("params", params)
+    headers = {
+        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) 
Gecko/20100101 Firefox/131.0",
+        "Accept": "application/json",
+        "Accept-Language": 
"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
+        "Accept-Encoding": "gzip, deflate",
+        "Content-Type": "application/json;charset=UTF-8",
+        "tenant": "public",
+        "Connection": "close",
+        "Priority": "u=0"
+    }
+    time_interval = get_time_interval(context)
+    data = {
+        "boundaryType": boundaryType,
+        "groupId": groupId,
+        "lowerBoundary": time_interval[0],
+        "upperBoundary": time_interval[1]
+    }
+    print("Request Body: ", data)
+    response = requests.post(url, params=params, headers=headers, json=data)
+    if response.status_code == 200:
+        print(response.json())
+    else:
+        print(response.text)
+    print("#########################")
+
+
+def get_time_interval(context):
+    execution_date = context.get('execution_date')
+    execution_date = execution_date.astimezone(target_timezone)
+    dag = context.get('dag')
+    schedule_interval = dag.schedule_interval
+    if isinstance(schedule_interval, timedelta):
+        return execution_date.timestamp(), (execution_date + 
schedule_interval).timestamp()
+    else:
+        cron_expr = dag.schedule_interval
+        cron = croniter(cron_expr, execution_date)
+        next_run = cron.get_next(datetime)
+        return execution_date.timestamp(), next_run.timestamp()
+
+
+default_args = {
+    'owner': 'inlong',
+    'start_date': start_date,
+    'end_date': end_date,
+    'catchup': False,
+}
+
+dag = DAG(
+    dag_id,
+    default_args=default_args,
+    schedule_interval=schedule_interval,
+    is_paused_upon_creation=False
+)
+
+clean_task = PythonOperator(
+    task_id=dag_id,
+    python_callable=taskFunction,
+    provide_context=True,
+    dag=dag,
+)
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-dev.properties 
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index b0193111aa..f17c451aee 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -113,3 +113,13 @@ 
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
 # DolphinScheduler related config
 schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
 schedule.engine.dolphinscheduler.token=default_token_value
+
+# Airflow configuration
+schedule.engine.airflow.baseUrl=
+schedule.engine.airflow.username=
+schedule.engine.airflow.password=
+schedule.engine.airflow.connection.id=
+# Please confirm if it is a loopback address
+schedule.engine.airflow.inlong.manager.host=
+schedule.engine.airflow.cleaner.id=
+schedule.engine.airflow.creator.id=
\ No newline at end of file
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-prod.properties 
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index a549e206f0..129bce9ac6 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -104,3 +104,13 @@ 
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
 # DolphinScheduler related config
 schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
 schedule.engine.dolphinscheduler.token=default_token_value
+
+# Airflow configuration
+schedule.engine.airflow.baseUrl=
+schedule.engine.airflow.username=
+schedule.engine.airflow.password=
+schedule.engine.airflow.connection.id=
+# Please confirm if it is a loopback address
+schedule.engine.airflow.inlong.manager.host=
+schedule.engine.airflow.cleaner.id=
+schedule.engine.airflow.creator.id=
\ No newline at end of file
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-test.properties 
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 6a71dc7a05..a062bc2c63 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -105,3 +105,13 @@ 
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
 # DolphinScheduler related config
 schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
 schedule.engine.dolphinscheduler.token=default_token_value
+
+# Airflow configuration
+schedule.engine.airflow.baseUrl=
+schedule.engine.airflow.username=
+schedule.engine.airflow.password=
+schedule.engine.airflow.connection.id=
+# Please confirm if it is a loopback address
+schedule.engine.airflow.inlong.manager.host=
+schedule.engine.airflow.cleaner.id=
+schedule.engine.airflow.creator.id=
\ No newline at end of file

Reply via email to