This is an automated email from the ASF dual-hosted git repository.
xddeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 402fc1e85f7 Isolate per-dag-run failures in _schedule_all_dag_runs()
to prevent single DagRun crashing the Scheduler (#62893)
402fc1e85f7 is described below
commit 402fc1e85f7feb5543c5c300f74bf05239781ec8
Author: Xiaodong DENG <[email protected]>
AuthorDate: Wed Mar 18 21:24:08 2026 -0700
Isolate per-dag-run failures in _schedule_all_dag_runs() to prevent single
DagRun crashing the Scheduler (#62893)
* Isolate per-dag-run failures in _schedule_all_dag_runs()
# What's the issue
Previously, `_schedule_all_dag_runs()` used a list comprehension to process
all DagRuns. If any `_schedule_dag_run()` raised an exception for any single
dag run, the entire comprehension would abort, no other DagRun would be
processed, and the exception would propagate up to crash the scheduler process
— **stopping scheduling for ALL DAGs**.
# How to reproduce
While the specific scenario used to reproduce this (a TaskInstance with
`state=UP_FOR_RETRY` and `end_date=NULL`) is nearly impossible under normal
operation, the lack of **per-dag-run fault isolation** means ANY unexpected
exception from ANY DagRun would have the same fatal effect.
#What's the fix
Replace the list comprehension with an explicit loop that catches
exceptions per DagRun, logs the error with full traceback, and continues
processing the remaining DagRuns.
* Address static check error
* Re-raise DBAPIError in _schedule_all_dag_runs() for proper retry handling
The broad `except Exception` was swallowing DBAPIError, preventing
@retry_db_transaction from seeing DB failures (connection drops,
deadlocks) and retrying the whole function. Add `except DBAPIError:
raise` before the general catch so DB errors propagate correctly.
Also add autospec=True to test mocks per project conventions, and add
a test verifying DBAPIError is not swallowed.
---
.../src/airflow/jobs/scheduler_job_runner.py | 12 ++-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 91 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index f8c043f0c9f..8400e13cd79 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -34,7 +34,7 @@ from itertools import groupby
from typing import TYPE_CHECKING, Any
from sqlalchemy import CTE, and_, delete, exists, func, inspect, or_, select,
text, tuple_, update
-from sqlalchemy.exc import OperationalError
+from sqlalchemy.exc import DBAPIError, OperationalError
from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient,
selectinload
from sqlalchemy.sql import expression
@@ -2218,7 +2218,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
session: Session,
) -> list[tuple[DagRun, DagCallbackRequest | None]]:
"""Make scheduling decisions for all `dag_runs`."""
- callback_tuples = [(run, self._schedule_dag_run(run, session=session))
for run in dag_runs]
+ callback_tuples = []
+ for run in dag_runs:
+ try:
+ callback = self._schedule_dag_run(run, session=session)
+ callback_tuples.append((run, callback))
+ except DBAPIError:
+ raise # let @retry_db_transaction handle DB errors
+ except Exception:
+ self.log.exception("Error scheduling DAG run %s of %s",
run.run_id, run.dag_id)
guard.commit()
return callback_tuples
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 23fbecbf73a..386eeedb608 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -5224,6 +5224,97 @@ class TestSchedulerJob:
f"Expected deserialization error log, got:
{scheduler_messages}"
)
+ def
test_schedule_all_dag_runs_does_not_crash_on_single_dag_run_error(self,
dag_maker, caplog, session):
+ """Test that _schedule_all_dag_runs continues processing other DAG runs
+ when one DAG run raises an exception during scheduling.
+
+ Previously, _schedule_all_dag_runs used a list comprehension that would
+ abort entirely if any single _schedule_dag_run call raised, crashing
+ the entire scheduler and stopping scheduling for ALL DAGs.
+
+ While the specific scenario used to reproduce this (a TaskInstance with
+ state=UP_FOR_RETRY and end_date=NULL) is nearly impossible under normal
+ operation, the lack of per-dag-run fault isolation means ANY unexpected
+ exception from ANY dag run would have the same catastrophic effect.
+ """
+ # Create two DAGs with running DAG runs
+ with dag_maker(dag_id="good_dag", schedule="@once"):
+ EmptyOperator(task_id="good_task")
+ good_run = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+ with dag_maker(dag_id="bad_dag", schedule="@once"):
+ EmptyOperator(task_id="bad_task")
+ bad_run = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ caplog.clear()
+ with (
+ caplog.at_level("ERROR",
logger="airflow.jobs.scheduler_job_runner"),
+ patch.object(
+ self.job_runner,
+ "_schedule_dag_run",
+ autospec=True,
+ side_effect=[
+ TypeError("simulated crash from corrupted task instance"),
# bad_run
+ None, # good_run
+ ],
+ ) as mock_schedule,
+ ):
+ from airflow.utils.sqlalchemy import prohibit_commit
+
+ with prohibit_commit(session) as guard:
+ result = self.job_runner._schedule_all_dag_runs(guard,
[bad_run, good_run], session=session)
+
+ # The good DAG run should have been processed despite the bad one
failing
+ assert len(result) == 1
+ assert result[0][0] == good_run
+
+ # Both dag runs should have been attempted
+ assert mock_schedule.call_count == 2
+
+ # The error should have been logged
+ error_messages = [r.message for r in caplog.records if r.levelno
>= logging.ERROR]
+ assert any(
+ msg == f"Error scheduling DAG run {bad_run.run_id} of
{bad_run.dag_id}"
+ for msg in error_messages
+ )
+
+ def test_schedule_all_dag_runs_reraises_db_errors(self, dag_maker,
session):
+ """Test that _schedule_all_dag_runs does not catch DBAPIError, allowing
+ it to propagate to @retry_db_transaction for proper retry handling.
+ """
+ from sqlalchemy.exc import DBAPIError
+
+ with dag_maker(dag_id="db_error_dag", schedule="@once"):
+ EmptyOperator(task_id="task1")
+ run = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ with patch.object(
+ self.job_runner,
+ "_schedule_dag_run",
+ autospec=True,
+ side_effect=DBAPIError("select 1", None, Exception("connection
lost")),
+ ) as mock_schedule:
+ from airflow.utils.sqlalchemy import prohibit_commit
+
+ with prohibit_commit(session) as guard:
+ # Bypass @retry_db_transaction to verify the exception escapes
+ # the inner function rather than being swallowed by except
Exception.
+ with pytest.raises(DBAPIError):
+ self.job_runner._schedule_all_dag_runs.__wrapped__(
+ self.job_runner, guard, [run], session=session
+ )
+
+ assert mock_schedule.call_count == 1
+
def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self,
dag_maker, testing_dag_bundle):
"""
Test that externally triggered Dag Runs should not affect (by
skipping) next