kaxil commented on code in PR #64181:
URL: https://github.com/apache/airflow/pull/64181#discussion_r3018538938
##########
airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -627,5 +676,9 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
if not status.passed and changed:
# no need to evaluate trigger rule; we've already
marked as skipped or failed
return
-
- yield from _evaluate_direct_relatives()
+ yield from _evaluate_direct_relatives()
+ else:
+ statuses = list(_evaluate_direct_relatives())
+ yield from statuses
+ if not statuses:
+ yield from _evaluate_teardown_scope()
Review Comment:
`list()` eagerly materializes the entire generator, which means all side
effects inside `_evaluate_direct_relatives()` (including `ti.set_state()` on
line 460) execute immediately rather than lazily. Before this PR, `yield from
_evaluate_direct_relatives()` was lazy. This does not cause bugs today since
callers consume all statuses, but a sentinel-based approach avoids the
behavioral change:
```python
found_failure = False
for status in _evaluate_direct_relatives():
found_failure = True
yield status
if not found_failure:
yield from _evaluate_teardown_scope()
```
##########
airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -619,6 +619,55 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
reason=f"No strategy to evaluate trigger rule
'{trigger_rule_str}'."
)
+ def _evaluate_teardown_scope() -> Iterator[TIDepStatus]:
+ """Ensure all tasks between setup(s) and this teardown have
completed."""
+ if not task.dag:
+ return
+
+ setup_task_ids = {t.task_id for t in task.upstream_list if
t.is_setup}
Review Comment:
What happens when the teardown has no setup upstreams? `setup_task_ids`
would be empty, so the `for setup_id in setup_task_ids` loop on line 636 never
executes, `in_scope_ids` stays empty, and `_evaluate_teardown_scope` yields
nothing. That's fine. But what about a teardown whose setups are not direct
upstreams? `upstream_list` only returns direct upstream tasks. If a setup is an
indirect upstream (connected through a chain rather than a direct edge), it
won't be found here. Is that a valid DAG topology, or does
`as_teardown(setups=...)` always create a direct edge?
##########
airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -619,6 +619,55 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
reason=f"No strategy to evaluate trigger rule
'{trigger_rule_str}'."
)
+ def _evaluate_teardown_scope() -> Iterator[TIDepStatus]:
+ """Ensure all tasks between setup(s) and this teardown have
completed."""
+ if not task.dag:
+ return
+
+ setup_task_ids = {t.task_id for t in task.upstream_list if
t.is_setup}
+
+ all_upstream_ids = task.get_flat_relative_ids(upstream=True)
+ indirect_upstream_ids = all_upstream_ids - task.upstream_task_ids
+
+ if not indirect_upstream_ids:
+ return
+
+ in_scope_ids = set()
+ for setup_id in setup_task_ids:
+ setup_obj = task.dag.get_task(setup_id)
+ in_scope_ids.update(indirect_upstream_ids &
setup_obj.get_flat_relative_ids(upstream=False))
+
+ in_scope_tasks = {tid: task.dag.get_task(tid) for tid in
in_scope_ids}
Review Comment:
If `in_scope_ids` ends up empty after the intersection (e.g., setup has no
indirect downstream overlap with teardown's indirect upstreams), this still
iterates all finished TIs on line 642-646 to produce `done=0`, then
`expected=0`, and exits without yielding. Adding `if not in_scope_tasks:
return` here would skip that unnecessary work.
##########
airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py:
##########
@@ -858,6 +858,117 @@ def test_teardown_tr_not_all_done(
expected_ti_state=exp_state if exp_state and flag_upstream_failed
else None,
)
+ @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+ def test_teardown_waits_for_in_scope_tasks(self, session, dag_maker,
flag_upstream_failed):
+ """
+ Teardown should not run until all tasks between setup and teardown are
done.
+
+ Regression test for https://github.com/apache/airflow/issues/29332
+ """
+ with dag_maker(session=session):
+ setup = EmptyOperator(task_id="setup").as_setup()
+ t1 = EmptyOperator(task_id="t1")
+ t2 = EmptyOperator(task_id="t2")
+ t3 = EmptyOperator(task_id="t3")
+ teardown_task =
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+ setup >> t1 >> t2 >> t3 >> teardown_task
Review Comment:
The tests all use linear chains (`setup >> t1 >> t2 >> t3 >> teardown`), but
the bug in #29332 is about parallel branches where one fails and the other is
still running. The PR description DAG (`setup >> [t_fail, t_slow] >> d >> td`)
would be the most direct regression test. Worth adding a test with that DAG
shape to verify the actual reported scenario.
Also missing: a test where an in-scope task has FAILED state. The current
tests only use SUCCESS or None. The teardown should still proceed when in-scope
tasks are in any terminal state (FAILED, UPSTREAM_FAILED, SKIPPED), not just
SUCCESS. Without that test, someone could accidentally filter on SUCCESS-only
and these tests wouldn't catch it.
##########
airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py:
##########
@@ -858,6 +858,117 @@ def test_teardown_tr_not_all_done(
expected_ti_state=exp_state if exp_state and flag_upstream_failed
else None,
)
+ @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+ def test_teardown_waits_for_in_scope_tasks(self, session, dag_maker,
flag_upstream_failed):
+ """
+ Teardown should not run until all tasks between setup and teardown are
done.
+
+ Regression test for https://github.com/apache/airflow/issues/29332
+ """
+ with dag_maker(session=session):
+ setup = EmptyOperator(task_id="setup").as_setup()
+ t1 = EmptyOperator(task_id="t1")
+ t2 = EmptyOperator(task_id="t2")
+ t3 = EmptyOperator(task_id="t3")
+ teardown_task =
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+ setup >> t1 >> t2 >> t3 >> teardown_task
+
+ dr = dag_maker.create_dagrun()
+ tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+ for task_id in ("setup", "t2", "t3"):
+ tis[task_id].state = SUCCESS
+ session.merge(tis[task_id])
+ session.flush()
+
+ teardown_ti = tis["teardown"]
+ teardown_ti.task = dag_maker.dag.get_task("teardown")
+ assert teardown_ti.state is None
+
+ dep_statuses = tuple(
+ TriggerRuleDep()._evaluate_trigger_rule(
+ ti=teardown_ti,
+
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+ session=session,
+ )
+ )
+ assert len(dep_statuses) == 1
+ assert not dep_statuses[0].passed
+ assert "in-scope" in dep_statuses[0].reason
+
+ @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+ def test_teardown_runs_when_all_in_scope_tasks_done(self, session,
dag_maker, flag_upstream_failed):
+ """
+ Teardown should run when all tasks between setup and teardown are done.
+ """
+ with dag_maker(session=session):
+ setup = EmptyOperator(task_id="setup").as_setup()
+ t1 = EmptyOperator(task_id="t1")
+ t2 = EmptyOperator(task_id="t2")
+ t3 = EmptyOperator(task_id="t3")
+ teardown_task =
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+ setup >> t1 >> t2 >> t3 >> teardown_task
+
+ dr = dag_maker.create_dagrun()
+ tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+ for task_id in ("setup", "t1", "t2", "t3"):
+ tis[task_id].state = SUCCESS
+ session.merge(tis[task_id])
+ session.flush()
+
+ teardown_ti = tis["teardown"]
+ teardown_ti.task = dag_maker.dag.get_task("teardown")
+ assert teardown_ti.state is None
+
+ dep_statuses = tuple(
+ TriggerRuleDep()._evaluate_trigger_rule(
+ ti=teardown_ti,
+
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+ session=session,
+ )
+ )
+ assert not dep_statuses
+
+ @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+ def test_teardown_waits_for_multiple_cleared_in_scope_tasks(
+ self, session, dag_maker, flag_upstream_failed
+ ):
+ """
+ Teardown should wait when multiple in-scope tasks are not done.
+ """
+ with dag_maker(session=session):
+ setup = EmptyOperator(task_id="setup").as_setup()
+ t1 = EmptyOperator(task_id="t1")
+ t2 = EmptyOperator(task_id="t2")
+ t3 = EmptyOperator(task_id="t3")
+ teardown_task =
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+ setup >> t1 >> t2 >> t3 >> teardown_task
+
+ dr = dag_maker.create_dagrun()
+ tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+ tis["setup"].state = SUCCESS
+ tis["t3"].state = SUCCESS
+ session.merge(tis["setup"])
+ session.merge(tis["t3"])
+ session.flush()
+
+ teardown_ti = tis["teardown"]
+ teardown_ti.task = dag_maker.dag.get_task("teardown")
+ assert teardown_ti.state is None
+
+ dep_statuses = tuple(
+ TriggerRuleDep()._evaluate_trigger_rule(
+ ti=teardown_ti,
+
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+ session=session,
+ )
+ )
+ assert len(dep_statuses) == 1
+ assert not dep_statuses[0].passed
+ assert "2 in-scope" in dep_statuses[0].reason
Review Comment:
`assert "2 in-scope" in dep_statuses[0].reason` is fragile -- it couples the
test to the exact error message format. If someone changes the message to say
"2 tasks in-scope" or "in-scope: 2", this breaks. Testing `not
dep_statuses[0].passed` is already sufficient for correctness; if you want to
verify the count, parsing from structured data would be more durable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]