This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6d1794ac49e Fix downstream tasks being incorrectly skipped in HA
scheduler mode (#63266)
6d1794ac49e is described below
commit 6d1794ac49ed60872068f5df254849e04a6cc954
Author: Sam Dumont <[email protected]>
AuthorDate: Thu Mar 19 05:46:30 2026 +0100
Fix downstream tasks being incorrectly skipped in HA scheduler mode (#63266)
In HA deployments, ti_skip_downstream() issues a bulk UPDATE without
a state guard. When a BranchOperator decides to skip downstream tasks,
it can overwrite a task already RUNNING on a worker to SKIPPED, causing
a 409 heartbeat conflict that kills the task mid-execution.
Add a skippable_state_clause to the UPDATE WHERE clause so RUNNING,
SUCCESS, and FAILED tasks are never overwritten to SKIPPED.
QUEUED tasks are intentionally allowed to be skipped: no work has been
done yet and the BranchOperator's decision should take priority. The
worker pod will get a benign 409 on PATCH /run and exit cleanly.
closes: #59378
closes: #57618
---
airflow-core/newsfragments/63266.bugfix.rst | 1 +
.../execution_api/routes/task_instances.py | 20 ++++-
.../versions/head/test_task_instances.py | 89 ++++++++++++++++++++++
3 files changed, 109 insertions(+), 1 deletion(-)
diff --git a/airflow-core/newsfragments/63266.bugfix.rst
b/airflow-core/newsfragments/63266.bugfix.rst
new file mode 100644
index 00000000000..a8e1ff44aec
--- /dev/null
+++ b/airflow-core/newsfragments/63266.bugfix.rst
@@ -0,0 +1 @@
+Fix ``ti_skip_downstream`` overwriting RUNNING tasks to SKIPPED in HA
deployments.
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 60dd868c2e3..96d8f3a6c86 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -596,9 +596,27 @@ def ti_skip_downstream(
task_ids = [task if isinstance(task, tuple) else (task, -1) for task in
tasks]
log.debug("Prepared task IDs for skipping", task_ids=task_ids)
+ # Don't overwrite tasks that are already executing or finished.
+ # See: https://github.com/apache/airflow/issues/59378
+ # Note: SQL NULL NOT IN (...) is falsy, so we need an explicit IS NULL
check.
+ skippable_state_clause = or_(
+ TI.state.is_(None),
+ TI.state.not_in(
+ [
+ TaskInstanceState.RUNNING,
+ TaskInstanceState.SUCCESS,
+ TaskInstanceState.FAILED,
+ ]
+ ),
+ )
query = (
update(TI)
- .where(TI.dag_id == dag_id, TI.run_id == run_id, tuple_(TI.task_id,
TI.map_index).in_(task_ids))
+ .where(
+ TI.dag_id == dag_id,
+ TI.run_id == run_id,
+ tuple_(TI.task_id, TI.map_index).in_(task_ids),
+ skippable_state_clause,
+ )
.values(state=TaskInstanceState.SKIPPED, start_date=now, end_date=now)
.execution_options(synchronize_session=False)
)
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 835a8c46139..7cc7aa85594 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1633,6 +1633,95 @@ class TestTISkipDownstream:
assert ti1.state == State.SKIPPED
+class TestTISkipDownstreamRaceCondition:
+ """Regression tests for #59378: state guard in ti_skip_downstream()."""
+
+ def setup_method(self):
+ clear_db_runs()
+
+ def teardown_method(self):
+ clear_db_runs()
+
+ @pytest.mark.parametrize(
+ "initial_state",
+ [
+ State.RUNNING,
+ State.SUCCESS,
+ State.FAILED,
+ ],
+ )
+ def test_skip_downstream_does_not_overwrite_terminal_or_running_ti(
+ self, client, session, dag_maker, initial_state
+ ):
+ with dag_maker(f"skip_race_dag_{initial_state}", session=session):
+ branch = EmptyOperator(task_id="branch")
+ downstream = EmptyOperator(task_id="downstream")
+ branch >> downstream
+ dr = dag_maker.create_dagrun(run_id="run")
+
+ ti_branch = dr.get_task_instance("branch")
+ ti_branch.set_state(State.SUCCESS)
+
+ ti_downstream = dr.get_task_instance("downstream")
+ ti_downstream.set_state(initial_state)
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti_branch.id}/skip-downstream",
+ json={"tasks": ["downstream"]},
+ )
+ assert response.status_code == 204
+
+ session.expire_all()
+ ti_downstream = dr.get_task_instance("downstream")
+ assert ti_downstream.state == initial_state
+
+ def test_skip_downstream_does_skip_queued_ti(self, client, session,
dag_maker):
+ with dag_maker("skip_race_dag_queued", session=session):
+ branch = EmptyOperator(task_id="branch")
+ downstream = EmptyOperator(task_id="downstream")
+ branch >> downstream
+ dr = dag_maker.create_dagrun(run_id="run")
+
+ ti_branch = dr.get_task_instance("branch")
+ ti_branch.set_state(State.SUCCESS)
+
+ ti_downstream = dr.get_task_instance("downstream")
+ ti_downstream.set_state(TaskInstanceState.QUEUED)
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti_branch.id}/skip-downstream",
+ json={"tasks": ["downstream"]},
+ )
+ assert response.status_code == 204
+
+ session.expire_all()
+ ti_downstream = dr.get_task_instance("downstream")
+ assert ti_downstream.state == State.SKIPPED
+
+ def test_skip_downstream_still_skips_none_state_ti(self, client, session,
dag_maker):
+ with dag_maker("skip_race_dag_normal", session=session):
+ branch = EmptyOperator(task_id="branch")
+ downstream = EmptyOperator(task_id="downstream")
+ branch >> downstream
+ dr = dag_maker.create_dagrun(run_id="run")
+
+ ti_branch = dr.get_task_instance("branch")
+ ti_branch.set_state(State.SUCCESS)
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti_branch.id}/skip-downstream",
+ json={"tasks": ["downstream"]},
+ )
+ assert response.status_code == 204
+
+ session.expire_all()
+ ti_downstream = dr.get_task_instance("downstream")
+ assert ti_downstream.state == State.SKIPPED
+
+
class TestTIHealthEndpoint:
def setup_method(self):
clear_db_runs()