nathadfield commented on code in PR #61448:
URL: https://github.com/apache/airflow/pull/61448#discussion_r2894610252


##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1097,6 +1099,116 @@ def get_edge_info(self, upstream_task_id: str, 
downstream_task_id: str) -> EdgeI
         return empty
 
 
+def _resolve_bundle_version(
+    dag: SerializedDAG,
+    session: Session,
+) -> str | None:
+    """
+    Resolve the bundle version for a DAG run based on the precedence hierarchy.
+
+    Precedence (highest to lowest):
+    1. DAG-level configuration (dag.run_on_latest_version)
+    2. Global configuration (core.run_on_latest_version)
+    3. System default (use original version from DagModel)
+
+    :param dag: The serialized DAG
+    :param session: Database session
+    :return: The resolved bundle version, or None if versioning is disabled
+    """
+    if dag.disable_bundle_versioning:
+        return None
+
+    # Determine whether to use latest version based on precedence hierarchy
+    use_latest = _should_use_latest_version(dag)
+    source = _get_config_source(dag)
+
+    if use_latest:
+        log.debug("Using latest bundle version", dag_id=dag.dag_id, 
source=source)
+        return _get_latest_bundle_version(dag.dag_id, session)
+
+    log.debug("Using original bundle version", dag_id=dag.dag_id, 
source=source)

Review Comment:
   This codebase uses structlog, which natively supports keyword arguments. The 
`self.log.warning("msg", dag_id=dag_id)` pattern is idiomatic so no % 
formatting needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to