This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f094e6f917447fed4fbd986175432176f448fe93
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Oct 9 22:36:53 2021 +0200

    Fix occassional deadloc on MSSQL test DagMaker cleanup (#18857)
    
    Occasionally our tests in CI for MsSQL failed with deadlock on
    cleaning SerilizedDag table. After closer inspection, the
    deadlock happened in the test dag_maker cleanup() code.
    
    This PR fixes it by attempting to retry the cleaning in case
    of deadlock.
    
    (cherry picked from commit fd45f5f3e38b80993d5624480a793be381194f04)
---
 tests/conftest.py | 49 +++++++++++++++++++++++++++++--------------------
 1 file changed, 29 insertions(+), 20 deletions(-)

diff --git a/tests/conftest.py b/tests/conftest.py
index 36e88b3..22770c5 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -473,7 +473,9 @@ def dag_maker(request):
     if serialized_marker:
         (want_serialized,) = serialized_marker.args or (True,)
 
-    class DagFactory:
+    from airflow.utils.log.logging_mixin import LoggingMixin
+
+    class DagFactory(LoggingMixin):
         def __init__(self):
             from airflow.models import DagBag
 
@@ -601,25 +603,32 @@ def dag_maker(request):
         def cleanup(self):
             from airflow.models import DagModel, DagRun, TaskInstance, XCom
             from airflow.models.serialized_dag import SerializedDagModel
-
-            dag_ids = list(self.dagbag.dag_ids)
-            if not dag_ids:
-                return
-            # To isolate problems here with problems from elsewhere on the 
session object
-            self.session.flush()
-
-            
self.session.query(SerializedDagModel).filter(SerializedDagModel.dag_id.in_(dag_ids)).delete(
-                synchronize_session=False
-            )
-            
self.session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)).delete(synchronize_session=False)
-            
self.session.query(TaskInstance).filter(TaskInstance.dag_id.in_(dag_ids)).delete(
-                synchronize_session=False
-            )
-            
self.session.query(XCom).filter(XCom.dag_id.in_(dag_ids)).delete(synchronize_session=False)
-            
self.session.query(DagModel).filter(DagModel.dag_id.in_(dag_ids)).delete(
-                synchronize_session=False
-            )
-            self.session.commit()
+            from airflow.utils.retries import run_with_db_retries
+
+            for attempt in run_with_db_retries(logger=self.log):
+                with attempt:
+                    dag_ids = list(self.dagbag.dag_ids)
+                    if not dag_ids:
+                        return
+                    # To isolate problems here with problems from elsewhere on 
the session object
+                    self.session.flush()
+
+                    self.session.query(SerializedDagModel).filter(
+                        SerializedDagModel.dag_id.in_(dag_ids)
+                    ).delete(synchronize_session=False)
+                    
self.session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)).delete(
+                        synchronize_session=False
+                    )
+                    
self.session.query(TaskInstance).filter(TaskInstance.dag_id.in_(dag_ids)).delete(
+                        synchronize_session=False
+                    )
+                    
self.session.query(XCom).filter(XCom.dag_id.in_(dag_ids)).delete(
+                        synchronize_session=False
+                    )
+                    
self.session.query(DagModel).filter(DagModel.dag_id.in_(dag_ids)).delete(
+                        synchronize_session=False
+                    )
+                    self.session.commit()
 
     factory = DagFactory()
 

Reply via email to