This is an automated email from the ASF dual-hosted git repository.
gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b93d3798cf4 Fix import errors updating DAGs in other bundles (#63615)
b93d3798cf4 is described below
commit b93d3798cf4b4561c84900ae4821837294e03d07
Author: GPK <[email protected]>
AuthorDate: Sun Mar 15 09:12:45 2026 +0000
Fix import errors updating DAGs in other bundles (#63615)
---
.../src/airflow/dag_processing/collection.py | 1 +
.../tests/unit/dag_processing/test_collection.py | 36 ++++++++++++++++++++++
2 files changed, 37 insertions(+)
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index 7754080ae80..14924e3e9e7 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -407,6 +407,7 @@ def _update_import_errors(
update(DagModel)
.where(
DagModel.relative_fileloc == relative_fileloc,
+ DagModel.bundle_name == bundle_name_,
)
.values(
has_import_errors=True,
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 458f773046e..1c12371de7d 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -49,6 +49,7 @@ from airflow.models.asset import (
DagScheduleAssetUriReference,
)
from airflow.models.dag import DagTag
+from airflow.models.dagbundle import DagBundleModel
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -803,6 +804,41 @@ class TestUpdateDagParsingResults:
import_errors = set(session.execute(select(ParseImportError.filename,
ParseImportError.bundle_name)))
assert import_errors == {("other.py", bundle_name)}, "Import error for
parsed file should be cleared"
+ @pytest.mark.usefixtures("clean_db")
+ def
test_import_error_update_does_not_touch_other_bundle_with_same_relative_fileloc(self,
session):
+ relative_fileloc = "example_dag.py"
+ session.add_all([DagBundleModel(name="bundle_a"),
DagBundleModel(name="bundle_b")])
+ session.flush()
+ session.add_all(
+ [
+ DagModel(dag_id="dag_in_bundle_a",
relative_fileloc=relative_fileloc, bundle_name="bundle_a"),
+ DagModel(dag_id="dag_in_bundle_b",
relative_fileloc=relative_fileloc, bundle_name="bundle_b"),
+ ]
+ )
+ session.flush()
+
+ update_dag_parsing_results_in_db(
+ bundle_name="bundle_a",
+ bundle_version=None,
+ dags=[],
+ import_errors={("bundle_a", relative_fileloc): "Import failed in
bundle_a"},
+ parse_duration=None,
+ warnings=set(),
+ session=session,
+ files_parsed={("bundle_a", relative_fileloc)},
+ )
+ session.flush()
+
+ dag_in_bundle_a = session.get(DagModel, "dag_in_bundle_a")
+ dag_in_bundle_b = session.get(DagModel, "dag_in_bundle_b")
+
+ assert dag_in_bundle_a is not None
+ assert dag_in_bundle_b is not None
+ assert dag_in_bundle_a.bundle_name == "bundle_a"
+ assert dag_in_bundle_b.bundle_name == "bundle_b"
+ assert dag_in_bundle_a.has_import_errors is True
+ assert dag_in_bundle_b.has_import_errors is False
+
@pytest.mark.need_serialized_dag(False)
@pytest.mark.parametrize(
("attrs", "expected"),