This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 79fda5fe3 [#4460] feat(core): Add the method call of the 
authorizationPlugin (#4461)
79fda5fe3 is described below

commit 79fda5fe3be322b68ae782fc67107425891d1c9c
Author: roryqi <ror...@apache.org>
AuthorDate: Mon Aug 19 18:24:10 2024 +0800

    [#4460] feat(core): Add the method call of the authorizationPlugin (#4461)
    
    ### What changes were proposed in this pull request?
    If we want to push the privileges down the underlying system, we need to
    call methods of the underlying system authorization plugin.
    
    ### Why are the changes needed?
    
    Fix: #4460
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Add some test cases.
---
 .../java/org/apache/gravitino/GravitinoEnv.java    |  18 +
 .../gravitino/SupportsRelationOperations.java      |   8 +-
 .../authorization/AuthorizationUtils.java          |  77 ++++
 .../authorization/FutureGrantManager.java          | 133 +++++++
 .../gravitino/authorization/PermissionManager.java | 392 ++++++++++++---------
 .../gravitino/authorization/RoleManager.java       |  19 +
 .../gravitino/hook/CatalogHookDispatcher.java      |  10 +
 .../gravitino/storage/relational/JDBCBackend.java  |  18 +
 .../relational/mapper/MetalakeMetaMapper.java      |   2 +-
 .../storage/relational/mapper/RoleMetaMapper.java  |  17 +
 .../relational/service/GroupMetaService.java       |  18 +
 .../relational/service/OwnerMetaService.java       |  19 +-
 .../relational/service/RoleMetaService.java        |  67 +++-
 .../relational/service/UserMetaService.java        |  18 +
 .../apache/gravitino/utils/NameIdentifierUtil.java |  14 +
 .../authorization/TestAccessControlManager.java    |  20 ++
 .../TestAccessControlManagerForPermissions.java    |  34 ++
 .../authorization/TestFutureGrantManager.java      | 170 +++++++++
 .../relational/service/TestRoleMetaService.java    | 100 ++++++
 .../gravitino/server/web/rest/RoleOperations.java  |  99 +++---
 20 files changed, 1001 insertions(+), 252 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 6e97ed1eb..04de93186 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -21,6 +21,7 @@ package org.apache.gravitino;
 import com.google.common.base.Preconditions;
 import org.apache.gravitino.authorization.AccessControlDispatcher;
 import org.apache.gravitino.authorization.AccessControlManager;
+import org.apache.gravitino.authorization.FutureGrantManager;
 import org.apache.gravitino.authorization.OwnerManager;
 import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
 import org.apache.gravitino.catalog.CatalogDispatcher;
@@ -111,6 +112,7 @@ public class GravitinoEnv {
   private TagManager tagManager;
   private EventBus eventBus;
   private OwnerManager ownerManager;
+  private FutureGrantManager futureGrantManager;
 
   protected GravitinoEnv() {}
 
@@ -287,10 +289,24 @@ public class GravitinoEnv {
     return tagManager;
   }
 
+  /**
+   * Get the OwnerManager associated with the Gravitino environment.
+   *
+   * @return The OwnerManager instance.
+   */
   public OwnerManager ownerManager() {
     return ownerManager;
   }
 
+  /**
+   * Get the FutureGrantManager associated with the Gravitino environment.
+   *
+   * @return The FutureGrantManager instance.
+   */
+  public FutureGrantManager futureGrantManager() {
+    return futureGrantManager;
+  }
+
   public void start() {
     auxServiceManager.serviceStart();
     metricsSystem.start();
@@ -410,9 +426,11 @@ public class GravitinoEnv {
 
       this.accessControlDispatcher = accessControlHookDispatcher;
       this.ownerManager = new OwnerManager(entityStore);
+      this.futureGrantManager = new FutureGrantManager(entityStore);
     } else {
       this.accessControlDispatcher = null;
       this.ownerManager = null;
+      this.futureGrantManager = null;
     }
 
     this.auxServiceManager = new AuxiliaryServiceManager();
diff --git 
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java 
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index 5a63ceee0..617f72ab9 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -30,7 +30,13 @@ public interface SupportsRelationOperations {
   /** Relation is an abstraction which connects two entities. */
   enum Type {
     /** The owner relationship */
-    OWNER_REL
+    OWNER_REL,
+    /** Metadata objet and role relationship */
+    METADATA_OBJECT_ROLE_REL,
+    /** Role and user relationship */
+    ROLE_USER_REL,
+    /** Role and group relationship */
+    ROLE_GROUP_REL
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
index 5e16c5bcb..6dd42e628 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
@@ -18,13 +18,24 @@
  */
 package org.apache.gravitino.authorization;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +48,15 @@ public class AuthorizationUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(AuthorizationUtils.class);
   private static final String METALAKE_DOES_NOT_EXIST_MSG = "Metalake %s does 
not exist";
 
+  private static final List<Privilege.Name> pluginNotSupportsPrivileges =
+      Lists.newArrayList(
+          Privilege.Name.CREATE_CATALOG,
+          Privilege.Name.USE_CATALOG,
+          Privilege.Name.MANAGE_GRANTS,
+          Privilege.Name.MANAGE_USERS,
+          Privilege.Name.MANAGE_GROUPS,
+          Privilege.Name.CREATE_ROLE);
+
   private AuthorizationUtils() {}
 
   static void checkMetalakeExists(String metalake) throws 
NoSuchMetalakeException {
@@ -116,4 +136,61 @@ public class AuthorizationUtils {
         "Role namespace must have 3 levels, the input namespace is %s",
         namespace);
   }
+
+  // Every catalog has one authorization plugin, we should avoid calling
+  // underlying authorization repeatedly. So we use a set to record which
+  // catalog has been called the authorization plugin.
+  public static void callAuthorizationPlugin(
+      String metalake,
+      List<SecurableObject> securableObjects,
+      Set<String> catalogsAlreadySet,
+      Consumer<AuthorizationPlugin> consumer) {
+    CatalogManager catalogManager = 
GravitinoEnv.getInstance().catalogManager();
+    for (SecurableObject securableObject : securableObjects) {
+      if (needApplyAllAuthorizationPlugin(securableObject)) {
+        Catalog[] catalogs = 
catalogManager.listCatalogsInfo(Namespace.of(metalake));
+        for (Catalog catalog : catalogs) {
+          callAuthorizationPluginImpl(catalogsAlreadySet, consumer, catalog);
+        }
+
+      } else if (supportsSingleAuthorizationPlugin(securableObject.type())) {
+        NameIdentifier catalogIdent =
+            NameIdentifierUtil.getCatalogIdentifier(
+                MetadataObjectUtil.toEntityIdent(metalake, securableObject));
+        Catalog catalog = catalogManager.loadCatalog(catalogIdent);
+        callAuthorizationPluginImpl(catalogsAlreadySet, consumer, catalog);
+      }
+    }
+  }
+
+  private static void callAuthorizationPluginImpl(
+      Set<String> catalogsAlreadySet, Consumer<AuthorizationPlugin> consumer, 
Catalog catalog) {
+    if (!catalogsAlreadySet.contains(catalog.name())) {
+      catalogsAlreadySet.add(catalog.name());
+
+      if (catalog instanceof BaseCatalog) {
+        BaseCatalog baseCatalog = (BaseCatalog) catalog;
+        if (baseCatalog.getAuthorizationPlugin() != null) {
+          consumer.accept(baseCatalog.getAuthorizationPlugin());
+        }
+      }
+    }
+  }
+
+  public static boolean needApplyAllAuthorizationPlugin(SecurableObject 
securableObject) {
+    // TODO: Add supportsSecurableObjects method for every privilege to 
simplify this code
+    if (securableObject.type() == MetadataObject.Type.METALAKE) {
+      List<Privilege> privileges = securableObject.privileges();
+      for (Privilege privilege : privileges) {
+        if (!pluginNotSupportsPrivileges.contains(privilege.name())) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private static boolean supportsSingleAuthorizationPlugin(MetadataObject.Type 
type) {
+    return type != MetadataObject.Type.ROLE && type != 
MetadataObject.Type.METALAKE;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/FutureGrantManager.java 
b/core/src/main/java/org/apache/gravitino/authorization/FutureGrantManager.java
new file mode 100644
index 000000000..1745d53d9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/FutureGrantManager.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.glassfish.jersey.internal.guava.Sets;
+
+/**
+ * FutureGrantManager is responsible for granting privileges to future object. 
When you grant a
+ * privilege which authorization supports to a metalake, the future creating 
catalog should apply
+ * the privilege to underlying authorization plugin, too. FutureGrantManager 
selects the roles
+ * contains the metalake securable object and filter unnecessary roles. Then, 
it selects the users
+ * and groups by roles. Finally, it apply the information to the authorization 
plugins.
+ */
+public class FutureGrantManager {
+  EntityStore entityStore;
+
+  public FutureGrantManager(EntityStore entityStore) {
+    this.entityStore = entityStore;
+  }
+
+  public void grantNewlyCreatedCatalog(String metalake, BaseCatalog catalog) {
+    try {
+
+      Map<UserEntity, Set<RoleEntity>> userGrantRoles = Maps.newHashMap();
+      Map<GroupEntity, Set<RoleEntity>> groupGrantRoles = Maps.newHashMap();
+      List<RoleEntity> roles =
+          entityStore.relationOperations()
+              .listEntitiesByRelation(
+                  SupportsRelationOperations.Type.METADATA_OBJECT_ROLE_REL,
+                  NameIdentifier.of(metalake),
+                  Entity.EntityType.METALAKE)
+              .stream()
+              .map(entity -> (RoleEntity) entity)
+              .collect(Collectors.toList());
+
+      for (RoleEntity role : roles) {
+
+        boolean supportsFutureGrant = false;
+        for (SecurableObject object : role.securableObjects()) {
+          if (AuthorizationUtils.needApplyAllAuthorizationPlugin(object)) {
+            supportsFutureGrant = true;
+            break;
+          }
+        }
+
+        if (!supportsFutureGrant) {
+          continue;
+        }
+
+        List<UserEntity> users =
+            entityStore.relationOperations()
+                .listEntitiesByRelation(
+                    SupportsRelationOperations.Type.ROLE_USER_REL,
+                    role.nameIdentifier(),
+                    Entity.EntityType.ROLE)
+                .stream()
+                .map(entity -> (UserEntity) entity)
+                .collect(Collectors.toList());
+
+        for (UserEntity user : users) {
+          Set<RoleEntity> roleSet = userGrantRoles.computeIfAbsent(user, k -> 
Sets.newHashSet());
+          roleSet.add(role);
+        }
+
+        List<GroupEntity> groups =
+            entityStore.relationOperations()
+                .listEntitiesByRelation(
+                    SupportsRelationOperations.Type.ROLE_GROUP_REL,
+                    role.nameIdentifier(),
+                    Entity.EntityType.ROLE)
+                .stream()
+                .map(entity -> (GroupEntity) entity)
+                .collect(Collectors.toList());
+
+        for (GroupEntity group : groups) {
+          Set<RoleEntity> roleSet = groupGrantRoles.computeIfAbsent(group, k 
-> Sets.newHashSet());
+          roleSet.add(role);
+        }
+      }
+
+      for (Map.Entry<UserEntity, Set<RoleEntity>> entry : 
userGrantRoles.entrySet()) {
+        AuthorizationPlugin authorizationPlugin = 
catalog.getAuthorizationPlugin();
+        if (authorizationPlugin != null) {
+          authorizationPlugin.onGrantedRolesToUser(
+              Lists.newArrayList(entry.getValue()), entry.getKey());
+        }
+      }
+
+      for (Map.Entry<GroupEntity, Set<RoleEntity>> entry : 
groupGrantRoles.entrySet()) {
+        AuthorizationPlugin authorizationPlugin = 
catalog.getAuthorizationPlugin();
+
+        if (authorizationPlugin != null) {
+          authorizationPlugin.onGrantedRolesToGroup(
+              Lists.newArrayList(entry.getValue()), entry.getKey());
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/PermissionManager.java 
b/core/src/main/java/org/apache/gravitino/authorization/PermissionManager.java
index 7453d304a..01c87d030 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/PermissionManager.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/PermissionManager.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Entity;
@@ -37,6 +38,7 @@ import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.utils.PrincipalUtils;
+import org.glassfish.jersey.internal.guava.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,50 +64,64 @@ class PermissionManager {
         roleEntitiesToGrant.add(roleManager.getRole(metalake, role));
       }
 
-      return store.update(
-          AuthorizationUtils.ofUser(metalake, user),
-          UserEntity.class,
-          Entity.EntityType.USER,
-          userEntity -> {
-            List<RoleEntity> roleEntities = Lists.newArrayList();
-            if (userEntity.roleNames() != null) {
-              for (String role : userEntity.roleNames()) {
-                roleEntities.add(roleManager.getRole(metalake, role));
-              }
-            }
-            List<String> roleNames = 
Lists.newArrayList(toRoleNames(roleEntities));
-            List<Long> roleIds = Lists.newArrayList(toRoleIds(roleEntities));
-
-            for (RoleEntity roleEntityToGrant : roleEntitiesToGrant) {
-              if (roleIds.contains(roleEntityToGrant.id())) {
-                LOG.warn(
-                    "Failed to grant, role {} already exists in the user {} of 
metalake {}",
-                    roleEntityToGrant.name(),
-                    user,
-                    metalake);
-              } else {
-                roleNames.add(roleEntityToGrant.name());
-                roleIds.add(roleEntityToGrant.id());
-              }
-            }
-
-            AuditInfo auditInfo =
-                AuditInfo.builder()
-                    .withCreator(userEntity.auditInfo().creator())
-                    .withCreateTime(userEntity.auditInfo().createTime())
-                    
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
-                    .withLastModifiedTime(Instant.now())
+      User updatedUser =
+          store.update(
+              AuthorizationUtils.ofUser(metalake, user),
+              UserEntity.class,
+              Entity.EntityType.USER,
+              userEntity -> {
+                List<RoleEntity> roleEntities = Lists.newArrayList();
+                if (userEntity.roleNames() != null) {
+                  for (String role : userEntity.roleNames()) {
+                    roleEntities.add(roleManager.getRole(metalake, role));
+                  }
+                }
+                List<String> roleNames = 
Lists.newArrayList(toRoleNames(roleEntities));
+                List<Long> roleIds = 
Lists.newArrayList(toRoleIds(roleEntities));
+
+                for (RoleEntity roleEntityToGrant : roleEntitiesToGrant) {
+                  if (roleIds.contains(roleEntityToGrant.id())) {
+                    LOG.warn(
+                        "Failed to grant, role {} already exists in the user 
{} of metalake {}",
+                        roleEntityToGrant.name(),
+                        user,
+                        metalake);
+                  } else {
+                    roleNames.add(roleEntityToGrant.name());
+                    roleIds.add(roleEntityToGrant.id());
+                  }
+                }
+
+                AuditInfo auditInfo =
+                    AuditInfo.builder()
+                        .withCreator(userEntity.auditInfo().creator())
+                        .withCreateTime(userEntity.auditInfo().createTime())
+                        
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                        .withLastModifiedTime(Instant.now())
+                        .build();
+
+                return UserEntity.builder()
+                    .withNamespace(userEntity.namespace())
+                    .withId(userEntity.id())
+                    .withName(userEntity.name())
+                    .withRoleNames(roleNames)
+                    .withRoleIds(roleIds)
+                    .withAuditInfo(auditInfo)
                     .build();
+              });
 
-            return UserEntity.builder()
-                .withNamespace(userEntity.namespace())
-                .withId(userEntity.id())
-                .withName(userEntity.name())
-                .withRoleNames(roleNames)
-                .withRoleIds(roleIds)
-                .withAuditInfo(auditInfo)
-                .build();
-          });
+      Set<String> catalogs = Sets.newHashSet();
+      for (Role grantedRole : roleEntitiesToGrant) {
+        AuthorizationUtils.callAuthorizationPlugin(
+            metalake,
+            grantedRole.securableObjects(),
+            catalogs,
+            authorizationPlugin ->
+                authorizationPlugin.onGrantedRolesToUser(
+                    Lists.newArrayList(roleEntitiesToGrant), updatedUser));
+      }
+
+      return updatedUser;
     } catch (NoSuchEntityException nse) {
       LOG.warn("Failed to grant, user {} does not exist in the metalake {}", 
user, metalake, nse);
       throw new NoSuchUserException(USER_DOES_NOT_EXIST_MSG, user, metalake);
@@ -127,50 +143,64 @@ class PermissionManager {
         roleEntitiesToGrant.add(roleManager.getRole(metalake, role));
       }
 
-      return store.update(
-          AuthorizationUtils.ofGroup(metalake, group),
-          GroupEntity.class,
-          Entity.EntityType.GROUP,
-          groupEntity -> {
-            List<RoleEntity> roleEntities = Lists.newArrayList();
-            if (groupEntity.roleNames() != null) {
-              for (String role : groupEntity.roleNames()) {
-                roleEntities.add(roleManager.getRole(metalake, role));
-              }
-            }
-            List<String> roleNames = 
Lists.newArrayList(toRoleNames(roleEntities));
-            List<Long> roleIds = Lists.newArrayList(toRoleIds(roleEntities));
-
-            for (RoleEntity roleEntityToGrant : roleEntitiesToGrant) {
-              if (roleIds.contains(roleEntityToGrant.id())) {
-                LOG.warn(
-                    "Failed to grant, role {} already exists in the group {} 
of metalake {}",
-                    roleEntityToGrant.name(),
-                    group,
-                    metalake);
-              } else {
-                roleNames.add(roleEntityToGrant.name());
-                roleIds.add(roleEntityToGrant.id());
-              }
-            }
-
-            AuditInfo auditInfo =
-                AuditInfo.builder()
-                    .withCreator(groupEntity.auditInfo().creator())
-                    .withCreateTime(groupEntity.auditInfo().createTime())
-                    
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
-                    .withLastModifiedTime(Instant.now())
+      Group updatedGroup =
+          store.update(
+              AuthorizationUtils.ofGroup(metalake, group),
+              GroupEntity.class,
+              Entity.EntityType.GROUP,
+              groupEntity -> {
+                List<RoleEntity> roleEntities = Lists.newArrayList();
+                if (groupEntity.roleNames() != null) {
+                  for (String role : groupEntity.roleNames()) {
+                    roleEntities.add(roleManager.getRole(metalake, role));
+                  }
+                }
+                List<String> roleNames = 
Lists.newArrayList(toRoleNames(roleEntities));
+                List<Long> roleIds = 
Lists.newArrayList(toRoleIds(roleEntities));
+
+                for (RoleEntity roleEntityToGrant : roleEntitiesToGrant) {
+                  if (roleIds.contains(roleEntityToGrant.id())) {
+                    LOG.warn(
+                        "Failed to grant, role {} already exists in the group 
{} of metalake {}",
+                        roleEntityToGrant.name(),
+                        group,
+                        metalake);
+                  } else {
+                    roleNames.add(roleEntityToGrant.name());
+                    roleIds.add(roleEntityToGrant.id());
+                  }
+                }
+
+                AuditInfo auditInfo =
+                    AuditInfo.builder()
+                        .withCreator(groupEntity.auditInfo().creator())
+                        .withCreateTime(groupEntity.auditInfo().createTime())
+                        
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                        .withLastModifiedTime(Instant.now())
+                        .build();
+
+                return GroupEntity.builder()
+                    .withId(groupEntity.id())
+                    .withNamespace(groupEntity.namespace())
+                    .withName(groupEntity.name())
+                    .withRoleNames(roleNames)
+                    .withRoleIds(roleIds)
+                    .withAuditInfo(auditInfo)
                     .build();
+              });
 
-            return GroupEntity.builder()
-                .withId(groupEntity.id())
-                .withNamespace(groupEntity.namespace())
-                .withName(groupEntity.name())
-                .withRoleNames(roleNames)
-                .withRoleIds(roleIds)
-                .withAuditInfo(auditInfo)
-                .build();
-          });
+      Set<String> catalogs = Sets.newHashSet();
+      for (Role grantedRole : roleEntitiesToGrant) {
+        AuthorizationUtils.callAuthorizationPlugin(
+            metalake,
+            grantedRole.securableObjects(),
+            catalogs,
+            authorizationPlugin ->
+                authorizationPlugin.onGrantedRolesToGroup(
+                    Lists.newArrayList(roleEntitiesToGrant), updatedGroup));
+      }
+
+      return updatedGroup;
     } catch (NoSuchEntityException nse) {
       LOG.warn("Failed to grant, group {} does not exist in the metalake {}", 
group, metalake, nse);
       throw new NoSuchGroupException(GROUP_DOES_NOT_EXIST_MSG, group, 
metalake);
@@ -192,49 +222,63 @@ class PermissionManager {
         roleEntitiesToRevoke.add(roleManager.getRole(metalake, role));
       }
 
-      return store.update(
-          AuthorizationUtils.ofGroup(metalake, group),
-          GroupEntity.class,
-          Entity.EntityType.GROUP,
-          groupEntity -> {
-            List<RoleEntity> roleEntities = Lists.newArrayList();
-            if (groupEntity.roleNames() != null) {
-              for (String role : groupEntity.roleNames()) {
-                roleEntities.add(roleManager.getRole(metalake, role));
-              }
-            }
-            List<String> roleNames = 
Lists.newArrayList(toRoleNames(roleEntities));
-            List<Long> roleIds = Lists.newArrayList(toRoleIds(roleEntities));
-
-            for (RoleEntity roleEntityToRevoke : roleEntitiesToRevoke) {
-              roleNames.remove(roleEntityToRevoke.name());
-              boolean removed = roleIds.remove(roleEntityToRevoke.id());
-              if (!removed) {
-                LOG.warn(
-                    "Failed to revoke, role {} does not exist in the group {} 
of metalake {}",
-                    roleEntityToRevoke.name(),
-                    group,
-                    metalake);
-              }
-            }
-
-            AuditInfo auditInfo =
-                AuditInfo.builder()
-                    .withCreator(groupEntity.auditInfo().creator())
-                    .withCreateTime(groupEntity.auditInfo().createTime())
-                    
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
-                    .withLastModifiedTime(Instant.now())
+      Group updatedGroup =
+          store.update(
+              AuthorizationUtils.ofGroup(metalake, group),
+              GroupEntity.class,
+              Entity.EntityType.GROUP,
+              groupEntity -> {
+                List<RoleEntity> roleEntities = Lists.newArrayList();
+                if (groupEntity.roleNames() != null) {
+                  for (String role : groupEntity.roleNames()) {
+                    roleEntities.add(roleManager.getRole(metalake, role));
+                  }
+                }
+                List<String> roleNames = 
Lists.newArrayList(toRoleNames(roleEntities));
+                List<Long> roleIds = 
Lists.newArrayList(toRoleIds(roleEntities));
+
+                for (RoleEntity roleEntityToRevoke : roleEntitiesToRevoke) {
+                  roleNames.remove(roleEntityToRevoke.name());
+                  boolean removed = roleIds.remove(roleEntityToRevoke.id());
+                  if (!removed) {
+                    LOG.warn(
+                        "Failed to revoke, role {} does not exist in the group 
{} of metalake {}",
+                        roleEntityToRevoke.name(),
+                        group,
+                        metalake);
+                  }
+                }
+
+                AuditInfo auditInfo =
+                    AuditInfo.builder()
+                        .withCreator(groupEntity.auditInfo().creator())
+                        .withCreateTime(groupEntity.auditInfo().createTime())
+                        
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                        .withLastModifiedTime(Instant.now())
+                        .build();
+
+                return GroupEntity.builder()
+                    .withNamespace(groupEntity.namespace())
+                    .withId(groupEntity.id())
+                    .withName(groupEntity.name())
+                    .withRoleNames(roleNames)
+                    .withRoleIds(roleIds)
+                    .withAuditInfo(auditInfo)
                     .build();
+              });
 
-            return GroupEntity.builder()
-                .withNamespace(groupEntity.namespace())
-                .withId(groupEntity.id())
-                .withName(groupEntity.name())
-                .withRoleNames(roleNames)
-                .withRoleIds(roleIds)
-                .withAuditInfo(auditInfo)
-                .build();
-          });
+      Set<String> catalogs = Sets.newHashSet();
+      for (Role grantedRole : roleEntitiesToRevoke) {
+        AuthorizationUtils.callAuthorizationPlugin(
+            metalake,
+            grantedRole.securableObjects(),
+            catalogs,
+            authorizationPlugin ->
+                authorizationPlugin.onRevokedRolesFromGroup(
+                    Lists.newArrayList(roleEntitiesToRevoke), updatedGroup));
+      }
+
+      return updatedGroup;
 
     } catch (NoSuchEntityException nse) {
       LOG.warn(
@@ -258,49 +302,63 @@ class PermissionManager {
         roleEntitiesToRevoke.add(roleManager.getRole(metalake, role));
       }
 
-      return store.update(
-          AuthorizationUtils.ofUser(metalake, user),
-          UserEntity.class,
-          Entity.EntityType.USER,
-          userEntity -> {
-            List<RoleEntity> roleEntities = Lists.newArrayList();
-            if (userEntity.roleNames() != null) {
-              for (String role : userEntity.roleNames()) {
-                roleEntities.add(roleManager.getRole(metalake, role));
-              }
-            }
-
-            List<String> roleNames = 
Lists.newArrayList(toRoleNames(roleEntities));
-            List<Long> roleIds = Lists.newArrayList(toRoleIds(roleEntities));
-
-            for (RoleEntity roleEntityToRevoke : roleEntitiesToRevoke) {
-              roleNames.remove(roleEntityToRevoke.name());
-              boolean removed = roleIds.remove(roleEntityToRevoke.id());
-              if (!removed) {
-                LOG.warn(
-                    "Failed to revoke, role {} doesn't exist in the user {} of 
metalake {}",
-                    roleEntityToRevoke.name(),
-                    user,
-                    metalake);
-              }
-            }
-
-            AuditInfo auditInfo =
-                AuditInfo.builder()
-                    .withCreator(userEntity.auditInfo().creator())
-                    .withCreateTime(userEntity.auditInfo().createTime())
-                    
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
-                    .withLastModifiedTime(Instant.now())
+      User updatedUser =
+          store.update(
+              AuthorizationUtils.ofUser(metalake, user),
+              UserEntity.class,
+              Entity.EntityType.USER,
+              userEntity -> {
+                List<RoleEntity> roleEntities = Lists.newArrayList();
+                if (userEntity.roleNames() != null) {
+                  for (String role : userEntity.roleNames()) {
+                    roleEntities.add(roleManager.getRole(metalake, role));
+                  }
+                }
+
+                List<String> roleNames = 
Lists.newArrayList(toRoleNames(roleEntities));
+                List<Long> roleIds = 
Lists.newArrayList(toRoleIds(roleEntities));
+
+                for (RoleEntity roleEntityToRevoke : roleEntitiesToRevoke) {
+                  roleNames.remove(roleEntityToRevoke.name());
+                  boolean removed = roleIds.remove(roleEntityToRevoke.id());
+                  if (!removed) {
+                    LOG.warn(
+                        "Failed to revoke, role {} doesn't exist in the user 
{} of metalake {}",
+                        roleEntityToRevoke.name(),
+                        user,
+                        metalake);
+                  }
+                }
+
+                AuditInfo auditInfo =
+                    AuditInfo.builder()
+                        .withCreator(userEntity.auditInfo().creator())
+                        .withCreateTime(userEntity.auditInfo().createTime())
+                        
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                        .withLastModifiedTime(Instant.now())
+                        .build();
+                return UserEntity.builder()
+                    .withId(userEntity.id())
+                    .withNamespace(userEntity.namespace())
+                    .withName(userEntity.name())
+                    .withRoleNames(roleNames)
+                    .withRoleIds(roleIds)
+                    .withAuditInfo(auditInfo)
                     .build();
-            return UserEntity.builder()
-                .withId(userEntity.id())
-                .withNamespace(userEntity.namespace())
-                .withName(userEntity.name())
-                .withRoleNames(roleNames)
-                .withRoleIds(roleIds)
-                .withAuditInfo(auditInfo)
-                .build();
-          });
+              });
+
+      Set<String> catalogs = Sets.newHashSet();
+      for (Role grantedRole : roleEntitiesToRevoke) {
+        AuthorizationUtils.callAuthorizationPlugin(
+            metalake,
+            grantedRole.securableObjects(),
+            catalogs,
+            authorizationPlugin ->
+                authorizationPlugin.onRevokedRolesFromUser(
+                    Lists.newArrayList(roleEntitiesToRevoke), updatedUser));
+      }
+
+      return updatedUser;
     } catch (NoSuchEntityException nse) {
       LOG.warn("Failed to revoke, user {} does not exist in the metalake {}", 
user, metalake, nse);
       throw new NoSuchUserException(USER_DOES_NOT_EXIST_MSG, user, metalake);
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/RoleManager.java 
b/core/src/main/java/org/apache/gravitino/authorization/RoleManager.java
index ad0e9a768..b1539d019 100644
--- a/core/src/main/java/org/apache/gravitino/authorization/RoleManager.java
+++ b/core/src/main/java/org/apache/gravitino/authorization/RoleManager.java
@@ -43,6 +43,7 @@ import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.utils.PrincipalUtils;
+import org.glassfish.jersey.internal.guava.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,6 +108,13 @@ class RoleManager {
     try {
       store.put(roleEntity, false /* overwritten */);
       cache.put(roleEntity.nameIdentifier(), roleEntity);
+
+      AuthorizationUtils.callAuthorizationPlugin(
+          metalake,
+          roleEntity.securableObjects(),
+          Sets.newHashSet(),
+          authorizationPlugin -> 
authorizationPlugin.onRoleCreated(roleEntity));
+
       return roleEntity;
     } catch (EntityAlreadyExistsException e) {
       LOG.warn("Role {} in the metalake {} already exists", role, metalake, e);
@@ -135,6 +143,17 @@ class RoleManager {
       NameIdentifier ident = AuthorizationUtils.ofRole(metalake, role);
       cache.invalidate(ident);
 
+      try {
+        RoleEntity roleEntity = store.get(ident, Entity.EntityType.ROLE, 
RoleEntity.class);
+        AuthorizationUtils.callAuthorizationPlugin(
+            metalake,
+            roleEntity.securableObjects(),
+            Sets.newHashSet(),
+            authorizationPlugin -> 
authorizationPlugin.onRoleDeleted(roleEntity));
+      } catch (NoSuchEntityException nse) {
+        // ignore, because the role may have been deleted.
+      }
+
       return store.delete(ident, Entity.EntityType.ROLE);
     } catch (IOException ioe) {
       LOG.error(
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java
index 4b6067de1..86b42dea3 100644
--- a/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java
@@ -25,9 +25,11 @@ import org.apache.gravitino.Entity;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.FutureGrantManager;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerManager;
 import org.apache.gravitino.catalog.CatalogDispatcher;
+import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
@@ -80,6 +82,14 @@ public class CatalogHookDispatcher implements 
CatalogDispatcher {
           PrincipalUtils.getCurrentUserName(),
           Owner.Type.USER);
     }
+
+    // Apply the metalake securable object privileges to authorization plugin
+    FutureGrantManager futureGrantManager = 
GravitinoEnv.getInstance().futureGrantManager();
+    if (futureGrantManager != null && catalog instanceof BaseCatalog) {
+      futureGrantManager.grantNewlyCreatedCatalog(
+          ident.namespace().level(0), (BaseCatalog) catalog);
+    }
+
     return catalog;
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 641b3be25..81e111a28 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -375,6 +375,24 @@ public class JDBCBackend implements RelationalBackend {
             .getOwner(nameIdentifier, identType)
             .ifPresent(e -> list.add((E) e));
         return list;
+      case METADATA_OBJECT_ROLE_REL:
+        return (List<E>)
+            RoleMetaService.getInstance()
+                .listRolesByMetadataObjectIdentAndType(nameIdentifier, 
identType);
+      case ROLE_GROUP_REL:
+        if (identType == Entity.EntityType.ROLE) {
+          return (List<E>) 
GroupMetaService.getInstance().listGroupsByRoleIdent(nameIdentifier);
+        } else {
+          throw new IllegalArgumentException(
+              String.format("ROLE_GROUP_REL doesn't support type %s", 
identType.name()));
+        }
+      case ROLE_USER_REL:
+        if (identType == Entity.EntityType.ROLE) {
+          return (List<E>) 
UserMetaService.getInstance().listUsersByRoleIdent(nameIdentifier);
+        } else {
+          throw new IllegalArgumentException(
+              String.format("ROLE_USER_REL doesn't support type %s", 
identType.name()));
+        }
       default:
         throw new IllegalArgumentException(
             String.format("Doesn't support the relation type %s", relType));
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
index 361600968..4c041a993 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
@@ -68,7 +68,7 @@ public interface MetalakeMetaMapper {
           + " deleted_at as deletedAt"
           + " FROM "
           + TABLE_NAME
-          + " WHERE metalake_id = #{metalaId} and deleted_at = 0")
+          + " WHERE metalake_id = #{metalakeId} and deleted_at = 0")
   MetalakePO selectMetalakeMetaById(@Param("metalakeId") Long metalakeId);
 
   @Select(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaMapper.java
index c6cf136ad..999eaeac5 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/RoleMetaMapper.java
@@ -88,6 +88,23 @@ public interface RoleMetaMapper {
           + " AND ro.deleted_at = 0 AND ge.deleted_at = 0")
   List<RolePO> listRolesByGroupId(Long groupId);
 
+  @Select(
+      "SELECT DISTINCT ro.role_id as roleId, ro.role_name as roleName,"
+          + " ro.metalake_id as metalakeId, ro.properties as properties,"
+          + " ro.audit_info as auditInfo, ro.current_version as 
currentVersion,"
+          + " ro.last_version as lastVersion, ro.deleted_at as deletedAt"
+          + " FROM "
+          + ROLE_TABLE_NAME
+          + " ro JOIN "
+          + SecurableObjectMapper.SECURABLE_OBJECT_TABLE_NAME
+          + " se ON ro.role_id = se.role_id"
+          + " WHERE se.entity_id = #{metadataObjectId}"
+          + " AND se.type = #{metadataObjectType}"
+          + " AND ro.deleted_at = 0 AND se.deleted_at = 0")
+  List<RolePO> listRolesByMetadataObjectIdAndType(
+      @Param("metadataObjectId") Long metadataObjectId,
+      @Param("metadataObjectType") String metadataObjectType);
+
   @Insert(
       "INSERT INTO "
           + ROLE_TABLE_NAME
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/GroupMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/GroupMetaService.java
index 81dbda4f8..2ffc10dac 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/GroupMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/GroupMetaService.java
@@ -22,17 +22,20 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
@@ -94,6 +97,21 @@ public class GroupMetaService {
     return POConverters.fromGroupPO(groupPO, rolePOs, identifier.namespace());
   }
 
+  public List<GroupEntity> listGroupsByRoleIdent(NameIdentifier roleIdent) {
+    RoleEntity roleEntity = 
RoleMetaService.getInstance().getRoleByIdentifier(roleIdent);
+    List<GroupPO> groupPOs =
+        SessionUtils.getWithoutCommit(
+            GroupMetaMapper.class, mapper -> 
mapper.listGroupsByRoleId(roleEntity.id()));
+    return groupPOs.stream()
+        .map(
+            po ->
+                POConverters.fromGroupPO(
+                    po,
+                    Collections.emptyList(),
+                    
AuthorizationUtils.ofGroupNamespace(roleIdent.namespace().level(0))))
+        .collect(Collectors.toList());
+  }
+
   public void insertGroup(GroupEntity groupEntity, boolean overwritten) throws 
IOException {
     try {
       AuthorizationUtils.checkGroup(groupEntity.nameIdentifier());
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
index c25c9997c..1118467b0 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
@@ -45,7 +45,8 @@ public class OwnerMetaService {
 
   public Optional<Entity> getOwner(NameIdentifier identifier, 
Entity.EntityType type) {
     long metalakeId =
-        
MetalakeMetaService.getInstance().getMetalakeIdByName(getMetalake(identifier));
+        MetalakeMetaService.getInstance()
+            .getMetalakeIdByName(NameIdentifierUtil.getMetalake(identifier));
     Long entityId = getEntityId(metalakeId, identifier, type);
 
     UserPO userPO =
@@ -58,7 +59,7 @@ public class OwnerMetaService {
           POConverters.fromUserPO(
               userPO,
               Collections.emptyList(),
-              AuthorizationUtils.ofUserNamespace(getMetalake(identifier))));
+              
AuthorizationUtils.ofUserNamespace(NameIdentifierUtil.getMetalake(identifier))));
     }
 
     GroupPO groupPO =
@@ -71,7 +72,7 @@ public class OwnerMetaService {
           POConverters.fromGroupPO(
               groupPO,
               Collections.emptyList(),
-              AuthorizationUtils.ofGroupNamespace(getMetalake(identifier))));
+              
AuthorizationUtils.ofGroupNamespace(NameIdentifierUtil.getMetalake(identifier))));
     }
 
     return Optional.empty();
@@ -82,7 +83,9 @@ public class OwnerMetaService {
       Entity.EntityType entityType,
       NameIdentifier owner,
       Entity.EntityType ownerType) {
-    long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(getMetalake(entity));
+    long metalakeId =
+        MetalakeMetaService.getInstance()
+            .getMetalakeIdByName(NameIdentifierUtil.getMetalake(entity));
 
     Long entityId = getEntityId(metalakeId, entity, entityType);
     Long ownerId = getEntityId(metalakeId, owner, ownerType);
@@ -118,12 +121,4 @@ public class OwnerMetaService {
             metalakeId, object.fullName(), object.type());
     }
   }
-
-  private static String getMetalake(NameIdentifier identifier) {
-    if (identifier.hasNamespace()) {
-      return identifier.namespace().level(0);
-    } else {
-      return identifier.name();
-    }
-  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
index cbb1fdfb0..d4f38e63c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.storage.relational.service;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
@@ -38,6 +39,7 @@ import 
org.apache.gravitino.storage.relational.po.SecurableObjectPO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
 import org.apache.gravitino.storage.relational.utils.POConverters;
 import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,6 +90,52 @@ public class RoleMetaService {
         RoleMetaMapper.class, mapper -> mapper.listRolesByUserId(userId));
   }
 
+  public List<RoleEntity> listRolesByMetadataObjectIdentAndType(
+      NameIdentifier metadataObjectIdent, Entity.EntityType 
metadataObjectType) {
+    String metalake = NameIdentifierUtil.getMetalake(metadataObjectIdent);
+    long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+    MetadataObject metadataObject =
+        NameIdentifierUtil.toMetadataObject(metadataObjectIdent, 
metadataObjectType);
+    long metadataObjectId =
+        MetadataObjectService.getMetadataObjectId(
+            metalakeId, metadataObject.fullName(), metadataObject.type());
+    List<RolePO> rolePOs =
+        SessionUtils.getWithoutCommit(
+            RoleMetaMapper.class,
+            mapper ->
+                mapper.listRolesByMetadataObjectIdAndType(
+                    metadataObjectId, metadataObject.type().name()));
+    return rolePOs.stream()
+        .map(
+            po ->
+                POConverters.fromRolePO(
+                    po, listSecurableObjects(po), 
AuthorizationUtils.ofRoleNamespace(metalake)))
+        .collect(Collectors.toList());
+  }
+
+  private List<SecurableObject> listSecurableObjects(RolePO po) {
+    List<SecurableObjectPO> securableObjectPOs = 
listSecurableObjectsByRoleId(po.getRoleId());
+    List<SecurableObject> securableObjects = Lists.newArrayList();
+
+    for (SecurableObjectPO securableObjectPO : securableObjectPOs) {
+      String fullName =
+          MetadataObjectService.getMetadataObjectFullName(
+              securableObjectPO.getType(), securableObjectPO.getEntityId());
+      if (fullName != null) {
+        securableObjects.add(
+            POConverters.fromSecurableObjectPO(
+                fullName, securableObjectPO, 
getType(securableObjectPO.getType())));
+      } else {
+        LOG.info(
+            "The securable object {} {} may be deleted",
+            securableObjectPO.getEntityId(),
+            securableObjectPO.getType());
+      }
+    }
+
+    return securableObjects;
+  }
+
   public List<RolePO> listRolesByGroupId(Long groupId) {
     return SessionUtils.getWithoutCommit(
         RoleMetaMapper.class, mapper -> mapper.listRolesByGroupId(groupId));
@@ -149,24 +197,7 @@ public class RoleMetaService {
         
MetalakeMetaService.getInstance().getMetalakeIdByName(identifier.namespace().level(0));
     RolePO rolePO = getRolePOByMetalakeIdAndName(metalakeId, 
identifier.name());
 
-    List<SecurableObjectPO> securableObjectPOs = 
listSecurableObjectsByRoleId(rolePO.getRoleId());
-    List<SecurableObject> securableObjects = Lists.newArrayList();
-
-    for (SecurableObjectPO securableObjectPO : securableObjectPOs) {
-      String fullName =
-          MetadataObjectService.getMetadataObjectFullName(
-              securableObjectPO.getType(), securableObjectPO.getEntityId());
-      if (fullName != null) {
-        securableObjects.add(
-            POConverters.fromSecurableObjectPO(
-                fullName, securableObjectPO, 
getType(securableObjectPO.getType())));
-      } else {
-        LOG.info(
-            "The securable object {} {} may be deleted",
-            securableObjectPO.getEntityId(),
-            securableObjectPO.getType());
-      }
-    }
+    List<SecurableObject> securableObjects = listSecurableObjects(rolePO);
 
     return POConverters.fromRolePO(rolePO, securableObjects, 
identifier.namespace());
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/UserMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/UserMetaService.java
index f39386175..e7d0a435a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/UserMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/UserMetaService.java
@@ -22,16 +22,19 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.UserMetaMapper;
@@ -94,6 +97,21 @@ public class UserMetaService {
     return POConverters.fromUserPO(userPO, rolePOs, identifier.namespace());
   }
 
+  public List<UserEntity> listUsersByRoleIdent(NameIdentifier roleIdent) {
+    RoleEntity roleEntity = 
RoleMetaService.getInstance().getRoleByIdentifier(roleIdent);
+    List<UserPO> userPOs =
+        SessionUtils.getWithoutCommit(
+            UserMetaMapper.class, mapper -> 
mapper.listUsersByRoleId(roleEntity.id()));
+    return userPOs.stream()
+        .map(
+            po ->
+                POConverters.fromUserPO(
+                    po,
+                    Collections.emptyList(),
+                    
AuthorizationUtils.ofUserNamespace(roleIdent.namespace().level(0))))
+        .collect(Collectors.toList());
+  }
+
   public void insertUser(UserEntity userEntity, boolean overwritten) throws 
IOException {
     try {
       AuthorizationUtils.checkUser(userEntity.nameIdentifier());
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index ceae3797a..30f560102 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -285,4 +285,18 @@ public class NameIdentifierUtil {
             "Entity type " + entityType + " is not supported to convert to 
MetadataObject");
     }
   }
+
+  /**
+   * Get the metalake name of the given {@link NameIdentifier}.
+   *
+   * @param identifier The name identifier of the entity
+   * @return metalake name
+   */
+  public static String getMetalake(NameIdentifier identifier) {
+    if (identifier.hasNamespace()) {
+      return identifier.namespace().level(0);
+    } else {
+      return identifier.name();
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
 
b/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
index 06d397f3b..27e5e667c 100644
--- 
a/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
+++ 
b/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManager.java
@@ -19,6 +19,11 @@
 package org.apache.gravitino.authorization;
 
 import static org.apache.gravitino.Configs.SERVICE_ADMINS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -30,6 +35,9 @@ import org.apache.gravitino.Config;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
 import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
@@ -52,10 +60,12 @@ public class TestAccessControlManager {
   private static AccessControlManager accessControlManager;
 
   private static EntityStore entityStore;
+  private static CatalogManager catalogManager = mock(CatalogManager.class);
 
   private static Config config;
 
   private static String METALAKE = "metalake";
+  private static AuthorizationPlugin authorizationPlugin;
 
   private static BaseMetalake metalakeEntity =
       BaseMetalake.builder()
@@ -81,6 +91,11 @@ public class TestAccessControlManager {
     FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", 
entityStore, true);
     FieldUtils.writeField(
         GravitinoEnv.getInstance(), "accessControlDispatcher", 
accessControlManager, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
catalogManager, true);
+    BaseCatalog catalog = mock(BaseCatalog.class);
+    when(catalogManager.loadCatalog(any())).thenReturn(catalog);
+    authorizationPlugin = mock(AuthorizationPlugin.class);
+    when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
   }
 
   @AfterAll
@@ -213,6 +228,7 @@ public class TestAccessControlManager {
 
   @Test
   public void testCreateRole() {
+    reset(authorizationPlugin);
     Map<String, String> props = ImmutableMap.of("key1", "value1");
 
     Role role =
@@ -225,6 +241,7 @@ public class TestAccessControlManager {
                     "catalog", 
Lists.newArrayList(Privileges.UseCatalog.allow()))));
     Assertions.assertEquals("create", role.name());
     testProperties(props, role.properties());
+    verify(authorizationPlugin).onRoleCreated(any());
 
     // Test with RoleAlreadyExistsException
     Assertions.assertThrows(
@@ -281,9 +298,12 @@ public class TestAccessControlManager {
                 "catalog", 
Lists.newArrayList(Privileges.UseCatalog.allow()))));
 
     // Test drop role
+    reset(authorizationPlugin);
     boolean dropped = accessControlManager.deleteRole("metalake", "testDrop");
     Assertions.assertTrue(dropped);
 
+    verify(authorizationPlugin).onRoleDeleted(any());
+
     // Test drop non-existed role
     boolean dropped1 = accessControlManager.deleteRole("metalake", "no-exist");
     Assertions.assertFalse(dropped1);
diff --git 
a/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManagerForPermissions.java
 
b/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManagerForPermissions.java
index 0ef4b6a8c..ce3c4e90c 100644
--- 
a/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManagerForPermissions.java
+++ 
b/core/src/test/java/org/apache/gravitino/authorization/TestAccessControlManagerForPermissions.java
@@ -18,6 +18,10 @@
  */
 package org.apache.gravitino.authorization;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
@@ -30,6 +34,9 @@ import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.exceptions.NoSuchRoleException;
@@ -46,12 +53,15 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 public class TestAccessControlManagerForPermissions {
 
   private static AccessControlManager accessControlManager;
 
   private static EntityStore entityStore;
+  private static CatalogManager catalogManager = 
Mockito.mock(CatalogManager.class);
+  private static AuthorizationPlugin authorizationPlugin;
 
   private static Config config;
 
@@ -126,6 +136,11 @@ public class TestAccessControlManagerForPermissions {
     FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", 
entityStore, true);
     FieldUtils.writeField(
         GravitinoEnv.getInstance(), "accessControlDispatcher", 
accessControlManager, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
catalogManager, true);
+    BaseCatalog catalog = Mockito.mock(BaseCatalog.class);
+    Mockito.when(catalogManager.loadCatalog(any())).thenReturn(catalog);
+    authorizationPlugin = Mockito.mock(AuthorizationPlugin.class);
+    
Mockito.when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
   }
 
   @AfterAll
@@ -138,14 +153,20 @@ public class TestAccessControlManagerForPermissions {
 
   @Test
   public void testGrantRoleToUser() {
+    reset(authorizationPlugin);
     String notExist = "not-exist";
 
     User user = accessControlManager.getUser(METALAKE, USER);
     Assertions.assertNull(user.roles());
 
+    reset(authorizationPlugin);
+
     user = accessControlManager.grantRolesToUser(METALAKE, ROLE, USER);
     Assertions.assertFalse(user.roles().isEmpty());
 
+    // Test authorization plugin
+    Mockito.verify(authorizationPlugin).onGrantedRolesToUser(any(), any());
+
     user = accessControlManager.getUser(METALAKE, USER);
     Assertions.assertEquals(1, user.roles().size());
     Assertions.assertEquals(ROLE, user.roles());
@@ -181,9 +202,13 @@ public class TestAccessControlManagerForPermissions {
     User user = accessControlManager.grantRolesToUser(METALAKE, ROLE, USER);
     Assertions.assertFalse(user.roles().isEmpty());
 
+    reset(authorizationPlugin);
     user = accessControlManager.revokeRolesFromUser(METALAKE, ROLE, USER);
     Assertions.assertTrue(user.roles().isEmpty());
 
+    // Test authorization plugin
+    Mockito.verify(authorizationPlugin).onRevokedRolesFromUser(any(), any());
+
     // Throw NoSuchMetalakeException
     Assertions.assertThrows(
         NoSuchMetalakeException.class,
@@ -212,9 +237,14 @@ public class TestAccessControlManagerForPermissions {
     Group group = accessControlManager.getGroup(METALAKE, GROUP);
     Assertions.assertTrue(group.roles().isEmpty());
 
+    reset(authorizationPlugin);
+
     group = accessControlManager.grantRolesToGroup(METALAKE, ROLE, GROUP);
     Assertions.assertFalse(group.roles().isEmpty());
 
+    // Test authorization plugin
+    verify(authorizationPlugin).onGrantedRolesToGroup(any(), any());
+
     group = accessControlManager.getGroup(METALAKE, GROUP);
     Assertions.assertEquals(1, group.roles().size());
     Assertions.assertEquals(ROLE, group.roles());
@@ -251,9 +281,13 @@ public class TestAccessControlManagerForPermissions {
     Group group = accessControlManager.grantRolesToGroup(METALAKE, ROLE, 
GROUP);
     Assertions.assertFalse(group.roles().isEmpty());
 
+    reset(authorizationPlugin);
     group = accessControlManager.revokeRolesFromGroup(METALAKE, ROLE, GROUP);
     Assertions.assertTrue(group.roles().isEmpty());
 
+    // Test authorization plugin
+    verify(authorizationPlugin).onRevokedRolesFromGroup(any(), any());
+
     // Throw NoSuchMetalakeException
     Assertions.assertThrows(
         NoSuchMetalakeException.class,
diff --git 
a/core/src/test/java/org/apache/gravitino/authorization/TestFutureGrantManager.java
 
b/core/src/test/java/org/apache/gravitino/authorization/TestFutureGrantManager.java
new file mode 100644
index 000000000..87acd0223
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/authorization/TestFutureGrantManager.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.SchemaVersion;
+import org.apache.gravitino.meta.UserEntity;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TestFutureGrantManager {
+  private static EntityStore entityStore = mock(EntityStore.class);
+  private static String METALAKE = "metalake";
+  private static AuthorizationPlugin authorizationPlugin;
+  private static BaseMetalake metalakeEntity =
+      BaseMetalake.builder()
+          .withId(1L)
+          .withName(METALAKE)
+          .withAuditInfo(
+              
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+          .withVersion(SchemaVersion.V_0_1)
+          .build();
+  private static BaseCatalog catalog;
+
+  @BeforeAll
+  public static void setUp() throws Exception {
+    entityStore.put(metalakeEntity, true);
+
+    catalog = mock(BaseCatalog.class);
+    authorizationPlugin = mock(AuthorizationPlugin.class);
+    when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
+  }
+
+  @Test
+  void testGrantNormally() throws IOException {
+    FutureGrantManager manager = new FutureGrantManager(entityStore);
+
+    SupportsRelationOperations relationOperations = 
mock(SupportsRelationOperations.class);
+    when(entityStore.relationOperations()).thenReturn(relationOperations);
+
+    // test no securable objects
+    RoleEntity roleEntity = mock(RoleEntity.class);
+    when(relationOperations.listEntitiesByRelation(
+            SupportsRelationOperations.Type.METADATA_OBJECT_ROLE_REL,
+            NameIdentifier.of(METALAKE),
+            Entity.EntityType.METALAKE))
+        .thenReturn(Lists.newArrayList(roleEntity));
+    UserEntity userEntity = mock(UserEntity.class);
+    when(relationOperations.listEntitiesByRelation(
+            SupportsRelationOperations.Type.ROLE_USER_REL,
+            AuthorizationUtils.ofRole(METALAKE, "role1"),
+            Entity.EntityType.ROLE))
+        .thenReturn(Lists.newArrayList(userEntity));
+    when(relationOperations.listEntitiesByRelation(
+            SupportsRelationOperations.Type.ROLE_GROUP_REL,
+            AuthorizationUtils.ofRole(METALAKE, "role1"),
+            Entity.EntityType.ROLE))
+        .thenReturn(Collections.emptyList());
+    when(roleEntity.securableObjects()).thenReturn(Collections.emptyList());
+
+    manager.grantNewlyCreatedCatalog(METALAKE, catalog);
+    verify(authorizationPlugin, never()).onGrantedRolesToUser(any(), any());
+    verify(authorizationPlugin, never()).onGrantedRolesToGroup(any(), any());
+
+    // test only grant users
+    reset(authorizationPlugin);
+    SecurableObject securableObject = mock(SecurableObject.class);
+    when(securableObject.type()).thenReturn(MetadataObject.Type.METALAKE);
+    when(securableObject.privileges())
+        .thenReturn(Lists.newArrayList(Privileges.CreateTable.allow()));
+    
when(roleEntity.securableObjects()).thenReturn(Lists.newArrayList(securableObject));
+    
when(roleEntity.nameIdentifier()).thenReturn(AuthorizationUtils.ofRole(METALAKE,
 "role1"));
+
+    manager.grantNewlyCreatedCatalog(METALAKE, catalog);
+    verify(authorizationPlugin).onGrantedRolesToUser(any(), any());
+    verify(authorizationPlugin, never()).onGrantedRolesToGroup(any(), any());
+
+    // test only grant groups
+    reset(authorizationPlugin);
+    GroupEntity groupEntity = mock(GroupEntity.class);
+    when(relationOperations.listEntitiesByRelation(
+            SupportsRelationOperations.Type.ROLE_USER_REL,
+            AuthorizationUtils.ofRole(METALAKE, "role1"),
+            Entity.EntityType.ROLE))
+        .thenReturn(Collections.emptyList());
+    when(relationOperations.listEntitiesByRelation(
+            SupportsRelationOperations.Type.ROLE_GROUP_REL,
+            AuthorizationUtils.ofRole(METALAKE, "role1"),
+            Entity.EntityType.ROLE))
+        .thenReturn(Lists.newArrayList(groupEntity));
+    manager.grantNewlyCreatedCatalog(METALAKE, catalog);
+    verify(authorizationPlugin, never()).onGrantedRolesToUser(any(), any());
+    verify(authorizationPlugin).onGrantedRolesToGroup(any(), any());
+
+    // test users and groups
+    reset(authorizationPlugin);
+    when(relationOperations.listEntitiesByRelation(
+            SupportsRelationOperations.Type.ROLE_USER_REL,
+            AuthorizationUtils.ofRole(METALAKE, "role1"),
+            Entity.EntityType.ROLE))
+        .thenReturn(Lists.newArrayList(userEntity));
+    when(relationOperations.listEntitiesByRelation(
+            SupportsRelationOperations.Type.ROLE_GROUP_REL,
+            AuthorizationUtils.ofRole(METALAKE, "role1"),
+            Entity.EntityType.ROLE))
+        .thenReturn(Lists.newArrayList(groupEntity));
+    manager.grantNewlyCreatedCatalog(METALAKE, catalog);
+    verify(authorizationPlugin).onGrantedRolesToUser(any(), any());
+    verify(authorizationPlugin).onGrantedRolesToGroup(any(), any());
+
+    // test to skip unnecessary roles
+    reset(authorizationPlugin);
+    when(securableObject.privileges())
+        .thenReturn(Lists.newArrayList(Privileges.CreateCatalog.allow()));
+    manager.grantNewlyCreatedCatalog(METALAKE, catalog);
+    verify(authorizationPlugin, never()).onGrantedRolesToUser(any(), any());
+    verify(authorizationPlugin, never()).onGrantedRolesToGroup(any(), any());
+  }
+
+  @Test
+  void testGrantWithException() throws IOException {
+    FutureGrantManager manager = new FutureGrantManager(entityStore);
+    SupportsRelationOperations relationOperations = 
mock(SupportsRelationOperations.class);
+    when(entityStore.relationOperations()).thenReturn(relationOperations);
+    doThrow(new IOException("mock error"))
+        .when(relationOperations)
+        .listEntitiesByRelation(any(), any(), any());
+    Assertions.assertThrows(
+        RuntimeException.class, () -> 
manager.grantNewlyCreatedCatalog(METALAKE, catalog));
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
index 65489a2c4..d811b8b59 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
@@ -27,6 +27,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Instant;
+import java.util.Comparator;
 import java.util.List;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.Namespace;
@@ -38,8 +39,12 @@ import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.storage.relational.TestJDBCBackend;
@@ -127,12 +132,62 @@ class TestRoleMetaService extends TestJDBCBackend {
             auditInfo);
     backend.insert(overwriteCatalog, false);
 
+    SchemaEntity schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog"),
+            "schema",
+            auditInfo);
+    backend.insert(schema, false);
+
+    TopicEntity topic =
+        createTopicEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "topic",
+            auditInfo);
+    backend.insert(topic, false);
+
+    FilesetEntity fileset =
+        createFilesetEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "fileset",
+            auditInfo);
+    backend.insert(fileset, false);
+
+    TableEntity table =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "table",
+            auditInfo);
+    backend.insert(table, false);
+
     RoleMetaService roleMetaService = RoleMetaService.getInstance();
 
+    // Test with different securable objects
+    SecurableObject metalakeObject =
+        SecurableObjects.ofMetalake(
+            "metalake", Lists.newArrayList(Privileges.CreateCatalog.allow()));
     SecurableObject catalogObject =
         SecurableObjects.ofCatalog(
             "catalog",
             Lists.newArrayList(Privileges.UseCatalog.allow(), 
Privileges.CreateSchema.deny()));
+    SecurableObject schemaObject =
+        SecurableObjects.ofSchema(
+            catalogObject,
+            "schema",
+            Lists.newArrayList(Privileges.UseSchema.allow(), 
Privileges.CreateTable.allow()));
+    SecurableObject topicObject =
+        SecurableObjects.ofTopic(
+            schemaObject, "topic", 
Lists.newArrayList(Privileges.ConsumeTopic.allow()));
+    SecurableObject filesetObject =
+        SecurableObjects.ofFileset(
+            schemaObject, "fileset", 
Lists.newArrayList(Privileges.ReadFileset.allow()));
+    SecurableObject tableObject =
+        SecurableObjects.ofTable(
+            schemaObject, "table", 
Lists.newArrayList(Privileges.SelectTable.allow()));
 
     // insert role
     RoleEntity role1 =
@@ -143,6 +198,11 @@ class TestRoleMetaService extends TestJDBCBackend {
             auditInfo,
             Lists.newArrayList(
                 catalogObject,
+                metalakeObject,
+                schemaObject,
+                filesetObject,
+                topicObject,
+                tableObject,
                 SecurableObjects.ofCatalog(
                     "anotherCatalog", 
Lists.newArrayList(Privileges.UseCatalog.allow()))),
             ImmutableMap.of("k1", "v1"));
@@ -297,6 +357,46 @@ class TestRoleMetaService extends TestJDBCBackend {
     Assertions.assertTrue(groupRoleRels.isEmpty());
   }
 
+  @Test
+  void listRolesBySecurableObject() throws IOException {
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    CatalogEntity catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"), 
"catalog", auditInfo);
+    backend.insert(catalog, false);
+
+    RoleEntity role1 =
+        createRoleEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            AuthorizationUtils.ofRoleNamespace(metalakeName),
+            "role1",
+            auditInfo,
+            "catalog");
+
+    RoleEntity role2 =
+        createRoleEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            AuthorizationUtils.ofRoleNamespace(metalakeName),
+            "role2",
+            auditInfo,
+            "catalog");
+
+    RoleMetaService roleMetaService = RoleMetaService.getInstance();
+    roleMetaService.insertRole(role1, false);
+    roleMetaService.insertRole(role2, false);
+
+    List<RoleEntity> roleEntities =
+        roleMetaService.listRolesByMetadataObjectIdentAndType(
+            catalog.nameIdentifier(), catalog.type());
+    roleEntities.sort(Comparator.comparing(RoleEntity::name));
+    Assertions.assertEquals(Lists.newArrayList(role1, role2), roleEntities);
+  }
+
   @Test
   void deleteMetalake() throws IOException {
     AuditInfo auditInfo =
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java 
b/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java
index a3699c0f9..c67b4bd2b 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java
@@ -182,58 +182,51 @@ public class RoleOperations {
 
     String existErrMsg = "Securable object %s doesn't exist";
 
-    TreeLockUtils.doWithTreeLock(
-        identifier,
-        LockType.READ,
-        () -> {
-          switch (object.type()) {
-            case METALAKE:
-              if 
(!GravitinoEnv.getInstance().metalakeDispatcher().metalakeExists(identifier)) {
-                throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
-              }
-
-              break;
-
-            case CATALOG:
-              if 
(!GravitinoEnv.getInstance().catalogDispatcher().catalogExists(identifier)) {
-                throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
-              }
-
-              break;
-
-            case SCHEMA:
-              if 
(!GravitinoEnv.getInstance().schemaDispatcher().schemaExists(identifier)) {
-                throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
-              }
-
-              break;
-
-            case FILESET:
-              if 
(!GravitinoEnv.getInstance().filesetDispatcher().filesetExists(identifier)) {
-                throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
-              }
-
-              break;
-            case TABLE:
-              if 
(!GravitinoEnv.getInstance().tableDispatcher().tableExists(identifier)) {
-                throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
-              }
-
-              break;
-
-            case TOPIC:
-              if 
(!GravitinoEnv.getInstance().topicDispatcher().topicExists(identifier)) {
-                throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
-              }
-
-              break;
-
-            default:
-              throw new IllegalArgumentException(
-                  String.format("Doesn't support the type %s", object.type()));
-          }
-
-          return null;
-        });
+    switch (object.type()) {
+      case METALAKE:
+        if 
(!GravitinoEnv.getInstance().metalakeDispatcher().metalakeExists(identifier)) {
+          throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
+        }
+
+        break;
+
+      case CATALOG:
+        if 
(!GravitinoEnv.getInstance().catalogDispatcher().catalogExists(identifier)) {
+          throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
+        }
+
+        break;
+
+      case SCHEMA:
+        if 
(!GravitinoEnv.getInstance().schemaDispatcher().schemaExists(identifier)) {
+          throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
+        }
+
+        break;
+
+      case FILESET:
+        if 
(!GravitinoEnv.getInstance().filesetDispatcher().filesetExists(identifier)) {
+          throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
+        }
+
+        break;
+      case TABLE:
+        if 
(!GravitinoEnv.getInstance().tableDispatcher().tableExists(identifier)) {
+          throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
+        }
+
+        break;
+
+      case TOPIC:
+        if 
(!GravitinoEnv.getInstance().topicDispatcher().topicExists(identifier)) {
+          throw new NoSuchMetadataObjectException(existErrMsg, 
object.fullName());
+        }
+
+        break;
+
+      default:
+        throw new IllegalArgumentException(
+            String.format("Doesn't support the type %s", object.type()));
+    }
   }
 }

Reply via email to