This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new cb35fefc77b [v3-1-test] fix: gracefully handle 404 from worker log
server for historical retry attempts (#62475) (#63000)
cb35fefc77b is described below
commit cb35fefc77bdf2e0ca6702c2a89cefa5fe29db5d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 6 18:51:07 2026 +0200
[v3-1-test] fix: gracefully handle 404 from worker log server for
historical retry attempts (#62475) (#63000)
* fix: gracefully handle 404 from worker log server for historical retry
attempts
(cherry picked from commit 25e9284a1ded0cf3bdd819591894c50b743d479a)
Co-authored-by: Pradeep Kalluri
<[email protected]>
---
.../src/airflow/utils/log/file_task_handler.py | 18 +++++
.../tests/unit/utils/log/test_file_task_handler.py | 92 ++++++++++++++++++++++
2 files changed, 110 insertions(+)
diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py
b/airflow-core/src/airflow/utils/log/file_task_handler.py
index 432ecffd460..adc393c1689 100644
--- a/airflow-core/src/airflow/utils/log/file_task_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_task_handler.py
@@ -895,9 +895,27 @@ class FileTaskHandler(logging.Handler):
"See more at
https://airflow.apache.org/docs/apache-airflow/"
"stable/configurations-ref.html#secret-key"
)
+ elif response.status_code == 404:
+ # Log file not found on the worker's log server.
+ # This typically happens when a task retried on a different
worker
+ # and the original worker's logs are no longer accessible.
+ # Fall back to local filesystem read if available.
+ worker_log_full_path = Path(self.local_base,
worker_log_rel_path)
+ fallback_sources, fallback_streams =
self._read_from_local(worker_log_full_path)
+ if fallback_sources:
+ sources.extend(fallback_sources)
+ log_streams.extend(fallback_streams)
+ else:
+ sources.append(
+ f"Log file not found on worker '{ti.hostname}'. "
+ f"This attempt may have run on a different worker
whose logs "
+ f"are no longer accessible. "
+ f"Consider configuring remote logging (S3, GCS, etc.)
for log persistence."
+ )
else:
# Check if the resource was properly fetched
response.raise_for_status()
+
if int(response.headers.get("Content-Length", 0)) > 0:
sources.append(url)
log_streams.append(
diff --git a/airflow-core/tests/unit/utils/log/test_file_task_handler.py
b/airflow-core/tests/unit/utils/log/test_file_task_handler.py
new file mode 100644
index 00000000000..cdda0e66de8
--- /dev/null
+++ b/airflow-core/tests/unit/utils/log/test_file_task_handler.py
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+
+class TestFileTaskHandlerLogServer:
+ """Tests for _read_from_logs_server 404 handling."""
+
+ def setup_method(self):
+ self.handler = FileTaskHandler(base_log_folder="/tmp/test_logs")
+
+ self.ti = MagicMock()
+ self.ti.hostname = "worker-1"
+ self.ti.triggerer_job = None
+ self.ti.task = None
+
+ @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+ @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+ @patch.object(FileTaskHandler, "_read_from_local")
+ def test_404_falls_back_to_local_when_available(self, mock_read_local,
mock_get_url, mock_fetch):
+ """When log server returns 404 and local logs exist, use local logs."""
+ mock_get_url.return_value = ("http://worker-1/log",
"dag/run/task/1.log")
+
+ mock_response = MagicMock()
+ mock_response.status_code = 404
+ mock_fetch.return_value = mock_response
+
+ mock_stream = MagicMock()
+ mock_read_local.return_value = (["/tmp/test_logs/dag/run/task/1.log"],
[mock_stream])
+
+ sources, streams = self.handler._read_from_logs_server(self.ti,
"dag/run/task/1.log")
+
+ assert sources == ["/tmp/test_logs/dag/run/task/1.log"]
+ assert streams == [mock_stream]
+ mock_read_local.assert_called_once_with(Path("/tmp/test_logs",
"dag/run/task/1.log"))
+
+ @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+ @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+ @patch.object(FileTaskHandler, "_read_from_local")
+ def test_404_shows_clear_message_when_no_local_fallback(self,
mock_read_local, mock_get_url, mock_fetch):
+ """When log server returns 404 and no local logs exist, show helpful
message."""
+ mock_get_url.return_value = ("http://worker-1/log",
"dag/run/task/1.log")
+
+ mock_response = MagicMock()
+ mock_response.status_code = 404
+ mock_fetch.return_value = mock_response
+
+ mock_read_local.return_value = ([], [])
+
+ sources, streams = self.handler._read_from_logs_server(self.ti,
"dag/run/task/1.log")
+
+ assert len(sources) == 1
+ assert "worker-1" in sources[0]
+ assert "no longer accessible" in sources[0]
+ assert "remote logging" in sources[0]
+ assert streams == []
+
+ @patch("airflow.utils.log.file_task_handler._fetch_logs_from_service")
+ @patch.object(FileTaskHandler, "_get_log_retrieval_url")
+ def test_403_shows_secret_key_message(self, mock_get_url, mock_fetch):
+ """When log server returns 403, show secret key configuration
message."""
+ mock_get_url.return_value = ("http://worker-1/log",
"dag/run/task/1.log")
+
+ mock_response = MagicMock()
+ mock_response.status_code = 403
+ mock_fetch.return_value = mock_response
+
+ sources, streams = self.handler._read_from_logs_server(self.ti,
"dag/run/task/1.log")
+
+ assert len(sources) == 1
+ assert "secret_key" in sources[0]
+ assert streams == []