Zkplo opened a new pull request, #11479: URL: https://github.com/apache/inlong/pull/11479
<!-- Prepare a Pull Request Change the title of pull request refer to the following example: [INLONG-XYZ][Component] Title of the pull request --> <!-- Specify the issue this pull request going to fix. The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)--> Fixes #11400 ### Motivation <!--Explain here the context, and why you're making that change. What is the problem you're trying to solve.--> ### Modifications #### Need to know about Airflow 1. By default, Airflow rejects all REST API requests. We need to configure it according to the requirements of the [[official documentation](https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/api-authentication.html)](https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/api-authentication.html). 2. Airflow Connections is used to store credentials for connecting to other systems to ensure the security of credentials. For specific reference: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html. 3. Airflow does not provide an API for DAG creation, so if we want to integrate with Inlong, it requires the original DAG. <details> <summary>dag_creator.py</summary> ````python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from airflow import DAG from datetime import datetime, timedelta from airflow.operators.python_operator import PythonOperator from airflow.models import Variable from airflow.utils.dates import days_ago from datetime import datetime import os import logging import pytz from croniter import croniter from airflow.hooks.base_hook import BaseHook from airflow import configuration DAG_PATH = configuration.get('core', 'dags_folder') + "/" def clean_expired_dags(**context): original_time = context.get('execution_date') target_timezone = pytz.timezone("Asia/Shanghai") utc_time = original_time.astimezone(target_timezone) current_time = utc_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" logging.info(f"Current time: {current_time}") for dag_file in os.listdir(DAG_PATH): if dag_file.endswith(".py") and dag_file.startswith("inlong_offline_task_"): with open(DAG_PATH + dag_file, "r") as file: line = file.readline() while line and "end_offset_datetime_str" not in line: line = file.readline() end_date_str = None if len(line.split("=")) > 1: end_date_str = line.split("=")[1].strip().strip("\"") logging.info(f"DAG end time: {end_date_str}") if end_date_str: try: if str(current_time) > str(end_date_str): dag_file_path = os.path.join(DAG_PATH, dag_file) os.remove(dag_file_path) # Optionally, delete the end_date variable logging.info(f"Deleted expired DAG: {dag_file}") except ValueError: logging.error(f"Invalid date format for DAG {dag_file}: {end_date_str}") default_args = { 'owner': 'airflow', 'start_date': datetime.now() - timedelta(minutes=5), 'catchup': False, 'tags': ["inlong"] } dag = DAG( 'dag_cleaner', default_args=default_args, schedule_interval="*/20 * * * *", is_paused_upon_creation=False ) clean_task = PythonOperator( task_id='clean_expired_dags', python_callable=clean_expired_dags, provide_context=True, dag=dag, ) ```` </details> <details> <summary>dag_cleaner.py</summary> ````python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from airflow.models import Variable import os from datetime import datetime from airflow.hooks.base_hook import BaseHook from airflow import configuration DAG_PATH = configuration.get('core', 'dags_folder') + "/" DAG_PREFIX = 'inlong_offline_task_' def create_dag_file(**context): conf = context.get('dag_run').conf print('conf: ', conf) groupId = conf.get('inlong_group_id') task_name = DAG_PREFIX + groupId timezone = conf.get('timezone') boundaryType = str(conf.get('boundary_type')) start_time = int(conf.get('start_time')) end_time = int(conf.get('end_time')) cron_expr = conf.get('cron_expr') seconds_interval = conf.get('seconds_interval') schedule_interval = cron_expr if cron_expr is None or len(cron_expr) == 0: schedule_interval = f'timedelta(seconds={seconds_interval})' else: schedule_interval = '"' + cron_expr + '"' connectionId = conf.get('connection_id') dag_content = f'''from airflow import DAG from datetime import datetime, timedelta from airflow.operators.python_operator import PythonOperator from datetime import datetime from croniter import croniter from airflow.hooks.base_hook import BaseHook import requests import pytz timezone = "{timezone}" start_offset_datetime_str = {start_time} end_offset_datetime_str = {end_time} schedule_interval = {schedule_interval} # Or put cron expression dag_id = "{task_name}" groupId = "{groupId}" connectionId = "{connectionId}" boundaryType = "{boundaryType}" target_timezone = pytz.timezone(timezone) start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, tz=target_timezone) end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, tz=target_timezone) def taskFunction(**context): print("#########################") conn = BaseHook.get_connection(connectionId) url = f"http://{{conn.host}}:{{conn.port}}/{{conn.schema}}" params = {{ "username": conn.login, "password": conn.password }} print("params", params) headers = {{ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0", "Accept": "application/json", "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", "Accept-Encoding": "gzip, deflate", "Referer": "http://192.168.101.2:8083/", "Content-Type": "application/json;charset=UTF-8", "tenant": "public", "Origin": "http://192.168.101.2", "Connection": "close", "Priority": "u=0" }} time_interval = get_time_interval(context) data = {{ "boundaryType": boundaryType, "groupId": groupId, "lowerBoundary": str(int(time_interval[0])), "upperBoundary": str(int(int(time_interval[1]))) }} print("Request Body: ", data) response = requests.post(url, params=params, headers=headers, json=data) if response.status_code == 200: print(response.json()) else: print(response.text) print("#########################") def get_time_interval(context): execution_date = context.get('execution_date') execution_date = execution_date.astimezone(target_timezone) dag = context.get('dag') schedule_interval = dag.schedule_interval if isinstance(schedule_interval, timedelta): return execution_date.timestamp(), (execution_date + schedule_interval).timestamp() else: cron_expr = dag.schedule_interval cron = croniter(cron_expr, execution_date) next_run = cron.get_next(datetime) return execution_date.timestamp(), next_run.timestamp() default_args = {{ 'owner': 'inlong', 'start_date': start_date, 'end_date': end_date, 'catchup': False, }} dag = DAG( dag_id, default_args=default_args, schedule_interval=schedule_interval, is_paused_upon_creation=False ) clean_task = PythonOperator( task_id=dag_id, python_callable=taskFunction, provide_context=True, dag=dag, ) ''' dag_file_path = os.path.join(DAG_PATH, f'{task_name}.py') with open(dag_file_path, 'w') as f: f.write(dag_content) print(f'Generated DAG file: {dag_file_path}') default_args = {'owner': 'airflow', 'start_date': days_ago(1), 'catchup': False} dag = DAG('dag_creator', default_args=default_args, schedule_interval=None, is_paused_upon_creation=False) create_dag_task = PythonOperator(task_id='create_dag_file', python_callable=create_dag_file, provide_context=True, dag=dag) ```` </details> #### System design: 1. In order to facilitate the maintenance and expansion of AIRFLOW interface support in the future, the `AirflowApi` interface and the `BaseAirflowApi` abstract class are designed, and subsequent expansion only needs to be done on this basis. 2. Implement a unified request class `AirflowServerClient` for the interface. 3. Add two Interceptors to OkHttpClient. `AirflowAuthInterceptor` is used for unified authorization of the interface, and `LoggingInterceptor` is used for logging. #### Results: When we issue a scheduled task, Airflow's `dag_creator `will receive information from Inlong manager and create an offline task DAG based on the information.As shown in the figure below.  <details> <summary>Inlong Manager Log</summary> ``` [ ] 2024-11-08 12:38:22.667 - INFO [inlong-workflow-0] .a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 100 to 101 for groupId=test_offline_1 [ ] 2024-11-08 12:38:22.672 - INFO [inlong-workflow-0] .a.i.m.s.ScheduleClientFactory:51 - Get schedule engine client success for Airflow [ ] 2024-11-08 12:38:22.672 - INFO [inlong-workflow-0] .i.m.s.a.AirflowScheduleEngine:138 - Registering DAG for test_offline_1 [ ] 2024-11-08 12:38:23.120 - INFO [inlong-workflow-0] a.i.m.s.a.i.LoggingInterceptor:38 - Airflow API request information - Address: http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, URI: http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, Request method: POST, Response status code: 200 [ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0] .a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 101 to 102 for groupId=test_offline_1 [ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0] a.i.m.s.s.ScheduleOperatorImpl:150 - Register schedule info success for group test_offline_1 [ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0] .GroupScheduleResourceListener:82 - success to process schedule resource for group=test_offline_1 [ ] 2024-11-08 12:38:23.163 - INFO [inlong-workflow-0] .l.g.InitGroupCompleteListener:78 - begin to execute InitGroupCompleteListener for groupId=test_offline_1 [ ] 2024-11-08 12:38:23.164 - INFO [inlong-workflow-0] i.m.s.g.InlongGroupServiceImpl:540 - begin to update group status to [130] for groupId=test_offline_1 by user=admin [ ] 2024-11-08 12:38:23.168 - INFO [inlong-workflow-0] i.m.s.g.InlongGroupServiceImpl:558 - success to update group status to [130] for groupId=test_offline_1 by user=admin [ ] 2024-11-08 12:38:23.188 - WARN [inlong-workflow-0] i.m.s.g.InlongGroupServiceImpl:249 - start time is less than current time, re-set to current time for groupId=test_offline_1, startTime=2024-11-08T12:34:47.000+0000, newStartTime=2024-11-08T12:38:23.188+0000 [ ] 2024-11-08 12:38:23.197 - INFO [inlong-workflow-0] .a.i.m.s.s.ScheduleServiceImpl:111 - success to update schedule info for groupId=test_offline_1 [ ] 2024-11-08 12:38:23.202 - INFO [inlong-workflow-0] .a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 102 to 103 for groupId=test_offline_1 [ ] 2024-11-08 12:38:23.203 - INFO [inlong-workflow-0] .i.m.s.a.AirflowScheduleEngine:203 - Updating DAG for test_offline_1 [ ] 2024-11-08 12:38:23.463 - INFO [inlong-workflow-0] a.i.m.s.a.i.LoggingInterceptor:38 - Airflow API request information - Address: http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, URI: http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, Request method: POST, Response status code: 200 ``` </details> The task has been executed for a certain period of time, and you can see that the current interval is the time of the last execution and the interval of the next execution meets expectations.  Now, I will modify the execution interval of this task through the inlong dashboard.  You can see that the modification is successful, but the Last Run field of the Web UI will not be reflected immediately, but it can be seen in Run After.  Next, change the scheduling period of this offline task to a Cron expression.  From the figure below we can see that the modification has been successful.  The execution results of each scheduled task are as follows:  There are two ways to delete files. One is that `dag_cleaner ` will regularly scan the files that meet the rules in the directory to determine whether their end time exceeds the current time. The second is that Inlong manager triggers `dag_cleaner ` through an interface with parameters, and `dag_cleaner` will directly delete the Dag file. For the latter, Inlong manager will also delete the DAG loaded into the memory instance through another interface.  <!--Describe the modifications you've done.--> ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org