This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new befe172c08 [INLONG-11400][Manager] Support Airflow schedule engine (#11479) befe172c08 is described below commit befe172c08b4e07f6de378cd36c392ea452bf0fd Author: Zkplo <87751516+zk...@users.noreply.github.com> AuthorDate: Wed Nov 20 22:04:03 2024 +0800 [INLONG-11400][Manager] Support Airflow schedule engine (#11479) --- .../pojo/schedule/airflow/AirflowConnection.java | 70 +++++ .../inlong/manager/pojo/schedule/airflow/DAG.java | 50 ++++ .../pojo/schedule/airflow/DAGCollection.java} | 30 ++- .../manager/pojo/schedule/airflow/DAGRun.java | 50 ++++ .../manager/pojo/schedule/airflow/DAGRunConf.java | 68 +++++ .../manager/pojo/schedule/airflow/Error.java | 52 ++++ .../manager/schedule/ScheduleEngineType.java | 1 + .../schedule/airflow/AirFlowAPIConstant.java | 40 +++ .../schedule/airflow/AirflowScheduleClient.java | 58 ++++ .../schedule/airflow/AirflowScheduleEngine.java | 258 ++++++++++++++++++ .../schedule/airflow/AirflowServerClient.java | 71 +++++ .../manager/schedule/airflow/api/AirflowApi.java | 75 ++++++ .../api/AirflowResponse.java} | 37 ++- .../schedule/airflow/api/BaseAirflowApi.java | 149 +++++++++++ .../api/connection/AirflowConnectionCreator.java | 99 +++++++ .../api/connection/AirflowConnectionGetter.java | 61 +++++ .../airflow/api/dag/DAGCollectionUpdater.java | 79 ++++++ .../schedule/airflow/api/dag/DAGDeletor.java | 69 +++++ .../schedule/airflow/api/dag/DAGUpdater.java | 78 ++++++ .../airflow/api/dagruns/DAGRunsTrigger.java | 100 +++++++ .../schedule/airflow/config/AirflowConfig.java | 86 ++++++ .../interceptor/AirflowAuthInterceptor.java | 51 ++++ .../interceptor/LoggingInterceptor.java} | 32 ++- .../util/DAGUtil.java} | 19 +- .../manager/schedule/airflow/util/DateUtil.java | 58 ++++ .../exception/AirflowScheduleException.java | 62 +++++ .../schedule/airflow/AirflowContainerEnv.java | 139 ++++++++++ .../airflow/AirflowScheduleEngineTest.java | 110 ++++++++ .../src/test/resources/airflow/dag_cleaner.py | 80 ++++++ .../src/test/resources/airflow/dag_creator.py | 148 +++++++++++ .../src/test/resources/airflow/docker-compose.yaml | 292 +++++++++++++++++++++ .../src/test/resources/airflow/testGroup_cron.py | 112 ++++++++ .../src/test/resources/airflow/testGroup_normal.py | 110 ++++++++ .../src/main/resources/application-dev.properties | 10 + .../src/main/resources/application-prod.properties | 10 + .../src/main/resources/application-test.properties | 10 + 36 files changed, 2776 insertions(+), 48 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java new file mode 100644 index 0000000000..deb056ed4f --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "Full representation of the connection.") +public class AirflowConnection { + + @JsonProperty("connection_id") + @ApiModelProperty("The connection ID.") + private String connectionId; + + @JsonProperty("conn_type") + @ApiModelProperty("The connection type.") + private String connType; + + @JsonProperty("description") + @ApiModelProperty("The description of the connection.") + private String description; + + @JsonProperty("host") + @ApiModelProperty("Host of the connection.") + private String host; + + @JsonProperty("login") + @ApiModelProperty("Login of the connection.") + private String login; + + @JsonProperty("schema") + @ApiModelProperty("Schema of the connection.") + private String schema; + + @JsonProperty("port") + @ApiModelProperty("Port of the connection.") + private Integer port; + + @JsonProperty("password") + @ApiModelProperty("Password of the connection.") + private String password; + + @JsonProperty("extra") + @ApiModelProperty("Additional information description of the connection.") + private String extra; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java new file mode 100644 index 0000000000..578eadb151 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "DAG Description Information.") +public class DAG { + + @JsonProperty("dag_id") + @ApiModelProperty("The ID of the DAG.") + private String dagId; + + @JsonProperty("root_dag_id") + @ApiModelProperty("If the DAG is SubDAG then it is the top level DAG identifier. Otherwise, null.") + private String rootDagId; + + @JsonProperty("is_paused") + @ApiModelProperty("Whether the DAG is paused.") + private Boolean isPaused; + + @JsonProperty("is_active") + @ApiModelProperty("Whether the DAG is currently seen by the scheduler(s).") + private Boolean isActive; + + @JsonProperty("description") + @ApiModelProperty("User-provided DAG description, which can consist of several sentences or paragraphs that describe DAG contents.") + private String description; +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java similarity index 55% copy from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java index ac71e4e2d1..7a52548f41 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java @@ -15,20 +15,26 @@ * limitations under the License. */ -package org.apache.inlong.manager.schedule; +package org.apache.inlong.manager.pojo.schedule.airflow; -import lombok.Getter; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; -@Getter -public enum ScheduleEngineType { +import java.util.List; - NONE("None"), - QUARTZ("Quartz"), - DOLPHINSCHEDULER("DolphinScheduler"); +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "Collection of DAGs.") +public class DAGCollection { - private final String type; + @JsonProperty("dags") + @ApiModelProperty("List of DAGs.") + private List<DAG> dags = null; - ScheduleEngineType(String type) { - this.type = type; - } -} \ No newline at end of file + @JsonProperty("total_entries") + @ApiModelProperty("The length of DAG list.") + private Integer totalEntries; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java new file mode 100644 index 0000000000..e9384c75da --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "DAGRun Description Information.") +public class DAGRun { + + @JsonProperty("conf") + @ApiModelProperty("JSON object describing additional configuration parameters.") + private Object conf; + + @JsonProperty("dag_id") + @ApiModelProperty("Airflow DAG id.") + private String dagId; + + @JsonProperty("dag_run_id") + @ApiModelProperty("Airflow DAGRun id (Nullable).") + private String dagRunId; + + @JsonProperty("end_date") + @ApiModelProperty("The end time of this DAGRun.") + private String endDate; + + @JsonProperty("start_date") + @ApiModelProperty("The start time of this DAGRun.") + private String startDate; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java new file mode 100644 index 0000000000..4154c2526c --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "DAGRunConf Description Information.") +public class DAGRunConf { + + @JsonProperty("inlong_group_id") + @ApiModelProperty("Specify the Inlong group ID") + private String inlongGroupId; + + @JsonProperty("start_time") + @ApiModelProperty("The start time of DAG scheduling.") + private long startTime; + + @JsonProperty("end_time") + @ApiModelProperty("The end time of DAG scheduling.") + private long endTime; + + @JsonProperty("boundary_type") + @ApiModelProperty("The offline task boundary type.") + private String boundaryType; + + @JsonProperty("cron_expr") + @ApiModelProperty("Cron expression.") + private String cronExpr; + + @JsonProperty("seconds_interval") + @ApiModelProperty("Time interval (in seconds).") + private String secondsInterval; + + @JsonProperty("connection_id") + @ApiModelProperty("Airflow Connection Id of Inlong Manager.") + private String connectionId; + + @JsonProperty("timezone") + @ApiModelProperty("The timezone.") + private String timezone; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java new file mode 100644 index 0000000000..3eb76fd677 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.math.BigDecimal; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "[RFC7807](https://tools.ietf.org/html/rfc7807) compliant response. ") +public class Error { + + @JsonProperty("detail") + @ApiModelProperty("Error Details.") + private String detail; + + @JsonProperty("instance") + @ApiModelProperty("Error of the instance.") + private String instance; + + @JsonProperty("status") + @ApiModelProperty("Error of the status.") + private BigDecimal status; + + @JsonProperty("title") + @ApiModelProperty("Error of the title.") + private String title; + + @JsonProperty("type") + @ApiModelProperty("Error of the type.") + private String type; +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java index ac71e4e2d1..8ae586609d 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java @@ -24,6 +24,7 @@ public enum ScheduleEngineType { NONE("None"), QUARTZ("Quartz"), + AIRFLOW("Airflow"), DOLPHINSCHEDULER("DolphinScheduler"); private final String type; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java new file mode 100644 index 0000000000..e328d8fd0a --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +/** + * Contains constants for interacting with the Airflow API. + */ +public class AirFlowAPIConstant { + + public static final String DEFAULT_TIMEZONE = "Asia/Shanghai"; + public static final String INLONG_OFFLINE_DAG_TASK_PREFIX = "inlong_offline_task_"; + public static final String SUBMIT_OFFLINE_JOB_URI = "/inlong/manager/api/group/submitOfflineJob"; + + // AirflowConnection + public static final String LIST_CONNECTIONS_URI = "/api/v1/connections"; + public static final String GET_CONNECTION_URI = "/api/v1/connections/{connection_id}"; + + // DAG + public static final String LIST_DAGS_URI = "/api/v1/dags"; + public static final String UPDATE_DAG_URI = "/api/v1/dags/{dag_id}"; + + // DAGRun + public static final String TRIGGER_NEW_DAG_RUN_URI = "/api/v1/dags/{dag_id}/dagRuns"; + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java new file mode 100644 index 0000000000..bbb8e59149 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngineClient; +import org.apache.inlong.manager.schedule.ScheduleEngineType; + +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * Built-in implementation of schedule engine client corresponding with {@link AirflowScheduleEngine}. + * AirflowScheduleClient simply invokes the {@link AirflowScheduleEngine} to register/unregister/update + * schedule info instead of calling a remote schedule service. + * */ +@Service +public class AirflowScheduleClient implements ScheduleEngineClient { + + @Resource + public AirflowScheduleEngine scheduleEngine; + + @Override + public boolean accept(String engineType) { + return ScheduleEngineType.AIRFLOW.getType().equalsIgnoreCase(engineType); + } + + @Override + public boolean register(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleRegister(scheduleInfo); + } + + @Override + public boolean unregister(String groupId) { + return scheduleEngine.handleUnregister(groupId); + } + + @Override + public boolean update(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleUpdate(scheduleInfo); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java new file mode 100644 index 0000000000..792307e6ae --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.common.bounded.BoundaryType; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection; +import org.apache.inlong.manager.pojo.schedule.airflow.DAG; +import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection; +import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun; +import org.apache.inlong.manager.pojo.schedule.airflow.DAGRunConf; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.ScheduleUnit; +import org.apache.inlong.manager.schedule.airflow.api.AirflowResponse; +import org.apache.inlong.manager.schedule.airflow.api.connection.AirflowConnectionCreator; +import org.apache.inlong.manager.schedule.airflow.api.connection.AirflowConnectionGetter; +import org.apache.inlong.manager.schedule.airflow.api.dag.DAGCollectionUpdater; +import org.apache.inlong.manager.schedule.airflow.api.dag.DAGDeletor; +import org.apache.inlong.manager.schedule.airflow.api.dag.DAGUpdater; +import org.apache.inlong.manager.schedule.airflow.api.dagruns.DAGRunsTrigger; +import org.apache.inlong.manager.schedule.airflow.config.AirflowConfig; +import org.apache.inlong.manager.schedule.airflow.util.DAGUtil; +import org.apache.inlong.manager.schedule.airflow.util.DateUtil; +import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; + +import com.google.common.collect.ImmutableMap; +import org.apache.mina.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.DEFAULT_TIMEZONE; +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX; +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.SUBMIT_OFFLINE_JOB_URI; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.DAG_DUPLICATE; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.INIT_CONNECTION_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_ENGINE_SHUTDOWN_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_REGISTER_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_UPDATE_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.TASK_DAG_SWITCH_FAILED; + +/** + * Response for processing the start/register/unregister/update/stop requests from {@link AirflowScheduleClient} + */ +@Service +public class AirflowScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(AirflowScheduleEngine.class); + private final Set<String> scheduledJobSet = new ConcurrentHashSet<>(); + private AirflowServerClient serverClient; + private AirflowConfig airflowConfig; + + public AirflowScheduleEngine(AirflowServerClient serverClient, AirflowConfig airflowConfig) { + this.serverClient = serverClient; + this.airflowConfig = airflowConfig; + start(); + } + + @Override + public void start() { + try { + // Create authentication information for the Inlong Manger API used by AirFlow + initConnection(); + // Check if DagCleaner and DagCreator exist and unpause them + switchOriginalDAG(false); + // Start all task DAGs and load all DAG ID(Group Id) into the local cache + switchAllTaskDAG(false); + LOGGER.info("Airflow initialization succeeded."); + } catch (Exception e) { + LOGGER.error("Airflow initialization failed.", e); + } + } + + private void initConnection() throws Exception { + LOGGER.info("Initializing Inlong Manager AirflowConnection for Airflow ... "); + // Check if Airflow has the Inlong AirflowConnection + AirflowResponse<AirflowConnection> response = serverClient.sendRequest( + new AirflowConnectionGetter(airflowConfig.getConnectionId())); + if (!response.isSuccess()) { + AirflowConnection newConn = new AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "", + airflowConfig.getHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI, + airflowConfig.getPort(), airflowConfig.getInlongPassword(), ""); + response = serverClient.sendRequest(new AirflowConnectionCreator(newConn)); + LOGGER.info("AirflowConnection registration response: {}", response.toString()); + if (!response.isSuccess()) { + LOGGER.error("Initialization connection failed."); + throw new AirflowScheduleException(INIT_CONNECTION_FAILED, "Initialization connection failed."); + } + } + } + + private void switchOriginalDAG(boolean isPaused) { + for (String dagId : Arrays.asList(airflowConfig.getDagCleanerId(), airflowConfig.getDagCreatorId())) { + try { + AirflowResponse<DAG> response = serverClient.sendRequest(new DAGUpdater(dagId, isPaused)); + LOGGER.info("Response to {} the original DAG : {}", isPaused ? "stop" : "start", response.toString()); + if (!response.isSuccess()) { + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, + String.format("%s does not exist or failed to %s.", dagId, (isPaused ? "stop" : "start"))); + } + } catch (Exception e) { + LOGGER.error("The original DAG {} failed.", isPaused ? "stop" : "start", e); + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, + String.format("The original DAG %s failed: %s.", isPaused ? "stop" : "start", e.getMessage())); + } + } + } + + private void switchAllTaskDAG(boolean isPaused) { + try { + AirflowResponse<DAGCollection> response = serverClient.sendRequest( + new DAGCollectionUpdater(INLONG_OFFLINE_DAG_TASK_PREFIX, isPaused)); + LOGGER.info("Response to {} task DAG : {}", isPaused ? "stop" : "start", response.toString()); + if (!response.isSuccess()) { + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, + String.format("Failed to %s task DAGs.", isPaused ? "stop" : "start")); + } + if (!isPaused) { + List<DAG> dagList = response.getData().getDags(); + if (dagList != null) { + dagList.forEach(dag -> scheduledJobSet + .add(dag.getDagId().substring(INLONG_OFFLINE_DAG_TASK_PREFIX.length() - 1))); + } + } + } catch (Exception e) { + LOGGER.error("Failed to {} task DAGs.", isPaused ? "stop" : "start", e); + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, + String.format("Failed to %s task DAGs: %s", isPaused ? "stop" : "start", e.getMessage())); + } + } + + @Override + public boolean handleRegister(ScheduleInfo scheduleInfo) { + try { + LOGGER.info("Registering DAG for {}", scheduleInfo.getInlongGroupId()); + return doRegister(scheduleInfo, true); + } catch (Exception e) { + LOGGER.error("The Airflow scheduling task with Group ID {} failed to register.", + scheduleInfo.getInlongGroupId(), e); + throw new AirflowScheduleException(SCHEDULE_TASK_REGISTER_FAILED, + String.format("The Airflow scheduling task with Group ID %s failed to register: %s", + scheduleInfo.getInlongGroupId(), e.getMessage())); + } + } + + @Override + public boolean handleUnregister(String groupId) { + LOGGER.info("Unregistering Airflow Dag with GroupId {} ", groupId); + if (scheduledJobSet.contains(groupId)) { + try { + if (!completelyDelete(DAGUtil.buildDAGIdByGroupId(groupId))) { + return false; + } + } catch (Exception e) { + LOGGER.warn("May not be completely removed {}", groupId, e); + } + } + scheduledJobSet.remove(groupId); + LOGGER.info("Un-registered airflow schedule info for {}", groupId); + return true; + } + + private boolean completelyDelete(String groupId) throws Exception { + // Trigger the removal of the DAG file for the Cleaner DAG + DAGRunConf dagRunConf = DAGRunConf.builder() + .inlongGroupId(DAGUtil.buildDAGIdByGroupId(groupId)).build(); + AirflowResponse<DAGRun> response = serverClient.sendRequest( + new DAGRunsTrigger(airflowConfig.getDagCleanerId(), ImmutableMap.of("conf", dagRunConf))); + LOGGER.info("Response to DAG file clearing: {}", response.toString()); + if (!response.isSuccess()) { + LOGGER.warn("Failed to delete DAG file corresponding to {}.", groupId); + return false; + } + // Delete DAG tasks that have been loaded into memory + AirflowResponse<Object> deleteResponse = serverClient.sendRequest(new DAGDeletor(groupId)); + LOGGER.info("Response to DAG scheduling instance clearing: {}", deleteResponse.toString()); + if (!deleteResponse.isSuccess()) { + LOGGER.warn("Failed to delete DAG instance corresponding to {}.", groupId); + } + return deleteResponse.isSuccess(); + } + + @Override + public boolean handleUpdate(ScheduleInfo scheduleInfo) { + try { + LOGGER.info("Updating DAG for {}", scheduleInfo.getInlongGroupId()); + return doRegister(scheduleInfo, false); + } catch (Exception e) { + LOGGER.error("The Airflow scheduling task with Group ID {} failed to update.", + scheduleInfo.getInlongGroupId(), e); + throw new AirflowScheduleException(SCHEDULE_TASK_UPDATE_FAILED, + String.format("The Airflow scheduling task with Group ID %s failed to update: %s", + scheduleInfo.getInlongGroupId(), e.getMessage())); + } + } + + public boolean doRegister(ScheduleInfo scheduleInfo, boolean isFirst) throws Exception { + if (isFirst && scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) { + throw new AirflowScheduleException(DAG_DUPLICATE, + String.format("Group %s is already registered", scheduleInfo.getInlongGroupId())); + } + DAGRunConf.DAGRunConfBuilder confBuilder = DAGRunConf.builder() + .inlongGroupId(scheduleInfo.getInlongGroupId()) + .startTime(scheduleInfo.getStartTime().getTime()) + .endTime(scheduleInfo.getEndTime().getTime()) + .boundaryType(BoundaryType.TIME.getType()) + .connectionId(airflowConfig.getConnectionId()) + .timezone(DEFAULT_TIMEZONE); + if (scheduleInfo.getScheduleType() == 1) { + confBuilder = confBuilder.cronExpr(scheduleInfo.getCrontabExpression()); + } else { + confBuilder = confBuilder.secondsInterval(DateUtil.intervalToSeconds(scheduleInfo.getScheduleInterval(), + scheduleInfo.getScheduleUnit())) + .startTime(ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit()) == ScheduleUnit.ONE_ROUND + ? scheduleInfo.getEndTime().getTime() + : scheduleInfo.getStartTime().getTime()); + } + DAGRunConf dagRunConf = confBuilder.build(); + AirflowResponse<DAGRun> response = serverClient.sendRequest( + new DAGRunsTrigger(airflowConfig.getDagCreatorId(), ImmutableMap.of("conf", dagRunConf))); + LOGGER.info("DAG {} response: {}", isFirst ? "registration" : "update", response.toString()); + if (response.isSuccess()) { + scheduledJobSet.add(scheduleInfo.getInlongGroupId()); + } + return response.isSuccess(); + } + + @Override + public void stop() { + try { + switchOriginalDAG(true); + switchAllTaskDAG(true); + } catch (Exception e) { + LOGGER.error("Airflow Schedule Engine shutdown failed: ", e); + throw new AirflowScheduleException(SCHEDULE_ENGINE_SHUTDOWN_FAILED, + String.format("Airflow Schedule Engine shutdown failed: %s", e.getMessage())); + } + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java new file mode 100644 index 0000000000..be67a36475 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.manager.pojo.schedule.airflow.Error; +import org.apache.inlong.manager.schedule.airflow.api.AirflowApi; +import org.apache.inlong.manager.schedule.airflow.api.AirflowResponse; +import org.apache.inlong.manager.schedule.airflow.config.AirflowConfig; + +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A unified class used for Airflow RESTful API processing. + */ +public class AirflowServerClient { + + private static final Logger logger = LoggerFactory.getLogger(AirflowServerClient.class); + private final OkHttpClient httpClient; + private final AirflowConfig config; + private final ObjectMapper objectMapper; + + public AirflowServerClient(OkHttpClient httpClient, AirflowConfig config) { + this.httpClient = httpClient; + this.config = config; + this.objectMapper = new ObjectMapper(); + } + + /** + * Send request and parse response + * + * @param apiEndpoint apiEndpoint + * @param <T> Response to Generic Types + * @return Parsed response object + * @throws IOException Network request exception + */ + public <T> AirflowResponse<T> sendRequest(AirflowApi<T> apiEndpoint) throws IOException { + Request request = apiEndpoint.buildRequest(config.getBaseUrl()); + try (Response response = httpClient.newCall(request).execute()) { + String responseBody = response.body().string(); + if (response.isSuccessful()) { + return new AirflowResponse<>(true, objectMapper.readValue(responseBody, apiEndpoint.getResponseType())); + } else { + logger.error("Airflow Web API Request failed, status code: {} , detail: {}", + response.code(), objectMapper.readValue(responseBody, Error.class).getDetail()); + return new AirflowResponse<>(false, null); + } + } + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java new file mode 100644 index 0000000000..4ff1a3284d --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api; + +import okhttp3.Request; +import okhttp3.RequestBody; +import org.springframework.http.HttpMethod; + +import java.util.Map; +/** + * Represents a generic interface for defining and constructing API requests to interact with Airflow. + * This interface provides methods for specifying HTTP methods, endpoint paths, parameters, + * request bodies, and constructing complete requests. + * @param <T> the type of the response expected from the API, allowing flexibility for various response types. + */ +public interface AirflowApi<T> { + + /** + * Get HTTP Method + * @return HTTP Method + */ + HttpMethod getMethod(); + + /** + * Get the requested path (relative to baseUrl) + * @return Request path + */ + String getPath(); + + /** + * Get path parameters to replace placeholders in the path (e.g. : "/api/v1/dags/{dag_id}/dagRuns") + * @return Path parameter map + */ + Map<String, String> getPathParams(); + + /** + * Get query parameters (e.g. "?Key=value") + * @return GET parameter map + */ + Map<String, Object> getQueryParams(); + + /** + * Get the request body (applicable to methods such as POST, PUT, etc.) + * @return Post RequestBody Object + */ + RequestBody getRequestBody(); + + /** + * Constructing a complete Request object + * @param baseUrl Base URL + * @return Constructed Request object + */ + Request buildRequest(String baseUrl); + + /** + * Returns the type of the response expected from this method. + * @return The expected response type. + */ + Class<T> getResponseType(); +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java similarity index 51% copy from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java copy to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java index ac71e4e2d1..60e0ef6366 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java @@ -15,20 +15,35 @@ * limitations under the License. */ -package org.apache.inlong.manager.schedule; +package org.apache.inlong.manager.schedule.airflow.api; -import lombok.Getter; +/** + * A generic response wrapper for handling responses from Airflow services. + * @param <T> the type of data included in the response, allowing flexibility for various data types. + */ +public class AirflowResponse<T> { -@Getter -public enum ScheduleEngineType { + private final boolean success; + private final T data; - NONE("None"), - QUARTZ("Quartz"), - DOLPHINSCHEDULER("DolphinScheduler"); + public AirflowResponse(boolean success, T data) { + this.success = success; + this.data = data; + } - private final String type; + public boolean isSuccess() { + return success; + } + + public T getData() { + return data; + } - ScheduleEngineType(String type) { - this.type = type; + @Override + public String toString() { + return "AirflowResponse{" + + "success=" + success + + ", data=" + data + + '}'; } -} \ No newline at end of file +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java new file mode 100644 index 0000000000..18a1ed5206 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api; + +import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import org.springframework.http.HttpMethod; + +import java.util.List; +import java.util.Map; + +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED; + +/** + * The basic implementation of Airflow API interface. + * + * @param <T> the type of the response expected from the API, allowing flexibility for various response types. + */ + +@Slf4j +public abstract class BaseAirflowApi<T> implements AirflowApi<T> { + + protected static final ObjectMapper objectMapper = new ObjectMapper(); + protected Map<String, String> pathParams = Maps.newHashMap(); + protected Map<String, Object> queryParams = Maps.newHashMap(); + protected Map<String, Object> requestBodyParams = Maps.newHashMap(); + + @Override + public abstract HttpMethod getMethod(); + + @Override + public abstract String getPath(); + + @Override + public abstract Class<T> getResponseType(); + + @Override + public Map<String, String> getPathParams() { + return pathParams; + } + + @Override + public Map<String, Object> getQueryParams() { + return queryParams; + } + + /** + * Create JSON request body + * @return RequestBody Object + */ + @Override + public RequestBody getRequestBody() { + try { + return RequestBody.create(MediaType.parse("application/json; charset=utf-8"), + objectMapper.writeValueAsString(requestBodyParams)); + } catch (Exception e) { + log.error("Airflow request body construction failed: {}", e.getMessage(), e); + throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED, + String.format("Airflow request body construction failed: %s", e.getMessage())); + } + } + + @Override + public Request buildRequest(String baseUrl) { + // Build a complete URL + String path = buildPathParams(getPath(), getPathParams()); + String url = baseUrl + path; + + // Add query parameters + if (!getQueryParams().isEmpty()) { + String queryString = buildQueryString(getQueryParams()); + url += "?" + queryString; + } + + // Build Request Builder + Request.Builder builder = new Request.Builder().url(url); + + // Set requests based on HTTP methods + switch (getMethod()) { + case GET: + builder.get(); + break; + case POST: + builder.post(getRequestBody()); + break; + case PATCH: + builder.patch(getRequestBody()); + break; + case PUT: + builder.put(getRequestBody()); + break; + case DELETE: + if (!requestBodyParams.isEmpty()) { + builder.delete(getRequestBody()); + } else { + builder.delete(); + } + break; + default: + throw new IllegalArgumentException("Unsupported HTTP method: " + getMethod()); + } + return builder.build(); + } + + private String buildPathParams(String path, Map<String, String> pathParams) { + for (Map.Entry<String, String> entry : pathParams.entrySet()) { + path = path.replace("{" + entry.getKey() + "}", entry.getValue()); + } + return path; + } + + private String buildQueryString(Map<String, Object> queryParams) { + StringBuilder sb = new StringBuilder(); + // Multiple values can be specified for the same parameter name in the Get parameter. + // (e.g. "?Key=value1&Key=value2") + queryParams.forEach((key, value) -> { + if (value instanceof List) { + ((List) value).forEach(item -> sb.append(key).append("=").append(item).append("&")); + } else { + sb.append(key).append("=").append(value).append("&"); + } + }); + if (sb.length() > 0) { + sb.setLength(sb.length() - 1); + } + return sb.toString(); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java new file mode 100644 index 0000000000..1e5b7d737c --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.connection; + +import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection; +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; +import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; + +import lombok.extern.slf4j.Slf4j; +import okhttp3.MediaType; +import okhttp3.RequestBody; +import org.springframework.http.HttpMethod; + +import java.util.Map; + +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED; + +/** + * Build call for AirflowConnectionGetter<br> + * <table border="10"> + * <tr><th> Request Body Param </th><th> Description </th></tr> + * <tr><td> connection_id </td><td> The connection ID. </td></tr> + * <tr><td> conn_type </td><td> The connection type. </td></tr> + * <tr><td> description </td><td> The description of the connection. </td></tr> + * <tr><td> host </td><td> Host of the connection. </td></tr> + * <tr><td> login </td><td> Login of the connection. </td></tr> + * <tr><td> schema </td><td> Schema of the connection. </td></tr> + * <tr><td> port </td><td> Port of the connection. </td></tr> + * <tr><td> password </td><td> Password of the connection. </td></tr> + * <tr><td> extra </td><td> Other values that cannot be put into another field, e.g. RSA keys.(optional) </td></tr> + * </table> + * + * @http.response.details <table summary="Response Details" border="1"> + * <tr><th> Status Code </th><th> Description </th></tr> + * <tr><td> 200 </td><td> Success. </td></tr> + * <tr><td> 400 </td><td> Client specified an invalid argument. </td></tr> + * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, authentication info. </td></tr> + * <tr><td> 403 </td><td> Client does not have sufficient permission. </td></tr> + * </table> + */ +@Slf4j +public class AirflowConnectionCreator extends BaseAirflowApi<AirflowConnection> { + + AirflowConnection connection = null; + + public AirflowConnectionCreator(AirflowConnection connection) { + this.connection = connection; + } + + public AirflowConnectionCreator(Map<String, Object> requestBodyParams) { + this.requestBodyParams = requestBodyParams; + } + + @Override + public RequestBody getRequestBody() { + if (connection != null) { + try { + return RequestBody.create(MediaType.parse("application/json; charset=utf-8"), + objectMapper.writeValueAsString(connection)); + } catch (Exception e) { + log.error("Airflow request body construction failed: {}", e.getMessage(), e); + throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED, + String.format("Airflow request body construction failed: %s", e.getMessage())); + } + } + return super.getRequestBody(); + } + + @Override + public Class<AirflowConnection> getResponseType() { + return AirflowConnection.class; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.POST; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.LIST_CONNECTIONS_URI; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java new file mode 100644 index 0000000000..7dc278ae0b --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.connection; + +import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection; +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; + +import org.springframework.http.HttpMethod; + +/** + * Build call for AirflowConnectionGetter<br> + * <table border="10"> + * <tr><th> Path Param </th><th> Description </th></tr> + * <tr><td> connection_id </td><td> The connection ID. </td></tr> + * </table> + * + * @http.response.details <table summary="Response Details" border="1"> + * <tr><th> Status Code </th><th> Description </th></tr> + * <tr><td> 200 </td><td> Success. </td></tr> + * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, authentication info. </td></tr> + * <tr><td> 403 </td><td> Client does not have sufficient permission. </td></tr> + * <tr><td> 404 </td><td> A specified resource is not found. </td></tr> + * </table> + */ +public class AirflowConnectionGetter extends BaseAirflowApi<AirflowConnection> { + + public AirflowConnectionGetter(String connectionId) { + pathParams.put("connection_id", connectionId); + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.GET; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.GET_CONNECTION_URI; + } + + @Override + public Class<AirflowConnection> getResponseType() { + return AirflowConnection.class; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java new file mode 100644 index 0000000000..039a18d9d8 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.dag; + +import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection; +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; + +import org.springframework.http.HttpMethod; + +import java.util.Map; + +/** + * Build call for DAGCollectionUpdater< br> + * <table border="10"> + * <tr><th> GET Param </th><th> Description </th></tr> + * <tr><td> limit </td><td> The numbers of items to return. (optional, default to 100) </td></tr> + * <tr><td> offset </td><td> The number of items to skip before starting to collect the result set. (optional) </td></tr> + * <tr><td> tags </td><td> List of tags to filter results.(optional) </td></tr> + * <tr><td> update_mask </td><td> The fields to update on the resource. If absent or empty, all modifiable fields are updated. A comma-separated list of fully qualified names of fields.(optional) </td></tr> + * <tr><td> only_active </td><td> Only filter active DAGs. (optional, default to true) </td></tr> + * <tr><td> dag_id_pattern </td><td> If set, only return DAGs with dag_ids matching this pattern. (required) </td></tr> + * </table> + * + * <table border="10"> + * <tr><th> Request Body Param </th><th> Description </th></tr> + * <tr><td> is_paused </td><td> Whether the DAG is paused. </td></tr> + * </table> + * + * @http.response.details <table summary="Response Details" border="1"> + * <tr><th> Status Code </th><th> Description </th></tr> + * <tr><td> 200 </td><td> Success. </td><td></tr> + * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, authentication info. </td><td></tr> + * <tr><td> 403 </td><td> Client does not have sufficient permission. </td><td></tr> + * <tr><td> 404 </td><td> A specified resource is not found. </td><td></tr> + * </table> + */ +public class DAGCollectionUpdater extends BaseAirflowApi<DAGCollection> { + + public DAGCollectionUpdater(String dagIdPattern, boolean isPaused) { + this.queryParams.put("dag_id_pattern", dagIdPattern); + this.requestBodyParams.put("is_paused", isPaused); + } + + public DAGCollectionUpdater(Map<String, Object> queryParams, Map<String, Object> requestBodyParams) { + this.queryParams = queryParams; + this.requestBodyParams = requestBodyParams; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.PATCH; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.LIST_DAGS_URI; + } + + @Override + public Class<DAGCollection> getResponseType() { + return DAGCollection.class; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java new file mode 100644 index 0000000000..23a348d766 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.dag; + +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; + +import org.springframework.http.HttpMethod; + +/** + * Build call for DAGDeleter< br> + * <table border="10"> + * <tr><th> Path Param </th><th> Description </th></tr> + * <tr><td> dag_id </td><td> The DAG ID. </td></tr> + * </table> + * + * <table border="10"> + * <tr><th> GET Param </th><th> Description </th></tr> + * <tr><td> update_mask </td><td> The fields to update on the resource. If absent or empty, all modifiable fields are updated. A comma-separated list of fully qualified names of fields.(optional) </td></tr> + * </table> + * + * <table border="10"> + * <tr><th> Request Body Param </th><th> Description </th></tr> + * <tr><td> is_paused </td><td> Whether the DAG is paused. </td></tr> + * </table> + * + * @http.response.details <table summary="Response Details" border="1"> + * <tr><th> Status Code </th><th> Description </th></tr> + * <tr><td> 200 </td><td> Success. </td></tr> + * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, authentication info. </td></tr> + * <tr><td> 403 </td><td> Client does not have sufficient permission. </td></tr> + * <tr><td> 404 </td><td> A specified resource is not found. </td></tr> + * </table> + */ +public class DAGDeletor extends BaseAirflowApi<Object> { + + public DAGDeletor(String dagId) { + this.pathParams.put("dag_id", dagId); + } + @Override + public HttpMethod getMethod() { + return HttpMethod.DELETE; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.UPDATE_DAG_URI; + } + + @Override + public Class<Object> getResponseType() { + return Object.class; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java new file mode 100644 index 0000000000..be8313f1b1 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.dag; + +import org.apache.inlong.manager.pojo.schedule.airflow.DAG; +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; + +import org.springframework.http.HttpMethod; + +/** + * Build call for DAGUpdater< br> + * <table border="10"> + * <tr><th> Path Param </th><th> Description </th></tr> + * <tr><td> dag_id </td><td> The DAG ID. </td></tr> + * </table> + * + * <table border="10"> + * <tr><th> GET Param </th><th> Description </th></tr> + * <tr><td> update_mask </td><td> The fields to update on the resource. If absent or empty, all modifiable fields are updated. A comma-separated list of fully qualified names of fields.(optional) </td></tr> + * </table> + * + * <table border="10"> + * <tr><th> Request Body Param </th><th> Description </th></tr> + * <tr><td> is_paused </td><td> Whether the DAG is paused. </td></tr> + * </table> + * + * @http.response.details <table summary="Response Details" border="1"> + * <tr><th> Status Code </th><th> Description </th></tr> + * <tr><td> 200 </td><td> Success. </td></tr> + * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, authentication info. </td></tr> + * <tr><td> 403 </td><td> Client does not have sufficient permission. </td></tr> + * <tr><td> 404 </td><td> A specified resource is not found. </td></tr> + * </table> + */ +public class DAGUpdater extends BaseAirflowApi<DAG> { + + public DAGUpdater(String dagId, boolean isPaused) { + this.pathParams.put("dag_id", dagId); + this.requestBodyParams.put("is_paused", isPaused); + } + + public DAGUpdater(String dagId, String updateMask, boolean isPaused) { + this.pathParams.put("dag_id", dagId); + this.queryParams.put("update_mask", updateMask); + this.requestBodyParams.put("is_paused", isPaused); + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.PATCH; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.UPDATE_DAG_URI; + } + + @Override + public Class<DAG> getResponseType() { + return DAG.class; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java new file mode 100644 index 0000000000..b9fe7b2260 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.dagruns; + +import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun; +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; + +import org.springframework.http.HttpMethod; + +import java.util.Map; + +/** + * Build call for DAGRunsTrigger <br> + * <table border="10"> + * <tr><th> Path Param </th><th> Description </th></tr> + * <tr><td> dag_id </td><td> The DAG ID. </td></tr> + * </table> + * + * <table border="10"> + * <tr><th> Request Body Param </th><th> Description </th></tr> + * <tr> + * <td> conf </td> + * <td> + * JSON object describing additional configuration parameters. <br> + * The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.<br> + * </td> + * </tr> + * <tr> + * <td> dag_run_id </td> + * <td> Run ID. <br> + * The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error. <br> + * If not provided, a value will be generated based on execution_date. <br> + * If the specified dag_run_id is in use, the creation request fails with an ALREADY_EXISTS error. <br> + * This together with DAG_ID are a unique key.<br> + * </td> + * </tr> + * <tr><td> data_interval_end </td><td> The end of the interval the DAG run covers. </td></tr> + * <tr><td> data_interval_start </td><td> The beginning of the interval the DAG run covers. </td></tr> + * <tr> + * <td> logical_date </td> + * <td> + * The logical date (previously called execution date). This is the time or interval covered by this DAG run, according to the DAG definition. <br> + * The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.<br> + * This together with DAG_ID are a unique key. <br> + * </td> + * </tr> + * <tr><td> note </td><td> Contains manually entered notes by the user about the DagRun. </td></tr> + * </table> + * + * @http.response.details <table summary="Response Details" border="1"> + * <tr><th> Status Code </th><th> Description </th></tr> + * <tr><td> 200 </td><td> Success. </td></tr> + * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid, authentication info. </td></tr> + * <tr><td> 403 </td><td> Client does not have sufficient permission. </td></tr> + * <tr><td> 404 </td><td> A specified resource is not found. </td></tr> + * </table> + */ + +public class DAGRunsTrigger extends BaseAirflowApi<DAGRun> { + + public DAGRunsTrigger(String dagId) { + this.pathParams.put("dag_id", dagId); + } + + public DAGRunsTrigger(String dagId, Map<String, Object> requestBodyParams) { + this.pathParams.put("dag_id", dagId); + this.requestBodyParams = requestBodyParams; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.POST; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.TRIGGER_NEW_DAG_RUN_URI; + } + + @Override + public Class<DAGRun> getResponseType() { + return DAGRun.class; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java new file mode 100644 index 0000000000..60bb6673ce --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.config; + +import org.apache.inlong.manager.client.api.ClientConfiguration; +import org.apache.inlong.manager.schedule.airflow.AirflowServerClient; +import org.apache.inlong.manager.schedule.airflow.interceptor.AirflowAuthInterceptor; +import org.apache.inlong.manager.schedule.airflow.interceptor.LoggingInterceptor; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import okhttp3.OkHttpClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(callSuper = true) +public class AirflowConfig extends ClientConfiguration { + + @Value("${schedule.engine.airflow.inlong.manager.host:127.0.0.1}") + private String host; + + @Value("${server.port:8083}") + private int port; + + @Value("${default.admin.user:admin}") + private String inlongUsername; + + @Value("${default.admin.password:inlong}") + private String inlongPassword; + + @Value("${schedule.engine.airflow.connection.id:inlong_connection}") + private String connectionId; + + @Value("${schedule.engine.airflow.cleaner.id:dag_cleaner}") + private String dagCleanerId; + + @Value("${schedule.engine.airflow.creator.id:dag_creator}") + private String dagCreatorId; + + @Value("${schedule.engine.airflow.username:airflow}") + private String airflowUsername; + + @Value("${schedule.engine.airflow.password:airflow}") + private String airflowPassword; + + @Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}") + private String baseUrl; + + @Bean + public OkHttpClient okHttpClient() { + return new OkHttpClient.Builder() + .addInterceptor(new AirflowAuthInterceptor(this.getAirflowUsername(), this.getAirflowPassword())) + .addInterceptor(new LoggingInterceptor()) + .connectTimeout(this.getConnectTimeout(), this.getTimeUnit()) + .readTimeout(this.getReadTimeout(), this.getTimeUnit()) + .writeTimeout(this.getWriteTimeout(), this.getTimeUnit()) + .retryOnConnectionFailure(true) + .build(); + } + @Bean + public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig) { + return new AirflowServerClient(okHttpClient, airflowConfig); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java new file mode 100644 index 0000000000..714614bf94 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.interceptor; + +import lombok.extern.slf4j.Slf4j; +import okhttp3.Interceptor; +import okhttp3.Request; +import okhttp3.Response; + +import java.io.IOException; +import java.util.Base64; + +/** + * AirflowAuthInterceptor + * Before okhttp call a request, uniformly encapsulate the relevant parameters of authentication + */ +@Slf4j +public class AirflowAuthInterceptor implements Interceptor { + + // Airflow Authentication Header + private final String authHeader; + + public AirflowAuthInterceptor(String username, String password) { + String credentials = username + ":" + password; + this.authHeader = "Basic " + Base64.getEncoder().encodeToString(credentials.getBytes()); + } + + @Override + public Response intercept(Chain chain) throws IOException { + Request originalRequest = chain.request(); + Request.Builder requestBuilder = originalRequest + .newBuilder() + .header("Authorization", authHeader); + return chain.proceed(requestBuilder.build()); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java similarity index 52% copy from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java copy to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java index ac71e4e2d1..c3028385b1 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java @@ -15,20 +15,28 @@ * limitations under the License. */ -package org.apache.inlong.manager.schedule; +package org.apache.inlong.manager.schedule.airflow.interceptor; -import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import okhttp3.Interceptor; +import okhttp3.Request; +import okhttp3.Response; -@Getter -public enum ScheduleEngineType { +import java.io.IOException; - NONE("None"), - QUARTZ("Quartz"), - DOLPHINSCHEDULER("DolphinScheduler"); - - private final String type; +/** + * LoggingInterceptor + * Provide unified logging for okhttp + */ +@Slf4j +public class LoggingInterceptor implements Interceptor { - ScheduleEngineType(String type) { - this.type = type; + @Override + public Response intercept(Chain chain) throws IOException { + Request request = chain.request(); + Response response = chain.proceed(request); + log.info("Airflow API request information - Address: {}, URI: {}, Request method: {}, Response status code: {}", + request.url(), request.url().uri(), request.method(), response.code()); + return response; } -} \ No newline at end of file +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java similarity index 71% copy from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java copy to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java index ac71e4e2d1..fad05f2116 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java @@ -15,20 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.manager.schedule; +package org.apache.inlong.manager.schedule.airflow.util; -import lombok.Getter; +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX; -@Getter -public enum ScheduleEngineType { +public class DAGUtil { - NONE("None"), - QUARTZ("Quartz"), - DOLPHINSCHEDULER("DolphinScheduler"); - - private final String type; - - ScheduleEngineType(String type) { - this.type = type; + public static String buildDAGIdByGroupId(String groupId) { + return INLONG_OFFLINE_DAG_TASK_PREFIX.concat(groupId); } -} \ No newline at end of file +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java new file mode 100644 index 0000000000..950e334921 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.util; + +import org.apache.inlong.manager.schedule.ScheduleUnit; + +import java.math.BigInteger; +import java.util.Objects; + +public class DateUtil { + + public static String intervalToSeconds(long interval, String timeUnit) { + BigInteger seconds = new BigInteger(String.valueOf(interval)); + String intervalStr = ""; + switch (Objects.requireNonNull(ScheduleUnit.getScheduleUnit(timeUnit))) { + case SECOND: + intervalStr = "1"; + break; + case MINUTE: + intervalStr = "60"; + break; + case HOUR: + intervalStr = "3600"; + break; + case DAY: + intervalStr = "86400"; + break; + case WEEK: + intervalStr = "604800"; + break; + case MONTH: + intervalStr = "2592000"; + break; + case YEAR: + intervalStr = "31536000"; + break; + default: + throw new IllegalArgumentException("Unsupported time unit"); + } + return seconds.multiply(new BigInteger(intervalStr)).toString(); + } + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java new file mode 100644 index 0000000000..6b6830fb30 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.exception; + +/** + * Represents exceptions specific to the Airflow scheduling process. + * Each exception is associated with a specific error code for better identification. + */ +public class AirflowScheduleException extends RuntimeException { + + /** + * Enum to define all error codes associated with Airflow scheduling exceptions. + */ + public enum AirflowErrorCode { + INIT_CONNECTION_FAILED, + TASK_DAG_SWITCH_FAILED, + SCHEDULE_TASK_REGISTER_FAILED, + SCHEDULE_TASK_UPDATE_FAILED, + SCHEDULE_ENGINE_SHUTDOWN_FAILED, + BUILD_REQUEST_BODY_FAILED, + DAG_DUPLICATE + } + + private AirflowErrorCode errorCode; + + public AirflowScheduleException(String message) { + super(message); + } + public AirflowScheduleException(AirflowErrorCode errorCode, String message) { + super(message); + this.errorCode = errorCode; + } + + public AirflowScheduleException(AirflowErrorCode errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = errorCode; + } + + public AirflowErrorCode getErrorCode() { + return errorCode; + } + + @Override + public String toString() { + return String.format("ErrorCode: %s, Message: %s", errorCode, getMessage()); + } +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java new file mode 100644 index 0000000000..f99282a2ed --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; + +import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.testcontainers.containers.ContainerState; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.utility.MountableFile; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +@Slf4j +public class AirflowContainerEnv { + + public static String BASE_URL = "http://localhost:8080"; + public static String AIRFLOW_USERNAME = "airflow"; + public static String AIRFLOW_PASSWORD = "airflow"; + public static String NORMAL_POSTFIX = "_normal"; + public static String CORN_POSTFIX = "_cron"; + public static String AIRFLOW_SCHEDULER_CONTAINER_NAME = "airflow-scheduler_1"; + public static String DOCKER_COMPOSE_YAML_PATH = "src/test/resources/airflow/docker-compose.yaml"; + public static String DEFAULT_DAGS_PATH = "/opt/airflow/dags/"; + + private static DockerComposeContainer<?> environment; + private static OkHttpClient httpClient = new OkHttpClient(); + + public static void setUp() { + // Step 1: Start only the airflow-init service + environment = new DockerComposeContainer<>(new File(DOCKER_COMPOSE_YAML_PATH)) + .withServices("airflow-init") + .withEnv("AIRFLOW_UID", "$(id -u)"); + // Start the environment + environment.start(); + // Step 2: Wait until the "airflow-init" service has completed initialization + // Once initialized, stop the init-only environment and start the full environment + environment.stop(); + // Step 3: Start all services in detached mode after initialization + environment = new DockerComposeContainer<>(new File(DOCKER_COMPOSE_YAML_PATH)) + .withEnv("AIRFLOW_UID", "0") + .withEnv("AIRFLOW__CORE__LOAD_EXAMPLES", "false") + .withEnv("AIRFLOW__API__AUTH_BACKEND", + "airflow.providers.fab.auth_manager.api.auth.backend.basic_auth"); + environment.start(); + copyTestDAGs(); + waitForDAGsLoad("dag_cleaner"); + log.info("Airflow runtime environment created successfully."); + } + + public static void shutDown() { + if (environment != null) { + environment.stop(); + } + } + + private static void copyTestDAGs() { + // After the DAG file is created, the scheduler will regularly scan the DAG file directory and + // then load it into memory for scheduling. In order to quickly test the update and unregister, two + // test DAGs need to be loaded at the beginning. + Optional<ContainerState> container = environment.getContainerByServiceName(AIRFLOW_SCHEDULER_CONTAINER_NAME); + if (container.isPresent()) { + ContainerState airflowScheduler = container.get(); + Path dagPath1 = Paths.get("src/test/resources/airflow/dag_cleaner.py").toAbsolutePath(); + Path dagPath2 = Paths.get("src/test/resources/airflow/dag_creator.py").toAbsolutePath(); + Path dagPath3 = Paths.get("src/test/resources/airflow/testGroup_cron.py").toAbsolutePath(); + Path dagPath4 = Paths.get("src/test/resources/airflow/testGroup_normal.py").toAbsolutePath(); + airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath1), + DEFAULT_DAGS_PATH.concat("dag_cleaner.py")); + airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath2), + DEFAULT_DAGS_PATH.concat("dag_creator.py")); + airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath3), + DEFAULT_DAGS_PATH.concat("testGroup_cron.py")); + airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath4), + DEFAULT_DAGS_PATH.concat("testGroup_normal.py")); + try { + String result = + airflowScheduler.execInContainer("bash", "-c", "ls ".concat(DEFAULT_DAGS_PATH)).getStdout(); + log.info(DEFAULT_DAGS_PATH.concat(" has file: {}"), result); + } catch (Exception e) { + log.warn(String.format( + "Copying the test DAG file may have failed. Docker Container command(\"%s\") execution failed.", + "ls ".contains(DEFAULT_DAGS_PATH)), e); + } + } else { + log.error(String.format("Copying test DAG file failed. Airflow scheduler container(%s) does not exist.", + AIRFLOW_SCHEDULER_CONTAINER_NAME)); + throw new AirflowScheduleException("Copying test DAG file failed."); + } + log.info("Copy test DAG file successfully."); + } + + public static void waitForDAGsLoad(String dagId) { + int total = 10; + // Waiting for Airflow to load the initial DAG + while (total > 0) { + String credential = okhttp3.Credentials.basic(AIRFLOW_USERNAME, AIRFLOW_PASSWORD); + Request request = new Request.Builder() + .url(BASE_URL + "/api/v1/dags/" + dagId + "/details") + .header("Authorization", credential) + .build(); + try (Response response = httpClient.newCall(request).execute()) { + if (response.code() == 200) { + break; + } + } catch (Exception e) { + log.error("The request to check if the original DAG exists failed: {}", e.getMessage(), e); + } + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + total--; + } + log.info("DAG successfully loaded."); + } +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java new file mode 100644 index 0000000000..fe5d070afd --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.BaseScheduleTest; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; + +import static org.apache.inlong.manager.schedule.airflow.AirflowContainerEnv.CORN_POSTFIX; +import static org.apache.inlong.manager.schedule.airflow.AirflowContainerEnv.NORMAL_POSTFIX; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Slf4j +@EnableConfigurationProperties +@ComponentScan(basePackages = "org.apache.inlong.manager") +@SpringBootTest(classes = AirflowScheduleEngineTest.class) +public class AirflowScheduleEngineTest { + + @Autowired + private AirflowScheduleEngine scheduleEngine; + private static BaseScheduleTest baseScheduleTest = new BaseScheduleTest(); + + @BeforeAll + public static void initScheduleEngine() { + try { + AirflowContainerEnv.setUp(); + } catch (Exception e) { + log.error("Airflow runtime environment creation failed.", e); + throw new RuntimeException( + String.format("Airflow runtime environment creation failed: %s", e.getMessage())); + } + } + + @AfterAll + public static void stopScheduleEngine() { + AirflowContainerEnv.shutDown(); + } + + @Test + @Order(1) + public void testRegisterScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo(); + String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX + System.currentTimeMillis(); + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleRegister(scheduleInfo)); + + // 2. test for cron schedule + scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo(); + groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX + System.currentTimeMillis(); + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleRegister(scheduleInfo)); + } + + @Test + @Order(2) + public void testUpdateScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo(); + String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX; + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleUpdate(scheduleInfo)); + + // 2. test for cron schedule, gen cron schedule info, */2 * * * * ? + scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo(); + groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX; + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleUpdate(scheduleInfo)); + } + + @Test + @Order(3) + public void testUnRegisterScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo(); + String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX; + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId())); + + // 2. test for cron schedule, gen cron schedule info, */2 * * * * ? + scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo(); + groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX; + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId())); + } +} diff --git a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py new file mode 100644 index 0000000000..be20fe1bb1 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import DAG +from datetime import datetime, timedelta +from airflow.operators.python_operator import PythonOperator +from airflow.models import Variable +from airflow.utils.dates import days_ago +from datetime import datetime +import os +import logging +import pytz +from croniter import croniter +from airflow.hooks.base_hook import BaseHook +from airflow import configuration + +DAG_PATH = configuration.get('core', 'dags_folder') + "/" + + +def clean_expired_dags(**context): + original_time = context.get('execution_date') + target_timezone = pytz.timezone("Asia/Shanghai") + utc_time = original_time.astimezone(target_timezone) + current_time = utc_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + logging.info(f"Current time: {current_time}") + for dag_file in os.listdir(DAG_PATH): + if dag_file.endswith(".py") and dag_file.startswith("inlong_offline_task_"): + with open(DAG_PATH + dag_file, "r") as file: + line = file.readline() + while line and "end_offset_datetime_str" not in line: + line = file.readline() + end_date_str = None + if len(line.split("=")) > 1: + end_date_str = line.split("=")[1].strip().strip("\"") + logging.info(f"DAG end time: {end_date_str}") + if end_date_str: + try: + if str(current_time) > str(end_date_str): + dag_file_path = os.path.join(DAG_PATH, dag_file) + os.remove(dag_file_path) + # Optionally, delete the end_date variable + logging.info(f"Deleted expired DAG: {dag_file}") + except ValueError: + logging.error(f"Invalid date format for DAG {dag_file}: {end_date_str}") + + +default_args = { + 'owner': 'airflow', + 'start_date': datetime.now() - timedelta(minutes=5), + 'catchup': False, + 'tags': ["inlong"] +} + +dag = DAG( + 'dag_cleaner', + default_args=default_args, + schedule_interval="*/20 * * * *", + is_paused_upon_creation=False +) + +clean_task = PythonOperator( + task_id='clean_expired_dags', + python_callable=clean_expired_dags, + provide_context=True, + dag=dag, +) diff --git a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py new file mode 100644 index 0000000000..4034cf467c --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py @@ -0,0 +1,148 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago +from airflow.models import Variable +import os +from datetime import datetime +from airflow.hooks.base_hook import BaseHook +from airflow import configuration + +DAG_PATH = configuration.get('core', 'dags_folder') + "/" +DAG_PREFIX = 'inlong_offline_task_' + +def create_dag_file(**context): + conf = context.get('dag_run').conf + print('conf: ', conf) + groupId = conf.get('inlong_group_id') + task_name = DAG_PREFIX + groupId + timezone = conf.get('timezone') + boundaryType = str(conf.get('boundary_type')) + start_time = int(conf.get('start_time')) + end_time = int(conf.get('end_time')) + cron_expr = conf.get('cron_expr') + seconds_interval = conf.get('seconds_interval') + schedule_interval = cron_expr + if cron_expr is None or len(cron_expr) == 0: + schedule_interval = f'timedelta(seconds={seconds_interval})' + else: + schedule_interval = '"' + cron_expr + '"' + connectionId = conf.get('connection_id') + dag_content = f'''from airflow import DAG +from datetime import datetime, timedelta +from airflow.operators.python_operator import PythonOperator +from datetime import datetime +from croniter import croniter +from airflow.hooks.base_hook import BaseHook +import requests +import pytz + +timezone = "{timezone}" +start_offset_datetime_str = {start_time} +end_offset_datetime_str = {end_time} +schedule_interval = {schedule_interval} # Or put cron expression +dag_id = "{task_name}" +groupId = "{groupId}" +connectionId = "{connectionId}" +boundaryType = "{boundaryType}" + +target_timezone = pytz.timezone(timezone) + +start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, tz=target_timezone) +end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, tz=target_timezone) + +def taskFunction(**context): + print("#########################") + conn = BaseHook.get_connection(connectionId) + url = f"http://{{conn.host}}:{{conn.port}}/{{conn.schema}}" + params = {{ + "username": conn.login, + "password": conn.password + }} + print("params", params) + headers = {{ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0", + "Accept": "application/json", + "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", + "Accept-Encoding": "gzip, deflate", + "Referer": "http://192.168.101.2:8083/", + "Content-Type": "application/json;charset=UTF-8", + "tenant": "public", + "Origin": "http://192.168.101.2", + "Connection": "close", + "Priority": "u=0" + }} + time_interval = get_time_interval(context) + data = {{ + "boundaryType": boundaryType, + "groupId": groupId, + "lowerBoundary": str(int(time_interval[0])), + "upperBoundary": str(int(int(time_interval[1]))) + }} + print("Request Body: ", data) + response = requests.post(url, params=params, headers=headers, json=data) + if response.status_code == 200: + print(response.json()) + else: + print(response.text) + print("#########################") + + +def get_time_interval(context): + execution_date = context.get('execution_date') + execution_date = execution_date.astimezone(target_timezone) + dag = context.get('dag') + schedule_interval = dag.schedule_interval + if isinstance(schedule_interval, timedelta): + return execution_date.timestamp(), (execution_date + schedule_interval).timestamp() + else: + cron_expr = dag.schedule_interval + cron = croniter(cron_expr, execution_date) + next_run = cron.get_next(datetime) + return execution_date.timestamp(), next_run.timestamp() + + +default_args = {{ + 'owner': 'inlong', + 'start_date': start_date, + 'end_date': end_date, + 'catchup': False, +}} + +dag = DAG( + dag_id, + default_args=default_args, + schedule_interval=schedule_interval, + is_paused_upon_creation=False +) + +clean_task = PythonOperator( + task_id=dag_id, + python_callable=taskFunction, + provide_context=True, + dag=dag, +) + ''' + dag_file_path = os.path.join(DAG_PATH, f'{task_name}.py') + with open(dag_file_path, 'w') as f: + f.write(dag_content) + print(f'Generated DAG file: {dag_file_path}') +default_args = {'owner': 'airflow', 'start_date': days_ago(1), 'catchup': False} +dag = DAG('dag_creator', default_args=default_args, schedule_interval=None, is_paused_upon_creation=False) +create_dag_task = PythonOperator(task_id='create_dag_file', python_callable=create_dag_file, provide_context=True, dag=dag) \ No newline at end of file diff --git a/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml b/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml new file mode 100644 index 0000000000..c97195c03f --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml @@ -0,0 +1,292 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. +# +# WARNING: This configuration is for local development. Do not use it in a production deployment. +# +# This configuration supports basic configuration using environment variables or an .env file +# The following variables are supported: +# +# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. +# Default: apache/airflow:2.6.0 +# AIRFLOW_UID - User ID in Airflow containers +# Default: 50000 +# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. +# Default: . +# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode +# +# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). +# Default: airflow +# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). +# Default: airflow +# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Use this option ONLY for quick checks. Installing requirements at container +# startup is done EVERY TIME the service is started. +# A better way is to build a custom image or extend the official image +# as described in https://airflow.apache.org/docs/docker-stack/build.html. +# Default: '' +# +# Feel free to modify this file to suit your needs. +--- +version: '3.8' +x-airflow-common: + &airflow-common + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.0} + # build: . + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + # For backward compatibility, with Airflow <2.3 + AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'true' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + # yamllint disable rule:line-length + # Use simple http server on scheduler for health checks + # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server + # yamllint enable rule:line-length + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks + # for other purpose (development, test and especially production usage) build/extend Airflow image. + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + + redis: + image: redis:latest + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + test: + - "CMD-SHELL" + - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + function ver() { + printf "%04d%04d%04d%04d" $${1//./ } + } + airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version) + airflow_version_comparable=$$(ver $${airflow_version}) + min_airflow_version=2.2.0 + min_airflow_version_comparable=$$(ver $${min_airflow_version}) + if (( airflow_version_comparable < min_airflow_version_comparable )); then + echo + echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" + echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" + echo + exit 1 + fi + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + exec /entrypoint airflow version + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_UPGRADE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + volumes: + - ${AIRFLOW_PROJ_DIR:-.}:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + + # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up + # or by explicitly targeted on the command line e.g. docker-compose up flower. + # See: https://docs.docker.com/compose/profiles/ + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - "5555:5555" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + +volumes: + postgres-db-volume: diff --git a/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py new file mode 100644 index 0000000000..b753eb7587 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import DAG +from datetime import datetime, timedelta +from airflow.operators.python_operator import PythonOperator +from datetime import datetime +from croniter import croniter +from airflow.hooks.base_hook import BaseHook +import requests +import pytz + +timezone = "Asia/Shanghai" +start_offset_datetime_str = 1731072908243 +end_offset_datetime_str = 1731142800000 +schedule_interval = "*/1 * * * *" # Or put cron expression +dag_id = "inlong_offline_task_testGroup_cron" +groupId = "test_offline_1" +connectionId = "inlong_connection" +boundaryType = str("time") + +target_timezone = pytz.timezone(timezone) + +start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, tz=target_timezone) +end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, tz=target_timezone) + + +def taskFunction(**context): + print("#########################") + conn = BaseHook.get_connection(connectionId) + url = f"http://{conn.host}:{conn.port}/{conn.schema}" + params = { + "username": conn.login, + "password": conn.password + } + print("params", params) + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0", + "Accept": "application/json", + "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", + "Accept-Encoding": "gzip, deflate", + "Referer": "http://192.168.101.2:8083/", + "Content-Type": "application/json;charset=UTF-8", + "tenant": "public", + "Origin": "http://192.168.101.2", + "Connection": "close", + "Priority": "u=0" + } + time_interval = get_time_interval(context) + data = { + "boundaryType": boundaryType, + "groupId": groupId, + "lowerBoundary": time_interval[0], + "upperBoundary": time_interval[1] + } + print("Request Body: ", data) + response = requests.post(url, params=params, headers=headers, json=data) + if response.status_code == 200: + print(response.json()) + else: + print(response.text) + print("#########################") + + +def get_time_interval(context): + execution_date = context.get('execution_date') + execution_date = execution_date.astimezone(target_timezone) + dag = context.get('dag') + schedule_interval = dag.schedule_interval + if isinstance(schedule_interval, timedelta): + return execution_date.timestamp(), (execution_date + schedule_interval).timestamp() + else: + cron_expr = dag.schedule_interval + cron = croniter(cron_expr, execution_date) + next_run = cron.get_next(datetime) + return execution_date.timestamp(), next_run.timestamp() + + +default_args = { + 'owner': 'inlong', + 'start_date': start_date, + 'end_date': end_date, + 'catchup': False, +} + +dag = DAG( + dag_id, + default_args=default_args, + schedule_interval=schedule_interval, + is_paused_upon_creation=False +) + +clean_task = PythonOperator( + task_id=dag_id, + python_callable=taskFunction, + provide_context=True, + dag=dag, +) diff --git a/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py new file mode 100644 index 0000000000..5666f9f471 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import DAG +from datetime import datetime, timedelta +from airflow.operators.python_operator import PythonOperator +from datetime import datetime +from croniter import croniter +from airflow.hooks.base_hook import BaseHook +import requests +import pytz + +timezone = "Asia/Shanghai" +start_offset_datetime_str = 1731072908243 +end_offset_datetime_str = 1731142800000 +schedule_interval = "*/1 * * * *" # Or put cron expression +dag_id = "inlong_offline_task_testGroup_normal" +groupId = "test_offline_1" +connectionId = "inlong_connection" +boundaryType = str("time") + +target_timezone = pytz.timezone(timezone) + +start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, tz=target_timezone) +end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, tz=target_timezone) + + +def taskFunction(**context): + print("#########################") + conn = BaseHook.get_connection(connectionId) + url = f"http://{conn.host}:{conn.port}/{conn.schema}" + params = { + "username": conn.login, + "password": conn.password + } + print("params", params) + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0", + "Accept": "application/json", + "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", + "Accept-Encoding": "gzip, deflate", + "Content-Type": "application/json;charset=UTF-8", + "tenant": "public", + "Connection": "close", + "Priority": "u=0" + } + time_interval = get_time_interval(context) + data = { + "boundaryType": boundaryType, + "groupId": groupId, + "lowerBoundary": time_interval[0], + "upperBoundary": time_interval[1] + } + print("Request Body: ", data) + response = requests.post(url, params=params, headers=headers, json=data) + if response.status_code == 200: + print(response.json()) + else: + print(response.text) + print("#########################") + + +def get_time_interval(context): + execution_date = context.get('execution_date') + execution_date = execution_date.astimezone(target_timezone) + dag = context.get('dag') + schedule_interval = dag.schedule_interval + if isinstance(schedule_interval, timedelta): + return execution_date.timestamp(), (execution_date + schedule_interval).timestamp() + else: + cron_expr = dag.schedule_interval + cron = croniter(cron_expr, execution_date) + next_run = cron.get_next(datetime) + return execution_date.timestamp(), next_run.timestamp() + + +default_args = { + 'owner': 'inlong', + 'start_date': start_date, + 'end_date': end_date, + 'catchup': False, +} + +dag = DAG( + dag_id, + default_args=default_args, + schedule_interval=schedule_interval, + is_paused_upon_creation=False +) + +clean_task = PythonOperator( + task_id=dag_id, + python_callable=taskFunction, + provide_context=True, + dag=dag, +) diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index b0193111aa..f17c451aee 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -113,3 +113,13 @@ dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg # DolphinScheduler related config schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler schedule.engine.dolphinscheduler.token=default_token_value + +# Airflow configuration +schedule.engine.airflow.baseUrl= +schedule.engine.airflow.username= +schedule.engine.airflow.password= +schedule.engine.airflow.connection.id= +# Please confirm if it is a loopback address +schedule.engine.airflow.inlong.manager.host= +schedule.engine.airflow.cleaner.id= +schedule.engine.airflow.creator.id= \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index a549e206f0..129bce9ac6 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -104,3 +104,13 @@ dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg # DolphinScheduler related config schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler schedule.engine.dolphinscheduler.token=default_token_value + +# Airflow configuration +schedule.engine.airflow.baseUrl= +schedule.engine.airflow.username= +schedule.engine.airflow.password= +schedule.engine.airflow.connection.id= +# Please confirm if it is a loopback address +schedule.engine.airflow.inlong.manager.host= +schedule.engine.airflow.cleaner.id= +schedule.engine.airflow.creator.id= \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 6a71dc7a05..a062bc2c63 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -105,3 +105,13 @@ dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg # DolphinScheduler related config schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler schedule.engine.dolphinscheduler.token=default_token_value + +# Airflow configuration +schedule.engine.airflow.baseUrl= +schedule.engine.airflow.username= +schedule.engine.airflow.password= +schedule.engine.airflow.connection.id= +# Please confirm if it is a loopback address +schedule.engine.airflow.inlong.manager.host= +schedule.engine.airflow.cleaner.id= +schedule.engine.airflow.creator.id= \ No newline at end of file