uranusjr commented on code in PR #60773:
URL: https://github.com/apache/airflow/pull/60773#discussion_r2887418199
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2135,26 +2135,44 @@ def _create_dag_runs_asset_triggered(
triggered_date_by_dag: dict[str, datetime],
session: Session,
) -> None:
- """For DAGs that are triggered by assets, create dag runs."""
- triggered_dates: dict[str, DateTime] = {
- dag_id: timezone.coerce_datetime(last_asset_event_time)
- for dag_id, last_asset_event_time in triggered_date_by_dag.items()
- }
-
+ """For Dags that are triggered by assets, create Dag runs."""
for dag_model in dag_models:
+ if dag_model.dag_id not in triggered_date_by_dag:
+ continue
dag = self._get_current_dag(dag_id=dag_model.dag_id,
session=session)
if not dag:
- self.log.error("DAG '%s' not found in serialized_dag table",
dag_model.dag_id)
+ self.log.error("Dag '%s' not found in serialized_dag table",
dag_model.dag_id)
continue
if not isinstance(dag.timetable, AssetTriggeredTimetable):
self.log.error(
- "DAG '%s' was asset-scheduled, but didn't have an
AssetTriggeredTimetable!",
+ "Dag '%s' was asset-scheduled, but didn't have an
AssetTriggeredTimetable!",
dag_model.dag_id,
)
continue
- triggered_date = triggered_dates[dag.dag_id]
+ queued_adrqs = list(
+ session.scalars(
+ with_row_locks(
+
select(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag.dag_id),
+ of=AssetDagRunQueue,
+ skip_locked=True,
+ key_share=False,
+ session=session,
+ )
+ )
+ )
+ # If another scheduler already locked these ADRQ rows, SKIP LOCKED
makes this scheduler skip them.
+ if not queued_adrqs:
+ self.log.debug(
+ "Skipping asset-triggered DagRun creation for Dag '%s'; no
queued assets remain.",
+ dag.dag_id,
+ )
+ continue
+
+ triggered_date: DateTime = timezone.coerce_datetime(
+ max(record.created_at for record in queued_adrqs)
Review Comment:
Instead of `max()`, maybe the previous query can have an `order_by()` so
this can simply do `[0]`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]