kaxil commented on code in PR #60330:
URL: https://github.com/apache/airflow/pull/60330#discussion_r2957597793
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2063,25 +2063,30 @@ def schedule_tis(
empty_ti_ids.append(ti.id)
count = 0
-
+ # Don't only check if the TI.id is in id_chunk
+ # but also check if the TI.state is in the schedulable states.
+ # Plus, a scheduled empty operator should not be scheduled again.
+ non_null_schedulable_states = tuple(s for s in SCHEDULEABLE_STATES if
s is not None)
+ schedulable_state_clause = or_(
+ TI.state.is_(None),
+ TI.state.in_(non_null_schedulable_states),
+ )
Review Comment:
Ran EXPLAIN ANALYZE on Postgres (500 TIs, 128-TI chunk):
Both queries use the same plan: `Index Scan using task_instance_pkey`. The
state clause shows up as a post-index `Filter`, evaluated in-memory on the rows
already fetched by PK. ~0.12ms overhead per chunk (1.69ms to 1.81ms avg). No
additional index scan.
```
-> Index Scan using task_instance_pkey on task_instance
Index Cond: (id = ANY ('{...}'::uuid[]))
Filter: ((state IS NULL) OR ((state)::text = ANY
('{up_for_retry,up_for_reschedule}'::text[])))
Planning Time: 0.234 ms
Execution Time: 0.970 ms
```
Re `!= SCHEDULED`: that only blocks one state. A TI already in
QUEUED/RUNNING/SUCCESS/FAILED would pass through and still get
double-scheduled. The whitelist matches exactly the states that
`task_instance_scheduling_decisions` filters on when building the schedulable
list, so the SQL guard mirrors the Python-side contract. Performance is
identical since both are row-level filters after PK lookup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]