This is an automated email from the ASF dual-hosted git repository.

xddeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 402fc1e85f7 Isolate per-dag-run failures in _schedule_all_dag_runs() 
to prevent single DagRun crashing the Scheduler (#62893)
402fc1e85f7 is described below

commit 402fc1e85f7feb5543c5c300f74bf05239781ec8
Author: Xiaodong DENG <[email protected]>
AuthorDate: Wed Mar 18 21:24:08 2026 -0700

    Isolate per-dag-run failures in _schedule_all_dag_runs() to prevent single 
DagRun crashing the Scheduler (#62893)
    
    * Isolate per-dag-run failures in _schedule_all_dag_runs()
    
    # What's the issue
    Previously, `_schedule_all_dag_runs()` used a list comprehension to process 
all DagRuns. If any `_schedule_dag_run()` raised an exception for any single 
dag run, the entire comprehension would abort, no other DagRun would be 
processed, and the exception would propagate up to crash the scheduler process 
— **stopping scheduling for ALL DAGs**.
    
    # How to reproduce
    While the specific scenario used to reproduce this (a TaskInstance with 
`state=UP_FOR_RETRY` and `end_date=NULL`) is nearly impossible under normal 
operation, the lack of **per-dag-run fault isolation** means ANY unexpected 
exception from ANY DagRun would have the same fatal effect.
    
    #What's the fix
    Replace the list comprehension with an explicit loop that catches 
exceptions per DagRun, logs the error with full traceback, and continues 
processing the remaining DagRuns.
    
    * Address static check error
    
    * Re-raise DBAPIError in _schedule_all_dag_runs() for proper retry handling
    
    The broad `except Exception` was swallowing DBAPIError, preventing
    @retry_db_transaction from seeing DB failures (connection drops,
    deadlocks) and retrying the whole function. Add `except DBAPIError:
    raise` before the general catch so DB errors propagate correctly.
    
    Also add autospec=True to test mocks per project conventions, and add
    a test verifying DBAPIError is not swallowed.
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 12 ++-
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 91 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index f8c043f0c9f..8400e13cd79 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -34,7 +34,7 @@ from itertools import groupby
 from typing import TYPE_CHECKING, Any
 
 from sqlalchemy import CTE, and_, delete, exists, func, inspect, or_, select, 
text, tuple_, update
-from sqlalchemy.exc import OperationalError
+from sqlalchemy.exc import DBAPIError, OperationalError
 from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, 
selectinload
 from sqlalchemy.sql import expression
 
@@ -2218,7 +2218,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         session: Session,
     ) -> list[tuple[DagRun, DagCallbackRequest | None]]:
         """Make scheduling decisions for all `dag_runs`."""
-        callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
+        callback_tuples = []
+        for run in dag_runs:
+            try:
+                callback = self._schedule_dag_run(run, session=session)
+                callback_tuples.append((run, callback))
+            except DBAPIError:
+                raise  # let @retry_db_transaction handle DB errors
+            except Exception:
+                self.log.exception("Error scheduling DAG run %s of %s", 
run.run_id, run.dag_id)
         guard.commit()
         return callback_tuples
 
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 23fbecbf73a..386eeedb608 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -5224,6 +5224,97 @@ class TestSchedulerJob:
                 f"Expected deserialization error log, got: 
{scheduler_messages}"
             )
 
+    def 
test_schedule_all_dag_runs_does_not_crash_on_single_dag_run_error(self, 
dag_maker, caplog, session):
+        """Test that _schedule_all_dag_runs continues processing other DAG runs
+        when one DAG run raises an exception during scheduling.
+
+        Previously, _schedule_all_dag_runs used a list comprehension that would
+        abort entirely if any single _schedule_dag_run call raised, crashing
+        the entire scheduler and stopping scheduling for ALL DAGs.
+
+        While the specific scenario used to reproduce this (a TaskInstance with
+        state=UP_FOR_RETRY and end_date=NULL) is nearly impossible under normal
+        operation, the lack of per-dag-run fault isolation means ANY unexpected
+        exception from ANY dag run would have the same catastrophic effect.
+        """
+        # Create two DAGs with running DAG runs
+        with dag_maker(dag_id="good_dag", schedule="@once"):
+            EmptyOperator(task_id="good_task")
+        good_run = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+        with dag_maker(dag_id="bad_dag", schedule="@once"):
+            EmptyOperator(task_id="bad_task")
+        bad_run = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        caplog.clear()
+        with (
+            caplog.at_level("ERROR", 
logger="airflow.jobs.scheduler_job_runner"),
+            patch.object(
+                self.job_runner,
+                "_schedule_dag_run",
+                autospec=True,
+                side_effect=[
+                    TypeError("simulated crash from corrupted task instance"), 
 # bad_run
+                    None,  # good_run
+                ],
+            ) as mock_schedule,
+        ):
+            from airflow.utils.sqlalchemy import prohibit_commit
+
+            with prohibit_commit(session) as guard:
+                result = self.job_runner._schedule_all_dag_runs(guard, 
[bad_run, good_run], session=session)
+
+            # The good DAG run should have been processed despite the bad one 
failing
+            assert len(result) == 1
+            assert result[0][0] == good_run
+
+            # Both dag runs should have been attempted
+            assert mock_schedule.call_count == 2
+
+            # The error should have been logged
+            error_messages = [r.message for r in caplog.records if r.levelno 
>= logging.ERROR]
+            assert any(
+                msg == f"Error scheduling DAG run {bad_run.run_id} of 
{bad_run.dag_id}"
+                for msg in error_messages
+            )
+
+    def test_schedule_all_dag_runs_reraises_db_errors(self, dag_maker, 
session):
+        """Test that _schedule_all_dag_runs does not catch DBAPIError, allowing
+        it to propagate to @retry_db_transaction for proper retry handling.
+        """
+        from sqlalchemy.exc import DBAPIError
+
+        with dag_maker(dag_id="db_error_dag", schedule="@once"):
+            EmptyOperator(task_id="task1")
+        run = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        with patch.object(
+            self.job_runner,
+            "_schedule_dag_run",
+            autospec=True,
+            side_effect=DBAPIError("select 1", None, Exception("connection 
lost")),
+        ) as mock_schedule:
+            from airflow.utils.sqlalchemy import prohibit_commit
+
+            with prohibit_commit(session) as guard:
+                # Bypass @retry_db_transaction to verify the exception escapes
+                # the inner function rather than being swallowed by except 
Exception.
+                with pytest.raises(DBAPIError):
+                    self.job_runner._schedule_all_dag_runs.__wrapped__(
+                        self.job_runner, guard, [run], session=session
+                    )
+
+            assert mock_schedule.call_count == 1
+
     def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, 
dag_maker, testing_dag_bundle):
         """
         Test that externally triggered Dag Runs should not affect (by 
skipping) next

Reply via email to