This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e17464882ce Fix triggerer logger's file descriptor closed when it 
removed (#62103)
e17464882ce is described below

commit e17464882ce66eb4784d11213c1af28b951864f2
Author: Jeongwoo Do <[email protected]>
AuthorDate: Sat Mar 7 13:04:23 2026 +0900

    Fix triggerer logger's file descriptor closed when it removed (#62103)
    
    closes: #61916
---
 .../src/airflow/jobs/triggerer_job_runner.py       | 19 ++++++++---
 airflow-core/tests/unit/jobs/test_triggerer_job.py | 37 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 5567e4763ea..ca116478110 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -30,7 +30,7 @@ from contextlib import suppress
 from datetime import datetime
 from socket import socket
 from traceback import format_exception
-from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
+from typing import TYPE_CHECKING, Annotated, Any, BinaryIO, ClassVar, Literal, 
TextIO, TypedDict
 
 import anyio
 import attrs
@@ -301,6 +301,8 @@ class TriggerLoggingFactory:
 
     bound_logger: WrappedLogger = attrs.field(init=False, repr=False)
 
+    _filehandle: TextIO | BinaryIO = attrs.field(init=False, repr=False)
+
     def __call__(self, processors: Iterable[structlog.typing.Processor]) -> 
WrappedLogger:
         if hasattr(self, "bound_logger"):
             return self.bound_logger
@@ -311,13 +313,20 @@ class TriggerLoggingFactory:
 
         pretty_logs = False
         if pretty_logs:
-            underlying_logger: WrappedLogger = 
structlog.WriteLogger(log_file.open("w", buffering=1))
+            self._filehandle = log_file.open("w", buffering=1)
+            underlying_logger: WrappedLogger = 
structlog.WriteLogger(self._filehandle)
         else:
-            underlying_logger = structlog.BytesLogger(log_file.open("wb"))
+            self._filehandle = log_file.open("wb")
+            underlying_logger = structlog.BytesLogger(self._filehandle)
         logger = structlog.wrap_logger(underlying_logger, 
processors=processors).bind()
         self.bound_logger = logger
         return logger
 
+    def close(self):
+        # Explicitly close the file descriptor.
+        if hasattr(self, "_filehandle") and self._filehandle and not 
self._filehandle.closed:
+            self._filehandle.close()
+
     def upload_to_remote(self):
         from airflow.sdk.log import upload_to_remote
 
@@ -420,10 +429,10 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
             for id in msg.finished or ():
                 self.running_triggers.discard(id)
                 self.cancelling_triggers.discard(id)
-                # Remove logger from the cache, and since structlog doesn't 
have an explicit close method, we
-                # only need to remove the last reference to it to close the 
open FH
                 if factory := self.logger_cache.pop(id, None):
                     factory.upload_to_remote()
+                    # Need to close the FD explicitly, as it is not closed 
when logger is removed.
+                    factory.close()
 
             response = messages.TriggerStateSync(
                 to_create=[],
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 7ba89562ad0..3d778804279 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -42,6 +42,7 @@ from airflow.jobs.triggerer_job_runner import (
     ToTriggerSupervisor,
     TriggerCommsDecoder,
     TriggererJobRunner,
+    TriggerLoggingFactory,
     TriggerRunner,
     TriggerRunnerSupervisor,
     messages,
@@ -303,6 +304,42 @@ def test_trigger_log(mock_monotonic, trigger, 
watcher_count, trigger_count, sess
     trigger_runner_supervisor.kill(force=False)
 
 
+def test_trigger_logger_close():
+    logger = TriggerLoggingFactory(log_path="/tmp/test.log", ti=MagicMock())
+
+    mock_fh = MagicMock()
+    mock_fh.closed = False
+
+    logger._filehandle = mock_fh
+
+    logger.close()
+
+    mock_fh.close.assert_called_once()
+
+
+def test_trigger_logger_fd_closed_when_removed(session):
+
+    trigger = TimeDeltaTrigger(datetime.timedelta(seconds=0.5))
+
+    create_trigger_in_db(session, trigger)
+
+    mock_file = MagicMock()
+    mock_file.closed = False
+
+    with patch("airflow.sdk.log.init_log_file") as mock_init_log_file:
+        mock_init_log_file.return_value.open.return_value = mock_file
+
+        trigger_runner_supervisor = 
TriggerRunnerSupervisor.start(job=Job(id=123456), capacity=10)
+        trigger_runner_supervisor.load_triggers()
+
+        for _ in range(30):
+            trigger_runner_supervisor._service_subprocess(0.1)
+
+    mock_file.close.assert_called_once()
+
+    trigger_runner_supervisor.kill(force=False)
+
+
 class TestTriggerRunner:
     def test_run_inline_trigger_canceled(self, session) -> None:
         trigger_runner = TriggerRunner()

Reply via email to