This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch fix-task-instance-mutation-no-id in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 99fb706798c631f0b9819800d14e806bdd1b60c1 Author: Jarek Potiuk <[email protected]> AuthorDate: Sat Mar 7 19:44:05 2026 +0100 Fix task_instance_mutation_hook receiving run_id=None during TaskInstance creation Also Add regression test verifying the hook sees the correct run_id In airflow/models/taskinstance.py, TI.__init__() calls self.refresh_from_task(task) which invokes task_instance_mutation_hook(task_instance) via _refresh_from_task(). However, self.run_id is not assigned until after this call, so any mutation hook that depends on run_id sees None. ```python self.refresh_from_task(task) # calls hook — self.run_id is None here ... self.run_id = run_id # too late ``` This means a hook like the one in the issue: ```python def task_instance_mutation_hook(task_instance): if task_instance.run_id.startswith("manual__"): task_instance.pool = "manual_pool" ``` fails with AttributeError: 'NoneType' object has no attribute 'startswith' (or silently does nothing if guarded with getattr). Move self.run_id = run_id before self.refresh_from_task(task) in TI.__init__(): self.run_id = run_id # set BEFORE refresh_from_task self.map_index = map_index self.refresh_from_task(task) # hook now sees correct run_id Co-authored-by: Dev-iL <[email protected]> Co-authored-by: Elad Kalif <[email protected]> --- airflow-core/src/airflow/models/taskinstance.py | 4 ++-- airflow-core/tests/unit/models/test_dagrun.py | 28 +++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 8b12db483af..8112d955582 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -566,6 +566,8 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload): self.dag_id = task.dag_id self.task_id = task.task_id self.map_index = map_index + if run_id is not None: + self.run_id = run_id self.refresh_from_task(task) if TYPE_CHECKING: @@ -573,8 +575,6 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload): # init_on_load will config the log self.init_on_load() - if run_id is not None: - self.run_id = run_id self.try_number = 0 self.max_tries = self.task.retries if not self.id: diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index bac07ffe1f2..3e62554901c 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -51,6 +51,7 @@ from airflow.sdk.definitions.callback import AsyncCallback from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference from airflow.serialization.definitions.deadline import SerializedReferenceModels from airflow.serialization.serialized_objects import LazyDeserializedDAG +from airflow.settings import get_policy_plugin_manager from airflow.task.trigger_rule import TriggerRule from airflow.triggers.base import StartTriggerArgs from airflow.utils.span_status import SpanStatus @@ -943,6 +944,33 @@ class TestDagRun: task = dagrun.get_task_instances()[0] assert task.queue == "queue1" + def test_task_instance_mutation_hook_has_run_id(self, dag_maker, session): + """Test that task_instance_mutation_hook receives a TI with run_id set (not None). + + Regression test for https://github.com/apache/airflow/issues/61945 + """ + observed_run_ids = [] + + def mutate_task_instance(task_instance): + observed_run_ids.append(task_instance.run_id) + if task_instance.run_id and task_instance.run_id.startswith("manual__"): + task_instance.pool = "manual_pool" + + with mock.patch.object( + get_policy_plugin_manager().hook, "task_instance_mutation_hook", autospec=True + ) as mock_hook: + mock_hook.side_effect = mutate_task_instance + with dag_maker( + dag_id="test_mutation_hook_run_id", schedule=datetime.timedelta(days=1), session=session + ) as dag: + EmptyOperator(task_id="mutated_task", owner="test") + + self.create_dag_run(dag, session=session) + # The hook should have been called during TI creation with run_id set + assert any(rid is not None for rid in observed_run_ids), ( + f"task_instance_mutation_hook was called with run_id=None. Observed run_ids: {observed_run_ids}" + ) + @pytest.mark.parametrize( ("prev_ti_state", "is_ti_schedulable"), [
