This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c951d9723f Fix/FabAuthManager race condition on startup with multiple 
workers (#62737)
5c951d9723f is described below

commit 5c951d9723f11f87da2a72697e5b4c6fbfaf4b8d
Author: Piyush Mudgal <[email protected]>
AuthorDate: Thu Mar 5 16:38:06 2026 +0530

    Fix/FabAuthManager race condition on startup with multiple workers (#62737)
    
    * fix: Gracefully handle `IntegrityError` in `add_permission_to_role` to 
prevent erroneous logging for concurrent duplicate permission assignments and 
add a helper method to check for existing permissions
    
    * fix: Correctly handle IntegrityError on session commit when adding 
permissions to roles and improve permission check query.
    
    * refactor: optimize permission role existence check query using 
`sqlalchemy.exists()`
    
    * use  for assertions
    
    * changing error log message
    
    * change log from debug to info
---
 .../fab/auth_manager/security_manager/override.py  | 29 ++++++++++++-
 .../auth_manager/security_manager/test_override.py | 49 +++++++++++++++++++++-
 2 files changed, 75 insertions(+), 3 deletions(-)

diff --git 
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
 
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
index 903a64fc3ca..92764a59a15 100644
--- 
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
+++ 
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
@@ -63,8 +63,8 @@ from flask_login import LoginManager
 from itsdangerous import want_bytes
 from markupsafe import Markup, escape
 from packaging.version import Version
-from sqlalchemy import delete, func, inspect, or_, select
-from sqlalchemy.exc import MultipleResultsFound
+from sqlalchemy import delete, exists, func, inspect, or_, select
+from sqlalchemy.exc import IntegrityError, MultipleResultsFound
 from sqlalchemy.orm import joinedload
 from werkzeug.security import check_password_hash, generate_password_hash
 
@@ -77,6 +77,7 @@ from airflow.providers.fab.auth_manager.models import (
     Resource,
     Role,
     User,
+    assoc_permission_role,
 )
 from airflow.providers.fab.auth_manager.models.anonymous_user import 
AnonymousUser
 from airflow.providers.fab.auth_manager.security_manager.constants import 
EXISTING_ROLES
@@ -1739,10 +1740,34 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
                 self.session.merge(role)
                 self.session.commit()
                 log.info(const.LOGMSG_INF_SEC_ADD_PERMROLE, permission, 
role.name)
+            except IntegrityError as e:
+                self.session.rollback()
+                if self._is_permission_assigned_to_role(role_id=role.id, 
permission_view_id=permission.id):
+                    log.info("Permission '%s' already assigned to role '%s'", 
permission, role.name)
+                else:
+                    log.error(
+                        const.LOGMSG_ERR_SEC_ADD_PERMROLE,
+                        f"Failed to add '{permission}' permission to the 
'{role}' role Error: {e}",
+                    )
             except Exception as e:
                 log.error(const.LOGMSG_ERR_SEC_ADD_PERMROLE, e)
                 self.session.rollback()
 
+    def _is_permission_assigned_to_role(self, role_id: int | None, 
permission_view_id: int | None) -> bool:
+        """Check if the permission is already assigned to the role."""
+        if role_id is None or permission_view_id is None:
+            return False
+        return bool(
+            self.session.scalar(
+                select(
+                    exists().where(
+                        assoc_permission_role.c.role_id == role_id,
+                        assoc_permission_role.c.permission_view_id == 
permission_view_id,
+                    )
+                )
+            )
+        )
+
     def remove_permission_from_role(self, role: Role, permission: Permission) 
-> None:
         """
         Remove a permission pair from a role.
diff --git 
a/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py 
b/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
index f72a093938a..f8a03f54bfe 100644
--- 
a/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
+++ 
b/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
@@ -17,10 +17,17 @@
 from __future__ import annotations
 
 from unittest import mock
-from unittest.mock import Mock
+from unittest.mock import Mock, call
 
 import pytest
+from flask_appbuilder import const
+from sqlalchemy.exc import IntegrityError
+from sqlalchemy.orm import Session
 
+from airflow.providers.fab.auth_manager.models import (
+    Permission,
+    Role,
+)
 from airflow.providers.fab.auth_manager.security_manager.override import 
FabAirflowSecurityManagerOverride
 
 
@@ -32,6 +39,46 @@ class 
EmptySecurityManager(FabAirflowSecurityManagerOverride):
 
 
 class TestFabAirflowSecurityManagerOverride:
+    
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+    def 
test_add_permission_to_role_ignores_duplicate_from_concurrent_worker(self, 
mock_log):
+        sm = EmptySecurityManager()
+        role = Mock(spec=Role, id=1, name="test_admin", permissions=[])
+        permission = Mock(spec=Permission, id=2)
+
+        mock_session = Mock(spec=Session)
+        mock_session.commit.side_effect = IntegrityError("stmt", {}, 
Exception("Duplicate entry"))
+
+        sm._is_permission_assigned_to_role = Mock(return_value=True)
+
+        with mock.patch.object(EmptySecurityManager, "session", mock_session):
+            sm.add_permission_to_role(role, permission)
+
+        assert mock_session.rollback.mock_calls == [call()]
+        assert sm._is_permission_assigned_to_role.mock_calls == 
[call(role_id=1, permission_view_id=2)]
+        assert mock_log.error.mock_calls == []
+
+    
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+    def 
test_add_permission_to_role_logs_error_when_duplicate_not_persisted(self, 
mock_log):
+        sm = EmptySecurityManager()
+        role = Mock(spec=Role, id=1, name="Admin", permissions=[])
+        permission = Mock(spec=Permission, id=2)
+
+        mock_session = Mock(spec=Session)
+        mock_error = IntegrityError("stmt", {}, Exception("duplicate key"))
+        mock_session.commit.side_effect = mock_error
+
+        sm._is_permission_assigned_to_role = Mock(return_value=False)
+
+        with mock.patch.object(EmptySecurityManager, "session", mock_session):
+            sm.add_permission_to_role(role, permission)
+
+        mock_session.rollback.assert_called_once_with()
+        sm._is_permission_assigned_to_role.assert_called_once_with(role_id=1, 
permission_view_id=2)
+        mock_log.error.assert_called_once_with(
+            const.LOGMSG_ERR_SEC_ADD_PERMROLE,
+            f"Failed to add '{permission}' permission to the '{role}' role 
Error: {mock_error}",
+        )
+
     def test_load_user(self):
         sm = EmptySecurityManager()
         sm.get_user_by_id = Mock()

Reply via email to