This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ced319fe95 Commit the session between writing and deletion of RTIF 
(#42928)
ced319fe95 is described below

commit ced319fe95a731b745801fe9b15ca7b24ef0e82f
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Mon Oct 14 17:00:07 2024 +0100

    Commit the session between writing and deletion of RTIF (#42928)
    
    * Use different sessions in writing and deletion of RTIF
    
    Previously, this was how it was done, but now,
    a session was used for both the writing and deletion of RTIF,
    which we suspect caused StaleDataError. The related PR: 
https://github.com/apache/airflow/pull/38565
    
    This PR brings back the old behaviour of using different sessions for 
writing/deleting RTIFs
    
    * fixup! Use different sessions in writing and deletion of RTIF
    
    * add test and use flush
---
 airflow/models/taskinstance.py        |  3 ++-
 tests/models/test_renderedtifields.py | 50 ++++++++++++++++++++++++++++++++++-
 2 files changed, 51 insertions(+), 2 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 5b51bb0d24..c1373e5d6a 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1632,11 +1632,12 @@ def _get_previous_ti(
 
 @internal_api_call
 @provide_session
-def _update_rtif(ti, rendered_fields, session: Session | None = None):
+def _update_rtif(ti, rendered_fields, session: Session = NEW_SESSION):
     from airflow.models.renderedtifields import RenderedTaskInstanceFields
 
     rtif = RenderedTaskInstanceFields(ti=ti, render_templates=False, 
rendered_fields=rendered_fields)
     RenderedTaskInstanceFields.write(rtif, session=session)
+    session.flush()
     RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id, 
session=session)
 
 
diff --git a/tests/models/test_renderedtifields.py 
b/tests/models/test_renderedtifields.py
index ea22d31871..6ff87b28a8 100644
--- a/tests/models/test_renderedtifields.py
+++ b/tests/models/test_renderedtifields.py
@@ -24,13 +24,16 @@ from collections import Counter
 from datetime import date, timedelta
 from unittest import mock
 
+import pendulum
 import pytest
+from sqlalchemy import select
 
 from airflow import settings
 from airflow.configuration import conf
 from airflow.decorators import task as task_decorator
-from airflow.models import Variable
+from airflow.models import DagRun, Variable
 from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
+from airflow.operators.python import PythonOperator
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.utils.task_instance_session import 
set_current_task_instance_session
 from airflow.utils.timezone import datetime
@@ -386,3 +389,48 @@ class TestRenderedTaskInstanceFields:
             "env": "val 2",
             "cwd": "val 3",
         }
+
+    @pytest.mark.skip_if_database_isolation_mode
+    def test_rtif_deletion_stale_data_error(self, dag_maker, session):
+        """
+        Here we verify bad behavior.  When we rerun a task whose RTIF
+        will get removed, we get a stale data error.
+        """
+        with dag_maker(dag_id="test_retry_handling"):
+            task = PythonOperator(
+                task_id="test_retry_handling_op",
+                python_callable=lambda a, b: print(f"{a}\n{b}\n"),
+                op_args=[
+                    "dag {{dag.dag_id}};",
+                    "try_number {{ti.try_number}};yo",
+                ],
+            )
+
+        def run_task(date):
+            run_id = f"abc_{date.to_date_string()}"
+            dr = session.scalar(select(DagRun).where(DagRun.execution_date == 
date, DagRun.run_id == run_id))
+            if not dr:
+                dr = dag_maker.create_dagrun(execution_date=date, 
run_id=run_id)
+            ti = dr.task_instances[0]
+            ti.state = None
+            ti.try_number += 1
+            session.commit()
+            ti.task = task
+            ti.run()
+            return dr
+
+        base_date = pendulum.datetime(2021, 1, 1)
+        exec_dates = [base_date.add(days=x) for x in range(40)]
+        for date_ in exec_dates:
+            run_task(date=date_)
+
+        session.commit()
+        session.expunge_all()
+
+        # find oldest date
+        date = session.scalar(
+            
select(DagRun.execution_date).join(RTIF.dag_run).order_by(DagRun.execution_date).limit(1)
+        )
+        date = pendulum.instance(date)
+        # rerun the old date. this will fail
+        run_task(date=date)

Reply via email to