This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch backport-62128 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ec1c019e4479278ff8c18ce96b0f72aeb4946b4e Author: Nick Stenning <[email protected]> AuthorDate: Sat Feb 21 00:39:46 2026 +0100 Fix broken `dag_processing.total_parse_time` metric (#62128) DagFileProcessorManager has been emitting a nonsense value for `dag_processing.total_parse_time` since 8774f28d76, which reversed the order in which `emit_metrics` and `prepare_file_queue` (then called `prepare_file_path_queue`) were called. As `prepare_file_path_queue` was responsible for resetting the value of `self._parsing_start_time`, the assumption made by `emit_metrics` was that it would be called once the file queue had been cleared, but crucially before `prepare_file_queue` was called to refill the queue. Additionally, there was no guarantee that we'd parsed any files at all since the last time the metric was emitted. If no work was due, we'd gladly emit near-zero metrics every time around the while loop. I've rearranged things in such a way that I hope will be harder to accidentally break in future: - `self._parsing_start_time` may be reset whenever files are added to the queue, if it was not set already. - metrics are emitted when `prepare_file_queue` is called -- when the queue is empty -- but only if `self._parsing_start_time` is set, meaning only if we've actually parsed any files since the last time metrics were emitted. Together, this means we should now emit metrics once per parsing loop. I've added a test which fails on main and passes on this branch. (cherry picked from commit 57a7c64a77503fef4eb7c6801a28a628a4098535) --- airflow-core/src/airflow/dag_processing/manager.py | 101 ++++++++++++--------- airflow-core/src/airflow/typing_compat.py | 5 +- .../tests/unit/dag_processing/test_manager.py | 32 ++++++- 3 files changed, 94 insertions(+), 44 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 65d41533e24..93dc19237ed 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -37,7 +37,7 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta from operator import attrgetter, itemgetter from pathlib import Path -from typing import TYPE_CHECKING, Any, NamedTuple, cast +from typing import TYPE_CHECKING, Any, Literal, NamedTuple, cast import attrs import structlog @@ -65,6 +65,7 @@ from airflow.sdk import SecretCache from airflow.sdk.log import init_log_file, logging_processors from airflow.stats import Stats from airflow.traces.tracer import DebugTrace +from airflow.typing_compat import assert_never from airflow.utils.file import list_py_file_paths, might_contain_dag from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname @@ -206,7 +207,7 @@ class DagFileProcessorManager(LoggingMixin): _processors: dict[DagFileInfo, DagFileProcessorProcess] = attrs.field(factory=dict, init=False) - _parsing_start_time: float = attrs.field(init=False) + _parsing_start_time: float | None = attrs.field(default=None, init=False) _num_run: int = attrs.field(default=0, init=False) _callback_to_execute: dict[DagFileInfo, list[CallbackRequest]] = attrs.field( @@ -391,7 +392,6 @@ class DagFileProcessorManager(LoggingMixin): # clear down, we must have cleared all files found from scanning the dags dir _and_ have # cleared all files added as a result of callbacks self.prepare_file_queue(known_files=known_files) - self.emit_metrics() self._start_new_processes() @@ -449,16 +449,8 @@ class DagFileProcessorManager(LoggingMixin): def _queue_requested_files_for_parsing(self) -> None: """Queue any files requested for parsing as requested by users via UI/API.""" files = self._get_priority_files() - bundles_to_refresh: set[str] = set() - for file in files: - # Try removing the file if already present - with contextlib.suppress(ValueError): - self._file_queue.remove(file) - # enqueue file to the start of the queue. - self._file_queue.appendleft(file) - bundles_to_refresh.add(file.bundle_name) - - self._force_refresh_bundles |= bundles_to_refresh + self._add_files_to_queue(files, mode="frontprio") + self._force_refresh_bundles |= {file.bundle_name for file in files} if self._force_refresh_bundles: self.log.info("Bundles being force refreshed: %s", ", ".join(self._force_refresh_bundles)) @@ -535,7 +527,7 @@ class DagFileProcessorManager(LoggingMixin): bundle_version=request.bundle_version, ) self._callback_to_execute[file_info].append(request) - self._add_files_to_queue([file_info], True) + self._add_files_to_queue([file_info], mode="front") Stats.incr("dag_processing.other_callback_count") def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]): @@ -1005,7 +997,7 @@ class DagFileProcessorManager(LoggingMixin): if new_files: self.log.info("Adding %d new files to the front of the queue", len(new_files)) - self._add_files_to_queue(new_files, True) + self._add_files_to_queue(new_files, mode="front") def _resort_file_queue(self): if self._file_parsing_sort_mode == "modified_time" and self._file_queue: @@ -1060,7 +1052,15 @@ class DagFileProcessorManager(LoggingMixin): Note this method is only called when the file path queue is empty """ - self._parsing_start_time = time.perf_counter() + # We only emit metrics after processing all files in the queue. If `self._parsing_start_time` is None + # when this method is called, no files have yet been added to the queue so we shouldn't emit metrics. + if self._parsing_start_time is not None: + emit_metrics( + parse_time=time.perf_counter() - self._parsing_start_time, + stats=list(self._file_stats.values()), + ) + self._parsing_start_time = None + # If the file path is already being processed, or if a file was # processed recently, wait until the next batch in_progress = set(self._processors) @@ -1106,7 +1106,7 @@ class DagFileProcessorManager(LoggingMixin): "Queuing the following files for processing:\n\t%s", "\n\t".join(str(f.rel_path) for f in to_queue), ) - self._add_files_to_queue(to_queue, False) + self._add_files_to_queue(to_queue, mode="back") Stats.incr("dag_processing.file_path_queue_update_count") def _kill_timed_out_processors(self): @@ -1144,13 +1144,34 @@ class DagFileProcessorManager(LoggingMixin): processor = self._processors.pop(proc) processor.logger_filehandle.close() - def _add_files_to_queue(self, files: list[DagFileInfo], add_at_front: bool): + def _add_files_to_queue( + self, + files: list[DagFileInfo], + *, + mode: Literal["front", "back", "frontprio"], + ): """Add stuff to the back or front of the file queue, unless it's already present.""" - new_files = list(f for f in files if f not in self._file_queue) - if add_at_front: + if mode == "frontprio": + for file in files: + # Try removing the file if already present + with contextlib.suppress(ValueError): + self._file_queue.remove(file) + # enqueue file to the start of the queue. + self._file_queue.appendleft(file) + elif mode == "front": + new_files = list(f for f in files if f not in self._file_queue) self._file_queue.extendleft(new_files) - else: + elif mode == "back": + new_files = list(f for f in files if f not in self._file_queue) self._file_queue.extend(new_files) + else: + assert_never(mode) + + # If we've just added files to the queue for the first time since metrics were last emitted, reset the + # parse time counter. + if self._parsing_start_time is None and self._file_queue: + self._parsing_start_time = time.perf_counter() + Stats.gauge("dag_processing.file_path_queue_size", len(self._file_queue)) def max_runs_reached(self): @@ -1177,27 +1198,25 @@ class DagFileProcessorManager(LoggingMixin): if pids_to_kill: kill_child_processes_by_pids(pids_to_kill) - def emit_metrics(self): - """ - Emit metrics about dag parsing summary. - This is called once every time around the parsing "loop" - i.e. after - all files have been parsed. - """ - with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span: - parse_time = time.perf_counter() - self._parsing_start_time - Stats.gauge("dag_processing.total_parse_time", parse_time) - Stats.gauge("dagbag_size", sum(stat.num_dags for stat in self._file_stats.values())) - Stats.gauge( - "dag_processing.import_errors", sum(stat.import_errors for stat in self._file_stats.values()) - ) - span.set_attributes( - { - "total_parse_time": parse_time, - "dag_bag_size": sum(stat.num_dags for stat in self._file_stats.values()), - "import_errors": sum(stat.import_errors for stat in self._file_stats.values()), - } - ) +def emit_metrics(*, parse_time: float, stats: Sequence[DagFileStat]): + """ + Emit metrics about dag parsing summary. + + This is called once every time around the parsing "loop" - i.e. after + all files have been parsed. + """ + with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span: + Stats.gauge("dag_processing.total_parse_time", parse_time) + Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats)) + Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in stats)) + span.set_attributes( + { + "total_parse_time": parse_time, + "dag_bag_size": sum(stat.num_dags for stat in stats), + "import_errors": sum(stat.import_errors for stat in stats), + } + ) def process_parse_results( diff --git a/airflow-core/src/airflow/typing_compat.py b/airflow-core/src/airflow/typing_compat.py index 8a00ac06bd7..c72d5607bd1 100644 --- a/airflow-core/src/airflow/typing_compat.py +++ b/airflow-core/src/airflow/typing_compat.py @@ -25,6 +25,7 @@ __all__ = [ "Self", "TypeAlias", "TypeGuard", + "assert_never", ] import sys @@ -33,6 +34,6 @@ import sys from typing import Literal, ParamSpec, TypeAlias, TypeGuard if sys.version_info >= (3, 11): - from typing import Self + from typing import Self, assert_never else: - from typing_extensions import Self + from typing_extensions import Self, assert_never diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 00de3148c96..2cfc0a541b5 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -26,7 +26,7 @@ import shutil import signal import textwrap import time -from collections import deque +from collections import defaultdict, deque from datetime import datetime, timedelta from pathlib import Path from socket import socket, socketpair @@ -1379,3 +1379,33 @@ class TestDagFileProcessorManager: bundle_names_being_parsed = {b.name for b in manager._dag_bundles} assert bundle_names_being_parsed == expected + + @mock.patch("airflow.dag_processing.manager.Stats.gauge") + def test_stats_total_parse_time(self, statsd_gauge_mock, tmp_path, configure_testing_dag_bundle): + key = "dag_processing.total_parse_time" + gauge_values = defaultdict(list) + statsd_gauge_mock.side_effect = lambda name, value: gauge_values[name].append(value) + + dag_path = tmp_path / "temp_dag.py" + dag_code = textwrap.dedent( + """ + from airflow import DAG + dag = DAG(dag_id='temp_dag') + """ + ) + dag_path.write_text(dag_code) + + with configure_testing_dag_bundle(tmp_path): + manager = DagFileProcessorManager(max_runs=0) + + for _ in range(3): + manager.max_runs += 1 + manager.run() + + assert key in gauge_values + assert len(gauge_values[key]) == 1 + assert gauge_values[key][0] >= 1e-4 + + dag_path.touch() # make the loop run faster + gauge_values.clear() +
