jason810496 commented on code in PR #55298:
URL: https://github.com/apache/airflow/pull/55298#discussion_r2325715790


##########
airflow-core/src/airflow/api_fastapi/core_api/security.py:
##########
@@ -233,6 +233,37 @@ def inner(
     return inner
 
 
+class PermittedPoolFilter(OrmClause[set[str]]):
+    """A parameter that filters the permitted pools for the user."""
+
+    def to_orm(self, select: Select) -> Select:
+        return select.where(Pool.pool.in_(self.value))
+
+
+def permitted_pool_filter_factory(
+    method: ResourceMethod, filter_class=PermittedPoolFilter
+) -> Callable[[Request, BaseUser], PermittedPoolFilter]:
+    """
+    Create a callable for Depends in FastAPI that returns a filter of the 
permitted pools for the user.
+
+    :param method: whether filter readable or writable.
+    :param filter_class: filter class to apply
+    """
+
+    def depends_permitted_pools_filter(
+        request: Request,
+        user: GetUserDep,
+    ) -> PermittedPoolFilter:
+        auth_manager: BaseAuthManager = request.app.state.auth_manager
+        authorized_pools: set[str] = 
auth_manager.get_authorized_pools(user=user, method=method)
+        return filter_class(authorized_pools)

Review Comment:
   It seems the factory is entity specific, would it be better to remove the 
`filter_class` parameter?
   ```suggestion
       method: ResourceMethod
   ) -> Callable[[Request, BaseUser], PermittedPoolFilter]:
       """
       Create a callable for Depends in FastAPI that returns a filter of the 
permitted pools for the user.
   
       :param method: whether filter readable or writable.
       :param filter_class: filter class to apply
       """
   
       def depends_permitted_pools_filter(
           request: Request,
           user: GetUserDep,
       ) -> PermittedPoolFilter:
           auth_manager: BaseAuthManager = request.app.state.auth_manager
           authorized_pools: set[str] = 
auth_manager.get_authorized_pools(user=user, method=method)
           return PermittedPoolFilter(authorized_pools)
   ```



##########
airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py:
##########
@@ -427,36 +433,243 @@ def get_authorized_dag_ids(
         """
         Get DAGs the user has access to.
 
-        By default, reads all the DAGs and check individually if the user has 
permissions to access the DAG.
-        Can lead to some poor performance. It is recommended to override this 
method in the auth manager
-        implementation to provide a more efficient implementation.
-
         :param user: the user
         :param method: the method to filter on
         :param session: the session
         """
-        dag_ids = {dag.dag_id for dag in 
session.execute(select(DagModel.dag_id))}
-        return self.filter_authorized_dag_ids(dag_ids=dag_ids, method=method, 
user=user)
+        stmt = (
+            select(DagModel.dag_id, Team.name)
+            .join(DagBundleModel, DagModel.bundle_name == DagBundleModel.name)
+            .join(
+                dag_bundle_team_association_table,
+                DagBundleModel.name == 
dag_bundle_team_association_table.c.dag_bundle_name,
+                isouter=True,
+            )
+            .join(Team, Team.id == 
dag_bundle_team_association_table.c.team_id, isouter=True)
+        )
+        rows = session.execute(stmt).all()
+        dags_by_team: dict[str | None, set[str]] = defaultdict(set)
+        for dag_id, team_name in rows:
+            dags_by_team[team_name].add(dag_id)
+
+        dag_ids: set[str] = set()
+        for team_name, team_dag_ids in dags_by_team.items():
+            dag_ids.update(
+                self.filter_authorized_dag_ids(
+                    dag_ids=team_dag_ids, user=user, method=method, 
team_name=team_name
+                )
+            )
+
+        return dag_ids
 
     def filter_authorized_dag_ids(
         self,
         *,
         dag_ids: set[str],
         user: T,
         method: ResourceMethod = "GET",
+        team_name: str | None = None,
     ) -> set[str]:
         """
         Filter DAGs the user has access to.
 
-        :param dag_ids: the list of DAG ids
+        By default, check individually if the user has permissions to access 
the DAG.
+        Can lead to some poor performance. It is recommended to override this 
method in the auth manager
+        implementation to provide a more efficient implementation.
+
+        :param dag_ids: the set of DAG ids
+        :param user: the user
+        :param method: the method to filter on
+        :param team_name: the name of the team associated to the Dags if 
Airflow environment runs in
+            multi-team mode
+        """
+
+        def _is_authorized_dag_id(dag_id: str):
+            return self.is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id, 
team_name=team_name), user=user
+            )
+
+        return {dag_id for dag_id in dag_ids if _is_authorized_dag_id(dag_id)}
+
+    @provide_session
+    def get_authorized_connections(
+        self,
+        *,
+        user: T,
+        method: ResourceMethod = "GET",
+        session: Session = NEW_SESSION,
+    ) -> set[str]:
+        """
+        Get connection ids (``conn_id``) the user has access to.
+
+        :param user: the user
+        :param method: the method to filter on
+        :param session: the session
+        """
+        stmt = select(Connection.conn_id, Team.name).join(Team, 
Connection.team_id == Team.id, isouter=True)
+        rows = session.execute(stmt).all()
+        connections_by_team: dict[str | None, set[str]] = defaultdict(set)
+        for conn_id, team_name in rows:
+            connections_by_team[team_name].add(conn_id)
+
+        conn_ids: set[str] = set()
+        for team_name, team_conn_ids in connections_by_team.items():
+            conn_ids.update(
+                self.filter_authorized_connections(
+                    conn_ids=team_conn_ids, user=user, method=method, 
team_name=team_name
+                )
+            )
+
+        return conn_ids
+
+    def filter_authorized_connections(
+        self,
+        *,
+        conn_ids: set[str],
+        user: T,
+        method: ResourceMethod = "GET",
+        team_name: str | None = None,
+    ) -> set[str]:
+        """
+        Filter connections the user has access to.
+
+        By default, check individually if the user has permissions to access 
the connection.
+        Can lead to some poor performance. It is recommended to override this 
method in the auth manager
+        implementation to provide a more efficient implementation.
+
+        :param conn_ids: the set of connection ids (``conn_id``)
         :param user: the user
         :param method: the method to filter on
+        :param team_name: the name of the team associated to the connections 
if Airflow environment runs in
+            multi-team mode
+        """
+
+        def _is_authorized_connection(conn_id: str):
+            return self.is_authorized_connection(
+                method=method, details=ConnectionDetails(conn_id=conn_id, 
team_name=team_name), user=user
+            )
+
+        return {conn_id for conn_id in conn_ids if 
_is_authorized_connection(conn_id)}
+
+    @provide_session
+    def get_authorized_variables(
+        self,
+        *,
+        user: T,
+        method: ResourceMethod = "GET",
+        session: Session = NEW_SESSION,
+    ) -> set[str]:
         """
+        Get variable keys the user has access to.
 
-        def _is_authorized_dag_id(method: ResourceMethod, dag_id: str):
-            return self.is_authorized_dag(method=method, 
details=DagDetails(id=dag_id), user=user)
+        :param user: the user
+        :param method: the method to filter on
+        :param session: the session
+        """
+        stmt = select(Variable.key, Team.name).join(Team, Variable.team_id == 
Team.id, isouter=True)
+        rows = session.execute(stmt).all()
+        variables_by_team: dict[str | None, set[str]] = defaultdict(set)
+        for var_key, team_name in rows:
+            variables_by_team[team_name].add(var_key)
+
+        var_keys: set[str] = set()
+        for team_name, team_var_keys in variables_by_team.items():
+            var_keys.update(
+                self.filter_authorized_variables(
+                    variable_keys=team_var_keys, user=user, method=method, 
team_name=team_name
+                )
+            )
+
+        return var_keys
+
+    def filter_authorized_variables(
+        self,
+        *,
+        variable_keys: set[str],
+        user: T,
+        method: ResourceMethod = "GET",
+        team_name: str | None = None,
+    ) -> set[str]:
+        """
+        Filter variables the user has access to.
+
+        By default, check individually if the user has permissions to access 
the variable.
+        Can lead to some poor performance. It is recommended to override this 
method in the auth manager
+        implementation to provide a more efficient implementation.
+
+        :param variable_keys: the set of variable keys
+        :param user: the user
+        :param method: the method to filter on
+        :param team_name: the name of the team associated to the connections 
if Airflow environment runs in
+            multi-team mode
+        """
+
+        def _is_authorized_variable(var_key: str):
+            return self.is_authorized_variable(
+                method=method, details=VariableDetails(key=var_key, 
team_name=team_name), user=user
+            )
+
+        return {var_key for var_key in variable_keys if 
_is_authorized_variable(var_key)}
+
+    @provide_session
+    def get_authorized_pools(
+        self,
+        *,
+        user: T,
+        method: ResourceMethod = "GET",
+        session: Session = NEW_SESSION,
+    ) -> set[str]:
+        """
+        Get pools the user has access to.
+
+        :param user: the user
+        :param method: the method to filter on
+        :param session: the session
+        """
+        stmt = select(Pool.pool, Team.name).join(Team, Pool.team_id == 
Team.id, isouter=True)
+        rows = session.execute(stmt).all()
+        pools_by_team: dict[str | None, set[str]] = defaultdict(set)
+        for pool_name, team_name in rows:
+            pools_by_team[team_name].add(pool_name)
+
+        pool_names: set[str] = set()
+        for team_name, team_pool_names in pools_by_team.items():
+            pool_names.update(
+                self.filter_authorized_pools(
+                    pool_names=team_pool_names, user=user, method=method, 
team_name=team_name
+                )
+            )
+
+        return pool_names

Review Comment:
   If I understand correctly, it seems the `filter_authorized_<entites>` can be 
consolidated with `get_authorized_<entines>` as:
   
   ```suggestion        
           for team_name, team_pool_names in pools_by_team.items():
               # extend the set with new team
               pool_names.update(team_pool_names)
   
               # filter out invalid pools
               for pool_name in team_pool_names:
                   if not self.is_authorized_pool(
                       method=method,
                       details=PoolDetails(name=pool_name, team_name=team_name),
                       user=user
                   ):
                       pools_name.remove(pool_name
   
           return pool_names
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/security.py:
##########
@@ -281,6 +312,39 @@ def inner(
     return inner
 
 
+class PermittedConnectionFilter(OrmClause[set[str]]):
+    """A parameter that filters the permitted connections for the user."""
+
+    def to_orm(self, select: Select) -> Select:
+        return select.where(Connection.conn_id.in_(self.value))
+
+
+def permitted_connection_filter_factory(
+    method: ResourceMethod, filter_class=PermittedConnectionFilter

Review Comment:
   If yes, then we can remove for `Connection`, `Pool` and  `Variable` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to