kaxil commented on code in PR #62103:
URL: https://github.com/apache/airflow/pull/62103#discussion_r2886721636
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -310,13 +312,20 @@ def __call__(self, processors:
Iterable[structlog.typing.Processor]) -> WrappedL
pretty_logs = False
if pretty_logs:
- underlying_logger: WrappedLogger =
structlog.WriteLogger(log_file.open("w", buffering=1))
+ self._filehandle = log_file.open("w", buffering=1)
+ underlying_logger: WrappedLogger =
structlog.WriteLogger(self._filehandle)
else:
- underlying_logger = structlog.BytesLogger(log_file.open("wb"))
+ self._filehandle = log_file.open("wb")
+ underlying_logger = structlog.BytesLogger(self._filehandle)
logger = structlog.wrap_logger(underlying_logger,
processors=processors).bind()
self.bound_logger = logger
return logger
+ def __del__(self):
+ # Explicitly close the file descriptor when the logger is garbage
collected.
+ if hasattr(self, "_filehandle") and self._filehandle:
+ self._filehandle.close()
+
def upload_to_remote(self):
from airflow.sdk.log import upload_to_remote
Review Comment:
Should also add a `not self._filehandle.closed` guard here — if `close()` is
ever called explicitly before GC runs `__del__`, this would attempt to
double-close. It's a no-op for CPython files but the guard is good practice.
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -303,6 +303,29 @@ def test_trigger_log(mock_monotonic, trigger,
watcher_count, trigger_count, sess
trigger_runner_supervisor.kill(force=False)
+def test_trigger_logger_fd_closed_when_removed(session):
+
+ trigger = TimeDeltaTrigger(datetime.timedelta(seconds=0.5))
+
+ create_trigger_in_db(session, trigger)
+
+ mock_file = MagicMock()
+ mock_file.closed = False
+
+ with patch("airflow.sdk.log.init_log_file") as mock_init_log_file:
+ mock_init_log_file.return_value.open.return_value = mock_file
+
+ trigger_runner_supervisor =
TriggerRunnerSupervisor.start(job=Job(id=123456), capacity=10)
+ trigger_runner_supervisor.load_triggers()
+
+ for _ in range(30):
+ trigger_runner_supervisor._service_subprocess(0.1)
+
+ mock_file.close.assert_called_once()
+
Review Comment:
This assertion depends on `__del__` being triggered by GC during the loop,
which is non-deterministic across Python implementations. If you switch to an
explicit `close()` method (called from `_handle_request`), this test becomes
deterministic — you're verifying the cleanup lifecycle, not GC timing.
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -310,13 +312,20 @@ def __call__(self, processors:
Iterable[structlog.typing.Processor]) -> WrappedL
pretty_logs = False
if pretty_logs:
- underlying_logger: WrappedLogger =
structlog.WriteLogger(log_file.open("w", buffering=1))
+ self._filehandle = log_file.open("w", buffering=1)
+ underlying_logger: WrappedLogger =
structlog.WriteLogger(self._filehandle)
else:
- underlying_logger = structlog.BytesLogger(log_file.open("wb"))
+ self._filehandle = log_file.open("wb")
+ underlying_logger = structlog.BytesLogger(self._filehandle)
logger = structlog.wrap_logger(underlying_logger,
processors=processors).bind()
self.bound_logger = logger
return logger
+ def __del__(self):
+ # Explicitly close the file descriptor when the logger is garbage
collected.
+ if hasattr(self, "_filehandle") and self._filehandle:
+ self._filehandle.close()
+
Review Comment:
`__del__` is not guaranteed to run promptly (or at all during interpreter
shutdown / reference cycles), and exceptions inside it are silently swallowed.
This is the exact same class of problem the DAG processor had, solved in #47574
with an explicit `close()` method + explicit call at the cleanup site.
I'd prefer we follow the same pattern here for consistency and reliability:
```suggestion
def close(self):
"""Explicitly close the underlying log file handle."""
if hasattr(self, "_filehandle") and self._filehandle and not
self._filehandle.closed:
self._filehandle.close()
```
And then call `factory.close()` after `factory.upload_to_remote()` in
`_handle_request`.
If you want to keep `__del__` as a safety net that's fine, but the primary
cleanup path should be the explicit call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]