This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e17464882ce Fix triggerer logger's file descriptor closed when it
removed (#62103)
e17464882ce is described below
commit e17464882ce66eb4784d11213c1af28b951864f2
Author: Jeongwoo Do <[email protected]>
AuthorDate: Sat Mar 7 13:04:23 2026 +0900
Fix triggerer logger's file descriptor closed when it removed (#62103)
closes: #61916
---
.../src/airflow/jobs/triggerer_job_runner.py | 19 ++++++++---
airflow-core/tests/unit/jobs/test_triggerer_job.py | 37 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 5 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 5567e4763ea..ca116478110 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -30,7 +30,7 @@ from contextlib import suppress
from datetime import datetime
from socket import socket
from traceback import format_exception
-from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
+from typing import TYPE_CHECKING, Annotated, Any, BinaryIO, ClassVar, Literal,
TextIO, TypedDict
import anyio
import attrs
@@ -301,6 +301,8 @@ class TriggerLoggingFactory:
bound_logger: WrappedLogger = attrs.field(init=False, repr=False)
+ _filehandle: TextIO | BinaryIO = attrs.field(init=False, repr=False)
+
def __call__(self, processors: Iterable[structlog.typing.Processor]) ->
WrappedLogger:
if hasattr(self, "bound_logger"):
return self.bound_logger
@@ -311,13 +313,20 @@ class TriggerLoggingFactory:
pretty_logs = False
if pretty_logs:
- underlying_logger: WrappedLogger =
structlog.WriteLogger(log_file.open("w", buffering=1))
+ self._filehandle = log_file.open("w", buffering=1)
+ underlying_logger: WrappedLogger =
structlog.WriteLogger(self._filehandle)
else:
- underlying_logger = structlog.BytesLogger(log_file.open("wb"))
+ self._filehandle = log_file.open("wb")
+ underlying_logger = structlog.BytesLogger(self._filehandle)
logger = structlog.wrap_logger(underlying_logger,
processors=processors).bind()
self.bound_logger = logger
return logger
+ def close(self):
+ # Explicitly close the file descriptor.
+ if hasattr(self, "_filehandle") and self._filehandle and not
self._filehandle.closed:
+ self._filehandle.close()
+
def upload_to_remote(self):
from airflow.sdk.log import upload_to_remote
@@ -420,10 +429,10 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
for id in msg.finished or ():
self.running_triggers.discard(id)
self.cancelling_triggers.discard(id)
- # Remove logger from the cache, and since structlog doesn't
have an explicit close method, we
- # only need to remove the last reference to it to close the
open FH
if factory := self.logger_cache.pop(id, None):
factory.upload_to_remote()
+ # Need to close the FD explicitly, as it is not closed
when logger is removed.
+ factory.close()
response = messages.TriggerStateSync(
to_create=[],
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 7ba89562ad0..3d778804279 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -42,6 +42,7 @@ from airflow.jobs.triggerer_job_runner import (
ToTriggerSupervisor,
TriggerCommsDecoder,
TriggererJobRunner,
+ TriggerLoggingFactory,
TriggerRunner,
TriggerRunnerSupervisor,
messages,
@@ -303,6 +304,42 @@ def test_trigger_log(mock_monotonic, trigger,
watcher_count, trigger_count, sess
trigger_runner_supervisor.kill(force=False)
+def test_trigger_logger_close():
+ logger = TriggerLoggingFactory(log_path="/tmp/test.log", ti=MagicMock())
+
+ mock_fh = MagicMock()
+ mock_fh.closed = False
+
+ logger._filehandle = mock_fh
+
+ logger.close()
+
+ mock_fh.close.assert_called_once()
+
+
+def test_trigger_logger_fd_closed_when_removed(session):
+
+ trigger = TimeDeltaTrigger(datetime.timedelta(seconds=0.5))
+
+ create_trigger_in_db(session, trigger)
+
+ mock_file = MagicMock()
+ mock_file.closed = False
+
+ with patch("airflow.sdk.log.init_log_file") as mock_init_log_file:
+ mock_init_log_file.return_value.open.return_value = mock_file
+
+ trigger_runner_supervisor =
TriggerRunnerSupervisor.start(job=Job(id=123456), capacity=10)
+ trigger_runner_supervisor.load_triggers()
+
+ for _ in range(30):
+ trigger_runner_supervisor._service_subprocess(0.1)
+
+ mock_file.close.assert_called_once()
+
+ trigger_runner_supervisor.kill(force=False)
+
+
class TestTriggerRunner:
def test_run_inline_trigger_canceled(self, session) -> None:
trigger_runner = TriggerRunner()