This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch branch-metadata-authz
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-metadata-authz by this 
push:
     new 06fc053d8e [#7522] feat(authz): Support schema authorization (#7536)
06fc053d8e is described below

commit 06fc053d8e2c94bd5046b7cd46049fe4493c7788
Author: yangyang zhong <[email protected]>
AuthorDate: Fri Jul 4 17:22:03 2025 +0800

    [#7522] feat(authz): Support schema authorization (#7536)
    
    ### What changes were proposed in this pull request?
    
    Support schema authorization
    
    ### Why are the changes needed?
    
    close #7522
    
    ### Does this PR introduce _any_ user-facing change?
    
    None
    
    ### How was this patch tested?
    
    
    
org.apache.gravitino.client.integration.test.authorization.SchemaAuthorizationIT
---
 .../authorization/BaseRestApiAuthorizationIT.java  |  24 ++-
 .../test/authorization/CatalogAuthorizationIT.java |   8 +-
 .../test/authorization/SchemaAuthorizationIT.java  | 238 +++++++++++++++++++++
 .../java/org/apache/gravitino/GravitinoEnv.java    |  20 ++
 .../authorization/GravitinoAuthorizer.java         |  28 ++-
 .../gravitino/authorization/OwnerManager.java      |  31 ++-
 .../hook/AccessControlHookDispatcher.java          |  33 ++-
 .../authorization/GravitinoAuthorizerProvider.java |   4 +
 .../server/authorization/MetadataFilterHelper.java |   1 +
 .../authorization/PassThroughAuthorizer.java       |  10 +
 .../AuthorizationExpressionEvaluator.java          |   2 +-
 .../authorization/jcasbin/JcasbinAuthorizer.java   |  15 +-
 .../authorization/MockGravitinoAuthorizer.java     |   1 +
 .../TestGravitinoAuthorizerProvider.java           |   1 +
 .../jcasbin/TestJcasbinAuthorizer.java             |  33 ++-
 .../web/filter/GravitinoInterceptionService.java   |   4 +-
 .../server/web/rest/SchemaOperations.java          |  64 ++++--
 .../filter/TestGravitinoInterceptionService.java   |   2 +-
 .../server/web/rest/TestOwnerOperations.java       |   3 +-
 .../server/web/rest/TestPermissionOperations.java  |   3 +-
 .../server/web/rest/TestSchemaOperations.java      |   3 +-
 21 files changed, 479 insertions(+), 49 deletions(-)

diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/BaseRestApiAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/BaseRestApiAuthorizationIT.java
index bcf2066cae..0bc18b4eab 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/BaseRestApiAuthorizationIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/BaseRestApiAuthorizationIT.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.server.authorization.jcasbin.JcasbinAuthorizer;
 import org.junit.jupiter.api.AfterAll;
@@ -35,10 +36,10 @@ public class BaseRestApiAuthorizationIT extends BaseIT {
 
   protected static final String USER = "tester";
 
-  protected static final String USER_WITH_AUTHORIZATION = "tester2";
+  protected static final String NORMAL_USER = "tester2";
 
-  /** Mock a user without permissions. */
-  protected static GravitinoAdminClient clientWithNoAuthorization;
+  /** Mock a normal user without permissions. */
+  protected static GravitinoAdminClient normalUserClient;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseRestApiAuthorizationIT.class);
 
@@ -53,12 +54,15 @@ public class BaseRestApiAuthorizationIT extends BaseIT {
             Configs.ENABLE_AUTHORIZATION.getKey(),
             "true",
             Configs.AUTHORIZATION_IMPL.getKey(),
-            JcasbinAuthorizer.class.getCanonicalName()));
+            JcasbinAuthorizer.class.getCanonicalName(),
+            Configs.CACHE_ENABLED.getKey(),
+            "false"));
     super.startIntegrationTest();
     client.createMetalake(METALAKE, "", new HashMap<>());
-    client.loadMetalake(METALAKE).addUser(USER_WITH_AUTHORIZATION);
-    clientWithNoAuthorization =
-        
GravitinoAdminClient.builder(serverUri).withSimpleAuth(USER_WITH_AUTHORIZATION).build();
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.addUser(USER);
+    gravitinoMetalake.addUser(NORMAL_USER);
+    normalUserClient = 
GravitinoAdminClient.builder(serverUri).withSimpleAuth(NORMAL_USER).build();
   }
 
   @AfterAll
@@ -66,9 +70,9 @@ public class BaseRestApiAuthorizationIT extends BaseIT {
   public void stopIntegrationTest() throws IOException, InterruptedException {
     client.dropMetalake(METALAKE, true);
 
-    if (clientWithNoAuthorization != null) {
-      clientWithNoAuthorization.close();
-      clientWithNoAuthorization = null;
+    if (normalUserClient != null) {
+      normalUserClient.close();
+      normalUserClient = null;
     }
 
     try {
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/CatalogAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/CatalogAuthorizationIT.java
index 105f7cef66..0fcf7569b4 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/CatalogAuthorizationIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/CatalogAuthorizationIT.java
@@ -65,7 +65,7 @@ public class CatalogAuthorizationIT extends 
BaseRestApiAuthorizationIT {
         "Can not access metadata {" + catalog1 + "}.",
         RuntimeException.class,
         () -> {
-          clientWithNoAuthorization
+          normalUserClient
               .loadMetalake(METALAKE)
               .createCatalog(catalog1, Catalog.Type.RELATIONAL, "hive", 
"comment", properties);
         });
@@ -79,7 +79,7 @@ public class CatalogAuthorizationIT extends 
BaseRestApiAuthorizationIT {
         "Can not access metadata {" + catalog1 + "}.",
         RuntimeException.class,
         () -> {
-          clientWithNoAuthorization
+          normalUserClient
               .loadMetalake(METALAKE)
               .createCatalog(catalog1, Catalog.Type.RELATIONAL, "hive", 
"comment", properties);
         });
@@ -88,7 +88,7 @@ public class CatalogAuthorizationIT extends 
BaseRestApiAuthorizationIT {
   @Test
   @Order(2)
   public void testListCatalog() {
-    String[] catalogs = 
clientWithNoAuthorization.loadMetalake(METALAKE).listCatalogs();
+    String[] catalogs = normalUserClient.loadMetalake(METALAKE).listCatalogs();
     assertEquals(0, catalogs.length);
     catalogs = client.loadMetalake(METALAKE).listCatalogs();
     assertEquals(2, catalogs.length);
@@ -105,7 +105,7 @@ public class CatalogAuthorizationIT extends 
BaseRestApiAuthorizationIT {
         "Can not access metadata {" + catalog1 + "}.",
         RuntimeException.class,
         () -> {
-          
clientWithNoAuthorization.loadMetalake(METALAKE).dropCatalog(catalog1, true);
+          normalUserClient.loadMetalake(METALAKE).dropCatalog(catalog1, true);
         });
     client.loadMetalake(METALAKE).dropCatalog(catalog1, true);
     catalogs = client.loadMetalake(METALAKE).listCatalogs();
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/SchemaAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/SchemaAuthorizationIT.java
new file mode 100644
index 0000000000..e6cae76f7c
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/SchemaAuthorizationIT.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.client.integration.test.authorization;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@Tag("gravitino-docker-test")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class SchemaAuthorizationIT extends BaseRestApiAuthorizationIT {
+
+  private static final String CATALOG = "catalog";
+
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+
+  private static String hmsUri;
+
+  private static String role = "role";
+
+  @BeforeAll
+  public void startIntegrationTest() throws Exception {
+    containerSuite.startHiveContainer();
+    super.startIntegrationTest();
+    hmsUri =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("metastore.uris", hmsUri);
+    client
+        .loadMetalake(METALAKE)
+        .createCatalog(CATALOG, Catalog.Type.RELATIONAL, "hive", "comment", 
properties);
+    assertThrows(
+        "Can not access metadata {" + CATALOG + "}.",
+        RuntimeException.class,
+        () -> {
+          normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+        });
+    // grant tester load catalog privilege
+    List<SecurableObject> securableObjects = new ArrayList<>();
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    Catalog catalogEntity = gravitinoMetalake.loadCatalog(CATALOG);
+    List<Privilege> privileges = new ArrayList<>();
+    privileges.add(Privileges.UseCatalog.allow());
+    SecurableObject securableObject = 
SecurableObjects.ofCatalog(catalogEntity.name(), privileges);
+    securableObjects.add(securableObject);
+    gravitinoMetalake.createRole(role, new HashMap<>(), securableObjects);
+    gravitinoMetalake.grantRolesToUser(ImmutableList.of(role), NORMAL_USER);
+    Catalog catalogLoadByTester2 = 
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    assertEquals(CATALOG, catalogLoadByTester2.name());
+  }
+
+  @Test
+  @Order(1)
+  public void testCreateSchema() {
+    // test catalog owner privilege
+    Catalog catalogEntityLoadByTester1 = 
client.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    catalogEntityLoadByTester1.asSchemas().createSchema("schema1", "test", new 
HashMap<>());
+    Catalog catalogEntityLoadByTester2 =
+        normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    assertThrows(
+        "Can not access metadata {" + CATALOG + "}.",
+        RuntimeException.class,
+        () -> {
+          catalogEntityLoadByTester2.asSchemas().createSchema("schema2", 
"test2", new HashMap<>());
+        });
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    // test grant create schema privilege
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(null, CATALOG, MetadataObject.Type.CATALOG),
+        ImmutableList.of(Privileges.UseCatalog.allow(), 
Privileges.CreateSchema.allow()));
+    catalogEntityLoadByTester2.asSchemas().createSchema("schema2", "test2", 
new HashMap<>());
+    catalogEntityLoadByTester2.asSchemas().createSchema("schema3", "test2", 
new HashMap<>());
+  }
+
+  @Test
+  @Order(2)
+  public void testListSchema() {
+    Catalog catalogEntityLoadByTester1 = 
client.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    String[] schemasLoadByTester1 = 
catalogEntityLoadByTester1.asSchemas().listSchemas();
+    assertArrayEquals(
+        new String[] {"default", "schema1", "schema2", "schema3"}, 
schemasLoadByTester1);
+    Catalog catalogEntityLoadByTester2 =
+        normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    String[] schemasLoadByTester2 = 
catalogEntityLoadByTester2.asSchemas().listSchemas();
+    assertArrayEquals(new String[] {"schema2", "schema3"}, 
schemasLoadByTester2);
+  }
+
+  @Test
+  @Order(3)
+  public void testLoadSchema() {
+    Catalog catalogEntityLoadByTester2 =
+        normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    SupportsSchemas schemas = catalogEntityLoadByTester2.asSchemas();
+    String[] schemasLoadByTester2 = schemas.listSchemas();
+    assertArrayEquals(new String[] {"schema2", "schema3"}, 
schemasLoadByTester2);
+    assertThrows(
+        String.format("Can not access metadata {%s.%s}.", CATALOG, "schema1"),
+        RuntimeException.class,
+        () -> {
+          catalogEntityLoadByTester2.asSchemas().loadSchema("schema1");
+        });
+    // test grant use schema
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(CATALOG, "schema1", MetadataObject.Type.SCHEMA),
+        ImmutableList.of(Privileges.UseSchema.allow(), 
Privileges.UseSchema.allow()));
+    schemasLoadByTester2 = schemas.listSchemas();
+    assertArrayEquals(new String[] {"schema1", "schema2", "schema3"}, 
schemasLoadByTester2);
+  }
+
+  @Test
+  @Order(4)
+  public void testAlterSchema() {
+    // test catalog owner privilege
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    Catalog catalogEntityLoadByTester1 = 
gravitinoMetalake.loadCatalog(CATALOG);
+    catalogEntityLoadByTester1
+        .asSchemas()
+        .alterSchema("schema1", SchemaChange.setProperty("key", "value1"));
+    catalogEntityLoadByTester1
+        .asSchemas()
+        .alterSchema("schema2", SchemaChange.setProperty("key2", "value2"));
+    catalogEntityLoadByTester1
+        .asSchemas()
+        .alterSchema("schema3", SchemaChange.setProperty("key3", "value3"));
+    Catalog catalogEntityLoadByTester2 =
+        normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    assertThrows(
+        String.format("Can not access metadata {%s.%s}.", CATALOG, "schema1"),
+        RuntimeException.class,
+        () -> {
+          catalogEntityLoadByTester2
+              .asSchemas()
+              .alterSchema("schema1", SchemaChange.setProperty("key", 
"value"));
+        });
+    // test set owner
+    gravitinoMetalake.setOwner(
+        MetadataObjects.of(ImmutableList.of(CATALOG, "schema1"), 
MetadataObject.Type.SCHEMA),
+        NORMAL_USER,
+        Owner.Type.USER);
+    catalogEntityLoadByTester2
+        .asSchemas()
+        .alterSchema("schema1", SchemaChange.setProperty("key", "value"));
+    catalogEntityLoadByTester2
+        .asSchemas()
+        .alterSchema("schema2", SchemaChange.setProperty("key", "value"));
+  }
+
+  @Test
+  @Order(5)
+  public void testDropSchema() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    // reset owner
+    gravitinoMetalake.setOwner(
+        MetadataObjects.of(ImmutableList.of(CATALOG, "schema1"), 
MetadataObject.Type.SCHEMA),
+        USER,
+        Owner.Type.USER);
+    Catalog catalogEntityLoadByTester2 =
+        normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    assertThrows(
+        String.format("Can not access metadata {%s.%s}.", CATALOG, "schema1"),
+        RuntimeException.class,
+        () -> {
+          catalogEntityLoadByTester2.asSchemas().dropSchema("schema1", true);
+        });
+    catalogEntityLoadByTester2.asSchemas().dropSchema("schema2", true);
+    Catalog catalogEntityLoadByTester1 = 
gravitinoMetalake.loadCatalog(CATALOG);
+    catalogEntityLoadByTester1.asSchemas().dropSchema("schema1", true);
+    catalogEntityLoadByTester1.asSchemas().dropSchema("schema3", true);
+    String[] schemasLoadByTester1 = 
catalogEntityLoadByTester1.asSchemas().listSchemas();
+    assertArrayEquals(new String[] {"default"}, schemasLoadByTester1);
+    String[] schemasLoadByTester2 = 
catalogEntityLoadByTester2.asSchemas().listSchemas();
+    assertArrayEquals(new String[] {}, schemasLoadByTester2);
+    catalogEntityLoadByTester1.asSchemas().createSchema("schema1", "test", new 
HashMap<>());
+    // test catalog owner
+    assertThrows(
+        String.format("Can not access metadata {%s.%s}.", CATALOG, "schema1"),
+        RuntimeException.class,
+        () -> {
+          catalogEntityLoadByTester2.asSchemas().dropSchema("schema1", true);
+        });
+    gravitinoMetalake.setOwner(
+        MetadataObjects.of(ImmutableList.of(CATALOG, "schema1"), 
MetadataObject.Type.SCHEMA),
+        NORMAL_USER,
+        Owner.Type.USER);
+    catalogEntityLoadByTester2.asSchemas().dropSchema("schema1", true);
+    schemasLoadByTester1 = 
catalogEntityLoadByTester1.asSchemas().listSchemas();
+    assertArrayEquals(new String[] {"default"}, schemasLoadByTester1);
+    schemasLoadByTester2 = 
catalogEntityLoadByTester2.asSchemas().listSchemas();
+    assertArrayEquals(new String[] {}, schemasLoadByTester2);
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 91febb4220..c210336e47 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -23,6 +23,7 @@ import org.apache.gravitino.audit.AuditLogManager;
 import org.apache.gravitino.authorization.AccessControlDispatcher;
 import org.apache.gravitino.authorization.AccessControlManager;
 import org.apache.gravitino.authorization.FutureGrantManager;
+import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.OwnerManager;
 import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
 import org.apache.gravitino.catalog.CatalogDispatcher;
@@ -134,6 +135,7 @@ public class GravitinoEnv {
   private EventBus eventBus;
   private OwnerManager ownerManager;
   private FutureGrantManager futureGrantManager;
+  private GravitinoAuthorizer gravitinoAuthorizer;
 
   protected GravitinoEnv() {}
 
@@ -364,6 +366,24 @@ public class GravitinoEnv {
     return eventListenerManager;
   }
 
+  /**
+   * Set GravitinoAuthorizer to GravitinoEnv
+   *
+   * @param gravitinoAuthorizer the GravitinoAuthorizer instance
+   */
+  public void setGravitinoAuthorizer(GravitinoAuthorizer gravitinoAuthorizer) {
+    this.gravitinoAuthorizer = gravitinoAuthorizer;
+  }
+
+  /**
+   * Get The GravitinoAuthorizer
+   *
+   * @return the GravitinoAuthorizer instance
+   */
+  public GravitinoAuthorizer gravitinoAuthorizer() {
+    return gravitinoAuthorizer;
+  }
+
   public void start() {
     metricsSystem.start();
     eventListenerManager.start();
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/GravitinoAuthorizer.java
 
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
similarity index 75%
rename from 
server-common/src/main/java/org/apache/gravitino/server/authorization/GravitinoAuthorizer.java
rename to 
core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
index 43ffcc7fbe..37370e4c6b 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/GravitinoAuthorizer.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
@@ -15,14 +15,16 @@
  * under the License.
  */
 
-package org.apache.gravitino.server.authorization;
+package org.apache.gravitino.authorization;
 
 import java.io.Closeable;
 import java.security.Principal;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 
 /** Used for metadata authorization. */
 public interface GravitinoAuthorizer extends Closeable {
@@ -66,6 +68,28 @@ public interface GravitinoAuthorizer extends Closeable {
    */
   default void handleRolePrivilegeChange(Long roleId) {};
 
+  /**
+   * When the permissions of a role change, it is necessary to notify the 
GravitinoAuthorizer in
+   * order to clear the cache.
+   *
+   * @param metalake The metalake name;
+   * @param roleName The role name;
+   */
+  default void handleRolePrivilegeChange(String metalake, String roleName) {
+    try {
+      RoleEntity entity =
+          GravitinoEnv.getInstance()
+              .entityStore()
+              .get(
+                  NameIdentifierUtil.ofRole(metalake, roleName),
+                  Entity.EntityType.ROLE,
+                  RoleEntity.class);
+      handleRolePrivilegeChange(entity.id());
+    } catch (Exception e) {
+      throw new RuntimeException("Can not get Role Entity", e);
+    }
+  }
+
   /**
    * This method is called to clear the owner relationship in jcasbin when the 
owner of the metadata
    * changes.
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java 
b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
index 2847bbabe3..e27e8162a8 100644
--- a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
+++ b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Optional;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.SupportsRelationOperations;
@@ -34,6 +35,7 @@ import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +62,6 @@ public class OwnerManager {
 
   public void setOwner(
       String metalake, MetadataObject metadataObject, String ownerName, 
Owner.Type ownerType) {
-
     NameIdentifier objectIdent = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
     try {
       Optional<Owner> originOwner = getOwner(metalake, metadataObject);
@@ -113,6 +114,7 @@ public class OwnerManager {
           metadataObject,
           authorizationPlugin ->
               authorizationPlugin.onOwnerSet(metadataObject, 
originOwner.orElse(null), newOwner));
+      originOwner.ifPresent(owner -> notifyOwnerChange(owner, metalake, 
metadataObject));
     } catch (NoSuchEntityException nse) {
       LOG.warn(
           "Metadata object {} or owner {} is not found", 
metadataObject.fullName(), ownerName, nse);
@@ -128,6 +130,33 @@ public class OwnerManager {
     }
   }
 
+  private void notifyOwnerChange(Owner oldOwner, String metalake, 
MetadataObject metadataObject) {
+    GravitinoAuthorizer gravitinoAuthorizer = 
GravitinoEnv.getInstance().gravitinoAuthorizer();
+    if (gravitinoAuthorizer != null) {
+      if (oldOwner.type() == Owner.Type.USER) {
+        try {
+          UserEntity userEntity =
+              GravitinoEnv.getInstance()
+                  .entityStore()
+                  .get(
+                      NameIdentifierUtil.ofUser(metalake, oldOwner.name()),
+                      Entity.EntityType.USER,
+                      UserEntity.class);
+          gravitinoAuthorizer.handleMetadataOwnerChange(
+              metalake,
+              userEntity.id(),
+              MetadataObjectUtil.toEntityIdent(metalake, metadataObject),
+              Entity.EntityType.valueOf(metadataObject.type().name()));
+        } catch (IOException e) {
+          LOG.warn(e.getMessage(), e);
+        }
+      } else {
+        throw new UnsupportedOperationException(
+            "Notification for Group Owner is not supported yet.");
+      }
+    }
+  }
+
   public Optional<Owner> getOwner(String metalake, MetadataObject 
metadataObject) {
     NameIdentifier ident = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
     OwnerImpl owner = new OwnerImpl();
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
index dba0177ca1..71dea1a9c9 100644
--- 
a/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/hook/AccessControlHookDispatcher.java
@@ -26,6 +26,7 @@ import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.authorization.AccessControlDispatcher;
 import org.apache.gravitino.authorization.AuthorizationUtils;
+import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Group;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.OwnerManager;
@@ -114,7 +115,9 @@ public class AccessControlHookDispatcher implements 
AccessControlDispatcher {
   @Override
   public User grantRolesToUser(String metalake, List<String> roles, String 
user)
       throws NoSuchUserException, IllegalRoleException, 
NoSuchMetalakeException {
-    return dispatcher.grantRolesToUser(metalake, roles, user);
+    User grantedUser = dispatcher.grantRolesToUser(metalake, roles, user);
+    notifyRoleUserRelChange(metalake, roles);
+    return grantedUser;
   }
 
   @Override
@@ -132,7 +135,9 @@ public class AccessControlHookDispatcher implements 
AccessControlDispatcher {
   @Override
   public User revokeRolesFromUser(String metalake, List<String> roles, String 
user)
       throws NoSuchUserException, IllegalRoleException, 
NoSuchMetalakeException {
-    return dispatcher.revokeRolesFromUser(metalake, roles, user);
+    User revokedUser = dispatcher.revokeRolesFromUser(metalake, roles, user);
+    notifyRoleUserRelChange(metalake, roles);
+    return revokedUser;
   }
 
   @Override
@@ -191,13 +196,33 @@ public class AccessControlHookDispatcher implements 
AccessControlDispatcher {
   public Role grantPrivilegeToRole(
       String metalake, String role, MetadataObject object, Set<Privilege> 
privileges)
       throws NoSuchMetalakeException, NoSuchRoleException {
-    return dispatcher.grantPrivilegeToRole(metalake, role, object, privileges);
+    Role grantedRole = dispatcher.grantPrivilegeToRole(metalake, role, object, 
privileges);
+    notifyRoleUserRelChange(metalake, role);
+    return grantedRole;
   }
 
   @Override
   public Role revokePrivilegesFromRole(
       String metalake, String role, MetadataObject object, Set<Privilege> 
privileges)
       throws NoSuchMetalakeException, NoSuchRoleException {
-    return dispatcher.revokePrivilegesFromRole(metalake, role, object, 
privileges);
+    Role revokedRole = dispatcher.revokePrivilegesFromRole(metalake, role, 
object, privileges);
+    notifyRoleUserRelChange(metalake, role);
+    return revokedRole;
+  }
+
+  private static void notifyRoleUserRelChange(String metalake, List<String> 
roles) {
+    GravitinoAuthorizer gravitinoAuthorizer = 
GravitinoEnv.getInstance().gravitinoAuthorizer();
+    if (gravitinoAuthorizer != null) {
+      for (String role : roles) {
+        gravitinoAuthorizer.handleRolePrivilegeChange(metalake, role);
+      }
+    }
+  }
+
+  private static void notifyRoleUserRelChange(String metalake, String role) {
+    GravitinoAuthorizer gravitinoAuthorizer = 
GravitinoEnv.getInstance().gravitinoAuthorizer();
+    if (gravitinoAuthorizer != null) {
+      gravitinoAuthorizer.handleRolePrivilegeChange(metalake, role);
+    }
   }
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/GravitinoAuthorizerProvider.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/GravitinoAuthorizerProvider.java
index e909ae30a5..6480b990e9 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/GravitinoAuthorizerProvider.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/GravitinoAuthorizerProvider.java
@@ -20,6 +20,8 @@ package org.apache.gravitino.server.authorization;
 import java.io.Closeable;
 import java.io.IOException;
 import org.apache.gravitino.Configs;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.server.ServerConfig;
 
 /**
@@ -60,6 +62,7 @@ public class GravitinoAuthorizerProvider implements Closeable 
{
             gravitinoAuthorizer = new PassThroughAuthorizer();
           }
           gravitinoAuthorizer.initialize();
+          
GravitinoEnv.getInstance().setGravitinoAuthorizer(gravitinoAuthorizer);
         }
       }
     }
@@ -80,6 +83,7 @@ public class GravitinoAuthorizerProvider implements Closeable 
{
 
   @Override
   public void close() throws IOException {
+    GravitinoEnv.getInstance().setGravitinoAuthorizer(null);
     if (gravitinoAuthorizer != null) {
       gravitinoAuthorizer.close();
     }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
index 907532af96..9d508b84b2 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Privilege;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
 import org.apache.gravitino.utils.NameIdentifierUtil;
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
index 72e3b810a3..f785168a35 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/PassThroughAuthorizer.java
@@ -19,7 +19,10 @@ package org.apache.gravitino.server.authorization;
 
 import java.io.IOException;
 import java.security.Principal;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Privilege;
 
 /**
@@ -48,6 +51,13 @@ public class PassThroughAuthorizer implements 
GravitinoAuthorizer {
   @Override
   public void handleRolePrivilegeChange(Long roleId) {}
 
+  @Override
+  public void handleRolePrivilegeChange(String metalake, String roleName) {}
+
+  @Override
+  public void handleMetadataOwnerChange(
+      String metalake, Long oldOwnerId, NameIdentifier nameIdentifier, 
Entity.EntityType type) {}
+
   @Override
   public void close() throws IOException {}
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
index f0d4c6bf5b..f9bb05c0b6 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java
@@ -25,7 +25,7 @@ import ognl.OgnlException;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.server.authorization.GravitinoAuthorizer;
+import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.server.authorization.GravitinoAuthorizerProvider;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
index 0dda942d2e..0666593635 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
@@ -34,12 +34,12 @@ import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.UserEntity;
-import org.apache.gravitino.server.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.server.authorization.MetadataIdConverter;
 import org.apache.gravitino.storage.relational.service.UserMetaService;
 import org.apache.gravitino.utils.MetadataObjectUtil;
@@ -149,7 +149,13 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       String username, String metalake, MetadataObject metadataObject, String 
privilege) {
     Long metalakeId = getMetalakeId(metalake);
     Long userId = 
UserMetaService.getInstance().getUserIdByMetalakeIdAndName(metalakeId, 
username);
-    Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
+    Long metadataId;
+    try {
+      metadataId = MetadataIdConverter.getID(metadataObject, metalake);
+    } catch (Exception e) {
+      LOG.debug("Can not get entity id", e);
+      return false;
+    }
     loadPrivilege(metalake, username, userId, metadataObject, metadataId);
     return authorizeByJcasbin(userId, metadataObject, metadataId, privilege);
   }
@@ -173,6 +179,11 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
 
       for (RoleEntity role : entities) {
         Long roleId = role.id();
+        role =
+            entityStore.get(
+                NameIdentifierUtil.ofRole(metalake, role.name()),
+                Entity.EntityType.ROLE,
+                RoleEntity.class);
         if (loadedRoles.contains(roleId)) {
           continue;
         }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
index 38a6c16d17..4360b02f70 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/MockGravitinoAuthorizer.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.server.authorization;
 import java.security.Principal;
 import java.util.Objects;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Privilege;
 
 /** Mock GravitinoAuthorizer */
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestGravitinoAuthorizerProvider.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestGravitinoAuthorizerProvider.java
index 888080ced3..9b331a5150 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestGravitinoAuthorizerProvider.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestGravitinoAuthorizerProvider.java
@@ -18,6 +18,7 @@
 package org.apache.gravitino.server.authorization;
 
 import org.apache.gravitino.Configs;
+import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.server.ServerConfig;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
index 3f73f38d1f..4c6ab83845 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
@@ -159,7 +159,12 @@ public class TestJcasbinAuthorizer {
     Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
     assertFalse(doAuthorize(currentPrincipal));
     RoleEntity allowRole =
-        getRoleEntity(ALLOW_ROLE_ID, 
ImmutableList.of(getAllowSecurableObject()));
+        getRoleEntity(ALLOW_ROLE_ID, "allowRole", 
ImmutableList.of(getAllowSecurableObject()));
+    when(entityStore.get(
+            eq(NameIdentifierUtil.ofRole(METALAKE, allowRole.name())),
+            eq(Entity.EntityType.ROLE),
+            eq(RoleEntity.class)))
+        .thenReturn(allowRole);
     NameIdentifier userNameIdentifier = NameIdentifierUtil.ofUser(METALAKE, 
USERNAME);
     // Mock adds roles to users.
     when(supportsRelationOperations.listEntitiesByRelation(
@@ -172,7 +177,12 @@ public class TestJcasbinAuthorizer {
     // When permissions are changed but handleRolePrivilegeChange is not 
executed, the system will
     // use the cached permissions in JCasbin, so authorize can succeed.
     Long newRoleId = -1L;
-    RoleEntity tempNewRole = getRoleEntity(newRoleId, ImmutableList.of());
+    RoleEntity tempNewRole = getRoleEntity(newRoleId, "tempNewRole", 
ImmutableList.of());
+    when(entityStore.get(
+            eq(NameIdentifierUtil.ofRole(METALAKE, tempNewRole.name())),
+            eq(Entity.EntityType.ROLE),
+            eq(RoleEntity.class)))
+        .thenReturn(tempNewRole);
     when(supportsRelationOperations.listEntitiesByRelation(
             eq(SupportsRelationOperations.Type.ROLE_USER_REL),
             eq(userNameIdentifier),
@@ -188,14 +198,26 @@ public class TestJcasbinAuthorizer {
             eq(userNameIdentifier),
             eq(Entity.EntityType.USER)))
         .thenReturn(ImmutableList.of(allowRole));
+    when(entityStore.get(
+            eq(NameIdentifierUtil.ofRole(METALAKE, allowRole.name())),
+            eq(Entity.EntityType.ROLE),
+            eq(RoleEntity.class)))
+        .thenReturn(allowRole);
     assertTrue(doAuthorize(currentPrincipal));
     // Test deny
-    RoleEntity denyRole = getRoleEntity(DENY_ROLE_ID, 
ImmutableList.of(getDenySecurableObject()));
+    RoleEntity denyRole =
+        getRoleEntity(DENY_ROLE_ID, "denyRole", 
ImmutableList.of(getDenySecurableObject()));
+    when(entityStore.get(
+            eq(NameIdentifierUtil.ofRole(METALAKE, denyRole.name())),
+            eq(Entity.EntityType.ROLE),
+            eq(RoleEntity.class)))
+        .thenReturn(denyRole);
     when(supportsRelationOperations.listEntitiesByRelation(
             eq(SupportsRelationOperations.Type.ROLE_USER_REL),
             eq(userNameIdentifier),
             eq(Entity.EntityType.USER)))
         .thenReturn(ImmutableList.of(allowRole, denyRole));
+
     assertFalse(doAuthorize(currentPrincipal));
   }
 
@@ -243,12 +265,13 @@ public class TestJcasbinAuthorizer {
         .build();
   }
 
-  private static RoleEntity getRoleEntity(Long roleId, List<SecurableObject> 
securableObjects) {
+  private static RoleEntity getRoleEntity(
+      Long roleId, String roleName, List<SecurableObject> securableObjects) {
     Namespace namespace = NamespaceUtil.ofRole(METALAKE);
     return RoleEntity.builder()
         .withNamespace(namespace)
         .withId(roleId)
-        .withName("roleName")
+        .withName(roleName)
         .withAuditInfo(AuditInfo.EMPTY)
         .withSecurableObjects(securableObjects)
         .build();
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
index 412b78f4f6..a54b84eebd 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
@@ -40,6 +40,7 @@ import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetada
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionEvaluator;
 import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.server.web.rest.CatalogOperations;
+import org.apache.gravitino.server.web.rest.SchemaOperations;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.glassfish.hk2.api.Descriptor;
 import org.glassfish.hk2.api.Filter;
@@ -54,7 +55,8 @@ public class GravitinoInterceptionService implements 
InterceptionService {
 
   @Override
   public Filter getDescriptorFilter() {
-    return new 
ClassListFilter(ImmutableSet.of(CatalogOperations.class.getName()));
+    return new ClassListFilter(
+        ImmutableSet.of(CatalogOperations.class.getName(), 
SchemaOperations.class.getName()));
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
index 9fda549ebf..603b2e2c75 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java
@@ -35,6 +35,8 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
@@ -48,6 +50,9 @@ import org.apache.gravitino.dto.responses.EntityListResponse;
 import org.apache.gravitino.dto.responses.SchemaResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.metrics.MetricNames;
+import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
@@ -83,6 +88,14 @@ public class SchemaOperations {
           () -> {
             Namespace schemaNS = NamespaceUtil.ofSchema(metalake, catalog);
             NameIdentifier[] idents = dispatcher.listSchemas(schemaNS);
+            idents =
+                MetadataFilterHelper.filterByExpression(
+                    metalake,
+                    " ((METALAKE::USE_CATALOG||CATALOG::USE_CATALOG) && "
+                        + "(SCHEMA::OWNER || METALAKE::USE_SCHEMA || 
CATALOG::USE_SCHEMA || SCHEMA::USE_SCHEMA)) "
+                        + "|| METALAKE::OWNER || CATALOG::OWNER",
+                    Entity.EntityType.SCHEMA,
+                    idents);
             Response response = Utils.ok(new EntityListResponse(idents));
             LOG.info("List {} schemas in catalog {}.{}", idents.length, 
metalake, catalog);
             return response;
@@ -96,9 +109,16 @@ public class SchemaOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "create-schema." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "create-schema", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "( (METALAKE::USE_CATALOG || CATALOG::USE_CATALOG) && 
(METALAKE::CREATE_SCHEMA || CATALOG::CREATE_SCHEMA) )"
+              + "|| METALAKE::OWNER || CATALOG::OWNER",
+      accessMetadataType = MetadataObject.Type.CATALOG)
   public Response createSchema(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
       SchemaCreateRequest request) {
     LOG.info("Received create schema request: {}.{}.{}", metalake, catalog, 
request.getName());
     try {
@@ -126,10 +146,19 @@ public class SchemaOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "load-schema." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "load-schema", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "( (METALAKE::USE_CATALOG || CATALOG::USE_CATALOG) &&"
+              + " (METALAKE::USE_SCHEMA || CATALOG::USE_SCHEMA || 
SCHEMA::USE_SCHEMA || SCHEMA::OWNER) ) "
+              + " || METALAKE::OWNER || CATALOG::OWNER ",
+      accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response loadSchema(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA)
+          String schema) {
     LOG.info("Received load schema request for schema: {}.{}.{}", metalake, 
catalog, schema);
     try {
       return Utils.doAs(
@@ -152,10 +181,16 @@ public class SchemaOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "alter-schema." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "alter-schema", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "METALAKE::OWNER || CATALOG::OWNER || 
((METALAKE::USE_CATALOG||CATALOG::USE_CATALOG) && SCHEMA::OWNER)",
+      accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response alterSchema(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA) String schema,
       SchemaUpdatesRequest request) {
     LOG.info("Received alter schema request: {}.{}.{}", metalake, catalog, 
schema);
     try {
@@ -184,10 +219,16 @@ public class SchemaOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "drop-schema." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "drop-schema", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "METALAKE::OWNER || CATALOG::OWNER || 
((METALAKE::USE_CATALOG||CATALOG::USE_CATALOG) && SCHEMA::OWNER)",
+      accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response dropSchema(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA) String schema,
       @DefaultValue("false") @QueryParam("cascade") boolean cascade) {
     LOG.info("Received drop schema request: {}.{}.{}", metalake, catalog, 
schema);
     try {
@@ -199,7 +240,6 @@ public class SchemaOperations {
             if (!dropped) {
               LOG.warn("Fail to drop schema {} under namespace {}", schema, 
ident.namespace());
             }
-
             Response response = Utils.ok(new DropResponse(dropped));
             LOG.info("Schema dropped: {}.{}.{}", metalake, catalog, schema);
             return response;
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
 
b/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
index 41a8c8a1d4..7e803cfa26 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/filter/TestGravitinoInterceptionService.java
@@ -31,9 +31,9 @@ import org.aopalliance.intercept.MethodInterceptor;
 import org.aopalliance.intercept.MethodInvocation;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.UserPrincipal;
+import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.dto.responses.ErrorResponse;
-import org.apache.gravitino.server.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.server.authorization.GravitinoAuthorizerProvider;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestOwnerOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestOwnerOperations.java
index dc7451a538..06b19611e8 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestOwnerOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestOwnerOperations.java
@@ -57,14 +57,13 @@ import org.apache.gravitino.rest.RESTUtils;
 import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.test.JerseyTest;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-class TestOwnerOperations extends JerseyTest {
+class TestOwnerOperations extends BaseOperationsTest {
   private static final OwnerManager manager = mock(OwnerManager.class);
   private static final MetalakeDispatcher metalakeDispatcher = 
mock(MetalakeDispatcher.class);
   private static final AccessControlDispatcher accessControlDispatcher =
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestPermissionOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestPermissionOperations.java
index 1f507cbbcc..fdc4c2e2f0 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestPermissionOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestPermissionOperations.java
@@ -67,14 +67,13 @@ import org.apache.gravitino.metalake.MetalakeDispatcher;
 import org.apache.gravitino.rest.RESTUtils;
 import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.test.JerseyTest;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-public class TestPermissionOperations extends JerseyTest {
+public class TestPermissionOperations extends BaseOperationsTest {
 
   private static final AccessControlManager manager = 
mock(AccessControlManager.class);
   private static final MetalakeDispatcher metalakeDispatcher = 
mock(MetalakeDispatcher.class);
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestSchemaOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestSchemaOperations.java
index 724fcee311..c99af56b89 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestSchemaOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestSchemaOperations.java
@@ -61,14 +61,13 @@ import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.rest.RESTUtils;
 import org.glassfish.jersey.internal.inject.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.test.JerseyTest;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-public class TestSchemaOperations extends JerseyTest {
+public class TestSchemaOperations extends BaseOperationsTest {
 
   private static class MockServletRequestFactory extends 
ServletRequestFactoryBase {
     @Override

Reply via email to