This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch backport-95784d9-v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c5e8ea83cd832b0cccbf4bc830d8c1b86f001f1f
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Mar 3 21:21:34 2026 +0100

    Add logging to detect try number race (#62703)
    
    * Log try_number mismatches during TI scheduling for HA race diagnosis
    
    This adds more logging to select places that try_number mismatch
    could happen and would help us detect and fix the issue.
    
    Related: https://github.com/apache/airflow/issues/57618
    
    * Add tests
    
    (cherry picked from commit 95784d9f2d4edb5bf3a64ee44c1fa8264a8618df)
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 58 ++++++++++++++--
 airflow-core/src/airflow/models/dagrun.py          | 44 ++++++++++++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 44 +++++++++++-
 airflow-core/tests/unit/models/test_dagrun.py      | 78 ++++++++++++++++++++++
 4 files changed, 217 insertions(+), 7 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 53baf765bb2..c66be676137 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import itertools
+import logging
 import multiprocessing
 import operator
 import os
@@ -72,7 +73,7 @@ from airflow.models.dagbag import DBDagBag
 from airflow.models.dagrun import DagRun
 from airflow.models.dagwarning import DagWarning, DagWarningType
 from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
 from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger, 
TriggerFailureReason
 from airflow.stats import Stats
 from airflow.ti_deps.dependencies_states import EXECUTION_STATES
@@ -99,7 +100,6 @@ if TYPE_CHECKING:
     from airflow._shared.logging.types import Logger
     from airflow.executors.base_executor import BaseExecutor
     from airflow.executors.executor_utils import ExecutorName
-    from airflow.models.taskinstance import TaskInstanceKey
     from airflow.serialization.serialized_objects import SerializedDAG
     from airflow.utils.sqlalchemy import CommitProhibitorGuard
 
@@ -655,8 +655,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         Stats.gauge("scheduler.tasks.executable", len(executable_tis))
 
         if executable_tis:
-            task_instance_str = "\n".join(f"\t{x!r}" for x in executable_tis)
-            self.log.info("Setting the following tasks to queued state:\n%s", 
task_instance_str)
+            task_instance_str = "\n".join(
+                f"\t{x!r} (id={x.id}, try_number={x.try_number})" for x in 
executable_tis
+            )
+            self.log.info(
+                "Setting the following tasks to queued state (scheduler 
job_id=%s):\n%s",
+                self.job.id,
+                task_instance_str,
+            )
 
             # set TIs to queued state
             filter_for_tis = TI.filter_for_tis(executable_tis)
@@ -702,7 +708,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     ti,
                 )
                 continue
-
+            self.log.debug(
+                "Queueing workload for TI: %s id=%s try_number=%d state=%s 
scheduler_job_id=%s executor=%s",
+                ti,
+                ti.id,
+                ti.try_number,
+                ti.state,
+                self.job.id,
+                executor,
+            )
             workload = workloads.ExecuteTask.make(ti, 
generator=executor.jwt_generator)
             executor.queue_workload(workload, session=session)
 
@@ -823,6 +837,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
         # Report execution
         for ti_key, (state, _) in event_buffer.items():
+            if isinstance(ti_key, TaskInstanceKey):
+                existing_try = 
ti_primary_key_to_try_number_map.get(ti_key.primary)
+                if existing_try is not None and existing_try != 
ti_key.try_number:
+                    cls.logger().warning(
+                        "Multiple executor events for same TI with different 
try_numbers! "
+                        "primary_key=%s existing_try_number=%d 
new_try_number=%d new_state=%s. ",
+                        ti_key.primary,
+                        existing_try,
+                        ti_key.try_number,
+                        state,
+                    )
             # We create map (dag_id, task_id, logical_date) -> in-memory 
try_number
             ti_primary_key_to_try_number_map[ti_key.primary] = 
ti_key.try_number
 
@@ -858,6 +883,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         for ti in tis:
             try_number = ti_primary_key_to_try_number_map[ti.key.primary]
             buffer_key = ti.key.with_try_number(try_number)
+            if ti.try_number != try_number:
+                cls.logger().warning(
+                    "TI try_number mismatch: db_try_number=%d 
event_try_number=%d "
+                    "ti=%s ti_id=%s state=%s job_id=%s. "
+                    "Another scheduler may have already modified this TI.",
+                    ti.try_number,
+                    try_number,
+                    ti,
+                    ti.id,
+                    ti.state,
+                    job_id,
+                )
             state, info = event_buffer.pop(buffer_key)
 
             if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
@@ -2070,6 +2107,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                         "schedulable_tis": [_ti.task_id for _ti in 
schedulable_tis],
                     },
                 )
+            if schedulable_tis and self.log.isEnabledFor(logging.DEBUG):
+                self.log.debug(
+                    "Scheduling TIs for dag_run=%s/%s (scheduler job_id=%s): 
%s",
+                    dag_run.dag_id,
+                    dag_run.run_id,
+                    self.job.id,
+                    [
+                        f"{ti.task_id} (id={ti.id}, state={ti.state}, 
try_number={ti.try_number})"
+                        for ti in schedulable_tis
+                    ],
+                )
             dag_run.schedule_tis(schedulable_tis, session, 
max_tis_per_query=self.job.max_tis_per_query)
 
             return callback_to_run
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 39fad84be6a..d0dafd97bf6 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -18,11 +18,13 @@
 from __future__ import annotations
 
 import itertools
+import logging
 import os
 import re
 from collections import defaultdict
 from collections.abc import Callable, Iterable, Iterator, Sequence
 from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar, cast, overload
+from uuid import UUID
 
 import structlog
 from natsort import natsorted
@@ -2003,6 +2005,8 @@ class DagRun(Base, LoggingMixin):
         # tasks using EmptyOperator and without on_execute_callback / 
on_success_callback
         empty_ti_ids: list[str] = []
         schedulable_ti_ids: list[str] = []
+        debug_try_number_check = self.log.isEnabledFor(logging.DEBUG)
+        expected_try_number_by_ti_id: dict[UUID, tuple[int, int, str | None]] 
= {}
         for ti in schedulable_tis:
             task = ti.task
             if TYPE_CHECKING:
@@ -2034,6 +2038,14 @@ class DagRun(Base, LoggingMixin):
             #         schedulable_ti_ids.append(ti.id)
             else:
                 schedulable_ti_ids.append(ti.id)
+                if debug_try_number_check:
+                    expected_try_number_by_ti_id[ti.id] = (
+                        ti.try_number
+                        if ti.state == TaskInstanceState.UP_FOR_RESCHEDULE
+                        else ti.try_number + 1,
+                        ti.try_number,
+                        ti.state,
+                    )
 
         count = 0
 
@@ -2058,6 +2070,38 @@ class DagRun(Base, LoggingMixin):
                     )
                     .execution_options(synchronize_session=False)
                 ).rowcount
+                if debug_try_number_check:
+                    rows = session.execute(
+                        select(TI.id, TI.try_number, 
TI.state).where(TI.id.in_(id_chunk))
+                    ).all()
+                    rows_by_ti_id = {
+                        ti_id: (db_try_number, db_state) for ti_id, 
db_try_number, db_state in rows
+                    }
+                    for ti_id in id_chunk:
+                        expected = expected_try_number_by_ti_id.get(ti_id)
+                        if expected is None:
+                            continue
+                        db_row = rows_by_ti_id.get(ti_id)
+                        if db_row is None:
+                            continue
+                        expected_try_number, pre_update_try_number, 
pre_update_state = expected
+                        db_try_number, db_state = db_row
+                        if db_try_number != expected_try_number:
+                            self.log.warning(
+                                "schedule_tis: try_number mismatch after 
scheduling for ti_id=%s "
+                                "dag_run=%s/%s scheduler_job_id=%s "
+                                "pre_state=%s pre_try_number=%d 
expected_try_number=%d "
+                                "db_state=%s db_try_number=%d",
+                                ti_id,
+                                self.dag_id,
+                                self.run_id,
+                                self.scheduled_by_job_id,
+                                pre_update_state,
+                                pre_update_try_number,
+                                expected_try_number,
+                                db_state,
+                                db_try_number,
+                            )
 
         # Tasks using EmptyOperator should not be executed, mark them as 
success
         if empty_ti_ids:
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 8fac72ea3ee..a9ec7872003 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -552,7 +552,9 @@ class TestSchedulerJob:
 
     @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
     @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
-    def test_process_executor_events_ti_requeued(self, mock_stats_incr, 
mock_task_callback, dag_maker):
+    def test_process_executor_events_ti_requeued(
+        self, mock_stats_incr, mock_task_callback, dag_maker, caplog
+    ):
         dag_id = "test_process_executor_events_ti_requeued"
         task_id_1 = "dummy_task"
 
@@ -580,10 +582,12 @@ class TestSchedulerJob:
 
         executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None
 
-        self.job_runner._process_executor_events(executor=executor, 
session=session)
+        with caplog.at_level(logging.WARNING, 
logger="airflow.jobs.scheduler_job_runner"):
+            self.job_runner._process_executor_events(executor=executor, 
session=session)
         ti1.refresh_from_db(session=session)
         assert ti1.state == State.QUEUED
         scheduler_job.executor.callback_sink.send.assert_not_called()
+        assert any("TI try_number mismatch:" in rec.message for rec in 
caplog.records)
 
         # ti is queued by another scheduler - do not fail it
         ti1.state = State.QUEUED
@@ -613,6 +617,42 @@ class TestSchedulerJob:
         scheduler_job.executor.callback_sink.send.assert_not_called()
         mock_stats_incr.assert_not_called()
 
+    @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
+    @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
+    def test_process_executor_events_multiple_try_numbers_warns(
+        self, mock_stats_incr, mock_task_callback, dag_maker, caplog
+    ):
+        dag_id = "test_process_executor_events_multiple_try_numbers_warns"
+        task_id = "dummy_task"
+
+        session = settings.Session()
+        with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
+            task = EmptyOperator(task_id=task_id)
+        ti = dag_maker.create_dagrun().get_task_instance(task.task_id)
+
+        executor = MockExecutor(do_update=False)
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(scheduler_job, 
executors=[executor])
+        mock_stats_incr.reset_mock()
+
+        ti.state = State.QUEUED
+        ti.try_number = 2
+        session.merge(ti)
+        session.commit()
+
+        executor.event_buffer[ti.key.with_try_number(1)] = State.RUNNING, 
"first_executor_id"
+        executor.event_buffer[ti.key.with_try_number(2)] = State.RUNNING, 
"second_executor_id"
+
+        with caplog.at_level(logging.WARNING, 
logger="airflow.jobs.scheduler_job_runner"):
+            self.job_runner._process_executor_events(executor=executor, 
session=session)
+
+        assert any(
+            "Multiple executor events for same TI with different try_numbers!" 
in rec.message
+            for rec in caplog.records
+        )
+        mock_task_callback.assert_not_called()
+        mock_stats_incr.assert_not_called()
+
     @pytest.mark.usefixtures("testing_dag_bundle")
     @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
     def test_process_executor_events_with_asset_events(self, mock_stats_incr, 
session, dag_maker):
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 42d6af6bad3..0194aa059f4 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -2095,6 +2095,84 @@ def 
test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
     assert empty_ti.try_number == 1
 
 
+def test_schedule_tis_try_number_mismatch_logs_warning(dag_maker, session: 
Session, monkeypatch):
+    with dag_maker(session=session):
+        BaseOperator(task_id="task_1")
+
+    dr: DagRun = dag_maker.create_dagrun(session=session)
+    ti = dr.get_task_instance("task_1", session=session)
+    assert ti is not None
+
+    original_execute = session.execute
+
+    class _FakeSelectResult:
+        def all(self):
+            return [(ti.id, ti.try_number + 2, TaskInstanceState.SCHEDULED)]
+
+    def execute_with_mismatch(statement, *args, **kwargs):
+        if getattr(statement, "is_select", False):
+            return _FakeSelectResult()
+        return original_execute(statement, *args, **kwargs)
+
+    monkeypatch.setattr(session, "execute", execute_with_mismatch)
+
+    with (
+        mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+        mock.patch.object(dr.log, "warning") as warning_mock,
+    ):
+        dr.schedule_tis((ti,), session=session)
+
+    assert any(
+        "schedule_tis: try_number mismatch after scheduling" in call.args[0]
+        for call in warning_mock.call_args_list
+    )
+
+
+def test_schedule_tis_try_number_match_has_no_warning(dag_maker, session: 
Session):
+    with dag_maker(session=session):
+        BaseOperator(task_id="task_1")
+
+    dr: DagRun = dag_maker.create_dagrun(session=session)
+    ti = dr.get_task_instance("task_1", session=session)
+    assert ti is not None
+
+    with (
+        mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+        mock.patch.object(dr.log, "warning") as warning_mock,
+    ):
+        dr.schedule_tis((ti,), session=session)
+
+    assert all(
+        "schedule_tis: try_number mismatch after scheduling" not in 
call.args[0]
+        for call in warning_mock.call_args_list
+    )
+
+
+def test_schedule_tis_try_number_check_is_debug_only(dag_maker, session: 
Session, monkeypatch):
+    with dag_maker(session=session):
+        BaseOperator(task_id="task_1")
+
+    dr: DagRun = dag_maker.create_dagrun(session=session)
+    ti = dr.get_task_instance("task_1", session=session)
+    assert ti is not None
+
+    original_execute = session.execute
+    select_calls = 0
+
+    def execute_with_counter(statement, *args, **kwargs):
+        nonlocal select_calls
+        if getattr(statement, "is_select", False):
+            select_calls += 1
+        return original_execute(statement, *args, **kwargs)
+
+    monkeypatch.setattr(session, "execute", execute_with_counter)
+
+    with mock.patch.object(dr.log, "isEnabledFor", return_value=False):
+        dr.schedule_tis((ti,), session=session)
+
+    assert select_calls == 0
+
+
 @pytest.mark.xfail(reason="We can't keep this behaviour with remote workers 
where scheduler can't reach xcom")
 def test_schedule_tis_start_trigger_through_expand(dag_maker, session):
     """

Reply via email to