This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5c951d9723f Fix/FabAuthManager race condition on startup with multiple
workers (#62737)
5c951d9723f is described below
commit 5c951d9723f11f87da2a72697e5b4c6fbfaf4b8d
Author: Piyush Mudgal <[email protected]>
AuthorDate: Thu Mar 5 16:38:06 2026 +0530
Fix/FabAuthManager race condition on startup with multiple workers (#62737)
* fix: Gracefully handle `IntegrityError` in `add_permission_to_role` to
prevent erroneous logging for concurrent duplicate permission assignments and
add a helper method to check for existing permissions
* fix: Correctly handle IntegrityError on session commit when adding
permissions to roles and improve permission check query.
* refactor: optimize permission role existence check query using
`sqlalchemy.exists()`
* use for assertions
* changing error log message
* change log from debug to info
---
.../fab/auth_manager/security_manager/override.py | 29 ++++++++++++-
.../auth_manager/security_manager/test_override.py | 49 +++++++++++++++++++++-
2 files changed, 75 insertions(+), 3 deletions(-)
diff --git
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
index 903a64fc3ca..92764a59a15 100644
---
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
+++
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
@@ -63,8 +63,8 @@ from flask_login import LoginManager
from itsdangerous import want_bytes
from markupsafe import Markup, escape
from packaging.version import Version
-from sqlalchemy import delete, func, inspect, or_, select
-from sqlalchemy.exc import MultipleResultsFound
+from sqlalchemy import delete, exists, func, inspect, or_, select
+from sqlalchemy.exc import IntegrityError, MultipleResultsFound
from sqlalchemy.orm import joinedload
from werkzeug.security import check_password_hash, generate_password_hash
@@ -77,6 +77,7 @@ from airflow.providers.fab.auth_manager.models import (
Resource,
Role,
User,
+ assoc_permission_role,
)
from airflow.providers.fab.auth_manager.models.anonymous_user import
AnonymousUser
from airflow.providers.fab.auth_manager.security_manager.constants import
EXISTING_ROLES
@@ -1739,10 +1740,34 @@ class
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
self.session.merge(role)
self.session.commit()
log.info(const.LOGMSG_INF_SEC_ADD_PERMROLE, permission,
role.name)
+ except IntegrityError as e:
+ self.session.rollback()
+ if self._is_permission_assigned_to_role(role_id=role.id,
permission_view_id=permission.id):
+ log.info("Permission '%s' already assigned to role '%s'",
permission, role.name)
+ else:
+ log.error(
+ const.LOGMSG_ERR_SEC_ADD_PERMROLE,
+ f"Failed to add '{permission}' permission to the
'{role}' role Error: {e}",
+ )
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_PERMROLE, e)
self.session.rollback()
+ def _is_permission_assigned_to_role(self, role_id: int | None,
permission_view_id: int | None) -> bool:
+ """Check if the permission is already assigned to the role."""
+ if role_id is None or permission_view_id is None:
+ return False
+ return bool(
+ self.session.scalar(
+ select(
+ exists().where(
+ assoc_permission_role.c.role_id == role_id,
+ assoc_permission_role.c.permission_view_id ==
permission_view_id,
+ )
+ )
+ )
+ )
+
def remove_permission_from_role(self, role: Role, permission: Permission)
-> None:
"""
Remove a permission pair from a role.
diff --git
a/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
b/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
index f72a093938a..f8a03f54bfe 100644
---
a/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
+++
b/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
@@ -17,10 +17,17 @@
from __future__ import annotations
from unittest import mock
-from unittest.mock import Mock
+from unittest.mock import Mock, call
import pytest
+from flask_appbuilder import const
+from sqlalchemy.exc import IntegrityError
+from sqlalchemy.orm import Session
+from airflow.providers.fab.auth_manager.models import (
+ Permission,
+ Role,
+)
from airflow.providers.fab.auth_manager.security_manager.override import
FabAirflowSecurityManagerOverride
@@ -32,6 +39,46 @@ class
EmptySecurityManager(FabAirflowSecurityManagerOverride):
class TestFabAirflowSecurityManagerOverride:
+
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+ def
test_add_permission_to_role_ignores_duplicate_from_concurrent_worker(self,
mock_log):
+ sm = EmptySecurityManager()
+ role = Mock(spec=Role, id=1, name="test_admin", permissions=[])
+ permission = Mock(spec=Permission, id=2)
+
+ mock_session = Mock(spec=Session)
+ mock_session.commit.side_effect = IntegrityError("stmt", {},
Exception("Duplicate entry"))
+
+ sm._is_permission_assigned_to_role = Mock(return_value=True)
+
+ with mock.patch.object(EmptySecurityManager, "session", mock_session):
+ sm.add_permission_to_role(role, permission)
+
+ assert mock_session.rollback.mock_calls == [call()]
+ assert sm._is_permission_assigned_to_role.mock_calls ==
[call(role_id=1, permission_view_id=2)]
+ assert mock_log.error.mock_calls == []
+
+
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+ def
test_add_permission_to_role_logs_error_when_duplicate_not_persisted(self,
mock_log):
+ sm = EmptySecurityManager()
+ role = Mock(spec=Role, id=1, name="Admin", permissions=[])
+ permission = Mock(spec=Permission, id=2)
+
+ mock_session = Mock(spec=Session)
+ mock_error = IntegrityError("stmt", {}, Exception("duplicate key"))
+ mock_session.commit.side_effect = mock_error
+
+ sm._is_permission_assigned_to_role = Mock(return_value=False)
+
+ with mock.patch.object(EmptySecurityManager, "session", mock_session):
+ sm.add_permission_to_role(role, permission)
+
+ mock_session.rollback.assert_called_once_with()
+ sm._is_permission_assigned_to_role.assert_called_once_with(role_id=1,
permission_view_id=2)
+ mock_log.error.assert_called_once_with(
+ const.LOGMSG_ERR_SEC_ADD_PERMROLE,
+ f"Failed to add '{permission}' permission to the '{role}' role
Error: {mock_error}",
+ )
+
def test_load_user(self):
sm = EmptySecurityManager()
sm.get_user_by_id = Mock()