kaxil commented on code in PR #64181:
URL: https://github.com/apache/airflow/pull/64181#discussion_r3018538938


##########
airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -627,5 +676,9 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
                     if not status.passed and changed:
                         # no need to evaluate trigger rule; we've already 
marked as skipped or failed
                         return
-
-        yield from _evaluate_direct_relatives()
+            yield from _evaluate_direct_relatives()
+        else:
+            statuses = list(_evaluate_direct_relatives())
+            yield from statuses
+            if not statuses:
+                yield from _evaluate_teardown_scope()

Review Comment:
   `list()` eagerly materializes the entire generator, which means all side 
effects inside `_evaluate_direct_relatives()` (including `ti.set_state()` on 
line 460) execute immediately rather than lazily. Before this PR, `yield from 
_evaluate_direct_relatives()` was lazy. This does not cause bugs today since 
callers consume all statuses, but a sentinel-based approach avoids the 
behavioral change:
   
   ```python
   found_failure = False
   for status in _evaluate_direct_relatives():
       found_failure = True
       yield status
   if not found_failure:
       yield from _evaluate_teardown_scope()
   ```



##########
airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -619,6 +619,55 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
                     reason=f"No strategy to evaluate trigger rule 
'{trigger_rule_str}'."
                 )
 
+        def _evaluate_teardown_scope() -> Iterator[TIDepStatus]:
+            """Ensure all tasks between setup(s) and this teardown have 
completed."""
+            if not task.dag:
+                return
+
+            setup_task_ids = {t.task_id for t in task.upstream_list if 
t.is_setup}

Review Comment:
   What happens when the teardown has no setup upstreams? `setup_task_ids` 
would be empty, so the `for setup_id in setup_task_ids` loop on line 636 never 
executes, `in_scope_ids` stays empty, and `_evaluate_teardown_scope` yields 
nothing. That's fine. But what about a teardown whose setups are not direct 
upstreams? `upstream_list` only returns direct upstream tasks. If a setup is an 
indirect upstream (connected through a chain rather than a direct edge), it 
won't be found here. Is that a valid DAG topology, or does 
`as_teardown(setups=...)` always create a direct edge?



##########
airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -619,6 +619,55 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
                     reason=f"No strategy to evaluate trigger rule 
'{trigger_rule_str}'."
                 )
 
+        def _evaluate_teardown_scope() -> Iterator[TIDepStatus]:
+            """Ensure all tasks between setup(s) and this teardown have 
completed."""
+            if not task.dag:
+                return
+
+            setup_task_ids = {t.task_id for t in task.upstream_list if 
t.is_setup}
+
+            all_upstream_ids = task.get_flat_relative_ids(upstream=True)
+            indirect_upstream_ids = all_upstream_ids - task.upstream_task_ids
+
+            if not indirect_upstream_ids:
+                return
+
+            in_scope_ids = set()
+            for setup_id in setup_task_ids:
+                setup_obj = task.dag.get_task(setup_id)
+                in_scope_ids.update(indirect_upstream_ids & 
setup_obj.get_flat_relative_ids(upstream=False))
+
+            in_scope_tasks = {tid: task.dag.get_task(tid) for tid in 
in_scope_ids}

Review Comment:
   If `in_scope_ids` ends up empty after the intersection (e.g., setup has no 
indirect downstream overlap with teardown's indirect upstreams), this still 
iterates all finished TIs on line 642-646 to produce `done=0`, then 
`expected=0`, and exits without yielding. Adding `if not in_scope_tasks: 
return` here would skip that unnecessary work.



##########
airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py:
##########
@@ -858,6 +858,117 @@ def test_teardown_tr_not_all_done(
             expected_ti_state=exp_state if exp_state and flag_upstream_failed 
else None,
         )
 
+    @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+    def test_teardown_waits_for_in_scope_tasks(self, session, dag_maker, 
flag_upstream_failed):
+        """
+        Teardown should not run until all tasks between setup and teardown are 
done.
+
+        Regression test for https://github.com/apache/airflow/issues/29332
+        """
+        with dag_maker(session=session):
+            setup = EmptyOperator(task_id="setup").as_setup()
+            t1 = EmptyOperator(task_id="t1")
+            t2 = EmptyOperator(task_id="t2")
+            t3 = EmptyOperator(task_id="t3")
+            teardown_task = 
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+            setup >> t1 >> t2 >> t3 >> teardown_task

Review Comment:
   The tests all use linear chains (`setup >> t1 >> t2 >> t3 >> teardown`), but 
the bug in #29332 is about parallel branches where one fails and the other is 
still running. The PR description DAG (`setup >> [t_fail, t_slow] >> d >> td`) 
would be the most direct regression test. Worth adding a test with that DAG 
shape to verify the actual reported scenario.
   
   Also missing: a test where an in-scope task has FAILED state. The current 
tests only use SUCCESS or None. The teardown should still proceed when in-scope 
tasks are in any terminal state (FAILED, UPSTREAM_FAILED, SKIPPED), not just 
SUCCESS. Without that test, someone could accidentally filter on SUCCESS-only 
and these tests wouldn't catch it.



##########
airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py:
##########
@@ -858,6 +858,117 @@ def test_teardown_tr_not_all_done(
             expected_ti_state=exp_state if exp_state and flag_upstream_failed 
else None,
         )
 
+    @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+    def test_teardown_waits_for_in_scope_tasks(self, session, dag_maker, 
flag_upstream_failed):
+        """
+        Teardown should not run until all tasks between setup and teardown are 
done.
+
+        Regression test for https://github.com/apache/airflow/issues/29332
+        """
+        with dag_maker(session=session):
+            setup = EmptyOperator(task_id="setup").as_setup()
+            t1 = EmptyOperator(task_id="t1")
+            t2 = EmptyOperator(task_id="t2")
+            t3 = EmptyOperator(task_id="t3")
+            teardown_task = 
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+            setup >> t1 >> t2 >> t3 >> teardown_task
+
+        dr = dag_maker.create_dagrun()
+        tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+        for task_id in ("setup", "t2", "t3"):
+            tis[task_id].state = SUCCESS
+            session.merge(tis[task_id])
+        session.flush()
+
+        teardown_ti = tis["teardown"]
+        teardown_ti.task = dag_maker.dag.get_task("teardown")
+        assert teardown_ti.state is None
+
+        dep_statuses = tuple(
+            TriggerRuleDep()._evaluate_trigger_rule(
+                ti=teardown_ti,
+                
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+                session=session,
+            )
+        )
+        assert len(dep_statuses) == 1
+        assert not dep_statuses[0].passed
+        assert "in-scope" in dep_statuses[0].reason
+
+    @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+    def test_teardown_runs_when_all_in_scope_tasks_done(self, session, 
dag_maker, flag_upstream_failed):
+        """
+        Teardown should run when all tasks between setup and teardown are done.
+        """
+        with dag_maker(session=session):
+            setup = EmptyOperator(task_id="setup").as_setup()
+            t1 = EmptyOperator(task_id="t1")
+            t2 = EmptyOperator(task_id="t2")
+            t3 = EmptyOperator(task_id="t3")
+            teardown_task = 
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+            setup >> t1 >> t2 >> t3 >> teardown_task
+
+        dr = dag_maker.create_dagrun()
+        tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+        for task_id in ("setup", "t1", "t2", "t3"):
+            tis[task_id].state = SUCCESS
+            session.merge(tis[task_id])
+        session.flush()
+
+        teardown_ti = tis["teardown"]
+        teardown_ti.task = dag_maker.dag.get_task("teardown")
+        assert teardown_ti.state is None
+
+        dep_statuses = tuple(
+            TriggerRuleDep()._evaluate_trigger_rule(
+                ti=teardown_ti,
+                
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+                session=session,
+            )
+        )
+        assert not dep_statuses
+
+    @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+    def test_teardown_waits_for_multiple_cleared_in_scope_tasks(
+        self, session, dag_maker, flag_upstream_failed
+    ):
+        """
+        Teardown should wait when multiple in-scope tasks are not done.
+        """
+        with dag_maker(session=session):
+            setup = EmptyOperator(task_id="setup").as_setup()
+            t1 = EmptyOperator(task_id="t1")
+            t2 = EmptyOperator(task_id="t2")
+            t3 = EmptyOperator(task_id="t3")
+            teardown_task = 
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+            setup >> t1 >> t2 >> t3 >> teardown_task
+
+        dr = dag_maker.create_dagrun()
+        tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+        tis["setup"].state = SUCCESS
+        tis["t3"].state = SUCCESS
+        session.merge(tis["setup"])
+        session.merge(tis["t3"])
+        session.flush()
+
+        teardown_ti = tis["teardown"]
+        teardown_ti.task = dag_maker.dag.get_task("teardown")
+        assert teardown_ti.state is None
+
+        dep_statuses = tuple(
+            TriggerRuleDep()._evaluate_trigger_rule(
+                ti=teardown_ti,
+                
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+                session=session,
+            )
+        )
+        assert len(dep_statuses) == 1
+        assert not dep_statuses[0].passed
+        assert "2 in-scope" in dep_statuses[0].reason

Review Comment:
   `assert "2 in-scope" in dep_statuses[0].reason` is fragile -- it couples the 
test to the exact error message format. If someone changes the message to say 
"2 tasks in-scope" or "in-scope: 2", this breaks. Testing `not 
dep_statuses[0].passed` is already sufficient for correctness; if you want to 
verify the count, parsing from structured data would be more durable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to