kaxil commented on code in PR #61448:
URL: https://github.com/apache/airflow/pull/61448#discussion_r2895979632


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1970,30 +1970,44 @@ def _create_dagruns_for_partitioned_asset_dags(self, 
session: Session) -> set[st
 
             partition_dag_ids.add(apdr.target_dag_id)
             run_after = timezone.utcnow()
-            dag_run = dag.create_dagrun(
-                run_id=DagRun.generate_run_id(
-                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=run_after
-                ),
-                logical_date=None,
-                data_interval=None,
-                partition_key=apdr.partition_key,
-                run_after=run_after,
-                run_type=DagRunType.ASSET_TRIGGERED,
-                triggered_by=DagRunTriggeredByType.ASSET,
-                state=DagRunState.QUEUED,
-                creating_job_id=self.job.id,
-                session=session,
-            )
-            asset_events = session.scalars(
-                select(AssetEvent).where(
-                    PartitionedAssetKeyLog.asset_partition_dag_run_id == 
apdr.id,
-                    PartitionedAssetKeyLog.asset_event_id == AssetEvent.id,
+            try:
+                dag_run = dag.create_dagrun(
+                    run_id=DagRun.generate_run_id(
+                        run_type=DagRunType.ASSET_TRIGGERED, 
logical_date=None, run_after=run_after
+                    ),
+                    logical_date=None,
+                    data_interval=None,
+                    partition_key=apdr.partition_key,
+                    run_after=run_after,
+                    run_type=DagRunType.ASSET_TRIGGERED,
+                    triggered_by=DagRunTriggeredByType.ASSET,
+                    state=DagRunState.QUEUED,
+                    creating_job_id=self.job.id,
+                    session=session,
+                )
+                asset_events = session.scalars(
+                    select(AssetEvent).where(
+                        PartitionedAssetKeyLog.asset_partition_dag_run_id == 
apdr.id,
+                        PartitionedAssetKeyLog.asset_event_id == AssetEvent.id,
+                    )
+                )
+                dag_run.consumed_asset_events.extend(asset_events)
+                session.flush()
+                apdr.created_dag_run_id = dag_run.id
+                session.flush()
+            except BundleVersionUnavailable:
+                self.log.warning(
+                    "Bundle version not yet available for partitioned 
asset-triggered DAG %s. "
+                    "Run will be created on next evaluation cycle.",
+                    apdr.target_dag_id,
+                )
+            except OperationalError:

Review Comment:
   Thanks for the quick follow-up here. One remaining concern: re-raising only 
`OperationalError` is still narrower than the retry contract for 
`@retry_db_transaction`, which is based on `DBAPIError`. Other `DBAPIError` 
subclasses (e.g. `IntegrityError`) can still be caught by the broad `except 
Exception` and get swallowed. Could we switch this to `except DBAPIError: 
raise` (or rollback + re-raise), then keep non-DB exceptions as 
log-and-continue? Same applies to the analogous block in 
`_create_dag_runs_asset_triggered`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to