AnudeepKonaboina opened a new issue #21579:
URL: https://github.com/apache/airflow/issues/21579


   ### Apache Airflow version
   
   2.1.1
   
   ### What happened
   
   Hi ,
   There is a python operator which gets the list of files every 30 secs from 
an SFTP server and this DAG must be run indefinitely until someone manually 
pauses or deletes it. But  the task is exiting with a SIGABRT error after the 
dag has run for 9 hrs. Below is the error .This has happened a couple of times .
   
   ```
   [2022-02-14 18:39:30,350] {local_task_job.py:151} INFO - Task exited with 
return code Negsignal.SIGABRT
   ```
   
   Also tried the same using a FileSensor operator but the result is the same. 
Could you please provide a workaround or a solution for the same so that the 
DAG runs forever without any manual intervention..
   
   ### What you expected to happen
   
   Expectation:
   
   The DAG should not abruptly stop in between after running for a specific 
time interval. Instead it has to run until any user manually pauses the DAG.
   
   
   
   ### How to reproduce
   
   ## Below were the DAG's created 
   
   ### Case 1: Using  an SFTPSensor 
   
   ```
   with DAG(
           dag_id='intg_gb9_pbs_trigger_minirxi',
           max_active_runs=1,
           default_args=default_args,
           catchup=False,
           schedule_interval=None
   ) as dag:
   
    file_sensing_task = SFTPSensor(
           task_id='sense_event_file',
           path=source_path + "/" + file_to_poke,
           sftp_conn_id='moveit_conn',
           poke_interval=30
       )
   
   file_sensing_task 
   ```
   Error after 9 hrs below is the error:
   
   
![image](https://user-images.githubusercontent.com/59328701/154031996-1529a9b0-3ba7-48fa-9955-805b6d99cf6b.png)
   
   
   ### Case-2: Using a Python operator
   
   ```
   # moveit connection details
   connection = BaseHook.get_connection("sftp_conn")
   
   
   def get_file_name_to_be_processed(src_path, file_pattern, poke_interval, 
**context):
       matched = False
       try:
           cnopts = pysftp.CnOpts()
           cnopts.hostkeys = None
           sftpconn = pysftp.Connection(host=connection.host
                                        , username=connection.login
                                        , password=connection.password
                                        , cnopts=cnopts)
   
           try:
               logging.info(f"Poking for file with pattern: {file_pattern} in 
path {src_path}")
               for filename in sftpconn.listdir(src_path):
                   print(filename)
                   if fnmatch.fnmatch(filename, file_pattern):
                       print("matched")
                       matched = True
                       context['ti'].xcom_push(key='file_name', value=filename)
                       break
               if not matched:
                   sftpconn.close()
                   time.sleep(poke_interval)
                   get_file_name_to_be_processed(src_path, file_pattern, 
poke_interval, fail_timeout, **context)
           except FileNotFoundError:
                psass
       except paramiko.SSHException as e:
           raise e
   
   
   dag = DAG(
       dag_id='sftp_file_sensor',
       max_active_runs=1,
       default_args=default_args,
       catchup=False,
       schedule_interval=None
   )
   
   sense_file_and_get_name = PythonOperator(
       task_id='sense_file',
       python_callable=get_file_name_to_be_processed,
       op_kwargs={
           'src_path': "",
           'file_pattern': "",
           'poke_interval': 30
   
       },
       provide_context=True,
       retries=0,
       dag=dag
   )
   
   sense_file_and_get_name 
   ```
   After 9 hrs of run below is the error:
   
   
![image](https://user-images.githubusercontent.com/59328701/154029181-99919ae6-d95f-47da-a7ba-468f1af8cd10.png)
   
   
   
   ### Operating System
   
   CentOS
   
   ### Versions of Apache Airflow Providers
   
   Version:
   
   apache-airflow-providers:2.1.1
   apache-airflow-providers-sftp:2.1.1
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   This problem has been occurring since past 2 weeks and we are forced to 
restart the DAG manually every time it fails .This is impacting the performance 
of our jobs.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to