This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fd6ec148 [#5146] fix(core): Support to rename and delete metadata 
object in the authorization plugin (#5321)
2fd6ec148 is described below

commit 2fd6ec148380aea1124b45956609eff019ced8a8
Author: roryqi <ror...@apache.org>
AuthorDate: Thu Oct 31 18:49:15 2024 +0800

    [#5146] fix(core): Support to rename and delete metadata object in the 
authorization plugin (#5321)
    
    ### What changes were proposed in this pull request?
    
    Support to rename and delete metadata object in the authorization plugin
    
    ### Why are the changes needed?
    
    Fix: #5146
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    add new IT and UT
---
 .../ranger/integration/test/RangerHiveE2EIT.java   | 487 ++++++++++++++++++++-
 .../authorization/AccessControlDispatcher.java     |   1 +
 .../authorization/AuthorizationUtils.java          |  57 ++-
 .../gravitino/hook/CatalogHookDispatcher.java      |  16 +-
 .../gravitino/hook/FilesetHookDispatcher.java      |  18 +-
 .../gravitino/hook/MetalakeHookDispatcher.java     |   9 +
 .../gravitino/hook/SchemaHookDispatcher.java       |   6 +-
 .../apache/gravitino/hook/TableHookDispatcher.java |  24 +-
 .../apache/gravitino/hook/TopicHookDispatcher.java |   4 +-
 .../gravitino/storage/relational/JDBCBackend.java  |   2 +-
 .../relational/service/RoleMetaService.java        |   2 +-
 .../catalog/TestFilesetOperationDispatcher.java    |   8 +
 .../catalog/TestTableOperationDispatcher.java      |   8 +
 .../catalog/TestTopicOperationDispatcher.java      |   8 +
 .../gravitino/hook/TestFilesetHookDispatcher.java  | 108 +++++
 .../gravitino/hook/TestTableHookDispatcher.java    | 181 ++++++++
 .../gravitino/hook/TestTopicHookDispatcher.java    |  79 ++++
 .../relational/service/TestRoleMetaService.java    |   3 +-
 18 files changed, 988 insertions(+), 33 deletions(-)

diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
index 565acb82f..409ddf48e 100644
--- 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
@@ -45,6 +45,7 @@ import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.Privileges;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.authorization.SecurableObjects;
@@ -55,6 +56,7 @@ import 
org.apache.gravitino.integration.test.container.HiveContainer;
 import org.apache.gravitino.integration.test.container.RangerContainer;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.TableChange;
 import org.apache.kyuubi.plugin.spark.authz.AccessControlException;
 import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.AnalysisException;
@@ -93,6 +95,8 @@ public class RangerHiveE2EIT extends BaseIT {
 
   private static final String SQL_CREATE_SCHEMA = String.format("CREATE 
DATABASE %s", schemaName);
 
+  private static final String SQL_DROP_SCHEMA = String.format("DROP DATABASE 
%s", schemaName);
+
   private static final String SQL_USE_SCHEMA = String.format("USE SCHEMA %s", 
schemaName);
 
   private static final String SQL_CREATE_TABLE =
@@ -112,6 +116,12 @@ public class RangerHiveE2EIT extends BaseIT {
   private static final String SQL_ALTER_TABLE =
       String.format("ALTER TABLE %s ADD COLUMN d string", tableName);
 
+  private static final String SQL_RENAME_TABLE =
+      String.format("ALTER TABLE %s RENAME TO new_table", tableName);
+
+  private static final String SQL_RENAME_BACK_TABLE =
+      String.format("ALTER TABLE new_table RENAME TO %s", tableName);
+
   private static final String SQL_DROP_TABLE = String.format("DROP TABLE %s", 
tableName);
 
   private static String RANGER_ADMIN_URL = null;
@@ -244,6 +254,9 @@ public class RangerHiveE2EIT extends BaseIT {
     // Third, succeed to create the schema
     sparkSession.sql(SQL_CREATE_SCHEMA);
 
+    // Fourth, fail to create the table
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+
     // Clean up
     catalog.asSchemas().dropSchema(schemaName, true);
     metalake.deleteRole(roleName);
@@ -295,7 +308,7 @@ public class RangerHiveE2EIT extends BaseIT {
   }
 
   @Test
-  void testReadWriteTable() throws InterruptedException {
+  void testReadWriteTableWithMetalakeLevelRole() throws InterruptedException {
     // First, create a role for creating a database and grant role to the user
     String readWriteRole = currentFunName();
     SecurableObject securableObject =
@@ -337,10 +350,92 @@ public class RangerHiveE2EIT extends BaseIT {
     // case 6: Fail to drop the table
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
 
+    // case 7: If we don't have the role, we can't insert and select from data.
+    metalake.deleteRole(readWriteRole);
+    waitForUpdatingPolicies();
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+
+    // Clean up
+    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
+    catalog.asSchemas().dropSchema(schemaName, true);
+  }
+
+  @Test
+  void testReadWriteTableWithTableLevelRole() throws InterruptedException {
+    // First, create a role for creating a database and grant role to the user
+    String roleName = currentFunName();
+    SecurableObject securableObject =
+        SecurableObjects.ofMetalake(
+            metalakeName,
+            Lists.newArrayList(
+                Privileges.UseSchema.allow(),
+                Privileges.CreateSchema.allow(),
+                Privileges.CreateTable.allow()));
+    String userName1 = System.getenv(HADOOP_USER_NAME);
+    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
+    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
+    waitForUpdatingPolicies();
+    // Second, create a schema
+    sparkSession.sql(SQL_CREATE_SCHEMA);
+
+    // Third, create a table
+    sparkSession.sql(SQL_USE_SCHEMA);
+    sparkSession.sql(SQL_CREATE_TABLE);
+
+    // Fourth, revoke and grant a table level role
+    metalake.deleteRole(roleName);
+    securableObject =
+        SecurableObjects.parse(
+            String.format("%s.%s.%s", catalogName, schemaName, tableName),
+            MetadataObject.Type.TABLE,
+            Lists.newArrayList(Privileges.ModifyTable.allow(), 
Privileges.SelectTable.allow()));
+    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
+    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
+    waitForUpdatingPolicies();
+
+    // case 1: Succeed to insert data into table
+    sparkSession.sql(SQL_INSERT_TABLE);
+
+    // case 2: Succeed to select data from the table
+    sparkSession.sql(SQL_SELECT_TABLE).collectAsList();
+
+    // case 3: Fail to update data in the table, Because Hive doesn't support.
+    Assertions.assertThrows(
+        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
+
+    // case 4: Fail to delete data from the table, Because Hive doesn't 
support.
+    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
+
+    // case 5: Succeed to alter the table
+    sparkSession.sql(SQL_ALTER_TABLE);
+
+    // case 6: Fail to drop the table
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
+
+    // case 7: If we don't have the role, we can't insert and select from data.
+    metalake.deleteRole(roleName);
+    waitForUpdatingPolicies();
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
     catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.deleteRole(readWriteRole);
   }
 
   @Test
@@ -385,16 +480,28 @@ public class RangerHiveE2EIT extends BaseIT {
     // case 6: Fail to drop the table
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
 
+    // case 7: If we don't have the role, we can't insert and select from data.
+    metalake.deleteRole(readOnlyRole);
+    waitForUpdatingPolicies();
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
     catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.deleteRole(readOnlyRole);
   }
 
   @Test
   void testWriteOnlyTable() throws InterruptedException {
     // First, create a role for creating a database and grant role to the user
-    String readOnlyRole = currentFunName();
+    String writeOnlyRole = currentFunName();
     SecurableObject securableObject =
         SecurableObjects.ofMetalake(
             metalakeName,
@@ -404,8 +511,8 @@ public class RangerHiveE2EIT extends BaseIT {
                 Privileges.CreateTable.allow(),
                 Privileges.ModifyTable.allow()));
     String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.createRole(readOnlyRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    metalake.grantRolesToUser(Lists.newArrayList(readOnlyRole), userName1);
+    metalake.createRole(writeOnlyRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
+    metalake.grantRolesToUser(Lists.newArrayList(writeOnlyRole), userName1);
     waitForUpdatingPolicies();
     // Second, create a schema
     sparkSession.sql(SQL_CREATE_SCHEMA);
@@ -434,10 +541,22 @@ public class RangerHiveE2EIT extends BaseIT {
     // case 6: Fail to drop the table
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
 
+    // case 7: If we don't have the role, we can't insert and select from data.
+    metalake.deleteRole(writeOnlyRole);
+    waitForUpdatingPolicies();
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
     catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.deleteRole(readOnlyRole);
   }
 
   @Test
@@ -471,10 +590,10 @@ public class RangerHiveE2EIT extends BaseIT {
 
     waitForUpdatingPolicies();
 
-    // Test to create a schema
+    // Test to create the schema
     sparkSession.sql(SQL_CREATE_SCHEMA);
 
-    // Test to creat a table
+    // Test to create a table
     sparkSession.sql(SQL_USE_SCHEMA);
     sparkSession.sql(SQL_CREATE_TABLE);
 
@@ -486,6 +605,9 @@ public class RangerHiveE2EIT extends BaseIT {
 
   @Test
   void testDeleteAndRecreateRole() throws InterruptedException {
+    // Fail to create schema
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
+
     // Create a role with CREATE_SCHEMA privilege
     String roleName = currentFunName();
     SecurableObject securableObject =
@@ -527,6 +649,271 @@ public class RangerHiveE2EIT extends BaseIT {
     metalake.deleteRole(roleName);
   }
 
+  @Test
+  void testDeleteAndRecreateMetadataObject() throws InterruptedException {
+    // Create a role with CREATE_SCHEMA privilege
+    String roleName = currentFunName();
+    SecurableObject securableObject =
+        SecurableObjects.parse(
+            String.format("%s", catalogName),
+            MetadataObject.Type.CATALOG,
+            Lists.newArrayList(Privileges.CreateSchema.allow()));
+    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
+
+    // Granted this role to the spark execution user `HADOOP_USER_NAME`
+    String userName1 = System.getenv(HADOOP_USER_NAME);
+    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
+    waitForUpdatingPolicies();
+
+    // Create a schema
+    sparkSession.sql(SQL_CREATE_SCHEMA);
+
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
+
+    // Set owner
+    MetadataObject schemaObject =
+        MetadataObjects.of(catalogName, schemaName, 
MetadataObject.Type.SCHEMA);
+    metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
+    waitForUpdatingPolicies();
+
+    // Delete a schema
+    sparkSession.sql(SQL_DROP_SCHEMA);
+    catalog.asSchemas().dropSchema(schemaName, true);
+    waitForUpdatingPolicies();
+
+    // Recreate a schema
+    sparkSession.sql(SQL_CREATE_SCHEMA);
+
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
+
+    // Set owner
+    schemaObject = MetadataObjects.of(catalogName, schemaName, 
MetadataObject.Type.SCHEMA);
+    metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
+    waitForUpdatingPolicies();
+    sparkSession.sql(SQL_DROP_SCHEMA);
+
+    // Delete the role and fail to create schema
+    metalake.deleteRole(roleName);
+    waitForUpdatingPolicies();
+
+    sparkSession.sql(SQL_CREATE_SCHEMA);
+
+    // Clean up
+    catalog.asSchemas().dropSchema(schemaName, true);
+  }
+
+  @Test
+  void testRenameMetadataObject() throws InterruptedException {
+    // Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
+    String roleName = currentFunName();
+    SecurableObject securableObject =
+        SecurableObjects.parse(
+            String.format("%s", catalogName),
+            MetadataObject.Type.CATALOG,
+            Lists.newArrayList(
+                Privileges.UseCatalog.allow(),
+                Privileges.CreateSchema.allow(),
+                Privileges.CreateTable.allow(),
+                Privileges.ModifyTable.allow()));
+    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
+    // Granted this role to the spark execution user `HADOOP_USER_NAME`
+    String userName1 = System.getenv(HADOOP_USER_NAME);
+    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
+
+    waitForUpdatingPolicies();
+
+    // Create a schema and a table
+    sparkSession.sql(SQL_CREATE_SCHEMA);
+    sparkSession.sql(SQL_USE_SCHEMA);
+    sparkSession.sql(SQL_CREATE_TABLE);
+
+    // Rename a table and rename back
+    sparkSession.sql(SQL_RENAME_TABLE);
+    sparkSession.sql(SQL_RENAME_BACK_TABLE);
+
+    // Clean up
+    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
+    catalog.asSchemas().dropSchema(schemaName, true);
+    metalake.deleteRole(roleName);
+  }
+
+  @Test
+  void testRenameMetadataObjectPrivilege() throws InterruptedException {
+    // Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
+    String roleName = currentFunName();
+    SecurableObject securableObject =
+        SecurableObjects.parse(
+            String.format("%s", catalogName),
+            MetadataObject.Type.CATALOG,
+            Lists.newArrayList(
+                Privileges.UseCatalog.allow(),
+                Privileges.CreateSchema.allow(),
+                Privileges.CreateTable.allow(),
+                Privileges.ModifyTable.allow()));
+    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
+    // Granted this role to the spark execution user `HADOOP_USER_NAME`
+    String userName1 = System.getenv(HADOOP_USER_NAME);
+    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
+
+    waitForUpdatingPolicies();
+
+    // Create a schema and a table
+    sparkSession.sql(SQL_CREATE_SCHEMA);
+    sparkSession.sql(SQL_USE_SCHEMA);
+    sparkSession.sql(SQL_CREATE_TABLE);
+
+    // Rename a table and rename back
+    catalog
+        .asTableCatalog()
+        .alterTable(NameIdentifier.of(schemaName, tableName), 
TableChange.rename("new_table"));
+
+    // Succeed to insert data
+    sparkSession.sql("INSERT INTO new_table (a, b, c) VALUES (1, 'a', 'b')");
+
+    catalog
+        .asTableCatalog()
+        .alterTable(NameIdentifier.of(schemaName, "new_table"), 
TableChange.rename(tableName));
+
+    // Succeed to insert data
+    sparkSession.sql(SQL_INSERT_TABLE);
+
+    // Clean up
+    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
+    catalog.asSchemas().dropSchema(schemaName, true);
+    metalake.deleteRole(roleName);
+  }
+
+  @Test
+  void testChangeOwner() throws InterruptedException {
+    // Create a schema and a table
+    String helperRole = currentFunName();
+    SecurableObject securableObject =
+        SecurableObjects.ofMetalake(
+            metalakeName,
+            Lists.newArrayList(
+                Privileges.UseSchema.allow(),
+                Privileges.CreateSchema.allow(),
+                Privileges.CreateTable.allow(),
+                Privileges.ModifyTable.allow()));
+    String userName1 = System.getenv(HADOOP_USER_NAME);
+    metalake.createRole(helperRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
+    metalake.grantRolesToUser(Lists.newArrayList(helperRole), userName1);
+    waitForUpdatingPolicies();
+
+    // Create a schema and a table
+    sparkSession.sql(SQL_CREATE_SCHEMA);
+    sparkSession.sql(SQL_USE_SCHEMA);
+    sparkSession.sql(SQL_CREATE_TABLE);
+    sparkSession.sql(SQL_INSERT_TABLE);
+
+    metalake.revokeRolesFromUser(Lists.newArrayList(helperRole), userName1);
+    metalake.deleteRole(helperRole);
+    waitForUpdatingPolicies();
+
+    // case 1. Have none of privileges of the table
+
+    // - a. Fail to insert data into the table
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
+
+    // - b. Fail to select data from the table
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
+
+    // - c: Fail to update data in the table
+    Assertions.assertThrows(
+        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
+
+    // - d: Fail to delete data from the table
+    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
+
+    // - e: Fail to alter the table
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_ALTER_TABLE));
+
+    // - f: Fail to drop the table
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
+
+    // case 2. user is the  table owner
+    MetadataObject tableObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogName, schemaName, tableName), 
MetadataObject.Type.TABLE);
+    metalake.setOwner(tableObject, userName1, Owner.Type.USER);
+    waitForUpdatingPolicies();
+
+    // Owner has all the privileges except for creating table
+    checkTableAllPrivilegesExceptForCreating();
+
+    // Delete Gravitino's meta data
+    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
+    waitForUpdatingPolicies();
+
+    // Fail to create the table
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+
+    // case 3. user is the schema owner
+    MetadataObject schemaObject =
+        MetadataObjects.of(catalogName, schemaName, 
MetadataObject.Type.SCHEMA);
+    metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
+    waitForUpdatingPolicies();
+
+    // Succeed to create a table
+    sparkSession.sql(SQL_CREATE_TABLE);
+
+    // Succeed to check other table privileges
+    checkTableAllPrivilegesExceptForCreating();
+
+    // Succeed to drop schema
+    sparkSession.sql(SQL_DROP_SCHEMA);
+    catalog.asSchemas().dropSchema(schemaName, true);
+    waitForUpdatingPolicies();
+
+    // Fail to create schema
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
+
+    // case 4. user is the catalog owner
+    MetadataObject catalogObject =
+        MetadataObjects.of(null, catalogName, MetadataObject.Type.CATALOG);
+    metalake.setOwner(catalogObject, userName1, Owner.Type.USER);
+    waitForUpdatingPolicies();
+
+    // Succeed to create a schema
+    sparkSession.sql(SQL_CREATE_SCHEMA);
+
+    // Succeed to create a table
+    sparkSession.sql(SQL_CREATE_TABLE);
+
+    // Succeed to check other table privileges
+    checkTableAllPrivilegesExceptForCreating();
+
+    // Succeed to drop schema
+    sparkSession.sql(SQL_DROP_SCHEMA);
+    catalog.asSchemas().dropSchema(schemaName, true);
+    waitForUpdatingPolicies();
+
+    metalake.setOwner(catalogObject, AuthConstants.ANONYMOUS_USER, 
Owner.Type.USER);
+    // case 5. user is the metalake owner
+    MetadataObject metalakeObject =
+        MetadataObjects.of(null, metalakeName, MetadataObject.Type.METALAKE);
+    metalake.setOwner(metalakeObject, userName1, Owner.Type.USER);
+    waitForUpdatingPolicies();
+
+    // Succeed to create a schema
+    sparkSession.sql(SQL_CREATE_SCHEMA);
+
+    // Succeed to create a table
+    sparkSession.sql(SQL_CREATE_TABLE);
+
+    // Succeed to check other table privileges
+    checkTableAllPrivilegesExceptForCreating();
+
+    // Succeed to drop schema
+    sparkSession.sql(SQL_DROP_SCHEMA);
+
+    // Clean up
+    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
+    catalog.asSchemas().dropSchema(schemaName, true);
+  }
+
   @Test
   void testAllowUseSchemaPrivilege() throws InterruptedException {
     // Create a role with CREATE_SCHEMA privilege
@@ -587,6 +974,67 @@ public class RangerHiveE2EIT extends BaseIT {
     metalake.deleteRole(roleName);
   }
 
+  @Test
+  void testDenyPrivileges() throws InterruptedException {
+    // Create a schema
+    catalog.asSchemas().createSchema(schemaName, "test", 
Collections.emptyMap());
+
+    // Create a role with CREATE_SCHEMA privilege
+    String roleName = currentFunName();
+    SecurableObject allowObject =
+        SecurableObjects.parse(
+            String.format("%s", catalogName),
+            MetadataObject.Type.CATALOG,
+            Lists.newArrayList(Privileges.UseSchema.allow(), 
Privileges.CreateTable.allow()));
+    SecurableObject denyObject =
+        SecurableObjects.parse(
+            String.format("%s.%s", catalogName, schemaName),
+            MetadataObject.Type.SCHEMA,
+            Lists.newArrayList(Privileges.CreateTable.deny()));
+    // Create a role, catalog allows to create a table, schema denies to 
create a table
+    metalake.createRole(
+        roleName, Collections.emptyMap(), Lists.newArrayList(allowObject, 
denyObject));
+
+    // Granted this role to the spark execution user `HADOOP_USER_NAME`
+    String userName1 = System.getenv(HADOOP_USER_NAME);
+    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
+    waitForUpdatingPolicies();
+
+    // Fail to create a table
+    sparkSession.sql(SQL_USE_SCHEMA);
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+
+    // Delete the role
+    metalake.deleteRole(roleName);
+
+    // Create another role, but catalog denies to create a table, schema 
allows to create a table
+    allowObject =
+        SecurableObjects.parse(
+            String.format("%s", catalogName),
+            MetadataObject.Type.CATALOG,
+            Lists.newArrayList(Privileges.CreateTable.deny()));
+    denyObject =
+        SecurableObjects.parse(
+            String.format("%s.%s", catalogName, schemaName),
+            MetadataObject.Type.SCHEMA,
+            Lists.newArrayList(Privileges.CreateTable.allow()));
+    metalake.createRole(
+        roleName, Collections.emptyMap(), Lists.newArrayList(allowObject, 
denyObject));
+
+    // Granted this role to the spark execution user `HADOOP_USER_NAME`
+    userName1 = System.getenv(HADOOP_USER_NAME);
+    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
+
+    waitForUpdatingPolicies();
+
+    // Fail to create a table
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+
+    // Clean up
+    catalog.asSchemas().dropSchema(schemaName, true);
+    metalake.deleteRole(roleName);
+  }
+
   private void createMetalake() {
     GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
     Assertions.assertEquals(0, gravitinoMetalakes.length);
@@ -623,6 +1071,27 @@ public class RangerHiveE2EIT extends BaseIT {
     LOG.info("Catalog created: {}", catalog);
   }
 
+  private void checkTableAllPrivilegesExceptForCreating() {
+    // - a. Succeed to insert data into the table
+    sparkSession.sql(SQL_INSERT_TABLE);
+
+    // - b. Succeed to select data from the table
+    sparkSession.sql(SQL_SELECT_TABLE).collectAsList();
+
+    // - c: Fail to update data in the table. Because Hive doesn't support
+    Assertions.assertThrows(
+        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
+
+    // - d: Fail to delete data from the table, Because Hive doesn't support
+    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
+
+    // - e: Succeed to alter the table
+    sparkSession.sql(SQL_ALTER_TABLE);
+
+    // - f: Succeed to drop the table
+    sparkSession.sql(SQL_DROP_TABLE);
+  }
+
   private static void waitForUpdatingPolicies() throws InterruptedException {
     // After Ranger authorization, Must wait a period of time for the Ranger 
Spark plugin to update
     // the policy Sleep time must be greater than the policy update interval
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AccessControlDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/authorization/AccessControlDispatcher.java
index f5625d9d6..b75a67055 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AccessControlDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AccessControlDispatcher.java
@@ -272,6 +272,7 @@ public interface AccessControlDispatcher {
    * Lists the role names associated the metadata object.
    *
    * @param metalake The Metalake of the Role.
+   * @param object The object of the Roles.
    * @return The role list.
    * @throws NoSuchMetalakeException If the Metalake with the given name does 
not exist.
    * @throws NoSuchMetadataObjectException If the Metadata object with the 
given name does not
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
index 42dd9f830..ca5866558 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java
@@ -186,17 +186,6 @@ public class AuthorizationUtils {
     }
   }
 
-  private static void callAuthorizationPluginImpl(
-      Consumer<AuthorizationPlugin> consumer, Catalog catalog) {
-
-    if (catalog instanceof BaseCatalog) {
-      BaseCatalog baseCatalog = (BaseCatalog) catalog;
-      if (baseCatalog.getAuthorizationPlugin() != null) {
-        consumer.accept(baseCatalog.getAuthorizationPlugin());
-      }
-    }
-  }
-
   public static boolean 
needApplyAuthorizationPluginAllCatalogs(SecurableObject securableObject) {
     if (securableObject.type() == MetadataObject.Type.METALAKE) {
       List<Privilege> privileges = securableObject.privileges();
@@ -271,4 +260,50 @@ public class AuthorizationUtils {
   private static boolean needApplyAuthorization(MetadataObject.Type type) {
     return type != MetadataObject.Type.ROLE && type != 
MetadataObject.Type.METALAKE;
   }
+
+  private static void callAuthorizationPluginImpl(
+      Consumer<AuthorizationPlugin> consumer, Catalog catalog) {
+
+    if (catalog instanceof BaseCatalog) {
+      BaseCatalog baseCatalog = (BaseCatalog) catalog;
+      if (baseCatalog.getAuthorizationPlugin() != null) {
+        consumer.accept(baseCatalog.getAuthorizationPlugin());
+      }
+    }
+  }
+
+  public static void authorizationPluginRemovePrivileges(
+      NameIdentifier ident, Entity.EntityType type) {
+    // If we enable authorization, we should remove the privileges about the 
entity in the
+    // authorization plugin.
+    if (GravitinoEnv.getInstance().accessControlDispatcher() != null) {
+      MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(ident, type);
+      MetadataObjectChange removeObject = 
MetadataObjectChange.remove(metadataObject);
+      callAuthorizationPluginForMetadataObject(
+          ident.namespace().level(0),
+          metadataObject,
+          authorizationPlugin -> {
+            authorizationPlugin.onMetadataUpdated(removeObject);
+          });
+    }
+  }
+
+  public static void authorizationPluginRenamePrivileges(
+      NameIdentifier ident, Entity.EntityType type, String newName) {
+    // If we enable authorization, we should rename the privileges about the 
entity in the
+    // authorization plugin.
+    if (GravitinoEnv.getInstance().accessControlDispatcher() != null) {
+      MetadataObject oldMetadataObject = 
NameIdentifierUtil.toMetadataObject(ident, type);
+      MetadataObject newMetadataObject =
+          
NameIdentifierUtil.toMetadataObject(NameIdentifier.of(ident.namespace(), 
newName), type);
+      MetadataObjectChange renameObject =
+          MetadataObjectChange.rename(oldMetadataObject, newMetadataObject);
+      callAuthorizationPluginForMetadataObject(
+          ident.namespace().level(0),
+          oldMetadataObject,
+          authorizationPlugin -> {
+            authorizationPlugin.onMetadataUpdated(renameObject);
+          });
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java
index 3dc2bc2bd..efc6e2f4c 100644
--- a/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/CatalogHookDispatcher.java
@@ -104,17 +104,29 @@ public class CatalogHookDispatcher implements 
CatalogDispatcher {
   @Override
   public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
       throws NoSuchCatalogException, IllegalArgumentException {
-    return dispatcher.alterCatalog(ident, changes);
+    Catalog alteredCatalog = dispatcher.alterCatalog(ident, changes);
+    CatalogChange.RenameCatalog lastRenameChange = null;
+    for (CatalogChange change : changes) {
+      if (change instanceof CatalogChange.RenameCatalog) {
+        lastRenameChange = (CatalogChange.RenameCatalog) change;
+      }
+    }
+    if (lastRenameChange != null) {
+      AuthorizationUtils.authorizationPluginRenamePrivileges(
+          ident, Entity.EntityType.CATALOG, lastRenameChange.getNewName());
+    }
+    return alteredCatalog;
   }
 
   @Override
   public boolean dropCatalog(NameIdentifier ident) {
-    return dispatcher.dropCatalog(ident);
+    return dropCatalog(ident, false /* force */);
   }
 
   @Override
   public boolean dropCatalog(NameIdentifier ident, boolean force)
       throws NonEmptyEntityException, CatalogInUseException {
+    AuthorizationUtils.authorizationPluginRemovePrivileges(ident, 
Entity.EntityType.CATALOG);
     return dispatcher.dropCatalog(ident, force);
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
index e3272846d..40d0cc5ec 100644
--- a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
@@ -86,12 +86,26 @@ public class FilesetHookDispatcher implements 
FilesetDispatcher {
   @Override
   public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
       throws NoSuchFilesetException, IllegalArgumentException {
-    return dispatcher.alterFileset(ident, changes);
+    Fileset alteredFileset = dispatcher.alterFileset(ident, changes);
+    FilesetChange.RenameFileset lastRenameChange = null;
+    for (FilesetChange change : changes) {
+      if (change instanceof FilesetChange.RenameFileset) {
+        lastRenameChange = (FilesetChange.RenameFileset) change;
+      }
+    }
+    if (lastRenameChange != null) {
+      AuthorizationUtils.authorizationPluginRenamePrivileges(
+          ident, Entity.EntityType.FILESET, lastRenameChange.getNewName());
+    }
+
+    return alteredFileset;
   }
 
   @Override
   public boolean dropFileset(NameIdentifier ident) {
-    return dispatcher.dropFileset(ident);
+    boolean dropped = dispatcher.dropFileset(ident);
+    AuthorizationUtils.authorizationPluginRemovePrivileges(ident, 
Entity.EntityType.FILESET);
+    return dropped;
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java
index 95554857a..ba7dedfa5 100644
--- a/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java
@@ -85,6 +85,9 @@ public class MetalakeHookDispatcher implements 
MetalakeDispatcher {
   @Override
   public Metalake alterMetalake(NameIdentifier ident, MetalakeChange... 
changes)
       throws NoSuchMetalakeException, IllegalArgumentException {
+    // For underlying authorization plugins, the privilege information 
shouldn't
+    // contain metalake information, so metalake rename won't affect the 
privileges
+    // of the authorization plugin.
     return dispatcher.alterMetalake(ident, changes);
   }
 
@@ -104,6 +107,12 @@ public class MetalakeHookDispatcher implements 
MetalakeDispatcher {
     dispatcher.disableMetalake(ident);
   }
 
+  public boolean dropMetalake(NameIdentifier ident) {
+    // For metalake, we don't clear all the privileges of catalog 
authorization plugin.
+    // we just remove metalake.
+    return dispatcher.dropMetalake(ident);
+  }
+
   @Override
   public boolean metalakeExists(NameIdentifier ident) {
     return dispatcher.metalakeExists(ident);
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java
index 8b53f6e6d..e6e1a3736 100644
--- a/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/SchemaHookDispatcher.java
@@ -82,12 +82,16 @@ public class SchemaHookDispatcher implements 
SchemaDispatcher {
   @Override
   public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
       throws NoSuchSchemaException {
+    // Schema doesn't support to rename operation now. So we don't need to 
change
+    // authorization plugin privileges, too.
     return dispatcher.alterSchema(ident, changes);
   }
 
   @Override
   public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
-    return dispatcher.dropSchema(ident, cascade);
+    boolean dropped = dispatcher.dropSchema(ident, cascade);
+    AuthorizationUtils.authorizationPluginRemovePrivileges(ident, 
Entity.EntityType.SCHEMA);
+    return dropped;
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
index c887746b4..1fe9db5d7 100644
--- a/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/TableHookDispatcher.java
@@ -96,17 +96,35 @@ public class TableHookDispatcher implements TableDispatcher 
{
   @Override
   public Table alterTable(NameIdentifier ident, TableChange... changes)
       throws NoSuchTableException, IllegalArgumentException {
-    return dispatcher.alterTable(ident, changes);
+
+    Table alteredTable = dispatcher.alterTable(ident, changes);
+    TableChange.RenameTable lastRenameChange = null;
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.RenameTable) {
+        lastRenameChange = (TableChange.RenameTable) change;
+      }
+    }
+
+    if (lastRenameChange != null) {
+      AuthorizationUtils.authorizationPluginRenamePrivileges(
+          ident, Entity.EntityType.TABLE, lastRenameChange.getNewName());
+    }
+
+    return alteredTable;
   }
 
   @Override
   public boolean dropTable(NameIdentifier ident) {
-    return dispatcher.dropTable(ident);
+    boolean dropped = dispatcher.dropTable(ident);
+    AuthorizationUtils.authorizationPluginRemovePrivileges(ident, 
Entity.EntityType.TABLE);
+    return dropped;
   }
 
   @Override
   public boolean purgeTable(NameIdentifier ident) throws 
UnsupportedOperationException {
-    return dispatcher.purgeTable(ident);
+    boolean purged = dispatcher.purgeTable(ident);
+    AuthorizationUtils.authorizationPluginRemovePrivileges(ident, 
Entity.EntityType.TABLE);
+    return purged;
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java
index ad0ec8c58..bc0caeb3d 100644
--- a/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/TopicHookDispatcher.java
@@ -88,7 +88,9 @@ public class TopicHookDispatcher implements TopicDispatcher {
 
   @Override
   public boolean dropTopic(NameIdentifier ident) {
-    return dispatcher.dropTopic(ident);
+    boolean dropped = dispatcher.dropTopic(ident);
+    AuthorizationUtils.authorizationPluginRemovePrivileges(ident, 
Entity.EntityType.TOPIC);
+    return dropped;
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 5a6eb2e09..1534a7397 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -387,7 +387,7 @@ public class JDBCBackend implements RelationalBackend {
       case METADATA_OBJECT_ROLE_REL:
         return (List<E>)
             RoleMetaService.getInstance()
-                .listRolesByMetadataObjectIdentAndType(nameIdentifier, 
identType, allFields);
+                .listRolesByMetadataObject(nameIdentifier, identType, 
allFields);
       case ROLE_GROUP_REL:
         if (identType == Entity.EntityType.ROLE) {
           return (List<E>) 
GroupMetaService.getInstance().listGroupsByRoleIdent(nameIdentifier);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
index 0236b01fa..b08e60ab6 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
@@ -84,7 +84,7 @@ public class RoleMetaService {
         RoleMetaMapper.class, mapper -> mapper.listRolesByUserId(userId));
   }
 
-  public List<RoleEntity> listRolesByMetadataObjectIdentAndType(
+  public List<RoleEntity> listRolesByMetadataObject(
       NameIdentifier metadataObjectIdent, Entity.EntityType 
metadataObjectType, boolean allFields) {
     String metalake = NameIdentifierUtil.getMetalake(metadataObjectIdent);
     long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java
index 4fa3cecbb..b9b80b18c 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java
@@ -50,6 +50,14 @@ public class TestFilesetOperationDispatcher extends 
TestOperationDispatcher {
         new FilesetOperationDispatcher(catalogManager, entityStore, 
idGenerator);
   }
 
+  public static FilesetOperationDispatcher getFilesetOperationDispatcher() {
+    return filesetOperationDispatcher;
+  }
+
+  public static SchemaOperationDispatcher getSchemaOperationDispatcher() {
+    return schemaOperationDispatcher;
+  }
+
   @Test
   public void testCreateAndListFilesets() {
     Namespace filesetNs = Namespace.of(metalake, catalog, "schema81");
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
index 6acec229e..cbdbc4848 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
@@ -875,4 +875,12 @@ public class TestTableOperationDispatcher extends 
TestOperationDispatcher {
           Assertions.assertEquals(e.defaultValue(), 
actualColumn.defaultValue());
         });
   }
+
+  public static TableOperationDispatcher getTableOperationDispatcher() {
+    return tableOperationDispatcher;
+  }
+
+  public static SchemaOperationDispatcher getSchemaOperationDispatcher() {
+    return schemaOperationDispatcher;
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
index ac6b3bea4..7ee545e8e 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
@@ -270,4 +270,12 @@ public class TestTopicOperationDispatcher extends 
TestOperationDispatcher {
     
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()), 
SCHEMA));
     Assertions.assertTrue(entityStore.exists(topicIdent, 
Entity.EntityType.TOPIC));
   }
+
+  public static SchemaOperationDispatcher getSchemaOperationDispatcher() {
+    return schemaOperationDispatcher;
+  }
+
+  public static TopicOperationDispatcher getTopicOperationDispatcher() {
+    return topicOperationDispatcher;
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestFilesetHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestFilesetHookDispatcher.java
new file mode 100644
index 000000000..63475ab05
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/hook/TestFilesetHookDispatcher.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.AccessControlManager;
+import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.catalog.TestFilesetOperationDispatcher;
+import org.apache.gravitino.catalog.TestOperationDispatcher;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.file.FilesetChange;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestFilesetHookDispatcher extends TestOperationDispatcher {
+
+  private static FilesetHookDispatcher filesetHookDispatcher;
+  private static SchemaHookDispatcher schemaHookDispatcher;
+  private static AccessControlManager accessControlManager =
+      Mockito.mock(AccessControlManager.class);
+  private static AuthorizationPlugin authorizationPlugin;
+
+  @BeforeAll
+  public static void initialize() throws IOException, IllegalAccessException {
+    TestFilesetOperationDispatcher.initialize();
+
+    filesetHookDispatcher =
+        new 
FilesetHookDispatcher(TestFilesetOperationDispatcher.getFilesetOperationDispatcher());
+    schemaHookDispatcher =
+        new 
SchemaHookDispatcher(TestFilesetOperationDispatcher.getSchemaOperationDispatcher());
+
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "accessControlDispatcher", 
accessControlManager, true);
+    catalogManager = Mockito.mock(CatalogManager.class);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
catalogManager, true);
+    BaseCatalog catalog = Mockito.mock(BaseCatalog.class);
+    Mockito.when(catalogManager.loadCatalog(any())).thenReturn(catalog);
+    authorizationPlugin = Mockito.mock(AuthorizationPlugin.class);
+    
Mockito.when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
+  }
+
+  @Test
+  public void testDropAuthorizationPrivilege() {
+    Namespace filesetNs = Namespace.of(metalake, catalog, "schema11212");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaHookDispatcher.createSchema(NameIdentifier.of(filesetNs.levels()), 
"comment", props);
+
+    NameIdentifier filesetIdent = NameIdentifier.of(filesetNs, "filesetNAME1");
+    filesetHookDispatcher.createFileset(
+        filesetIdent, "comment", Fileset.Type.MANAGED, "fileset41", props);
+    Mockito.reset(authorizationPlugin);
+
+    filesetHookDispatcher.dropFileset(filesetIdent);
+    Mockito.verify(authorizationPlugin).onMetadataUpdated(any());
+
+    Mockito.reset(authorizationPlugin);
+    schemaHookDispatcher.dropSchema(NameIdentifier.of(filesetNs.levels()), 
true);
+    Mockito.verify(authorizationPlugin).onMetadataUpdated(any());
+  }
+
+  @Test
+  public void testRenameAuthorizationPrivilege() {
+    Namespace filesetNs = Namespace.of(metalake, catalog, "schema1121");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaHookDispatcher.createSchema(NameIdentifier.of(filesetNs.levels()), 
"comment", props);
+
+    NameIdentifier filesetIdent = NameIdentifier.of(filesetNs, "filesetNAME2");
+    filesetHookDispatcher.createFileset(
+        filesetIdent, "comment", Fileset.Type.MANAGED, "fileset41", props);
+
+    Mockito.reset(authorizationPlugin);
+    FilesetChange setChange = FilesetChange.setProperty("k1", "v1");
+    filesetHookDispatcher.alterFileset(filesetIdent, setChange);
+    Mockito.verify(authorizationPlugin, 
Mockito.never()).onMetadataUpdated(any());
+
+    Mockito.reset(authorizationPlugin);
+    FilesetChange renameChange = FilesetChange.rename("newName");
+    filesetHookDispatcher.alterFileset(filesetIdent, renameChange);
+    Mockito.verify(authorizationPlugin).onMetadataUpdated(any());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
new file mode 100644
index 000000000..fd1137a0e
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/hook/TestTableHookDispatcher.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.TestColumn;
+import org.apache.gravitino.authorization.AccessControlManager;
+import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.catalog.TestOperationDispatcher;
+import org.apache.gravitino.catalog.TestTableOperationDispatcher;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestTableHookDispatcher extends TestOperationDispatcher {
+
+  private static TableHookDispatcher tableHookDispatcher;
+  private static SchemaHookDispatcher schemaHookDispatcher;
+  private static AccessControlManager accessControlManager =
+      Mockito.mock(AccessControlManager.class);
+  private static AuthorizationPlugin authorizationPlugin;
+
+  @BeforeAll
+  public static void initialize() throws IOException, IllegalAccessException {
+    TestTableOperationDispatcher.initialize();
+
+    tableHookDispatcher =
+        new 
TableHookDispatcher(TestTableOperationDispatcher.getTableOperationDispatcher());
+    schemaHookDispatcher =
+        new 
SchemaHookDispatcher(TestTableOperationDispatcher.getSchemaOperationDispatcher());
+
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "accessControlDispatcher", 
accessControlManager, true);
+    catalogManager = Mockito.mock(CatalogManager.class);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
catalogManager, true);
+    BaseCatalog catalog = Mockito.mock(BaseCatalog.class);
+    Mockito.when(catalogManager.loadCatalog(any())).thenReturn(catalog);
+    authorizationPlugin = Mockito.mock(AuthorizationPlugin.class);
+    
Mockito.when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
+  }
+
+  @Test
+  public void testDropAuthorizationPrivilege() {
+    Namespace tableNs = Namespace.of(metalake, catalog, "schema1123");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaHookDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), 
"comment", props);
+
+    NameIdentifier tableIdent = NameIdentifier.of(tableNs, "tableNAME");
+    Column[] columns =
+        new Column[] {
+          TestColumn.builder()
+              .withName("colNAME1")
+              .withPosition(0)
+              .withType(Types.StringType.get())
+              .build(),
+          TestColumn.builder()
+              .withName("colNAME2")
+              .withPosition(1)
+              .withType(Types.StringType.get())
+              .build()
+        };
+    RangePartition assignedPartition =
+        Partitions.range(
+            "partition_V1",
+            Literals.stringLiteral("value1"),
+            Literals.stringLiteral("value2"),
+            null);
+    Transform[] transforms =
+        new Transform[] {
+          Transforms.range(
+              new String[] {columns[0].name()}, new RangePartition[] 
{assignedPartition})
+        };
+    Distribution distribution =
+        Distributions.fields(Strategy.HASH, 5, new String[] 
{columns[0].name()});
+    SortOrder[] sortOrders =
+        new SortOrder[] 
{SortOrders.ascending(NamedReference.field(columns[0].name()))};
+    Index[] indexes = new Index[] {Indexes.primary("index1", new String[][] 
{{columns[0].name()}})};
+    tableHookDispatcher.createTable(
+        tableIdent, columns, "comment", props, transforms, distribution, 
sortOrders, indexes);
+
+    Mockito.reset(authorizationPlugin);
+    tableHookDispatcher.dropTable(tableIdent);
+    Mockito.verify(authorizationPlugin).onMetadataUpdated(any());
+
+    Mockito.reset(authorizationPlugin);
+    schemaHookDispatcher.dropSchema(NameIdentifier.of(tableNs.levels()), true);
+    Mockito.verify(authorizationPlugin).onMetadataUpdated(any());
+  }
+
+  @Test
+  public void testRenameAuthorizationPrivilege() {
+    Namespace tableNs = Namespace.of(metalake, catalog, "schema1124");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaHookDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), 
"comment", props);
+
+    NameIdentifier tableIdent = NameIdentifier.of(tableNs, "tableNAME");
+    Column[] columns =
+        new Column[] {
+          TestColumn.builder()
+              .withName("colNAME1")
+              .withPosition(0)
+              .withType(Types.StringType.get())
+              .build(),
+          TestColumn.builder()
+              .withName("colNAME2")
+              .withPosition(1)
+              .withType(Types.StringType.get())
+              .build()
+        };
+    RangePartition assignedPartition =
+        Partitions.range(
+            "partition_V1",
+            Literals.stringLiteral("value1"),
+            Literals.stringLiteral("value2"),
+            null);
+    Transform[] transforms =
+        new Transform[] {
+          Transforms.range(
+              new String[] {columns[0].name()}, new RangePartition[] 
{assignedPartition})
+        };
+    Distribution distribution =
+        Distributions.fields(Strategy.HASH, 5, new String[] 
{columns[0].name()});
+    SortOrder[] sortOrders =
+        new SortOrder[] 
{SortOrders.ascending(NamedReference.field(columns[0].name()))};
+    Index[] indexes = new Index[] {Indexes.primary("index1", new String[][] 
{{columns[0].name()}})};
+    tableHookDispatcher.createTable(
+        tableIdent, columns, "comment", props, transforms, distribution, 
sortOrders, indexes);
+
+    Mockito.reset(authorizationPlugin);
+    TableChange setChange = TableChange.setProperty("k1", "v1");
+    tableHookDispatcher.alterTable(tableIdent, setChange);
+    Mockito.verify(authorizationPlugin, 
Mockito.never()).onMetadataUpdated(any());
+
+    Mockito.reset(authorizationPlugin);
+    TableChange renameChange = TableChange.rename("newName");
+    tableHookDispatcher.alterTable(tableIdent, renameChange);
+    Mockito.verify(authorizationPlugin).onMetadataUpdated(any());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestTopicHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestTopicHookDispatcher.java
new file mode 100644
index 000000000..5e2a51547
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/hook/TestTopicHookDispatcher.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.mockito.ArgumentMatchers.any;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.AccessControlManager;
+import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.catalog.TestOperationDispatcher;
+import org.apache.gravitino.catalog.TestTopicOperationDispatcher;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestTopicHookDispatcher extends TestOperationDispatcher {
+  private static TopicHookDispatcher topicHookDispatcher;
+  private static SchemaHookDispatcher schemaHookDispatcher;
+  private static AccessControlManager accessControlManager =
+      Mockito.mock(AccessControlManager.class);
+  private static AuthorizationPlugin authorizationPlugin;
+
+  @BeforeAll
+  public static void initialize() throws IOException, IllegalAccessException {
+    TestTopicOperationDispatcher.initialize();
+
+    topicHookDispatcher =
+        new 
TopicHookDispatcher(TestTopicOperationDispatcher.getTopicOperationDispatcher());
+    schemaHookDispatcher =
+        new 
SchemaHookDispatcher(TestTopicOperationDispatcher.getSchemaOperationDispatcher());
+
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "accessControlDispatcher", 
accessControlManager, true);
+    catalogManager = Mockito.mock(CatalogManager.class);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogManager", 
catalogManager, true);
+    BaseCatalog catalog = Mockito.mock(BaseCatalog.class);
+    Mockito.when(catalogManager.loadCatalog(any())).thenReturn(catalog);
+    authorizationPlugin = Mockito.mock(AuthorizationPlugin.class);
+    
Mockito.when(catalog.getAuthorizationPlugin()).thenReturn(authorizationPlugin);
+  }
+
+  @Test
+  public void testDropAuthorizationPrivilege() {
+    Namespace topicNs = Namespace.of(metalake, catalog, "schema1123");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaHookDispatcher.createSchema(NameIdentifier.of(topicNs.levels()), 
"comment", props);
+
+    NameIdentifier topicIdent = NameIdentifier.of(topicNs, "topicNAME");
+    topicHookDispatcher.createTopic(topicIdent, "comment", null, props);
+
+    Mockito.reset(authorizationPlugin);
+    topicHookDispatcher.dropTopic(topicIdent);
+    Mockito.verify(authorizationPlugin).onMetadataUpdated(any());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
index 14ba3254d..9d02accc5 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestRoleMetaService.java
@@ -442,8 +442,7 @@ class TestRoleMetaService extends TestJDBCBackend {
     roleMetaService.insertRole(role2, false);
 
     List<RoleEntity> roleEntities =
-        roleMetaService.listRolesByMetadataObjectIdentAndType(
-            catalog.nameIdentifier(), catalog.type(), true);
+        roleMetaService.listRolesByMetadataObject(catalog.nameIdentifier(), 
catalog.type(), true);
     roleEntities.sort(Comparator.comparing(RoleEntity::name));
     Assertions.assertEquals(Lists.newArrayList(role1, role2), roleEntities);
   }

Reply via email to