This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new cb35fefc77b [v3-1-test] fix: gracefully handle 404 from worker log 
server for historical retry attempts (#62475) (#63000)
cb35fefc77b is described below

commit cb35fefc77bdf2e0ca6702c2a89cefa5fe29db5d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 6 18:51:07 2026 +0200

    [v3-1-test] fix: gracefully handle 404 from worker log server for 
historical retry attempts (#62475) (#63000)
    
    * fix: gracefully handle 404 from worker log server for historical retry 
attempts
    (cherry picked from commit 25e9284a1ded0cf3bdd819591894c50b743d479a)
    
    Co-authored-by: Pradeep Kalluri 
<[email protected]>
---
 .../src/airflow/utils/log/file_task_handler.py     | 18 +++++
 .../tests/unit/utils/log/test_file_task_handler.py | 92 ++++++++++++++++++++++
 2 files changed, 110 insertions(+)

diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py 
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index 432ecffd460..adc393c1689 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -895,9 +895,27 @@ class FileTaskHandler(logging.Handler):
                     "See more at 
https://airflow.apache.org/docs/apache-airflow/";
                     "stable/configurations-ref.html#secret-key"
                 )
+            elif response.status_code == 404:
+                # Log file not found on the worker's log server.
+                # This typically happens when a task retried on a different 
worker
+                # and the original worker's logs are no longer accessible.
+                # Fall back to local filesystem read if available.
+                worker_log_full_path = Path(self.local_base, 
worker_log_rel_path)
+                fallback_sources, fallback_streams = 
self._read_from_local(worker_log_full_path)
+                if fallback_sources:
+                    sources.extend(fallback_sources)
+                    log_streams.extend(fallback_streams)
+                else:
+                    sources.append(
+                        f"Log file not found on worker '{ti.hostname}'. "
+                        f"This attempt may have run on a different worker 
whose logs "
+                        f"are no longer accessible. "
+                        f"Consider configuring remote logging (S3, GCS, etc.) 
for log persistence."
+                    )
             else:
                 # Check if the resource was properly fetched
                 response.raise_for_status()
+
                 if int(response.headers.get("Content-Length", 0)) > 0:
                     sources.append(url)
                     log_streams.append(
diff --git a/airflow-core/tests/unit/utils/log/test_file_task_handler.py 
b/airflow-core/tests/unit/utils/log/test_file_task_handler.py
new file mode 100644
index 00000000000..cdda0e66de8
--- /dev/null
+++ b/airflow-core/tests/unit/utils/log/test_file_task_handler.py
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+
+class TestFileTaskHandlerLogServer:
+    """Tests for _read_from_logs_server 404 handling."""
+
+    def setup_method(self):
+        self.handler = FileTaskHandler(base_log_folder="/tmp/test_logs")
+
+        self.ti = MagicMock()
+        self.ti.hostname = "worker-1"
+        self.ti.triggerer_job = None
+        self.ti.task = None
+
+    @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+    @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+    @patch.object(FileTaskHandler, "_read_from_local")
+    def test_404_falls_back_to_local_when_available(self, mock_read_local, 
mock_get_url, mock_fetch):
+        """When log server returns 404 and local logs exist, use local logs."""
+        mock_get_url.return_value = ("http://worker-1/log";, 
"dag/run/task/1.log")
+
+        mock_response = MagicMock()
+        mock_response.status_code = 404
+        mock_fetch.return_value = mock_response
+
+        mock_stream = MagicMock()
+        mock_read_local.return_value = (["/tmp/test_logs/dag/run/task/1.log"], 
[mock_stream])
+
+        sources, streams = self.handler._read_from_logs_server(self.ti, 
"dag/run/task/1.log")
+
+        assert sources == ["/tmp/test_logs/dag/run/task/1.log"]
+        assert streams == [mock_stream]
+        mock_read_local.assert_called_once_with(Path("/tmp/test_logs", 
"dag/run/task/1.log"))
+
+    @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+    @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+    @patch.object(FileTaskHandler, "_read_from_local")
+    def test_404_shows_clear_message_when_no_local_fallback(self, 
mock_read_local, mock_get_url, mock_fetch):
+        """When log server returns 404 and no local logs exist, show helpful 
message."""
+        mock_get_url.return_value = ("http://worker-1/log";, 
"dag/run/task/1.log")
+
+        mock_response = MagicMock()
+        mock_response.status_code = 404
+        mock_fetch.return_value = mock_response
+
+        mock_read_local.return_value = ([], [])
+
+        sources, streams = self.handler._read_from_logs_server(self.ti, 
"dag/run/task/1.log")
+
+        assert len(sources) == 1
+        assert "worker-1" in sources[0]
+        assert "no longer accessible" in sources[0]
+        assert "remote logging" in sources[0]
+        assert streams == []
+
+    @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+    @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+    def test_403_shows_secret_key_message(self, mock_get_url, mock_fetch):
+        """When log server returns 403, show secret key configuration 
message."""
+        mock_get_url.return_value = ("http://worker-1/log";, 
"dag/run/task/1.log")
+
+        mock_response = MagicMock()
+        mock_response.status_code = 403
+        mock_fetch.return_value = mock_response
+
+        sources, streams = self.handler._read_from_logs_server(self.ti, 
"dag/run/task/1.log")
+
+        assert len(sources) == 1
+        assert "secret_key" in sources[0]
+        assert streams == []

Reply via email to