potiuk commented on code in PR #55298:
URL: https://github.com/apache/airflow/pull/55298#discussion_r2328831029


##########
airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py:
##########
@@ -427,36 +433,243 @@ def get_authorized_dag_ids(
         """
         Get DAGs the user has access to.
 
-        By default, reads all the DAGs and check individually if the user has 
permissions to access the DAG.
-        Can lead to some poor performance. It is recommended to override this 
method in the auth manager
-        implementation to provide a more efficient implementation.
-
         :param user: the user
         :param method: the method to filter on
         :param session: the session
         """
-        dag_ids = {dag.dag_id for dag in 
session.execute(select(DagModel.dag_id))}
-        return self.filter_authorized_dag_ids(dag_ids=dag_ids, method=method, 
user=user)
+        stmt = (
+            select(DagModel.dag_id, Team.name)
+            .join(DagBundleModel, DagModel.bundle_name == DagBundleModel.name)
+            .join(
+                dag_bundle_team_association_table,
+                DagBundleModel.name == 
dag_bundle_team_association_table.c.dag_bundle_name,
+                isouter=True,
+            )
+            .join(Team, Team.id == 
dag_bundle_team_association_table.c.team_id, isouter=True)
+        )
+        rows = session.execute(stmt).all()
+        dags_by_team: dict[str | None, set[str]] = defaultdict(set)
+        for dag_id, team_name in rows:
+            dags_by_team[team_name].add(dag_id)
+
+        dag_ids: set[str] = set()
+        for team_name, team_dag_ids in dags_by_team.items():
+            dag_ids.update(
+                self.filter_authorized_dag_ids(
+                    dag_ids=team_dag_ids, user=user, method=method, 
team_name=team_name
+                )
+            )
+
+        return dag_ids
 
     def filter_authorized_dag_ids(
         self,
         *,
         dag_ids: set[str],
         user: T,
         method: ResourceMethod = "GET",
+        team_name: str | None = None,
     ) -> set[str]:
         """
         Filter DAGs the user has access to.
 
-        :param dag_ids: the list of DAG ids
+        By default, check individually if the user has permissions to access 
the DAG.
+        Can lead to some poor performance. It is recommended to override this 
method in the auth manager
+        implementation to provide a more efficient implementation.
+
+        :param dag_ids: the set of DAG ids
+        :param user: the user
+        :param method: the method to filter on
+        :param team_name: the name of the team associated to the Dags if 
Airflow environment runs in
+            multi-team mode
+        """
+
+        def _is_authorized_dag_id(dag_id: str):
+            return self.is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id, 
team_name=team_name), user=user
+            )
+
+        return {dag_id for dag_id in dag_ids if _is_authorized_dag_id(dag_id)}
+
+    @provide_session
+    def get_authorized_connections(
+        self,
+        *,
+        user: T,
+        method: ResourceMethod = "GET",
+        session: Session = NEW_SESSION,
+    ) -> set[str]:
+        """
+        Get connection ids (``conn_id``) the user has access to.
+
+        :param user: the user
+        :param method: the method to filter on
+        :param session: the session
+        """
+        stmt = select(Connection.conn_id, Team.name).join(Team, 
Connection.team_id == Team.id, isouter=True)

Review Comment:
   @jedcunningham -> the DB connection is important - because in DB it will 
have optional team_id. For all the other cases, (env var, command, secrets 
manager) it's really the case of worker "workload identity". The question 
"whichc" connections and variables is determined by "workload identity".  
   
   There are few cases:
   
   * team Dag Processor following 
[AIP-92](https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-92+Isolate+DAG+processor%2C+Callback+processor%2C+and+Triggerer+from+core+services)
 
   * worker (per team)
   * K8s pod launched by the "team" executor 
   * edge executor configured per "team"
   
   Each of those will have a distinct set of env variables and possibly 
configurations in whatever configuration is suppoerted (non-accessible for 
other teams) that will determine access those workloads will have. For example 
if you are using AWS secrets manager to retrieve connections/variables + K8S 
executor, the executor that will be configured for the team will have the right 
pod template, or the right permissions to the k8s namespace that will have the 
credentials that will have access to secrets available to your team.
   
   Basically - in Airflow we **only** care about per-team access to database 
(because this is what Airflow does internally) while we **outsource** which 
team has access to which data for all other ways you can retrieve secrts and 
connections.
   
   And it makes perfect sense - because this way, whoever manages the 
deployment can keep full control pn how workloads "per team" are configured - 
including sensitive credthatential configuration.
   
   We will have to definitely describe it in the docs and make sure it's 
"deployment manager's" thing to do - and it is one of the things that makes 
"multi-team" not being "multi-tenant" out-of-the-box. We do not provide "all" 
tooling and out-of-the-bo x configuation to make multi-tenancy possible - 
instead we make it "possible" for someone who wants to implement multi-team. 
Where "possible" does not mean "easty" and "out-of-the-box" - it means "you can 
do it if you really wan't - here are the isolations points, but you need to 
implement it in the way your company needs it". 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to