This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch branch-metadata-authz
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-metadata-authz by this 
push:
     new 4ae2440237 [#7570] feat(authz): Support topic authorization (#7580)
4ae2440237 is described below

commit 4ae2440237fb11f7c803f574058a3e05490a761e
Author: Kyle Lin <[email protected]>
AuthorDate: Mon Jul 14 08:16:56 2025 +0800

    [#7570] feat(authz): Support topic authorization (#7580)
    
    ### What changes were proposed in this pull request?
    
    Support topic authorization.
    
    ### Why are the changes needed?
    
    Fixes #7570
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    
    
`org.apache.gravitino.client.integration.test.authorization.TopicAuthorizationIT`
    
    ---------
    
    Co-authored-by: yangyang zhong <[email protected]>
    Co-authored-by: [email protected] <[email protected]>
---
 .../test/authorization/TopicAuthorizationIT.java   | 240 +++++++++++++++++++++
 .../server/authorization/MetadataFilterHelper.java |  21 ++
 .../AuthorizationExpressionConverter.java          |  15 +-
 .../authorization/TestMetadataFilterHelper.java    |  25 +++
 .../web/filter/GravitinoInterceptionService.java   |   4 +-
 .../gravitino/server/web/rest/TopicOperations.java |  76 +++++--
 .../server/web/rest/TestTopicOperations.java       |   3 +-
 .../TestTopicAuthorizationExpression.java          | 186 ++++++++++++++++
 8 files changed, 548 insertions(+), 22 deletions(-)

diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TopicAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TopicAuthorizationIT.java
new file mode 100644
index 0000000000..5d28f8a8bc
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/TopicAuthorizationIT.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client.integration.test.authorization;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.KafkaContainer;
+import org.apache.gravitino.messaging.Topic;
+import org.apache.gravitino.messaging.TopicCatalog;
+import org.apache.gravitino.messaging.TopicChange;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@Tag("gravitino-docker-test")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class TopicAuthorizationIT extends BaseRestApiAuthorizationIT {
+
+  private static final String CATALOG = "catalog";
+  private static final String SCHEMA = "default";
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  private static String kafkaBootstrapServers;
+  private static String role = "role";
+
+  @BeforeAll
+  public void startIntegrationTest() throws Exception {
+    containerSuite.startKafkaContainer();
+    super.startIntegrationTest();
+    kafkaBootstrapServers =
+        String.format(
+            "%s:%d",
+            containerSuite.getKafkaContainer().getContainerIpAddress(),
+            KafkaContainer.DEFAULT_BROKER_PORT);
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("bootstrap.servers", kafkaBootstrapServers);
+    client
+        .loadMetalake(METALAKE)
+        .createCatalog(CATALOG, Catalog.Type.MESSAGING, "kafka", "comment", 
properties);
+    // try to load the schema as normal user, expect failure
+    assertThrows(
+        "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.",
+        RuntimeException.class,
+        () -> {
+          normalUserClient
+              .loadMetalake(METALAKE)
+              .loadCatalog(CATALOG)
+              .asSchemas()
+              .loadSchema(SCHEMA);
+        });
+    // grant tester privilege
+    List<SecurableObject> securableObjects = new ArrayList<>();
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog(CATALOG, 
ImmutableList.of(Privileges.UseCatalog.allow()));
+    securableObjects.add(catalogObject);
+    gravitinoMetalake.createRole(role, new HashMap<>(), securableObjects);
+    gravitinoMetalake.grantRolesToUser(ImmutableList.of(role), NORMAL_USER);
+    // normal user can load the catalog but not the schema
+    Catalog catalogLoadByNormalUser = 
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG);
+    assertEquals(CATALOG, catalogLoadByNormalUser.name());
+    assertThrows(
+        "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.",
+        RuntimeException.class,
+        () -> {
+          catalogLoadByNormalUser.asSchemas().loadSchema(SCHEMA);
+        });
+  }
+
+  @Test
+  @Order(1)
+  public void testCreateTopic() {
+    // owner can create topic
+    TopicCatalog topicCatalog = 
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+    topicCatalog.createTopic(NameIdentifier.of(SCHEMA, "topic1"), "test", 
null, new HashMap<>());
+    // normal user cannot create topic
+    TopicCatalog topicCatalogNormalUser =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+    assertThrows(
+        "Can not access metadata {" + CATALOG + "." + SCHEMA + "}.",
+        RuntimeException.class,
+        () -> {
+          topicCatalogNormalUser.createTopic(
+              NameIdentifier.of(SCHEMA, "topic2"), "test2", null, new 
HashMap<>());
+        });
+    // grant privileges
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(CATALOG, SCHEMA, MetadataObject.Type.SCHEMA),
+        ImmutableList.of(Privileges.UseSchema.allow(), 
Privileges.CreateTopic.allow()));
+    // normal user can now create topic
+    topicCatalogNormalUser.createTopic(
+        NameIdentifier.of(SCHEMA, "topic2"), "test2", null, new HashMap<>());
+    topicCatalogNormalUser.createTopic(
+        NameIdentifier.of(SCHEMA, "topic3"), "test3", null, new HashMap<>());
+  }
+
+  @Test
+  @Order(2)
+  public void testListTopic() {
+    TopicCatalog topicCatalog = 
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+    NameIdentifier[] topicsList = 
topicCatalog.listTopics(Namespace.of(SCHEMA));
+    assertArrayEquals(
+        new NameIdentifier[] {
+          NameIdentifier.of(SCHEMA, "topic1"),
+          NameIdentifier.of(SCHEMA, "topic2"),
+          NameIdentifier.of(SCHEMA, "topic3")
+        },
+        topicsList);
+    // normal user can only see topics they have privilege for
+    TopicCatalog topicCatalogNormalUser =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+    NameIdentifier[] topicsListNormalUser = 
topicCatalogNormalUser.listTopics(Namespace.of(SCHEMA));
+    assertArrayEquals(
+        new NameIdentifier[] {
+          NameIdentifier.of(SCHEMA, "topic2"), NameIdentifier.of(SCHEMA, 
"topic3")
+        },
+        topicsListNormalUser);
+  }
+
+  @Test
+  @Order(3)
+  public void testLoadTopic() {
+    TopicCatalog topicCatalogNormalUser =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+    // normal user can load topic2 and topic3, but not topic1
+    assertThrows(
+        String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA, 
"topic1"),
+        RuntimeException.class,
+        () -> {
+          topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA, 
"topic1"));
+        });
+    Topic topic2 = topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA, 
"topic2"));
+    assertEquals("topic2", topic2.name());
+    Topic topic3 = topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA, 
"topic3"));
+    assertEquals("topic3", topic3.name());
+
+    // grant normal user privilege to use topic1
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    gravitinoMetalake.grantPrivilegesToRole(
+        role,
+        MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "topic1"), 
MetadataObject.Type.TOPIC),
+        ImmutableList.of(Privileges.ConsumeTopic.allow()));
+    topicCatalogNormalUser.loadTopic(NameIdentifier.of(SCHEMA, "topic1"));
+  }
+
+  @Test
+  @Order(4)
+  public void testAlterTopic() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    TopicCatalog topicCatalogNormalUser =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+
+    // normal user cannot alter topic1 (no privilege)
+    assertThrows(
+        String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA, 
"topic1"),
+        RuntimeException.class,
+        () -> {
+          topicCatalogNormalUser.alterTopic(
+              NameIdentifier.of(SCHEMA, "topic1"), 
TopicChange.updateComment("new comment"));
+        });
+    // grant normal user owner privilege on topic1
+    gravitinoMetalake.setOwner(
+        MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "topic1"), 
MetadataObject.Type.TOPIC),
+        NORMAL_USER,
+        Owner.Type.USER);
+    topicCatalogNormalUser.alterTopic(
+        NameIdentifier.of(SCHEMA, "topic1"), TopicChange.updateComment("new 
comment"));
+  }
+
+  @Test
+  @Order(5)
+  public void testDropTopic() {
+    GravitinoMetalake gravitinoMetalake = client.loadMetalake(METALAKE);
+    TopicCatalog topicCatalogNormalUser =
+        
normalUserClient.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+    // reset owner
+    gravitinoMetalake.setOwner(
+        MetadataObjects.of(ImmutableList.of(CATALOG, SCHEMA, "topic1"), 
MetadataObject.Type.TOPIC),
+        USER,
+        Owner.Type.USER);
+    // normal user cannot drop topic1
+    assertThrows(
+        String.format("Can not access metadata {%s.%s.%s}.", CATALOG, SCHEMA, 
"topic1"),
+        RuntimeException.class,
+        () -> {
+          topicCatalogNormalUser.dropTopic(NameIdentifier.of(SCHEMA, 
"topic1"));
+        });
+    // normal user can drop topic2 and topic3 (they created them)
+    topicCatalogNormalUser.dropTopic(NameIdentifier.of(SCHEMA, "topic2"));
+    topicCatalogNormalUser.dropTopic(NameIdentifier.of(SCHEMA, "topic3"));
+
+    // owner can drop topic1
+    TopicCatalog topicCatalog = 
client.loadMetalake(METALAKE).loadCatalog(CATALOG).asTopicCatalog();
+    topicCatalog.dropTopic(NameIdentifier.of(SCHEMA, "topic1"));
+    // check topics are dropped
+    NameIdentifier[] topicsList = 
topicCatalog.listTopics(Namespace.of(SCHEMA));
+    assertArrayEquals(new NameIdentifier[] {}, topicsList);
+    NameIdentifier[] topicsListNormalUser = 
topicCatalogNormalUser.listTopics(Namespace.of(SCHEMA));
+    assertArrayEquals(new NameIdentifier[] {}, topicsListNormalUser);
+  }
+}
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
index 13fed7c5a6..2655b98a05 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java
@@ -21,7 +21,10 @@ import java.security.Principal;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Privilege;
@@ -51,6 +54,9 @@ public class MetadataFilterHelper {
       Entity.EntityType entityType,
       String privilege,
       NameIdentifier[] metadataList) {
+    if (!enableAuthorization()) {
+      return metadataList;
+    }
     GravitinoAuthorizer gravitinoAuthorizer =
         GravitinoAuthorizerProvider.getInstance().getGravitinoAuthorizer();
     Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
@@ -81,6 +87,9 @@ public class MetadataFilterHelper {
       NameIdentifier[] nameIdentifiers) {
     AuthorizationExpressionEvaluator authorizationExpressionEvaluator =
         new AuthorizationExpressionEvaluator(expression);
+    if (!enableAuthorization()) {
+      return nameIdentifiers;
+    }
     return Arrays.stream(nameIdentifiers)
         .filter(
             metaDataName -> {
@@ -128,9 +137,21 @@ public class MetadataFilterHelper {
         nameIdentifierMap.put(
             Entity.EntityType.CATALOG, 
NameIdentifierUtil.getCatalogIdentifier(nameIdentifier));
         break;
+      case TOPIC:
+        nameIdentifierMap.put(Entity.EntityType.TOPIC, nameIdentifier);
+        nameIdentifierMap.put(
+            Entity.EntityType.SCHEMA, 
NameIdentifierUtil.getSchemaIdentifier(nameIdentifier));
+        nameIdentifierMap.put(
+            Entity.EntityType.CATALOG, 
NameIdentifierUtil.getCatalogIdentifier(nameIdentifier));
+        break;
       default:
         break;
     }
     return nameIdentifierMap;
   }
+
+  private static boolean enableAuthorization() {
+    Config config = GravitinoEnv.getInstance().config();
+    return config != null && config.get(Configs.ENABLE_AUTHORIZATION);
+  }
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
index f21ef43824..0ec8ef18a1 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionConverter.java
@@ -54,12 +54,12 @@ public class AuthorizationExpressionConverter {
    * @return an OGNL expression used to call GravitinoAuthorizer
    */
   public static String convertToOgnlExpression(String authorizationExpression) 
{
-    authorizationExpression = replaceAnyPrivilege(authorizationExpression);
-    authorizationExpression = replaceAnyExpressions(authorizationExpression);
     return EXPRESSION_CACHE.computeIfAbsent(
         authorizationExpression,
         (expression) -> {
-          Matcher matcher = PATTERN.matcher(expression);
+          String replacedExpression = 
replaceAnyPrivilege(authorizationExpression);
+          replacedExpression = replaceAnyExpressions(replacedExpression);
+          Matcher matcher = PATTERN.matcher(replacedExpression);
           StringBuffer result = new StringBuffer();
 
           while (matcher.find()) {
@@ -154,6 +154,15 @@ public class AuthorizationExpressionConverter {
             "(ANY(CREATE_MODEL_VERSION, METALAKE, CATALOG, SCHEMA, MODEL))");
     expression =
         expression.replaceAll("ANY_CREATE_MODEL", "(ANY(CREATE_MODEL, 
METALAKE, CATALOG, SCHEMA))");
+    expression =
+        expression.replaceAll(
+            "ANY_CREATE_TOPIC", "(ANY(CREATE_TOPIC, METALAKE, CATALOG, SCHEMA, 
TOPIC))");
+    expression =
+        expression.replaceAll(
+            "ANY_PRODUCE_TOPIC", "(ANY(PRODUCE_TOPIC, METALAKE, CATALOG, 
SCHEMA, TOPIC))");
+    expression =
+        expression.replaceAll(
+            "ANY_CONSUME_TOPIC", "(ANY(CONSUME_TOPIC, METALAKE, CATALOG, 
SCHEMA, TOPIC))");
     return expression;
   }
 }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
index 6bb38f2de7..2714845b3d 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/TestMetadataFilterHelper.java
@@ -17,23 +17,48 @@
 
 package org.apache.gravitino.server.authorization;
 
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 
 /** Test of {@link MetadataFilterHelper} */
 public class TestMetadataFilterHelper {
 
+  private static MockedStatic<GravitinoEnv> mockedStaticGravitinoEnv;
+
+  @BeforeAll
+  public static void setup() {
+    mockedStaticGravitinoEnv = mockStatic(GravitinoEnv.class);
+    GravitinoEnv gravitinoEnv = mock(GravitinoEnv.class);
+    
mockedStaticGravitinoEnv.when(GravitinoEnv::getInstance).thenReturn(gravitinoEnv);
+    Config configMock = mock(Config.class);
+    when(gravitinoEnv.config()).thenReturn(configMock);
+    when(configMock.get(eq(Configs.ENABLE_AUTHORIZATION))).thenReturn(true);
+  }
+
+  @AfterAll
+  public static void stop() {
+    if (mockedStaticGravitinoEnv != null) {
+      mockedStaticGravitinoEnv.close();
+    }
+  }
+
   @Test
   public void testFilter() {
     try (MockedStatic<PrincipalUtils> principalUtilsMocked = 
mockStatic(PrincipalUtils.class);
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
index 9f799d2776..d39faf0ad8 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/filter/GravitinoInterceptionService.java
@@ -43,6 +43,7 @@ import org.apache.gravitino.server.web.rest.CatalogOperations;
 import org.apache.gravitino.server.web.rest.ModelOperations;
 import org.apache.gravitino.server.web.rest.SchemaOperations;
 import org.apache.gravitino.server.web.rest.TableOperations;
+import org.apache.gravitino.server.web.rest.TopicOperations;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.glassfish.hk2.api.Descriptor;
 import org.glassfish.hk2.api.Filter;
@@ -62,7 +63,8 @@ public class GravitinoInterceptionService implements 
InterceptionService {
             CatalogOperations.class.getName(),
             SchemaOperations.class.getName(),
             TableOperations.class.getName(),
-            ModelOperations.class.getName()));
+            ModelOperations.class.getName(),
+            TopicOperations.class.getName()));
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
index 471a194a4f..c3dd0e4c62 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
@@ -31,6 +31,8 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.catalog.TopicDispatcher;
@@ -44,6 +46,9 @@ import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.messaging.Topic;
 import org.apache.gravitino.messaging.TopicChange;
 import org.apache.gravitino.metrics.MetricNames;
+import org.apache.gravitino.server.authorization.MetadataFilterHelper;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import org.apache.gravitino.server.web.Utils;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
@@ -54,6 +59,11 @@ import org.slf4j.LoggerFactory;
 public class TopicOperations {
   private static final Logger LOG = 
LoggerFactory.getLogger(TopicOperations.class);
 
+  private static final String loadTopicsAuthorizationExpression =
+      "ANY(OWNER, METALAKE, CATALOG) || "
+          + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+          + "ANY_USE_CATALOG && ANY_USE_SCHEMA && (TOPIC::OWNER || 
ANY_CONSUME_TOPIC || ANY_PRODUCE_TOPIC)";
+
   private final TopicDispatcher dispatcher;
 
   @Context private HttpServletRequest httpRequest;
@@ -66,6 +76,7 @@ public class TopicOperations {
   @GET
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "list-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
+  @ResponseMetered(name = "list-topic", absolute = true)
   public Response listTopics(
       @PathParam("metalake") String metalake,
       @PathParam("catalog") String catalog,
@@ -78,13 +89,17 @@ public class TopicOperations {
             LOG.info("Listing topics under schema: {}.{}.{}", metalake, 
catalog, schema);
             Namespace topicNS = NamespaceUtil.ofTopic(metalake, catalog, 
schema);
             NameIdentifier[] topics = dispatcher.listTopics(topicNS);
+            topics = topics == null ? new NameIdentifier[0] : topics;
+            topics =
+                MetadataFilterHelper.filterByExpression(
+                    metalake, loadTopicsAuthorizationExpression, 
Entity.EntityType.TOPIC, topics);
             Response response = Utils.ok(new EntityListResponse(topics));
             LOG.info(
                 "List {} topics under schema: {}.{}.{}", topics.length, 
metalake, catalog, schema);
             return response;
           });
     } catch (Exception e) {
-      return ExceptionHandlers.handleFilesetException(OperationType.LIST, "", 
schema, e);
+      return ExceptionHandlers.handleTopicException(OperationType.LIST, "", 
schema, e);
     }
   }
 
@@ -92,10 +107,18 @@ public class TopicOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "create-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "create-topic", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER,METALAKE,CATALOG) || "
+              + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+              + "ANY_USE_CATALOG && ANY_USE_SCHEMA && ANY_CREATE_TOPIC",
+      accessMetadataType = MetadataObject.Type.SCHEMA)
   public Response createTopic(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA) String schema,
       TopicCreateRequest request) {
     LOG.info("Received create topic request: {}.{}.{}", metalake, catalog, 
schema);
     try {
@@ -133,11 +156,16 @@ public class TopicOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "load-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "load-topic", absolute = true)
+  @AuthorizationExpression(
+      expression = loadTopicsAuthorizationExpression,
+      accessMetadataType = MetadataObject.Type.TOPIC)
   public Response loadTopic(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema,
-      @PathParam("topic") String topic) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA) String schema,
+      @PathParam("topic") @AuthorizationMetadata(type = 
MetadataObject.Type.TOPIC) String topic) {
     LOG.info(
         "Received load topic request for topic: {}.{}.{}.{}", metalake, 
catalog, schema, topic);
     try {
@@ -161,11 +189,19 @@ public class TopicOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "alter-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "alter-topic", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER,METALAKE,CATALOG) || "
+              + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+              + "ANY_USE_CATALOG && ANY_USE_SCHEMA && (TOPIC::OWNER || 
ANY_PRODUCE_TOPIC)",
+      accessMetadataType = MetadataObject.Type.TOPIC)
   public Response alterTopic(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema,
-      @PathParam("topic") String topic,
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA) String schema,
+      @PathParam("topic") @AuthorizationMetadata(type = 
MetadataObject.Type.TOPIC) String topic,
       TopicUpdatesRequest request) {
     LOG.info("Received alter topic request: {}.{}.{}.{}", metalake, catalog, 
schema, topic);
     try {
@@ -195,11 +231,19 @@ public class TopicOperations {
   @Produces("application/vnd.gravitino.v1+json")
   @Timed(name = "drop-topic." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "drop-topic", absolute = true)
+  @AuthorizationExpression(
+      expression =
+          "ANY(OWNER,METALAKE,CATALOG) || "
+              + "SCHEMA_OWNER_WITH_USE_CATALOG || "
+              + "ANY_USE_CATALOG && ANY_USE_SCHEMA && TOPIC::OWNER",
+      accessMetadataType = MetadataObject.Type.TOPIC)
   public Response dropTopic(
-      @PathParam("metalake") String metalake,
-      @PathParam("catalog") String catalog,
-      @PathParam("schema") String schema,
-      @PathParam("topic") String topic) {
+      @PathParam("metalake") @AuthorizationMetadata(type = 
MetadataObject.Type.METALAKE)
+          String metalake,
+      @PathParam("catalog") @AuthorizationMetadata(type = 
MetadataObject.Type.CATALOG)
+          String catalog,
+      @PathParam("schema") @AuthorizationMetadata(type = 
MetadataObject.Type.SCHEMA) String schema,
+      @PathParam("topic") @AuthorizationMetadata(type = 
MetadataObject.Type.TOPIC) String topic) {
     LOG.info("Received drop topic request: {}.{}.{}.{}", metalake, catalog, 
schema, topic);
     try {
       return Utils.doAs(
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
index d5be6662f3..b838d62744 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
@@ -60,14 +60,13 @@ import org.apache.gravitino.messaging.TopicChange;
 import org.apache.gravitino.rest.RESTUtils;
 import org.glassfish.jersey.internal.inject.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.test.JerseyTest;
 import org.glassfish.jersey.test.TestProperties;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
-public class TestTopicOperations extends JerseyTest {
+public class TestTopicOperations extends BaseOperationsTest {
 
   private static class MockServletRequestFactory extends 
ServletRequestFactoryBase {
     @Override
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTopicAuthorizationExpression.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTopicAuthorizationExpression.java
new file mode 100644
index 0000000000..8e4b88aeb6
--- /dev/null
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/authorization/TestTopicAuthorizationExpression.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web.rest.authorization;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import ognl.OgnlException;
+import org.apache.gravitino.dto.requests.TopicCreateRequest;
+import org.apache.gravitino.dto.requests.TopicUpdatesRequest;
+import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpression;
+import org.apache.gravitino.server.web.rest.TopicOperations;
+import org.junit.jupiter.api.Test;
+
+public class TestTopicAuthorizationExpression {
+
+  @Test
+  public void testCreateTopic() throws NoSuchMethodException, OgnlException {
+    Method method =
+        TopicOperations.class.getMethod(
+            "createTopic", String.class, String.class, String.class, 
TopicCreateRequest.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("CATALOG::OWNER")));
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::CREATE_TOPIC", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+  }
+
+  @Test
+  public void testLoadTopics() throws OgnlException, NoSuchFieldException, 
IllegalAccessException {
+    Field loadTopicsAuthorizationExpressionField =
+        
TopicOperations.class.getDeclaredField("loadTopicsAuthorizationExpression");
+    loadTopicsAuthorizationExpressionField.setAccessible(true);
+    String loadTopicsAuthorizationExpression =
+        (String) loadTopicsAuthorizationExpressionField.get(null);
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new 
MockAuthorizationExpressionEvaluator(loadTopicsAuthorizationExpression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::CREATE_TOPIC", 
"CATALOG::CREATE_CATALOG")));
+
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::PRODUCE_TOPIC")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::PRODUCE_TOPIC")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "SCHEMA::PRODUCE_TOPIC", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::CONSUME_TOPIC")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CONSUME_TOPIC")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CONSUME_TOPIC")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CONSUME_TOPIC", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "SCHEMA::CONSUME_TOPIC", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::OWNER", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+  }
+
+  @Test
+  public void testAlterFileset() throws NoSuchMethodException, OgnlException {
+    Method method =
+        TopicOperations.class.getMethod(
+            "alterTopic",
+            String.class,
+            String.class,
+            String.class,
+            String.class,
+            TopicUpdatesRequest.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::CREATE_TOPIC", 
"CATALOG::CREATE_CATALOG")));
+
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::PRODUCE_TOPIC")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::PRODUCE_TOPIC")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "SCHEMA::PRODUCE_TOPIC", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::OWNER", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+  }
+
+  @Test
+  public void testDropFileset() throws NoSuchMethodException, OgnlException {
+    Method method =
+        TopicOperations.class.getMethod(
+            "dropTopic", String.class, String.class, String.class, 
String.class);
+    AuthorizationExpression authorizationExpressionAnnotation =
+        method.getAnnotation(AuthorizationExpression.class);
+    String expression = authorizationExpressionAnnotation.expression();
+    MockAuthorizationExpressionEvaluator mockEvaluator =
+        new MockAuthorizationExpressionEvaluator(expression);
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of()));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_CATALOG")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("METALAKE::OWNER")));
+
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::USE_METALAKE")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::CREATE_CATALOG")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::CREATE_TOPIC")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::CREATE_TOPIC", 
"CATALOG::CREATE_CATALOG")));
+
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("METALAKE::PRODUCE_TOPIC")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("CATALOG::PRODUCE_TOPIC")));
+    
assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC")));
+    assertFalse(
+        mockEvaluator.getResult(ImmutableSet.of("SCHEMA::PRODUCE_TOPIC", 
"CATALOG::USE_CATALOG")));
+    assertFalse(
+        mockEvaluator.getResult(
+            ImmutableSet.of(
+                "SCHEMA::PRODUCE_TOPIC", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+
+    assertFalse(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER")));
+    assertTrue(mockEvaluator.getResult(ImmutableSet.of("SCHEMA::OWNER", 
"CATALOG::USE_CATALOG")));
+    assertTrue(
+        mockEvaluator.getResult(
+            ImmutableSet.of("SCHEMA::OWNER", "SCHEMA::USE_SCHEMA", 
"CATALOG::USE_CATALOG")));
+  }
+}


Reply via email to