Zkplo opened a new pull request, #11479:
URL: https://github.com/apache/inlong/pull/11479

   <!-- Prepare a Pull Request
   Change the title of pull request refer to the following example:
     [INLONG-XYZ][Component] Title of the pull request 
   -->
   
   <!-- Specify the issue this pull request going to fix.
   The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)-->
   
   Fixes #11400 
   ### Motivation
   
   <!--Explain here the context, and why you're making that change. What is the 
problem you're trying to solve.-->
   
   ### Modifications
   #### Need to know about Airflow
   
   1. By default, Airflow rejects all REST API requests. We need to configure 
it according to the requirements of the [[official 
documentation](https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/api-authentication.html)](https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/api-authentication.html).
   2. Airflow Connections is used to store credentials for connecting to other 
systems to ensure the security of credentials. For specific reference: 
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html.
   3. Airflow does not provide an API for DAG creation, so if we want to 
integrate with Inlong, it requires the original DAG.
   
   <details>
   <summary>dag_creator.py</summary>
   
   ````python
   # Licensed to the Apache Software Foundation (ASF) under one
   # or more contributor license agreements.  See the NOTICE file
   # distributed with this work for additional information
   # regarding copyright ownership.  The ASF licenses this file
   # to you under the Apache License, Version 2.0 (the
   # "License"); you may not use this file except in compliance
   # with the License.  You may obtain a copy of the License at
   #
   #   http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing,
   # software distributed under the License is distributed on an
   # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   # KIND, either express or implied.  See the License for the
   # specific language governing permissions and limitations
   # under the License.
   
   from airflow import DAG
   from datetime import datetime, timedelta
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os
   import logging
   import pytz
   from croniter import croniter
   from airflow.hooks.base_hook import BaseHook
   from airflow import configuration
   
   DAG_PATH = configuration.get('core', 'dags_folder') + "/"
   
   
   def clean_expired_dags(**context):
       original_time = context.get('execution_date')
       target_timezone = pytz.timezone("Asia/Shanghai")
       utc_time = original_time.astimezone(target_timezone)
       current_time = utc_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
       logging.info(f"Current time: {current_time}")
       for dag_file in os.listdir(DAG_PATH):
           if dag_file.endswith(".py") and 
dag_file.startswith("inlong_offline_task_"):
               with open(DAG_PATH + dag_file, "r") as file:
                   line = file.readline()
                   while line and "end_offset_datetime_str" not in line:
                       line = file.readline()
                   end_date_str = None
                   if len(line.split("=")) > 1:
                       end_date_str = line.split("=")[1].strip().strip("\"")
                   logging.info(f"DAG end time: {end_date_str}")
                   if end_date_str:
                       try:
                           if str(current_time) > str(end_date_str):
                               dag_file_path = os.path.join(DAG_PATH, dag_file)
                               os.remove(dag_file_path)
                               # Optionally, delete the end_date variable
                               logging.info(f"Deleted expired DAG: {dag_file}")
                       except ValueError:
                           logging.error(f"Invalid date format for DAG 
{dag_file}: {end_date_str}")
   
   
   default_args = {
       'owner': 'airflow',
       'start_date': datetime.now() - timedelta(minutes=5),
       'catchup': False,
       'tags': ["inlong"]
   }
   
   dag = DAG(
       'dag_cleaner',
       default_args=default_args,
       schedule_interval="*/20 * * * *",
       is_paused_upon_creation=False
   )
   
   clean_task = PythonOperator(
       task_id='clean_expired_dags',
       python_callable=clean_expired_dags,
       provide_context=True,
       dag=dag,
   )
   
   ````
   </details>
   <details>
   <summary>dag_cleaner.py</summary>
   
   ````python
   # Licensed to the Apache Software Foundation (ASF) under one
   # or more contributor license agreements.  See the NOTICE file
   # distributed with this work for additional information
   # regarding copyright ownership.  The ASF licenses this file
   # to you under the Apache License, Version 2.0 (the
   # "License"); you may not use this file except in compliance
   # with the License.  You may obtain a copy of the License at
   #
   #   http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing,
   # software distributed under the License is distributed on an
   # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   # KIND, either express or implied.  See the License for the
   # specific language governing permissions and limitations
   # under the License.
   
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.models import Variable
   import os
   from datetime import datetime
   from airflow.hooks.base_hook import BaseHook
   from airflow import configuration
   
   DAG_PATH = configuration.get('core', 'dags_folder') + "/"
   DAG_PREFIX = 'inlong_offline_task_'
   
   def create_dag_file(**context):
       conf = context.get('dag_run').conf
       print('conf: ', conf)
       groupId = conf.get('inlong_group_id')
       task_name = DAG_PREFIX + groupId
       timezone = conf.get('timezone')
       boundaryType = str(conf.get('boundary_type'))
       start_time = int(conf.get('start_time'))
       end_time = int(conf.get('end_time'))
       cron_expr = conf.get('cron_expr')
       seconds_interval = conf.get('seconds_interval')
       schedule_interval = cron_expr
       if cron_expr is None or len(cron_expr) == 0:
           schedule_interval = f'timedelta(seconds={seconds_interval})'
       else:
           schedule_interval = '"' + cron_expr + '"'
       connectionId = conf.get('connection_id')
       dag_content = f'''from airflow import DAG
   from datetime import datetime, timedelta
   from airflow.operators.python_operator import PythonOperator
   from datetime import datetime
   from croniter import croniter
   from airflow.hooks.base_hook import BaseHook
   import requests
   import pytz
   
   timezone = "{timezone}"
   start_offset_datetime_str = {start_time}
   end_offset_datetime_str = {end_time}
   schedule_interval = {schedule_interval}  # Or put cron expression
   dag_id = "{task_name}"
   groupId = "{groupId}"
   connectionId = "{connectionId}"
   boundaryType = "{boundaryType}"
   
   target_timezone = pytz.timezone(timezone)  
   
   start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, 
tz=target_timezone)
   end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, 
tz=target_timezone)
   
   def taskFunction(**context):
       print("#########################")
       conn = BaseHook.get_connection(connectionId)
       url = f"http://{{conn.host}}:{{conn.port}}/{{conn.schema}}";
       params = {{
           "username": conn.login,
           "password": conn.password
       }}
       print("params", params)
       headers = {{
           "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) 
Gecko/20100101 Firefox/131.0",
           "Accept": "application/json",
           "Accept-Language": 
"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
           "Accept-Encoding": "gzip, deflate",
           "Referer": "http://192.168.101.2:8083/";,
           "Content-Type": "application/json;charset=UTF-8",
           "tenant": "public",
           "Origin": "http://192.168.101.2";,
           "Connection": "close",
           "Priority": "u=0"
       }}
       time_interval = get_time_interval(context)
       data = {{
           "boundaryType": boundaryType,
           "groupId": groupId,
           "lowerBoundary": str(int(time_interval[0])),
           "upperBoundary": str(int(int(time_interval[1])))
       }}
       print("Request Body: ", data)
       response = requests.post(url, params=params, headers=headers, json=data)
       if response.status_code == 200:
           print(response.json()) 
       else:
           print(response.text)
       print("#########################")
   
   
   def get_time_interval(context):
       execution_date = context.get('execution_date')
       execution_date = execution_date.astimezone(target_timezone)
       dag = context.get('dag')
       schedule_interval = dag.schedule_interval
       if isinstance(schedule_interval, timedelta):
           return execution_date.timestamp(), (execution_date + 
schedule_interval).timestamp()
       else:
           cron_expr = dag.schedule_interval
           cron = croniter(cron_expr, execution_date)
           next_run = cron.get_next(datetime)
           return execution_date.timestamp(), next_run.timestamp()
   
   
   default_args = {{
       'owner': 'inlong',
       'start_date': start_date,
       'end_date': end_date,
       'catchup': False,
   }}
   
   dag = DAG(
       dag_id,
       default_args=default_args,
       schedule_interval=schedule_interval,
       is_paused_upon_creation=False
   )
   
   clean_task = PythonOperator(
       task_id=dag_id,
       python_callable=taskFunction,
       provide_context=True,
       dag=dag,
   )
       '''
       dag_file_path = os.path.join(DAG_PATH, f'{task_name}.py')
       with open(dag_file_path, 'w') as f:
           f.write(dag_content)
       print(f'Generated DAG file: {dag_file_path}')
   default_args = {'owner': 'airflow', 'start_date': days_ago(1), 'catchup': 
False}
   dag = DAG('dag_creator', default_args=default_args, schedule_interval=None, 
is_paused_upon_creation=False)
   create_dag_task = PythonOperator(task_id='create_dag_file', 
python_callable=create_dag_file, provide_context=True, dag=dag)
   ````
   </details>
   
   #### System design:
   
   1. In order to facilitate the maintenance and expansion of AIRFLOW interface 
support in the future, the `AirflowApi` interface and the `BaseAirflowApi` 
abstract class are designed, and subsequent expansion only needs to be done on 
this basis.
   2. Implement a unified request class `AirflowServerClient` for the interface.
   3. Add two Interceptors to OkHttpClient. `AirflowAuthInterceptor` is used 
for unified authorization of the interface, and `LoggingInterceptor` is used 
for logging.
   
   #### Results:
   
   When we issue a scheduled task, Airflow's `dag_creator `will receive 
information from Inlong manager and create an offline task DAG based on the 
information.As shown in the figure below.
   
![image-20241110105752348](https://github.com/user-attachments/assets/514c8606-4766-4fb3-985c-672b48a9c50b)
   <details>
   
   <summary>Inlong Manager Log</summary>
   
   ```
   [ ] 2024-11-08 12:38:22.667 - INFO [inlong-workflow-0] 
.a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 100 
to 101 for groupId=test_offline_1 
   [ ] 2024-11-08 12:38:22.672 - INFO [inlong-workflow-0] 
.a.i.m.s.ScheduleClientFactory:51 - Get schedule engine client success for 
Airflow 
   [ ] 2024-11-08 12:38:22.672 - INFO [inlong-workflow-0] 
.i.m.s.a.AirflowScheduleEngine:138 - Registering DAG for test_offline_1 
   [ ] 2024-11-08 12:38:23.120 - INFO [inlong-workflow-0] 
a.i.m.s.a.i.LoggingInterceptor:38 - Airflow API request information - Address: 
http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, URI: 
http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, Request method: 
POST, Response status code: 200 
   [ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0] 
.a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 101 
to 102 for groupId=test_offline_1 
   [ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0] 
a.i.m.s.s.ScheduleOperatorImpl:150 - Register schedule info success for group 
test_offline_1 
   [ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0] 
.GroupScheduleResourceListener:82 - success to process schedule resource for 
group=test_offline_1 
   [ ] 2024-11-08 12:38:23.163 - INFO [inlong-workflow-0] 
.l.g.InitGroupCompleteListener:78 - begin to execute InitGroupCompleteListener 
for groupId=test_offline_1 
   [ ] 2024-11-08 12:38:23.164 - INFO [inlong-workflow-0] 
i.m.s.g.InlongGroupServiceImpl:540 - begin to update group status to [130] for 
groupId=test_offline_1 by user=admin 
   [ ] 2024-11-08 12:38:23.168 - INFO [inlong-workflow-0] 
i.m.s.g.InlongGroupServiceImpl:558 - success to update group status to [130] 
for groupId=test_offline_1 by user=admin 
   [ ] 2024-11-08 12:38:23.188 - WARN [inlong-workflow-0] 
i.m.s.g.InlongGroupServiceImpl:249 - start time is less than current time, 
re-set to current time for groupId=test_offline_1, 
startTime=2024-11-08T12:34:47.000+0000, 
newStartTime=2024-11-08T12:38:23.188+0000 
   [ ] 2024-11-08 12:38:23.197 - INFO [inlong-workflow-0] 
.a.i.m.s.s.ScheduleServiceImpl:111 - success to update schedule info for 
groupId=test_offline_1 
   [ ] 2024-11-08 12:38:23.202 - INFO [inlong-workflow-0] 
.a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 102 
to 103 for groupId=test_offline_1 
   [ ] 2024-11-08 12:38:23.203 - INFO [inlong-workflow-0] 
.i.m.s.a.AirflowScheduleEngine:203 - Updating DAG for test_offline_1 
   [ ] 2024-11-08 12:38:23.463 - INFO [inlong-workflow-0] 
a.i.m.s.a.i.LoggingInterceptor:38 - Airflow API request information - Address: 
http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, URI: 
http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, Request method: 
POST, Response status code: 200 
   ```
   </details>
   The task has been executed for a certain period of time, and you can see 
that the current interval is the time of the last execution and the interval of 
the next execution meets expectations.
   
   
![image-20241108205147944](https://github.com/user-attachments/assets/7a32080b-efbb-43d7-b797-b2fcdcbf0ed9)
   Now, I will modify the execution interval of this task through the inlong 
dashboard.
   
   
![image-20241108205809834](https://github.com/user-attachments/assets/88e24742-acd6-489a-ac5f-74f648d85c62)
   You can see that the modification is successful, but the Last Run field of 
the Web UI will not be reflected immediately, but it can be seen in Run After.
   
   
![image-20241108210117653](https://github.com/user-attachments/assets/18442c09-8be6-4d47-a4de-52a9534eac7e)
   Next, change the scheduling period of this offline task to a Cron expression.
   
   
![image-20241108212101216](https://github.com/user-attachments/assets/a2f6d9c2-fc5f-46f1-aa21-b19c8cb7fd8a)
   From the figure below we can see that the modification has been successful.
   
   
![image-20241108212237390](https://github.com/user-attachments/assets/7c440b8f-63fe-4d30-a611-b9a746555c1f)
   The execution results of each scheduled task are as follows:
   
   
![image-20241110111215023](https://github.com/user-attachments/assets/e525905e-721a-46f1-9f56-ad7a54284717)
   
   There are two ways to delete files. One is that `dag_cleaner ` will 
regularly scan the files that meet the rules in the directory to determine 
whether their end time exceeds the current time. The second is that Inlong 
manager triggers `dag_cleaner ` through an interface with parameters, and 
`dag_cleaner` will directly delete the Dag file. For the latter, Inlong manager 
will also delete the DAG loaded into the memory instance through another 
interface.
   
   
![image-20241108222558534](https://github.com/user-attachments/assets/aea0ab45-a167-4da4-88e8-311bf2db8667)
   
   
   <!--Describe the modifications you've done.-->
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
     - *Extended integration test for recovery after broker failure*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to