This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 95784d9f2d4 Add logging to detect try number race (#62703)
95784d9f2d4 is described below

commit 95784d9f2d4edb5bf3a64ee44c1fa8264a8618df
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Mar 3 21:21:34 2026 +0100

    Add logging to detect try number race (#62703)
    
    * Log try_number mismatches during TI scheduling for HA race diagnosis
    
    This adds more logging to select places that try_number mismatch
    could happen and would help us detect and fix the issue.
    
    Related: https://github.com/apache/airflow/issues/57618
    
    * Add tests
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 52 ++++++++++++++-
 airflow-core/src/airflow/models/dagrun.py          | 43 ++++++++++++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 44 +++++++++++-
 airflow-core/tests/unit/models/test_dagrun.py      | 78 ++++++++++++++++++++++
 4 files changed, 213 insertions(+), 4 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 0108a0f0830..2da41877c97 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -870,8 +870,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         Stats.gauge("scheduler.tasks.executable", len(executable_tis))
 
         if executable_tis:
-            task_instance_str = "\n".join(f"\t{x!r}" for x in executable_tis)
-            self.log.info("Setting the following tasks to queued state:\n%s", 
task_instance_str)
+            task_instance_str = "\n".join(
+                f"\t{x!r} (id={x.id}, try_number={x.try_number})" for x in 
executable_tis
+            )
+            self.log.info(
+                "Setting the following tasks to queued state (scheduler 
job_id=%s):\n%s",
+                self.job.id,
+                task_instance_str,
+            )
 
             # set TIs to queued state
             filter_for_tis = TI.filter_for_tis(executable_tis)
@@ -938,6 +944,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 )
                 continue
 
+            self.log.debug(
+                "Queueing workload for TI: %s id=%s try_number=%d state=%s 
scheduler_job_id=%s executor=%s",
+                ti,
+                ti.id,
+                ti.try_number,
+                ti.state,
+                self.job.id,
+                executor,
+            )
             workload = workloads.ExecuteTask.make(
                 ti,
                 generator=executor.jwt_generator,
@@ -1133,6 +1148,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         # Report execution - handle both task and callback events
         for key, (state, _) in event_buffer.items():
             if isinstance(key, TaskInstanceKey):
+                existing_try = 
ti_primary_key_to_try_number_map.get(key.primary)
+                if existing_try is not None and existing_try != key.try_number:
+                    cls.logger().warning(
+                        "Multiple executor events for same TI with different 
try_numbers! "
+                        "primary_key=%s existing_try_number=%d 
new_try_number=%d new_state=%s. ",
+                        key.primary,
+                        existing_try,
+                        key.try_number,
+                        state,
+                    )
                 ti_primary_key_to_try_number_map[key.primary] = key.try_number
                 cls.logger().info("Received executor event with state %s for 
task instance %s", state, key)
                 if state in (
@@ -1197,6 +1222,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         for ti in tis:
             try_number = ti_primary_key_to_try_number_map[ti.key.primary]
             buffer_key = ti.key.with_try_number(try_number)
+            if ti.try_number != try_number:
+                cls.logger().warning(
+                    "TI try_number mismatch: db_try_number=%d 
event_try_number=%d "
+                    "ti=%s ti_id=%s state=%s job_id=%s. "
+                    "Another scheduler may have already modified this TI.",
+                    ti.try_number,
+                    try_number,
+                    ti,
+                    ti.id,
+                    ti.state,
+                    job_id,
+                )
             state, info = event_buffer.pop(buffer_key)
 
             if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
@@ -2475,6 +2512,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         # query to update all the TIs across all the logical dates and dag
         # IDs in a single query, but it turns out that can be _very very slow_
         # see #11147/commit ee90807ac for more details
+        if schedulable_tis and self.log.isEnabledFor(logging.DEBUG):
+            self.log.debug(
+                "Scheduling TIs for dag_run=%s/%s (scheduler job_id=%s): %s",
+                dag_run.dag_id,
+                dag_run.run_id,
+                self.job.id,
+                [
+                    f"{ti.task_id} (id={ti.id}, state={ti.state}, 
try_number={ti.try_number})"
+                    for ti in schedulable_tis
+                ],
+            )
         dag_run.schedule_tis(schedulable_tis, session, 
max_tis_per_query=self.job.max_tis_per_query)
 
         return callback_to_run
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index f308e26c200..0de1a784fa4 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import itertools
+import logging
 import os
 import re
 from collections import defaultdict
@@ -2061,9 +2062,19 @@ class DagRun(Base, LoggingMixin):
         # tasks using EmptyOperator and without on_execute_callback / 
on_success_callback
         empty_ti_ids: list[UUID] = []
         schedulable_ti_ids: list[UUID] = []
+        debug_try_number_check = self.log.isEnabledFor(logging.DEBUG)
+        expected_try_number_by_ti_id: dict[UUID, tuple[int, int, str | None]] 
= {}
         for ti in schedulable_tis:
             if ti.is_schedulable:
                 schedulable_ti_ids.append(ti.id)
+                if debug_try_number_check:
+                    expected_try_number_by_ti_id[ti.id] = (
+                        ti.try_number
+                        if ti.state == TaskInstanceState.UP_FOR_RESCHEDULE
+                        else ti.try_number + 1,
+                        ti.try_number,
+                        ti.state,
+                    )
             # Check "start_trigger_args" to see whether the operator supports
             # start execution from triggerer. If so, we'll check 
"start_from_trigger"
             # to see whether this feature is turned on and defer this task.
@@ -2108,6 +2119,38 @@ class DagRun(Base, LoggingMixin):
                     .execution_options(synchronize_session=False)
                 )
                 count += getattr(result, "rowcount", 0)
+                if debug_try_number_check:
+                    rows = session.execute(
+                        select(TI.id, TI.try_number, 
TI.state).where(TI.id.in_(id_chunk))
+                    ).all()
+                    rows_by_ti_id = {
+                        ti_id: (db_try_number, db_state) for ti_id, 
db_try_number, db_state in rows
+                    }
+                    for ti_id in id_chunk:
+                        expected = expected_try_number_by_ti_id.get(ti_id)
+                        if expected is None:
+                            continue
+                        db_row = rows_by_ti_id.get(ti_id)
+                        if db_row is None:
+                            continue
+                        expected_try_number, pre_update_try_number, 
pre_update_state = expected
+                        db_try_number, db_state = db_row
+                        if db_try_number != expected_try_number:
+                            self.log.warning(
+                                "schedule_tis: try_number mismatch after 
scheduling for ti_id=%s "
+                                "dag_run=%s/%s scheduler_job_id=%s "
+                                "pre_state=%s pre_try_number=%d 
expected_try_number=%d "
+                                "db_state=%s db_try_number=%d",
+                                ti_id,
+                                self.dag_id,
+                                self.run_id,
+                                self.scheduled_by_job_id,
+                                pre_update_state,
+                                pre_update_try_number,
+                                expected_try_number,
+                                db_state,
+                                db_try_number,
+                            )
 
         # Tasks using EmptyOperator should not be executed, mark them as 
success
         if empty_ti_ids:
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index abbd5f20067..ec4ba77e7d8 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -684,7 +684,9 @@ class TestSchedulerJob:
 
     @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
     @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
-    def test_process_executor_events_ti_requeued(self, mock_stats_incr, 
mock_task_callback, dag_maker):
+    def test_process_executor_events_ti_requeued(
+        self, mock_stats_incr, mock_task_callback, dag_maker, caplog
+    ):
         dag_id = "test_process_executor_events_ti_requeued"
         task_id_1 = "dummy_task"
 
@@ -712,10 +714,12 @@ class TestSchedulerJob:
 
         executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None
 
-        self.job_runner._process_executor_events(executor=executor, 
session=session)
+        with caplog.at_level(logging.WARNING, 
logger="airflow.jobs.scheduler_job_runner"):
+            self.job_runner._process_executor_events(executor=executor, 
session=session)
         ti1.refresh_from_db(session=session)
         assert ti1.state == State.QUEUED
         self.job_runner.executor.callback_sink.send.assert_not_called()
+        assert any("TI try_number mismatch:" in rec.message for rec in 
caplog.records)
 
         # ti is queued by another scheduler - do not fail it
         ti1.state = State.QUEUED
@@ -745,6 +749,42 @@ class TestSchedulerJob:
         self.job_runner.executor.callback_sink.send.assert_not_called()
         mock_stats_incr.assert_not_called()
 
+    @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
+    @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
+    def test_process_executor_events_multiple_try_numbers_warns(
+        self, mock_stats_incr, mock_task_callback, dag_maker, caplog
+    ):
+        dag_id = "test_process_executor_events_multiple_try_numbers_warns"
+        task_id = "dummy_task"
+
+        session = settings.Session()
+        with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
+            task = EmptyOperator(task_id=task_id)
+        ti = dag_maker.create_dagrun().get_task_instance(task.task_id)
+
+        executor = MockExecutor(do_update=False)
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(scheduler_job, 
executors=[executor])
+        mock_stats_incr.reset_mock()
+
+        ti.state = State.QUEUED
+        ti.try_number = 2
+        session.merge(ti)
+        session.commit()
+
+        executor.event_buffer[ti.key.with_try_number(1)] = State.RUNNING, 
"first_executor_id"
+        executor.event_buffer[ti.key.with_try_number(2)] = State.RUNNING, 
"second_executor_id"
+
+        with caplog.at_level(logging.WARNING, 
logger="airflow.jobs.scheduler_job_runner"):
+            self.job_runner._process_executor_events(executor=executor, 
session=session)
+
+        assert any(
+            "Multiple executor events for same TI with different try_numbers!" 
in rec.message
+            for rec in caplog.records
+        )
+        mock_task_callback.assert_not_called()
+        mock_stats_incr.assert_not_called()
+
     @pytest.mark.usefixtures("testing_dag_bundle")
     @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
     def test_process_executor_events_with_asset_events(self, mock_stats_incr, 
session, dag_maker):
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index e5bb6e31593..bac07ffe1f2 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -2152,6 +2152,84 @@ def 
test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
     assert empty_ti.try_number == 1
 
 
+def test_schedule_tis_try_number_mismatch_logs_warning(dag_maker, session: 
Session, monkeypatch):
+    with dag_maker(session=session):
+        BaseOperator(task_id="task_1")
+
+    dr: DagRun = dag_maker.create_dagrun(session=session)
+    ti = dr.get_task_instance("task_1", session=session)
+    assert ti is not None
+
+    original_execute = session.execute
+
+    class _FakeSelectResult:
+        def all(self):
+            return [(ti.id, ti.try_number + 2, TaskInstanceState.SCHEDULED)]
+
+    def execute_with_mismatch(statement, *args, **kwargs):
+        if getattr(statement, "is_select", False):
+            return _FakeSelectResult()
+        return original_execute(statement, *args, **kwargs)
+
+    monkeypatch.setattr(session, "execute", execute_with_mismatch)
+
+    with (
+        mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+        mock.patch.object(dr.log, "warning") as warning_mock,
+    ):
+        dr.schedule_tis((ti,), session=session)
+
+    assert any(
+        "schedule_tis: try_number mismatch after scheduling" in call.args[0]
+        for call in warning_mock.call_args_list
+    )
+
+
+def test_schedule_tis_try_number_match_has_no_warning(dag_maker, session: 
Session):
+    with dag_maker(session=session):
+        BaseOperator(task_id="task_1")
+
+    dr: DagRun = dag_maker.create_dagrun(session=session)
+    ti = dr.get_task_instance("task_1", session=session)
+    assert ti is not None
+
+    with (
+        mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+        mock.patch.object(dr.log, "warning") as warning_mock,
+    ):
+        dr.schedule_tis((ti,), session=session)
+
+    assert all(
+        "schedule_tis: try_number mismatch after scheduling" not in 
call.args[0]
+        for call in warning_mock.call_args_list
+    )
+
+
+def test_schedule_tis_try_number_check_is_debug_only(dag_maker, session: 
Session, monkeypatch):
+    with dag_maker(session=session):
+        BaseOperator(task_id="task_1")
+
+    dr: DagRun = dag_maker.create_dagrun(session=session)
+    ti = dr.get_task_instance("task_1", session=session)
+    assert ti is not None
+
+    original_execute = session.execute
+    select_calls = 0
+
+    def execute_with_counter(statement, *args, **kwargs):
+        nonlocal select_calls
+        if getattr(statement, "is_select", False):
+            select_calls += 1
+        return original_execute(statement, *args, **kwargs)
+
+    monkeypatch.setattr(session, "execute", execute_with_counter)
+
+    with mock.patch.object(dr.log, "isEnabledFor", return_value=False):
+        dr.schedule_tis((ti,), session=session)
+
+    assert select_calls == 0
+
+
 @pytest.mark.xfail(reason="We can't keep this behaviour with remote workers 
where scheduler can't reach xcom")
 def test_schedule_tis_start_trigger_through_expand(dag_maker, session):
     """

Reply via email to