This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 95784d9f2d4 Add logging to detect try number race (#62703)
95784d9f2d4 is described below
commit 95784d9f2d4edb5bf3a64ee44c1fa8264a8618df
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Mar 3 21:21:34 2026 +0100
Add logging to detect try number race (#62703)
* Log try_number mismatches during TI scheduling for HA race diagnosis
This adds more logging to select places that try_number mismatch
could happen and would help us detect and fix the issue.
Related: https://github.com/apache/airflow/issues/57618
* Add tests
---
.../src/airflow/jobs/scheduler_job_runner.py | 52 ++++++++++++++-
airflow-core/src/airflow/models/dagrun.py | 43 ++++++++++++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 44 +++++++++++-
airflow-core/tests/unit/models/test_dagrun.py | 78 ++++++++++++++++++++++
4 files changed, 213 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 0108a0f0830..2da41877c97 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -870,8 +870,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
Stats.gauge("scheduler.tasks.executable", len(executable_tis))
if executable_tis:
- task_instance_str = "\n".join(f"\t{x!r}" for x in executable_tis)
- self.log.info("Setting the following tasks to queued state:\n%s",
task_instance_str)
+ task_instance_str = "\n".join(
+ f"\t{x!r} (id={x.id}, try_number={x.try_number})" for x in
executable_tis
+ )
+ self.log.info(
+ "Setting the following tasks to queued state (scheduler
job_id=%s):\n%s",
+ self.job.id,
+ task_instance_str,
+ )
# set TIs to queued state
filter_for_tis = TI.filter_for_tis(executable_tis)
@@ -938,6 +944,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
continue
+ self.log.debug(
+ "Queueing workload for TI: %s id=%s try_number=%d state=%s
scheduler_job_id=%s executor=%s",
+ ti,
+ ti.id,
+ ti.try_number,
+ ti.state,
+ self.job.id,
+ executor,
+ )
workload = workloads.ExecuteTask.make(
ti,
generator=executor.jwt_generator,
@@ -1133,6 +1148,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# Report execution - handle both task and callback events
for key, (state, _) in event_buffer.items():
if isinstance(key, TaskInstanceKey):
+ existing_try =
ti_primary_key_to_try_number_map.get(key.primary)
+ if existing_try is not None and existing_try != key.try_number:
+ cls.logger().warning(
+ "Multiple executor events for same TI with different
try_numbers! "
+ "primary_key=%s existing_try_number=%d
new_try_number=%d new_state=%s. ",
+ key.primary,
+ existing_try,
+ key.try_number,
+ state,
+ )
ti_primary_key_to_try_number_map[key.primary] = key.try_number
cls.logger().info("Received executor event with state %s for
task instance %s", state, key)
if state in (
@@ -1197,6 +1222,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
for ti in tis:
try_number = ti_primary_key_to_try_number_map[ti.key.primary]
buffer_key = ti.key.with_try_number(try_number)
+ if ti.try_number != try_number:
+ cls.logger().warning(
+ "TI try_number mismatch: db_try_number=%d
event_try_number=%d "
+ "ti=%s ti_id=%s state=%s job_id=%s. "
+ "Another scheduler may have already modified this TI.",
+ ti.try_number,
+ try_number,
+ ti,
+ ti.id,
+ ti.state,
+ job_id,
+ )
state, info = event_buffer.pop(buffer_key)
if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
@@ -2475,6 +2512,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# query to update all the TIs across all the logical dates and dag
# IDs in a single query, but it turns out that can be _very very slow_
# see #11147/commit ee90807ac for more details
+ if schedulable_tis and self.log.isEnabledFor(logging.DEBUG):
+ self.log.debug(
+ "Scheduling TIs for dag_run=%s/%s (scheduler job_id=%s): %s",
+ dag_run.dag_id,
+ dag_run.run_id,
+ self.job.id,
+ [
+ f"{ti.task_id} (id={ti.id}, state={ti.state},
try_number={ti.try_number})"
+ for ti in schedulable_tis
+ ],
+ )
dag_run.schedule_tis(schedulable_tis, session,
max_tis_per_query=self.job.max_tis_per_query)
return callback_to_run
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index f308e26c200..0de1a784fa4 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import itertools
+import logging
import os
import re
from collections import defaultdict
@@ -2061,9 +2062,19 @@ class DagRun(Base, LoggingMixin):
# tasks using EmptyOperator and without on_execute_callback /
on_success_callback
empty_ti_ids: list[UUID] = []
schedulable_ti_ids: list[UUID] = []
+ debug_try_number_check = self.log.isEnabledFor(logging.DEBUG)
+ expected_try_number_by_ti_id: dict[UUID, tuple[int, int, str | None]]
= {}
for ti in schedulable_tis:
if ti.is_schedulable:
schedulable_ti_ids.append(ti.id)
+ if debug_try_number_check:
+ expected_try_number_by_ti_id[ti.id] = (
+ ti.try_number
+ if ti.state == TaskInstanceState.UP_FOR_RESCHEDULE
+ else ti.try_number + 1,
+ ti.try_number,
+ ti.state,
+ )
# Check "start_trigger_args" to see whether the operator supports
# start execution from triggerer. If so, we'll check
"start_from_trigger"
# to see whether this feature is turned on and defer this task.
@@ -2108,6 +2119,38 @@ class DagRun(Base, LoggingMixin):
.execution_options(synchronize_session=False)
)
count += getattr(result, "rowcount", 0)
+ if debug_try_number_check:
+ rows = session.execute(
+ select(TI.id, TI.try_number,
TI.state).where(TI.id.in_(id_chunk))
+ ).all()
+ rows_by_ti_id = {
+ ti_id: (db_try_number, db_state) for ti_id,
db_try_number, db_state in rows
+ }
+ for ti_id in id_chunk:
+ expected = expected_try_number_by_ti_id.get(ti_id)
+ if expected is None:
+ continue
+ db_row = rows_by_ti_id.get(ti_id)
+ if db_row is None:
+ continue
+ expected_try_number, pre_update_try_number,
pre_update_state = expected
+ db_try_number, db_state = db_row
+ if db_try_number != expected_try_number:
+ self.log.warning(
+ "schedule_tis: try_number mismatch after
scheduling for ti_id=%s "
+ "dag_run=%s/%s scheduler_job_id=%s "
+ "pre_state=%s pre_try_number=%d
expected_try_number=%d "
+ "db_state=%s db_try_number=%d",
+ ti_id,
+ self.dag_id,
+ self.run_id,
+ self.scheduled_by_job_id,
+ pre_update_state,
+ pre_update_try_number,
+ expected_try_number,
+ db_state,
+ db_try_number,
+ )
# Tasks using EmptyOperator should not be executed, mark them as
success
if empty_ti_ids:
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index abbd5f20067..ec4ba77e7d8 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -684,7 +684,9 @@ class TestSchedulerJob:
@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
- def test_process_executor_events_ti_requeued(self, mock_stats_incr,
mock_task_callback, dag_maker):
+ def test_process_executor_events_ti_requeued(
+ self, mock_stats_incr, mock_task_callback, dag_maker, caplog
+ ):
dag_id = "test_process_executor_events_ti_requeued"
task_id_1 = "dummy_task"
@@ -712,10 +714,12 @@ class TestSchedulerJob:
executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None
- self.job_runner._process_executor_events(executor=executor,
session=session)
+ with caplog.at_level(logging.WARNING,
logger="airflow.jobs.scheduler_job_runner"):
+ self.job_runner._process_executor_events(executor=executor,
session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
self.job_runner.executor.callback_sink.send.assert_not_called()
+ assert any("TI try_number mismatch:" in rec.message for rec in
caplog.records)
# ti is queued by another scheduler - do not fail it
ti1.state = State.QUEUED
@@ -745,6 +749,42 @@ class TestSchedulerJob:
self.job_runner.executor.callback_sink.send.assert_not_called()
mock_stats_incr.assert_not_called()
+ @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
+ @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
+ def test_process_executor_events_multiple_try_numbers_warns(
+ self, mock_stats_incr, mock_task_callback, dag_maker, caplog
+ ):
+ dag_id = "test_process_executor_events_multiple_try_numbers_warns"
+ task_id = "dummy_task"
+
+ session = settings.Session()
+ with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
+ task = EmptyOperator(task_id=task_id)
+ ti = dag_maker.create_dagrun().get_task_instance(task.task_id)
+
+ executor = MockExecutor(do_update=False)
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(scheduler_job,
executors=[executor])
+ mock_stats_incr.reset_mock()
+
+ ti.state = State.QUEUED
+ ti.try_number = 2
+ session.merge(ti)
+ session.commit()
+
+ executor.event_buffer[ti.key.with_try_number(1)] = State.RUNNING,
"first_executor_id"
+ executor.event_buffer[ti.key.with_try_number(2)] = State.RUNNING,
"second_executor_id"
+
+ with caplog.at_level(logging.WARNING,
logger="airflow.jobs.scheduler_job_runner"):
+ self.job_runner._process_executor_events(executor=executor,
session=session)
+
+ assert any(
+ "Multiple executor events for same TI with different try_numbers!"
in rec.message
+ for rec in caplog.records
+ )
+ mock_task_callback.assert_not_called()
+ mock_stats_incr.assert_not_called()
+
@pytest.mark.usefixtures("testing_dag_bundle")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events_with_asset_events(self, mock_stats_incr,
session, dag_maker):
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index e5bb6e31593..bac07ffe1f2 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -2152,6 +2152,84 @@ def
test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
assert empty_ti.try_number == 1
+def test_schedule_tis_try_number_mismatch_logs_warning(dag_maker, session:
Session, monkeypatch):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ original_execute = session.execute
+
+ class _FakeSelectResult:
+ def all(self):
+ return [(ti.id, ti.try_number + 2, TaskInstanceState.SCHEDULED)]
+
+ def execute_with_mismatch(statement, *args, **kwargs):
+ if getattr(statement, "is_select", False):
+ return _FakeSelectResult()
+ return original_execute(statement, *args, **kwargs)
+
+ monkeypatch.setattr(session, "execute", execute_with_mismatch)
+
+ with (
+ mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+ mock.patch.object(dr.log, "warning") as warning_mock,
+ ):
+ dr.schedule_tis((ti,), session=session)
+
+ assert any(
+ "schedule_tis: try_number mismatch after scheduling" in call.args[0]
+ for call in warning_mock.call_args_list
+ )
+
+
+def test_schedule_tis_try_number_match_has_no_warning(dag_maker, session:
Session):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ with (
+ mock.patch.object(dr.log, "isEnabledFor", return_value=True),
+ mock.patch.object(dr.log, "warning") as warning_mock,
+ ):
+ dr.schedule_tis((ti,), session=session)
+
+ assert all(
+ "schedule_tis: try_number mismatch after scheduling" not in
call.args[0]
+ for call in warning_mock.call_args_list
+ )
+
+
+def test_schedule_tis_try_number_check_is_debug_only(dag_maker, session:
Session, monkeypatch):
+ with dag_maker(session=session):
+ BaseOperator(task_id="task_1")
+
+ dr: DagRun = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance("task_1", session=session)
+ assert ti is not None
+
+ original_execute = session.execute
+ select_calls = 0
+
+ def execute_with_counter(statement, *args, **kwargs):
+ nonlocal select_calls
+ if getattr(statement, "is_select", False):
+ select_calls += 1
+ return original_execute(statement, *args, **kwargs)
+
+ monkeypatch.setattr(session, "execute", execute_with_counter)
+
+ with mock.patch.object(dr.log, "isEnabledFor", return_value=False):
+ dr.schedule_tis((ti,), session=session)
+
+ assert select_calls == 0
+
+
@pytest.mark.xfail(reason="We can't keep this behaviour with remote workers
where scheduler can't reach xcom")
def test_schedule_tis_start_trigger_through_expand(dag_maker, session):
"""