This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 9e462bac5d4 [v3-1-test] Add row lock to ADRQ before Dag run creation 
(apache#60773) (#63776)
9e462bac5d4 is described below

commit 9e462bac5d421caffbb0f05be340af3226e95b69
Author: Wei Lee <[email protected]>
AuthorDate: Mon Mar 23 10:24:45 2026 +0800

    [v3-1-test] Add row lock to ADRQ before Dag run creation (apache#60773) 
(#63776)
    
    * [v3-1-test] Add row lock to ADRQ before Dag run creation (#60773)
    (cherry picked from commit b3055d7bb8cea89a77d7179b8896b84bac29de77)
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * fixup! [v3-1-test] Add row lock to ADRQ before Dag run creation (#60773) 
(cherry picked from commit b3055d7bb8cea89a77d7179b8896b84bac29de77)
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 45 ++++++++---
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 89 +++++++++++++++++++++-
 2 files changed, 119 insertions(+), 15 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index fc223d7ca80..587db7d0d2a 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1592,7 +1592,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         if asset_triggered_dags:
             self._create_dag_runs_asset_triggered(
                 dag_models=asset_triggered_dags,
-                triggered_date_by_dag=triggered_date_by_dag,
                 session=session,
             )
 
@@ -1705,30 +1704,44 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
     def _create_dag_runs_asset_triggered(
         self,
+        *,
         dag_models: Collection[DagModel],
-        triggered_date_by_dag: dict[str, datetime],
         session: Session,
     ) -> None:
-        """For DAGs that are triggered by assets, create dag runs."""
-        triggered_dates: dict[str, DateTime] = {
-            dag_id: timezone.coerce_datetime(last_asset_event_time)
-            for dag_id, last_asset_event_time in triggered_date_by_dag.items()
-        }
-
+        """For Dags that are triggered by assets, create Dag runs."""
         for dag_model in dag_models:
             dag = self._get_current_dag(dag_id=dag_model.dag_id, 
session=session)
             if not dag:
-                self.log.error("DAG '%s' not found in serialized_dag table", 
dag_model.dag_id)
+                self.log.error("Dag '%s' not found in serialized_dag table", 
dag_model.dag_id)
                 continue
 
             if not isinstance(dag.timetable, AssetTriggeredTimetable):
                 self.log.error(
-                    "DAG '%s' was asset-scheduled, but didn't have an 
AssetTriggeredTimetable!",
+                    "Dag '%s' was asset-scheduled, but didn't have an 
AssetTriggeredTimetable!",
                     dag_model.dag_id,
                 )
                 continue
 
-            triggered_date = triggered_dates[dag.dag_id]
+            queued_adrqs = session.scalars(
+                with_row_locks(
+                    select(AssetDagRunQueue)
+                    .where(AssetDagRunQueue.target_dag_id == dag.dag_id)
+                    .order_by(AssetDagRunQueue.created_at.desc()),
+                    of=AssetDagRunQueue,
+                    skip_locked=True,
+                    key_share=False,
+                    session=session,
+                )
+            ).all()
+            # If another scheduler already locked these ADRQ rows, SKIP LOCKED 
makes this scheduler skip them.
+            if not queued_adrqs:
+                self.log.debug(
+                    "Skipping asset-triggered DagRun creation for Dag '%s'; no 
queued assets remain.",
+                    dag.dag_id,
+                )
+                continue
+
+            triggered_date: DateTime = 
timezone.coerce_datetime(queued_adrqs[0].created_at)
             cte = (
                 
select(func.max(DagRun.run_after).label("previous_dag_run_run_after"))
                 .where(
@@ -1775,7 +1788,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
             Stats.incr("asset.triggered_dagruns")
             dag_run.consumed_asset_events.extend(asset_events)
-            
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_run.dag_id))
+
+            # Delete only consumed ADRQ rows to avoid dropping newly queued 
events
+            # (e.g. DagRun triggered by asset A while a new event for asset B 
arrives).
+            adrq_pks = [(record.asset_id, record.target_dag_id) for record in 
queued_adrqs]
+            session.execute(
+                delete(AssetDagRunQueue).where(
+                    tuple_(AssetDagRunQueue.asset_id, 
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
+                )
+            )
 
     def _should_update_dag_next_dagruns(
         self,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 0d6d2ddf360..4d91ed47cd1 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -36,7 +36,7 @@ import psutil
 import pytest
 import time_machine
 from pytest import param
-from sqlalchemy import func, select, update
+from sqlalchemy import delete, func, select, update
 from sqlalchemy.orm import joinedload
 
 from airflow import settings
@@ -214,9 +214,9 @@ class TestSchedulerJob:
     def set_instance_attrs(self) -> Generator:
         # Speed up some tests by not running the tasks, just look at what we
         # enqueue!
-        self.null_exec: MockExecutor | None = MockExecutor()
+        self.null_exec: BaseExecutor = MockExecutor()
         yield
-        self.null_exec = None
+        self.null_exec = None  # type: ignore[assignment]
 
     @pytest.fixture
     def mock_executors(self):
@@ -4587,6 +4587,89 @@ class TestSchedulerJob:
 
         assert created_run.creating_job_id == scheduler_job.id
 
+    @pytest.mark.need_serialized_dag
+    def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self, 
session, dag_maker):
+        asset = Asset(uri="test://asset-for-stale-trigger-date", 
name="asset-for-stale-trigger-date")
+        with dag_maker(dag_id="asset-consumer-stale-trigger-date", 
schedule=[asset], session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset.uri))
+
+        queued_at = timezone.utcnow()
+        session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id, 
asset_id=asset_id, created_at=queued_at))
+        session.flush()
+
+        # Simulate another scheduler consuming ADRQ rows after we computed 
triggered_date_by_dag.
+        
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_model.dag_id))
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+        self.job_runner._create_dag_runs_asset_triggered(
+            dag_models=[dag_model],
+            session=session,
+        )
+
+        # We do not create a new DagRun seems the ADRQ has already been 
consumed
+        assert session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none() is None
+
+    @pytest.mark.need_serialized_dag
+    def 
test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(self, 
session, dag_maker):
+        asset_1 = Asset("ready-to-trigger-a-Dag-run")
+        asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+        with dag_maker(dag_id="asset-consumer-delete-selected", 
schedule=asset_1 | asset_2, session=session):
+            pass
+        dag_model = dag_maker.dag_model
+        asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_1.name))
+        asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri 
== asset_2.name))
+
+        session.add_all(
+            [
+                # The ADRQ that should triggers the Dag run creation
+                AssetDagRunQueue(
+                    asset_id=asset_1_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+                # The ADRQ that arrives after the Dag run creation but before 
ADRQ clean up
+                # This situation is simluarted by _lock_only_selected_asset 
below
+                AssetDagRunQueue(
+                    asset_id=asset_2_id, target_dag_id=dag_model.dag_id, 
created_at=timezone.utcnow()
+                ),
+            ]
+        )
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        def _lock_only_selected_asset(query, **_):
+            # Simulate SKIP LOCKED behavior where this scheduler can only 
consume one ADRQ row.
+            return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+        with patch("airflow.jobs.scheduler_job_runner.with_row_locks", 
side_effect=_lock_only_selected_asset):
+            self.job_runner._create_dag_runs_asset_triggered(
+                dag_models=[dag_model],
+                session=session,
+            )
+
+        dr = session.scalars(select(DagRun).where(DagRun.dag_id == 
dag_model.dag_id)).one_or_none()
+
+        assert dr is not None
+
+        adrq_1 = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+                AssetDagRunQueue.asset_id == asset_1_id,
+            )
+        ).one_or_none()
+        assert adrq_1 is None
+        adrq_2 = session.scalars(
+            select(AssetDagRunQueue).where(
+                AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+                AssetDagRunQueue.asset_id == asset_2_id,
+            )
+        ).one_or_none()
+        assert adrq_2 is not None
+
     @pytest.mark.need_serialized_dag
     def test_create_dag_runs_asset_alias_with_asset_event_attached(self, 
session, dag_maker):
         """

Reply via email to