This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f6d9b6  Handle case of nonexistent file when preparing file path 
queue (#18998)
3f6d9b6 is described below

commit 3f6d9b6e3421ca36c2320e4ee1c63c71ca0cb85e
Author: Sam Wheating <[email protected]>
AuthorDate: Thu Oct 14 19:10:32 2021 -0700

    Handle case of nonexistent file when preparing file path queue (#18998)
---
 airflow/dag_processing/manager.py    |  6 +++++-
 tests/dag_processing/test_manager.py | 28 ++++++++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 64f8609..19a97ff 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -975,7 +975,11 @@ class DagFileProcessorManager(LoggingMixin):
         for file_path in self._file_paths:
 
             if is_mtime_mode:
-                files_with_mtime[file_path] = os.path.getmtime(file_path)
+                try:
+                    files_with_mtime[file_path] = os.path.getmtime(file_path)
+                except FileNotFoundError:
+                    self.log.warning("Skipping processing of missing file: 
%s", file_path)
+                    continue
                 file_modified_time = 
timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path]))
             else:
                 file_paths.append(file_path)
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index 1dbd1c9..43038092 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -372,6 +372,34 @@ class TestDagFileProcessorManager:
     @mock.patch("airflow.utils.file.find_path_from_directory", 
return_value=True)
     @mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
     @mock.patch("airflow.utils.file.os.path.getmtime")
+    def test_file_paths_in_queue_excludes_missing_file(
+        self, mock_getmtime, mock_isfile, mock_find_path, 
mock_might_contain_dag, mock_zipfile
+    ):
+        """Check that a file is not enqueued for processing if it has been 
deleted"""
+        dag_files = ["file_3.py", "file_2.py", "file_4.py"]
+        mock_getmtime.side_effect = [1.0, 2.0, FileNotFoundError()]
+        mock_find_path.return_value = dag_files
+
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=1,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        manager.set_file_paths(dag_files)
+        manager.prepare_file_path_queue()
+        assert manager._file_path_queue == ['file_2.py', 'file_3.py']
+
+    @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
+    @mock.patch("zipfile.is_zipfile", return_value=True)
+    @mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
+    @mock.patch("airflow.utils.file.find_path_from_directory", 
return_value=True)
+    @mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
+    @mock.patch("airflow.utils.file.os.path.getmtime")
     def test_recently_modified_file_is_parsed_with_mtime_mode(
         self, mock_getmtime, mock_isfile, mock_find_path, 
mock_might_contain_dag, mock_zipfile
     ):

Reply via email to