dingo4dev commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r2969574055


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2074,29 +2074,31 @@ def _create_dag_runs_asset_triggered(
                     .order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
                 )
             )
-
-            dag_run = dag.create_dagrun(
-                run_id=DagRun.generate_run_id(
-                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=triggered_date
-                ),
-                logical_date=None,
-                data_interval=None,
-                run_after=triggered_date,
-                run_type=DagRunType.ASSET_TRIGGERED,
-                triggered_by=DagRunTriggeredByType.ASSET,
-                state=DagRunState.QUEUED,
-                creating_job_id=self.job.id,
-                session=session,
-            )
-            Stats.incr("asset.triggered_dagruns")
-            dag_run.consumed_asset_events.extend(asset_events)
+            if asset_events:
+                dag_run = dag.create_dagrun(
+                    run_id=DagRun.generate_run_id(
+                        run_type=DagRunType.ASSET_TRIGGERED, 
logical_date=None, run_after=triggered_date
+                    ),
+                    logical_date=None,
+                    data_interval=None,
+                    run_after=triggered_date,
+                    run_type=DagRunType.ASSET_TRIGGERED,
+                    triggered_by=DagRunTriggeredByType.ASSET,
+                    state=DagRunState.QUEUED,
+                    creating_job_id=self.job.id,
+                    session=session,
+                )
+                Stats.incr("asset.triggered_dagruns")
+                dag_run.consumed_asset_events.extend(asset_events)
 
             # Delete only consumed ADRQ rows to avoid dropping newly queued 
events
-            # (e.g. DagRun triggered by asset A while a new event for asset B 
arrives).
+            # (e.g. 1. DagRun triggered by asset A while a new event for asset 
B arrives.
+            # 2. DagRun triggered by asset A while new event for asset A 
upsert to ADRQ)
             adrq_pks = [(record.asset_id, record.target_dag_id) for record in 
queued_adrqs]

Review Comment:
   I noticed there was already logging in latest code. I think it might be 
better to add `self.log.warning("No DagRun created for '%s' at '%s' - asset 
events already consumed", dag.dag_id, triggered_date)` if the asset event is 
empty
   
   
https://github.com/apache/airflow/blob/6e6ab0bd111c683936b35c128b981d61b4130262/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L2118-L2122



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to