This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch backport-95784d9-v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c5e8ea83cd832b0cccbf4bc830d8c1b86f001f1f Author: Ephraim Anierobi <[email protected]> AuthorDate: Tue Mar 3 21:21:34 2026 +0100 Add logging to detect try number race (#62703) * Log try_number mismatches during TI scheduling for HA race diagnosis This adds more logging to select places that try_number mismatch could happen and would help us detect and fix the issue. Related: https://github.com/apache/airflow/issues/57618 * Add tests (cherry picked from commit 95784d9f2d4edb5bf3a64ee44c1fa8264a8618df) --- .../src/airflow/jobs/scheduler_job_runner.py | 58 ++++++++++++++-- airflow-core/src/airflow/models/dagrun.py | 44 ++++++++++++ airflow-core/tests/unit/jobs/test_scheduler_job.py | 44 +++++++++++- airflow-core/tests/unit/models/test_dagrun.py | 78 ++++++++++++++++++++++ 4 files changed, 217 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 53baf765bb2..c66be676137 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -18,6 +18,7 @@ from __future__ import annotations import itertools +import logging import multiprocessing import operator import os @@ -72,7 +73,7 @@ from airflow.models.dagbag import DBDagBag from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning, DagWarningType from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskinstance import TaskInstance +from airflow.models.taskinstance import TaskInstance, TaskInstanceKey from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger, TriggerFailureReason from airflow.stats import Stats from airflow.ti_deps.dependencies_states import EXECUTION_STATES @@ -99,7 +100,6 @@ if TYPE_CHECKING: from airflow._shared.logging.types import Logger from airflow.executors.base_executor import BaseExecutor from airflow.executors.executor_utils import ExecutorName - from airflow.models.taskinstance import TaskInstanceKey from airflow.serialization.serialized_objects import SerializedDAG from airflow.utils.sqlalchemy import CommitProhibitorGuard @@ -655,8 +655,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): Stats.gauge("scheduler.tasks.executable", len(executable_tis)) if executable_tis: - task_instance_str = "\n".join(f"\t{x!r}" for x in executable_tis) - self.log.info("Setting the following tasks to queued state:\n%s", task_instance_str) + task_instance_str = "\n".join( + f"\t{x!r} (id={x.id}, try_number={x.try_number})" for x in executable_tis + ) + self.log.info( + "Setting the following tasks to queued state (scheduler job_id=%s):\n%s", + self.job.id, + task_instance_str, + ) # set TIs to queued state filter_for_tis = TI.filter_for_tis(executable_tis) @@ -702,7 +708,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): ti, ) continue - + self.log.debug( + "Queueing workload for TI: %s id=%s try_number=%d state=%s scheduler_job_id=%s executor=%s", + ti, + ti.id, + ti.try_number, + ti.state, + self.job.id, + executor, + ) workload = workloads.ExecuteTask.make(ti, generator=executor.jwt_generator) executor.queue_workload(workload, session=session) @@ -823,6 +837,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): # Report execution for ti_key, (state, _) in event_buffer.items(): + if isinstance(ti_key, TaskInstanceKey): + existing_try = ti_primary_key_to_try_number_map.get(ti_key.primary) + if existing_try is not None and existing_try != ti_key.try_number: + cls.logger().warning( + "Multiple executor events for same TI with different try_numbers! " + "primary_key=%s existing_try_number=%d new_try_number=%d new_state=%s. ", + ti_key.primary, + existing_try, + ti_key.try_number, + state, + ) # We create map (dag_id, task_id, logical_date) -> in-memory try_number ti_primary_key_to_try_number_map[ti_key.primary] = ti_key.try_number @@ -858,6 +883,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): for ti in tis: try_number = ti_primary_key_to_try_number_map[ti.key.primary] buffer_key = ti.key.with_try_number(try_number) + if ti.try_number != try_number: + cls.logger().warning( + "TI try_number mismatch: db_try_number=%d event_try_number=%d " + "ti=%s ti_id=%s state=%s job_id=%s. " + "Another scheduler may have already modified this TI.", + ti.try_number, + try_number, + ti, + ti.id, + ti.state, + job_id, + ) state, info = event_buffer.pop(buffer_key) if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING): @@ -2070,6 +2107,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): "schedulable_tis": [_ti.task_id for _ti in schedulable_tis], }, ) + if schedulable_tis and self.log.isEnabledFor(logging.DEBUG): + self.log.debug( + "Scheduling TIs for dag_run=%s/%s (scheduler job_id=%s): %s", + dag_run.dag_id, + dag_run.run_id, + self.job.id, + [ + f"{ti.task_id} (id={ti.id}, state={ti.state}, try_number={ti.try_number})" + for ti in schedulable_tis + ], + ) dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query) return callback_to_run diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 39fad84be6a..d0dafd97bf6 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -18,11 +18,13 @@ from __future__ import annotations import itertools +import logging import os import re from collections import defaultdict from collections.abc import Callable, Iterable, Iterator, Sequence from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar, cast, overload +from uuid import UUID import structlog from natsort import natsorted @@ -2003,6 +2005,8 @@ class DagRun(Base, LoggingMixin): # tasks using EmptyOperator and without on_execute_callback / on_success_callback empty_ti_ids: list[str] = [] schedulable_ti_ids: list[str] = [] + debug_try_number_check = self.log.isEnabledFor(logging.DEBUG) + expected_try_number_by_ti_id: dict[UUID, tuple[int, int, str | None]] = {} for ti in schedulable_tis: task = ti.task if TYPE_CHECKING: @@ -2034,6 +2038,14 @@ class DagRun(Base, LoggingMixin): # schedulable_ti_ids.append(ti.id) else: schedulable_ti_ids.append(ti.id) + if debug_try_number_check: + expected_try_number_by_ti_id[ti.id] = ( + ti.try_number + if ti.state == TaskInstanceState.UP_FOR_RESCHEDULE + else ti.try_number + 1, + ti.try_number, + ti.state, + ) count = 0 @@ -2058,6 +2070,38 @@ class DagRun(Base, LoggingMixin): ) .execution_options(synchronize_session=False) ).rowcount + if debug_try_number_check: + rows = session.execute( + select(TI.id, TI.try_number, TI.state).where(TI.id.in_(id_chunk)) + ).all() + rows_by_ti_id = { + ti_id: (db_try_number, db_state) for ti_id, db_try_number, db_state in rows + } + for ti_id in id_chunk: + expected = expected_try_number_by_ti_id.get(ti_id) + if expected is None: + continue + db_row = rows_by_ti_id.get(ti_id) + if db_row is None: + continue + expected_try_number, pre_update_try_number, pre_update_state = expected + db_try_number, db_state = db_row + if db_try_number != expected_try_number: + self.log.warning( + "schedule_tis: try_number mismatch after scheduling for ti_id=%s " + "dag_run=%s/%s scheduler_job_id=%s " + "pre_state=%s pre_try_number=%d expected_try_number=%d " + "db_state=%s db_try_number=%d", + ti_id, + self.dag_id, + self.run_id, + self.scheduled_by_job_id, + pre_update_state, + pre_update_try_number, + expected_try_number, + db_state, + db_try_number, + ) # Tasks using EmptyOperator should not be executed, mark them as success if empty_ti_ids: diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 8fac72ea3ee..a9ec7872003 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -552,7 +552,9 @@ class TestSchedulerJob: @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr") - def test_process_executor_events_ti_requeued(self, mock_stats_incr, mock_task_callback, dag_maker): + def test_process_executor_events_ti_requeued( + self, mock_stats_incr, mock_task_callback, dag_maker, caplog + ): dag_id = "test_process_executor_events_ti_requeued" task_id_1 = "dummy_task" @@ -580,10 +582,12 @@ class TestSchedulerJob: executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None - self.job_runner._process_executor_events(executor=executor, session=session) + with caplog.at_level(logging.WARNING, logger="airflow.jobs.scheduler_job_runner"): + self.job_runner._process_executor_events(executor=executor, session=session) ti1.refresh_from_db(session=session) assert ti1.state == State.QUEUED scheduler_job.executor.callback_sink.send.assert_not_called() + assert any("TI try_number mismatch:" in rec.message for rec in caplog.records) # ti is queued by another scheduler - do not fail it ti1.state = State.QUEUED @@ -613,6 +617,42 @@ class TestSchedulerJob: scheduler_job.executor.callback_sink.send.assert_not_called() mock_stats_incr.assert_not_called() + @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") + @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr") + def test_process_executor_events_multiple_try_numbers_warns( + self, mock_stats_incr, mock_task_callback, dag_maker, caplog + ): + dag_id = "test_process_executor_events_multiple_try_numbers_warns" + task_id = "dummy_task" + + session = settings.Session() + with dag_maker(dag_id=dag_id, fileloc="/test_path1/"): + task = EmptyOperator(task_id=task_id) + ti = dag_maker.create_dagrun().get_task_instance(task.task_id) + + executor = MockExecutor(do_update=False) + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor]) + mock_stats_incr.reset_mock() + + ti.state = State.QUEUED + ti.try_number = 2 + session.merge(ti) + session.commit() + + executor.event_buffer[ti.key.with_try_number(1)] = State.RUNNING, "first_executor_id" + executor.event_buffer[ti.key.with_try_number(2)] = State.RUNNING, "second_executor_id" + + with caplog.at_level(logging.WARNING, logger="airflow.jobs.scheduler_job_runner"): + self.job_runner._process_executor_events(executor=executor, session=session) + + assert any( + "Multiple executor events for same TI with different try_numbers!" in rec.message + for rec in caplog.records + ) + mock_task_callback.assert_not_called() + mock_stats_incr.assert_not_called() + @pytest.mark.usefixtures("testing_dag_bundle") @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr") def test_process_executor_events_with_asset_events(self, mock_stats_incr, session, dag_maker): diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 42d6af6bad3..0194aa059f4 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -2095,6 +2095,84 @@ def test_schedule_tis_empty_operator_try_number(dag_maker, session: Session): assert empty_ti.try_number == 1 +def test_schedule_tis_try_number_mismatch_logs_warning(dag_maker, session: Session, monkeypatch): + with dag_maker(session=session): + BaseOperator(task_id="task_1") + + dr: DagRun = dag_maker.create_dagrun(session=session) + ti = dr.get_task_instance("task_1", session=session) + assert ti is not None + + original_execute = session.execute + + class _FakeSelectResult: + def all(self): + return [(ti.id, ti.try_number + 2, TaskInstanceState.SCHEDULED)] + + def execute_with_mismatch(statement, *args, **kwargs): + if getattr(statement, "is_select", False): + return _FakeSelectResult() + return original_execute(statement, *args, **kwargs) + + monkeypatch.setattr(session, "execute", execute_with_mismatch) + + with ( + mock.patch.object(dr.log, "isEnabledFor", return_value=True), + mock.patch.object(dr.log, "warning") as warning_mock, + ): + dr.schedule_tis((ti,), session=session) + + assert any( + "schedule_tis: try_number mismatch after scheduling" in call.args[0] + for call in warning_mock.call_args_list + ) + + +def test_schedule_tis_try_number_match_has_no_warning(dag_maker, session: Session): + with dag_maker(session=session): + BaseOperator(task_id="task_1") + + dr: DagRun = dag_maker.create_dagrun(session=session) + ti = dr.get_task_instance("task_1", session=session) + assert ti is not None + + with ( + mock.patch.object(dr.log, "isEnabledFor", return_value=True), + mock.patch.object(dr.log, "warning") as warning_mock, + ): + dr.schedule_tis((ti,), session=session) + + assert all( + "schedule_tis: try_number mismatch after scheduling" not in call.args[0] + for call in warning_mock.call_args_list + ) + + +def test_schedule_tis_try_number_check_is_debug_only(dag_maker, session: Session, monkeypatch): + with dag_maker(session=session): + BaseOperator(task_id="task_1") + + dr: DagRun = dag_maker.create_dagrun(session=session) + ti = dr.get_task_instance("task_1", session=session) + assert ti is not None + + original_execute = session.execute + select_calls = 0 + + def execute_with_counter(statement, *args, **kwargs): + nonlocal select_calls + if getattr(statement, "is_select", False): + select_calls += 1 + return original_execute(statement, *args, **kwargs) + + monkeypatch.setattr(session, "execute", execute_with_counter) + + with mock.patch.object(dr.log, "isEnabledFor", return_value=False): + dr.schedule_tis((ti,), session=session) + + assert select_calls == 0 + + @pytest.mark.xfail(reason="We can't keep this behaviour with remote workers where scheduler can't reach xcom") def test_schedule_tis_start_trigger_through_expand(dag_maker, session): """
