This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6e6ab0bd111 Rectify DetachedInstanceError for airflow tasks render
command (#63916)
6e6ab0bd111 is described below
commit 6e6ab0bd111c683936b35c128b981d61b4130262
Author: Amogh Desai <[email protected]>
AuthorDate: Sat Mar 21 16:47:44 2026 +0530
Rectify DetachedInstanceError for airflow tasks render command (#63916)
---
.../execution_api/datamodels/taskinstance.py | 38 ++++++++++++++--------
.../src/airflow/cli/commands/task_command.py | 3 ++
.../tests/unit/cli/commands/test_task_command.py | 24 ++++++++++++++
3 files changed, 51 insertions(+), 14 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index 890314171b8..11b6b605f0b 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -309,8 +309,13 @@ class DagRun(StrictBaseModel):
@model_validator(mode="before")
@classmethod
- def extract_dag_run_note(cls, data: Any) -> Any:
- """Extract the `note` (`str | None` from
`association_proxy("dag_run_note", "content")`) relationship from `DagRun` to
prevent `DetachedInstanceError` when constructing `DagRunContext` or
`TIRunContext` models."""
+ def safe_extract_from_orm(cls, data: Any) -> Any:
+ """
+ Safely extract data from SQLAlchemy DagRun instances.
+
+ Handles the 'note' association proxy and provides defaults for
unloaded relationships
+ to prevent DetachedInstanceError when the instance is not bound to a
session.
+ """
from sqlalchemy import inspect as sa_inspect
from sqlalchemy.exc import NoInspectionAvailable
from sqlalchemy.orm.state import InstanceState
@@ -325,19 +330,24 @@ class DagRun(StrictBaseModel):
# Not a SQLAlchemy object, return as-is for Pydantic to handle
return data
+ values = {}
+
+ for field_name in cls.model_fields:
+ if field_name in insp.dict:
+ values[field_name] = insp.dict[field_name]
+ elif field_name == "state" and "_state" in insp.dict:
+ values["state"] = insp.dict["_state"]
+
+ if "consumed_asset_events" not in values:
+ values["consumed_asset_events"] = []
+
# Check if dag_run_note is already loaded (avoid lazy load on detached
instance)
- if "note" in insp.dict:
- note_value: str | None = insp.dict["note"]
- else:
- note_value = None
-
- # Convert to dict to avoid further lazy loading issues
- values = {
- field_name: getattr(data, field_name, None)
- for field_name in cls.model_fields
- if field_name != "note"
- }
- values["note"] = note_value
+ if "note" not in values:
+ if "dag_run_note" in insp.dict:
+ values["note"] = data.note
+ else:
+ values["note"] = None
+
return values
diff --git a/airflow-core/src/airflow/cli/commands/task_command.py
b/airflow-core/src/airflow/cli/commands/task_command.py
index cd4b0368e9a..b139459c9ee 100644
--- a/airflow-core/src/airflow/cli/commands/task_command.py
+++ b/airflow-core/src/airflow/cli/commands/task_command.py
@@ -233,6 +233,9 @@ def _get_ti(
ti.refresh_from_task(task, pool_override=pool)
ti.dag_model # we must ensure dag model is loaded eagerly for bundle info
+ # eagerly load consumed_asset_events for template rendering (needed for
triggering_asset_events context)
+ if ti.dag_run is not None:
+ _ = ti.dag_run.consumed_asset_events
return ti, dr_created
diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py
b/airflow-core/tests/unit/cli/commands/test_task_command.py
index 8d96d5579ec..a725fc5c006 100644
--- a/airflow-core/tests/unit/cli/commands/test_task_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_task_command.py
@@ -283,6 +283,30 @@ class TestCliTasks:
assert 'echo "2016-01-01"' in output
assert 'echo "2016-01-08"' in output
+ @pytest.mark.db_test
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_task_render_handles_detached_dagrun(self, dag_maker, session):
+ """Test that task_render handles DagRun with unloaded
consumed_asset_events relationship."""
+ from airflow.api_fastapi.execution_api.datamodels.taskinstance import
DagRun as DagRunPydantic
+
+ with dag_maker(dag_id="test_detached", session=session):
+ pass
+
+ dr = dag_maker.create_dagrun()
+ session.commit()
+ # Detach: this would cause DetachedInstanceError before fix
+ session.expunge(dr)
+
+ # This should not raise DetachedInstanceError
+ pydantic_dr = DagRunPydantic.model_validate(dr)
+ assert pydantic_dr.consumed_asset_events == []
+ assert pydantic_dr.note is None
+
+ args = self.parser.parse_args(["tasks", "render", "tutorial",
"templated", "2016-01-01"])
+
+ with redirect_stdout(io.StringIO()):
+ task_command.task_render(args)
+
@pytest.mark.usefixtures("testing_dag_bundle")
def test_mapped_task_render(self):
"""