This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-11-test by this push:
new a4b2ddf6ebd fix: task_instance_mutation_hook receives a TI with run_id
set (#62999)
a4b2ddf6ebd is described below
commit a4b2ddf6ebd37eefd261a84d0900843251736800
Author: Dev-iL <[email protected]>
AuthorDate: Fri Mar 6 19:37:19 2026 +0200
fix: task_instance_mutation_hook receives a TI with run_id set (#62999)
* fix: task_instance_mutation_hook receives a TI with run_id set
* fix mypy
* remove unused variable
---------
Co-authored-by: Elad Kalif <[email protected]>
---
airflow/models/taskinstance.py | 1 +
tests/models/test_dagrun.py | 28 ++++++++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 271273f32b0..1d822d251ba 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1949,6 +1949,7 @@ class TaskInstance(Base, LoggingMixin):
super().__init__()
self.dag_id = task.dag_id
self.task_id = task.task_id
+ self.run_id = run_id
self.map_index = map_index
self.refresh_from_task(task)
if TYPE_CHECKING:
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 0117103dbbb..c39fb17e587 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -779,6 +779,34 @@ class TestDagRun:
task = dagrun.get_task_instances()[0]
assert task.queue == "queue1"
+ @mock.patch.object(settings, "task_instance_mutation_hook", autospec=True)
+ def test_task_instance_mutation_hook_has_run_id(self, mock_hook, session):
+ """Test that task_instance_mutation_hook receives a TI with run_id set
(not None).
+
+ Regression test for https://github.com/apache/airflow/issues/61945
+ """
+ observed_run_ids = []
+
+ def mutate_task_instance(task_instance):
+ observed_run_ids.append(task_instance.run_id)
+ if task_instance.run_id and
task_instance.run_id.startswith("manual__"):
+ task_instance.pool = "manual_pool"
+
+ mock_hook.side_effect = mutate_task_instance
+
+ dag = DAG(
+ "test_mutation_hook_run_id",
+ schedule=datetime.timedelta(days=1),
+ start_date=DEFAULT_DATE,
+ )
+ dag.add_task(EmptyOperator(task_id="mutated_task", owner="test"))
+
+ self.create_dag_run(dag, session=session)
+ # The hook should have been called during TI creation with run_id set
+ assert any(
+ rid is not None for rid in observed_run_ids
+ ), f"task_instance_mutation_hook was called with run_id=None. Observed
run_ids: {observed_run_ids}"
+
@pytest.mark.parametrize(
"prev_ti_state, is_ti_success",
[