This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git

The following commit(s) were added to refs/heads/main by this push:
     new 0eec636655 [#6474] improvement(storage): batch listing securable 
objects in RoleMetaService (#6601)
0eec636655 is described below

commit 0eec636655211953c4dad941f5e3106ab8a3298b
Author: Eric Chang <e850...@gmail.com>
AuthorDate: Tue Mar 11 15:14:31 2025 +0800

    [#6474] improvement(storage): batch listing securable objects in 
RoleMetaService (#6601)
    
    <!--
    1. Title: [#<issue>] <type>(<scope>): <subject>
       Examples:
         - "[#123] feat(operator): support xxx"
         - "[#233] fix: check null before access result in xxx"
         - "[MINOR] refactor: fix typo in variable name"
         - "[MINOR] docs: fix typo in README"
         - "[#255] test: fix flaky test NameOfTheTest"
       Reference: https://www.conventionalcommits.org/en/v1.0.0/
    2. If the PR is unfinished, please mark this PR as draft.
    -->
    
    ### What changes were proposed in this pull request?
    
    Batch retrieving securable objects for `metalake`, `catalog`, `schema`,
    `table`, `topic`, `fileset` (fileset has been implemented at #6455).
    
    ### Why are the changes needed?
    
    Improve performance when role is bounded to lots of `SecurableObject`s.
    
    Fix: #6474
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    In `TestSecurableObjects.testAllTypeSecurableObjects`.
    
    Related Issue and PR: #6238 #6455
---
 .../gravitino/authorization/SecurableObjects.java  |   2 +-
 .../relational/mapper/MetalakeMetaMapper.java      |   5 +
 .../mapper/MetalakeMetaSQLProviderFactory.java     |   5 +
 .../storage/relational/mapper/ModelMetaMapper.java |   3 +
 .../mapper/ModelMetaSQLProviderFactory.java        |   5 +
 .../storage/relational/mapper/TableMetaMapper.java |   3 +
 .../mapper/TableMetaSQLProviderFactory.java        |   5 +
 .../storage/relational/mapper/TopicMetaMapper.java |   3 +
 .../mapper/TopicMetaSQLProviderFactory.java        |   5 +
 .../provider/base/MetalakeMetaBaseSQLProvider.java |  19 ++
 .../provider/base/ModelMetaBaseSQLProvider.java    |  18 ++
 .../provider/base/TableMetaBaseSQLProvider.java    |  19 ++
 .../provider/base/TopicMetaBaseSQLProvider.java    |  19 ++
 .../relational/service/MetadataObjectService.java  | 286 ++++++++++++++++++++-
 .../relational/service/RoleMetaService.java        | 158 ++++--------
 .../relational/service/TestSecurableObjects.java   |  28 +-
 16 files changed, 468 insertions(+), 115 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java 
b/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
index e6ca3a851c..b926964e15 100644
--- a/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
+++ b/api/src/main/java/org/apache/gravitino/authorization/SecurableObjects.java
@@ -105,7 +105,7 @@ public class SecurableObjects {
   }
 
   /**
-   * Create the table {@link SecurableObject} with the given securable schema 
object, fileset name
+   * Create the fileset {@link SecurableObject} with the given securable 
schema object, fileset name
    * and privileges.
    *
    * @param schema The schema securable object
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
index d5dc809bfe..90027ae151 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java
@@ -47,6 +47,11 @@ public interface MetalakeMetaMapper {
   @SelectProvider(type = MetalakeMetaSQLProviderFactory.class, method = 
"selectMetalakeMetaById")
   MetalakePO selectMetalakeMetaById(@Param("metalakeId") Long metalakeId);
 
+  @SelectProvider(
+      type = MetalakeMetaSQLProviderFactory.class,
+      method = "listMetalakePOsByMetalakeIds")
+  List<MetalakePO> listMetalakePOsByMetalakeIds(@Param("metalakeIds") 
List<Long> metalakeIds);
+
   @SelectProvider(
       type = MetalakeMetaSQLProviderFactory.class,
       method = "selectMetalakeIdMetaByName")
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
index 5b3ab58fbb..ee2c52a7dc 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java
@@ -20,6 +20,7 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.MetalakeMetaBaseSQLProvider;
@@ -68,6 +69,10 @@ public class MetalakeMetaSQLProviderFactory {
     return getProvider().selectMetalakeIdMetaByName(metalakeName);
   }
 
+  public static String listMetalakePOsByMetalakeIds(@Param("metalakeIds") 
List<Long> metalakeIds) {
+    return getProvider().listMetalakePOsByMetalakeIds(metalakeIds);
+  }
+
   public static String insertMetalakeMeta(@Param("metalakeMeta") MetalakePO 
metalakePO) {
     return getProvider().insertMetalakeMeta(metalakePO);
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
index 53aba8353d..b16c556c71 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
@@ -40,6 +40,9 @@ public interface ModelMetaMapper {
   @SelectProvider(type = ModelMetaSQLProviderFactory.class, method = 
"listModelPOsBySchemaId")
   List<ModelPO> listModelPOsBySchemaId(@Param("schemaId") Long schemaId);
 
+  @SelectProvider(type = ModelMetaSQLProviderFactory.class, method = 
"listModelPOsByModelIds")
+  List<ModelPO> listModelPOsByModelIds(@Param("modelIds") List<Long> modelIds);
+
   @SelectProvider(
       type = ModelMetaSQLProviderFactory.class,
       method = "selectModelMetaBySchemaIdAndModelName")
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
index 71c2050831..796009737b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.ModelMetaBaseSQLProvider;
@@ -62,6 +63,10 @@ public class ModelMetaSQLProviderFactory {
     return getProvider().listModelPOsBySchemaId(schemaId);
   }
 
+  public static String listModelPOsByModelIds(@Param("modelIds") List<Long> 
modelIds) {
+    return getProvider().listModelPOsByModelIds(modelIds);
+  }
+
   public static String selectModelMetaBySchemaIdAndModelName(
       @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
     return getProvider().selectModelMetaBySchemaIdAndModelName(schemaId, 
modelName);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
index a522459376..41621027bf 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java
@@ -41,6 +41,9 @@ public interface TableMetaMapper {
   @SelectProvider(type = TableMetaSQLProviderFactory.class, method = 
"listTablePOsBySchemaId")
   List<TablePO> listTablePOsBySchemaId(@Param("schemaId") Long schemaId);
 
+  @SelectProvider(type = TableMetaSQLProviderFactory.class, method = 
"listTablePOsByTableIds")
+  List<TablePO> listTablePOsByTableIds(@Param("tableIds") List<Long> tableIds);
+
   @SelectProvider(
       type = TableMetaSQLProviderFactory.class,
       method = "selectTableIdBySchemaIdAndName")
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
index c7152acd66..664b2423f8 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.TableMetaBaseSQLProvider;
@@ -54,6 +55,10 @@ public class TableMetaSQLProviderFactory {
     return getProvider().listTablePOsBySchemaId(schemaId);
   }
 
+  public static String listTablePOsByTableIds(@Param("tableIds") List<Long> 
tableIds) {
+    return getProvider().listTablePOsByTableIds(tableIds);
+  }
+
   public static String selectTableIdBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("tableName") String name) {
     return getProvider().selectTableIdBySchemaIdAndName(schemaId, name);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
index 8c194caff4..014ef229ee 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java
@@ -40,6 +40,9 @@ public interface TopicMetaMapper {
   @SelectProvider(type = TopicMetaSQLProviderFactory.class, method = 
"listTopicPOsBySchemaId")
   List<TopicPO> listTopicPOsBySchemaId(@Param("schemaId") Long schemaId);
 
+  @SelectProvider(type = TopicMetaSQLProviderFactory.class, method = 
"listTopicPOsByTopicIds")
+  List<TopicPO> listTopicPOsByTopicIds(@Param("topicIds") List<Long> topicIds);
+
   @SelectProvider(
       type = TopicMetaSQLProviderFactory.class,
       method = "selectTopicMetaBySchemaIdAndName")
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
index 39258c58dd..dbbfc21604 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java
@@ -20,6 +20,7 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.TopicMetaBaseSQLProvider;
@@ -63,6 +64,10 @@ public class TopicMetaSQLProviderFactory {
     return getProvider().listTopicPOsBySchemaId(schemaId);
   }
 
+  public static String listTopicPOsByTopicIds(@Param("topicIds") List<Long> 
topicIds) {
+    return getProvider().listTopicPOsByTopicIds(topicIds);
+  }
+
   public static String selectTopicMetaBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("topicName") String topicName) {
     return getProvider().selectTopicMetaBySchemaIdAndName(schemaId, topicName);
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/MetalakeMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/MetalakeMetaBaseSQLProvider.java
index b03a228a6c..5a041a3649 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/MetalakeMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/MetalakeMetaBaseSQLProvider.java
@@ -20,6 +20,7 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 
 import static 
org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper.TABLE_NAME;
 
+import java.util.List;
 import org.apache.gravitino.storage.relational.po.MetalakePO;
 import org.apache.ibatis.annotations.Param;
 
@@ -65,6 +66,24 @@ public class MetalakeMetaBaseSQLProvider {
         + " WHERE metalake_name = #{metalakeName} and deleted_at = 0";
   }
 
+  public String listMetalakePOsByMetalakeIds(@Param("metalakeIds") List<Long> 
metalakeIds) {
+    return "<script>"
+        + " SELECT metalake_id as metalakeId, metalake_name as metalakeName,"
+        + " metalake_comment as metalakeComment, properties,"
+        + " audit_info as auditInfo, schema_version as schemaVersion,"
+        + " current_version as currentVersion, last_version as lastVersion,"
+        + " deleted_at as deletedAt"
+        + " FROM "
+        + TABLE_NAME
+        + " WHERE deleted_at = 0"
+        + " AND metalake_id in ("
+        + "<foreach collection='metalakeIds' item='metalakeId' separator=','>"
+        + "#{metalakeId}"
+        + "</foreach>"
+        + ") "
+        + "</script>";
+  }
+
   public String insertMetalakeMeta(@Param("metalakeMeta") MetalakePO 
metalakePO) {
     return "INSERT INTO "
         + TABLE_NAME
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
index 0a78de9d09..aa0c42fe57 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.storage.relational.mapper.provider.base;
 
+import java.util.List;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import org.apache.gravitino.storage.relational.po.ModelPO;
 import org.apache.ibatis.annotations.Param;
@@ -66,6 +67,23 @@ public class ModelMetaBaseSQLProvider {
         + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
   }
 
+  public String listModelPOsByModelIds(List<Long> modelIds) {
+    return "<script>"
+        + " SELECT model_id AS modelId, model_name AS modelName, metalake_id 
AS metalakeId,"
+        + " catalog_id AS catalogId, schema_id AS schemaId, model_comment AS 
modelComment,"
+        + " model_properties AS modelProperties, model_latest_version AS"
+        + " modelLatestVersion, audit_info AS auditInfo, deleted_at AS 
deletedAt"
+        + " FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE deleted_at = 0"
+        + " AND model_id in ("
+        + "<foreach collection='modelIds' item='modelId' separator=','>"
+        + "#{modelId}"
+        + "</foreach>"
+        + ") "
+        + "</script>";
+  }
+
   public String selectModelMetaBySchemaIdAndModelName(
       @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
     return "SELECT model_id AS modelId, model_name AS modelName, metalake_id 
AS metalakeId,"
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
index 6169cc65a5..41f26e2340 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
@@ -20,6 +20,7 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 
 import static 
org.apache.gravitino.storage.relational.mapper.TableMetaMapper.TABLE_NAME;
 
+import java.util.List;
 import org.apache.gravitino.storage.relational.po.TablePO;
 import org.apache.ibatis.annotations.Param;
 
@@ -36,6 +37,24 @@ public class TableMetaBaseSQLProvider {
         + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
   }
 
+  public String listTablePOsByTableIds(List<Long> tableIds) {
+    return "<script>"
+        + " SELECT table_id as tableId, table_name as tableName,"
+        + " metalake_id as metalakeId, catalog_id as catalogId,"
+        + " schema_id as schemaId, audit_info as auditInfo,"
+        + " current_version as currentVersion, last_version as lastVersion,"
+        + " deleted_at as deletedAt"
+        + " FROM "
+        + TABLE_NAME
+        + " WHERE deleted_at = 0"
+        + " AND table_id in ("
+        + "<foreach collection='tableIds' item='tableId' separator=','>"
+        + "#{tableId}"
+        + "</foreach>"
+        + ") "
+        + "</script>";
+  }
+
   public String selectTableIdBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("tableName") String name) {
     return "SELECT table_id as tableId FROM "
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
index 34394780e4..f6a9ef0d61 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TopicMetaBaseSQLProvider.java
@@ -21,6 +21,7 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 
 import static 
org.apache.gravitino.storage.relational.mapper.TopicMetaMapper.TABLE_NAME;
 
+import java.util.List;
 import org.apache.gravitino.storage.relational.po.TopicPO;
 import org.apache.ibatis.annotations.Param;
 
@@ -90,6 +91,24 @@ public class TopicMetaBaseSQLProvider {
         + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
   }
 
+  public String listTopicPOsByTopicIds(@Param("topicIds") List<Long> topicIds) 
{
+    return "<script>"
+        + " SELECT topic_id as topicId, topic_name as topicName, metalake_id 
as metalakeId,"
+        + " catalog_id as catalogId, schema_id as schemaId,"
+        + " comment as comment, properties as properties, audit_info as 
auditInfo,"
+        + " current_version as currentVersion, last_version as lastVersion,"
+        + " deleted_at as deletedAt"
+        + " FROM "
+        + TABLE_NAME
+        + " WHERE deleted_at = 0"
+        + " AND topic_id in ("
+        + "<foreach collection='topicIds' item='topicId' separator=','>"
+        + "#{topicId}"
+        + "</foreach>"
+        + ") "
+        + "</script>";
+  }
+
   public String selectTopicMetaBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("topicName") String topicName) {
     return "SELECT topic_id as topicId, topic_name as topicName,"
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index e6790a602c..188c06ac23 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -20,9 +20,19 @@ package org.apache.gravitino.storage.relational.service;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
 import org.apache.gravitino.storage.relational.po.CatalogPO;
 import org.apache.gravitino.storage.relational.po.ColumnPO;
 import org.apache.gravitino.storage.relational.po.FilesetPO;
@@ -31,6 +41,9 @@ import org.apache.gravitino.storage.relational.po.ModelPO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.gravitino.storage.relational.po.TablePO;
 import org.apache.gravitino.storage.relational.po.TopicPO;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * MetadataObjectService is used for converting full name to entity id and 
converting entity id to
@@ -42,6 +55,8 @@ public class MetadataObjectService {
   private static final Joiner DOT_JOINER = Joiner.on(DOT);
   private static final Splitter DOT_SPLITTER = Splitter.on(DOT);
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetadataObjectService.class);
+
   private MetadataObjectService() {}
 
   public static long getMetadataObjectId(
@@ -90,7 +105,8 @@ public class MetadataObjectService {
     throw new IllegalArgumentException(String.format("Doesn't support the type 
%s", type));
   }
 
-  // Metadata object may be null because the metadata object can be deleted 
asynchronously.
+  // Metadata object may be null because the metadata object can be deleted
+  // asynchronously.
   @Nullable
   public static String getMetadataObjectFullName(String type, long 
metadataObjectId) {
     MetadataObject.Type metadataType = MetadataObject.Type.valueOf(type);
@@ -138,6 +154,8 @@ public class MetadataObjectService {
 
         case TABLE:
           TablePO tablePO = 
TableMetaService.getInstance().getTablePOById(objectId);
+          // if fullName is null:
+          // fullName = catalogPO.getSchemaName(),schemaPO.getTableName()
           if (tablePO != null) {
             fullName =
                 fullName != null
@@ -214,4 +232,270 @@ public class MetadataObjectService {
 
     return fullName;
   }
+
+  /**
+   * Retrieves a map of Metalake object IDs to their full names.
+   *
+   * @param metalakeIds A list of Metalake object IDs to fetch names for.
+   * @return A Map where the key is the Metalake ID and the value is the 
Metalake full name. The map
+   *     may contain null values for the names if its parent object is 
deleted. Returns an empty map
+   *     if no Metalake objects are found for the given IDs. {@code @example} 
value of metalake full
+   *     name: "metalake1.catalog1.schema1.table1"
+   */
+  public static Map<Long, String> getMetalakeObjectsFullName(List<Long> 
metalakeIds) {
+    List<MetalakePO> metalakePOs =
+        SessionUtils.getWithoutCommit(
+            MetalakeMetaMapper.class, mapper -> 
mapper.listMetalakePOsByMetalakeIds(metalakeIds));
+
+    if (metalakePOs == null || metalakePOs.isEmpty()) {
+      return new HashMap<>();
+    }
+
+    HashMap<Long, String> metalakeIdAndNameMap = new HashMap<>();
+
+    metalakePOs.forEach(
+        metalakePO ->
+            metalakeIdAndNameMap.put(metalakePO.getMetalakeId(), 
metalakePO.getMetalakeName()));
+
+    return metalakeIdAndNameMap;
+  }
+
+  /**
+   * Retrieves a map of Fileset object IDs to their full names.
+   *
+   * @param filesetIds A list of Fileset object IDs to fetch names for.
+   * @return A Map where the key is the Fileset ID and the value is the 
Fileset full name. The map
+   *     may contain null values for the names if its parent object is 
deleted. Returns an empty map
+   *     if no Fileset objects are found for the given IDs. {@code @example} 
value of fileset full
+   *     name: "catalog1.schema1.fileset1"
+   */
+  public static Map<Long, String> getFilesetObjectsFullName(List<Long> 
filesetIds) {
+    List<FilesetPO> filesetPOs =
+        SessionUtils.getWithoutCommit(
+            FilesetMetaMapper.class, mapper -> 
mapper.listFilesetPOsByFilesetIds(filesetIds));
+
+    if (filesetPOs == null || filesetPOs.isEmpty()) {
+      return new HashMap<>();
+    }
+
+    List<Long> schemaIds =
+        
filesetPOs.stream().map(FilesetPO::getSchemaId).collect(Collectors.toList());
+
+    Map<Long, String> schemaIdAndNameMap = getSchemaObjectsFullName(schemaIds);
+
+    HashMap<Long, String> filesetIdAndNameMap = new HashMap<>();
+
+    filesetPOs.forEach(
+        filesetPO -> {
+          // since the schema can be deleted, we need to check the null value,
+          // and when schema is deleted, we will set fullName of filesetPO to 
null.
+          String schemaName = 
schemaIdAndNameMap.getOrDefault(filesetPO.getSchemaId(), null);
+          if (schemaName == null) {
+            LOG.warn("The schema of fileset {} may be deleted", 
filesetPO.getFilesetId());
+            filesetIdAndNameMap.put(filesetPO.getFilesetId(), null);
+            return;
+          }
+
+          String fullName = DOT_JOINER.join(schemaName, 
filesetPO.getFilesetName());
+          filesetIdAndNameMap.put(filesetPO.getFilesetId(), fullName);
+        });
+
+    return filesetIdAndNameMap;
+  }
+
+  /**
+   * Retrieves a map of Model object IDs to their full names.
+   *
+   * @param modelIds A list of Model object IDs to fetch names for.
+   * @return A Map where the key is the Model ID and the value is the Model 
full name. The map may
+   *     contain null values for the names if its parent object is deleted. 
Returns an empty map if
+   *     no Model objects are found for the given IDs. {@code @example} value 
of model full name:
+   *     "catalog1.schema1.model1"
+   */
+  public static Map<Long, String> getModelObjectsFullName(List<Long> modelIds) 
{
+    List<ModelPO> modelPOs =
+        SessionUtils.getWithoutCommit(
+            ModelMetaMapper.class, mapper -> 
mapper.listModelPOsByModelIds(modelIds));
+
+    if (modelPOs == null || modelPOs.isEmpty()) {
+      return new HashMap<>();
+    }
+
+    List<Long> schemaIds = 
modelPOs.stream().map(ModelPO::getSchemaId).collect(Collectors.toList());
+
+    Map<Long, String> schemaIdAndNameMap = getSchemaObjectsFullName(schemaIds);
+
+    HashMap<Long, String> modelIdAndNameMap = new HashMap<>();
+
+    modelPOs.forEach(
+        modelPO -> {
+          // since the schema can be deleted, we need to check the null value,
+          // and when schema is deleted, we will set fullName of modelPO to 
null.
+          String schemaName = 
schemaIdAndNameMap.getOrDefault(modelPO.getSchemaId(), null);
+          if (schemaName == null) {
+            LOG.warn("The schema of model {} may be deleted", 
modelPO.getModelId());
+            modelIdAndNameMap.put(modelPO.getModelId(), null);
+            return;
+          }
+
+          String fullName = DOT_JOINER.join(schemaName, 
modelPO.getModelName());
+          modelIdAndNameMap.put(modelPO.getModelId(), fullName);
+        });
+
+    return modelIdAndNameMap;
+  }
+
+  /**
+   * Retrieves a map of Table object IDs to their full names.
+   *
+   * @param tableIds A list of Table object IDs to fetch names for.
+   * @return A Map where the key is the Table ID and the value is the Table 
full name. The map may
+   *     contain null values for the names if its parent object is deleted. 
Returns an empty map if
+   *     no Table objects are found for the given IDs. {@code @example} value 
of table full name:
+   *     "catalog1.schema1.table1"
+   */
+  public static Map<Long, String> getTableObjectsFullName(List<Long> tableIds) 
{
+    List<TablePO> tablePOs =
+        SessionUtils.getWithoutCommit(
+            TableMetaMapper.class, mapper -> 
mapper.listTablePOsByTableIds(tableIds));
+
+    if (tablePOs == null || tablePOs.isEmpty()) {
+      return new HashMap<>();
+    }
+
+    tablePOs.stream().map(TablePO::getCatalogId).collect(Collectors.toList());
+    List<Long> schemaIds = 
tablePOs.stream().map(TablePO::getSchemaId).collect(Collectors.toList());
+
+    Map<Long, String> schemaIdAndNameMap = getSchemaObjectsFullName(schemaIds);
+
+    HashMap<Long, String> tableIdAndNameMap = new HashMap<>();
+
+    tablePOs.forEach(
+        tablePO -> {
+          // since the schema can be deleted, we need to check the null value,
+          // and when schema is deleted, we will set fullName of tablePO to
+          // null
+          String schemaName = 
schemaIdAndNameMap.getOrDefault(tablePO.getSchemaId(), null);
+          if (schemaName == null) {
+            LOG.warn("The schema of table {} may be deleted", 
tablePO.getTableId());
+            tableIdAndNameMap.put(tablePO.getTableId(), null);
+            return;
+          }
+
+          String fullName = DOT_JOINER.join(schemaName, 
tablePO.getTableName());
+          tableIdAndNameMap.put(tablePO.getTableId(), fullName);
+        });
+
+    return tableIdAndNameMap;
+  }
+
+  /**
+   * Retrieves a map of Topic object IDs to their full names.
+   *
+   * @param topicIds A list of Topic object IDs to fetch names for.
+   * @return A Map where the key is the Topic ID and the value is the Topic 
full name. The map may
+   *     contain null values for the names if its parent object is deleted. 
Returns an empty map if
+   *     no Topic objects are found for the given IDs. {@code @example} value 
of topic full name:
+   *     "catalog1.schema1.topic1"
+   */
+  public static Map<Long, String> getTopicObjectsFullName(List<Long> topicIds) 
{
+    List<TopicPO> topicPOs =
+        SessionUtils.getWithoutCommit(
+            TopicMetaMapper.class, mapper -> 
mapper.listTopicPOsByTopicIds(topicIds));
+
+    if (topicPOs == null || topicPOs.isEmpty()) {
+      return new HashMap<>();
+    }
+
+    List<Long> schemaIds = 
topicPOs.stream().map(TopicPO::getSchemaId).collect(Collectors.toList());
+
+    Map<Long, String> schemaIdAndNameMap = getSchemaObjectsFullName(schemaIds);
+
+    HashMap<Long, String> topicIdAndNameMap = new HashMap<>();
+
+    topicPOs.forEach(
+        topicPO -> {
+          // since the schema can be deleted, we need to check the null value,
+          // and when schema is deleted, we will set fullName of topicPO to 
null.
+          String schemaName = 
schemaIdAndNameMap.getOrDefault(topicPO.getSchemaId(), null);
+          if (schemaName == null) {
+            LOG.warn("The schema of topic {} may be deleted", 
topicPO.getTopicId());
+            topicIdAndNameMap.put(topicPO.getTopicId(), null);
+            return;
+          }
+
+          String fullName = DOT_JOINER.join(schemaName, 
topicPO.getTopicName());
+          topicIdAndNameMap.put(topicPO.getTopicId(), fullName);
+        });
+
+    return topicIdAndNameMap;
+  }
+
+  /**
+   * Retrieves a map of Catalog object IDs to their full names.
+   *
+   * @param catalogIds A list of Catalog object IDs to fetch names for.
+   * @return A Map where the key is the Catalog ID and the value is the 
Catalog full name. The map
+   *     may contain null values for the names if its parent object is 
deleted. Returns an empty map
+   *     if no Catalog objects are found for the given IDs. {@code @example} 
value of catalog full
+   *     name: "catalog1"
+   */
+  public static Map<Long, String> getCatalogObjectsFullName(List<Long> 
catalogIds) {
+    List<CatalogPO> catalogPOs =
+        SessionUtils.getWithoutCommit(
+            CatalogMetaMapper.class, mapper -> 
mapper.listCatalogPOsByCatalogIds(catalogIds));
+
+    if (catalogPOs == null || catalogPOs.isEmpty()) {
+      return new HashMap<>();
+    }
+
+    HashMap<Long, String> catalogIdAndNameMap = new HashMap<>();
+
+    catalogPOs.forEach(
+        catalogPO -> catalogIdAndNameMap.put(catalogPO.getCatalogId(), 
catalogPO.getCatalogName()));
+
+    return catalogIdAndNameMap;
+  }
+
+  /**
+   * Retrieves a map of Schema object IDs to their full names.
+   *
+   * @param schemaIds A list of Schema object IDs to fetch names for.
+   * @return A Map where the key is the Schema ID and the value is the Schema 
full name. The map may
+   *     contain null values for the names if its parent object is deleted. 
Returns an empty map if
+   *     no Schema objects are found for the given IDs. {@code @example} value 
of schema full name:
+   *     "catalog1.schema1"
+   */
+  public static Map<Long, String> getSchemaObjectsFullName(List<Long> 
schemaIds) {
+    List<SchemaPO> schemaPOs =
+        SessionUtils.getWithoutCommit(
+            SchemaMetaMapper.class, mapper -> 
mapper.listSchemaPOsBySchemaIds(schemaIds));
+
+    if (schemaPOs == null || schemaPOs.isEmpty()) {
+      return new HashMap<>();
+    }
+
+    List<Long> catalogIds =
+        
schemaPOs.stream().map(SchemaPO::getCatalogId).collect(Collectors.toList());
+
+    Map<Long, String> catalogIdAndNameMap = 
getCatalogObjectsFullName(catalogIds);
+
+    HashMap<Long, String> schemaIdAndNameMap = new HashMap<>();
+
+    schemaPOs.forEach(
+        schemaPO -> {
+          String catalogName = 
catalogIdAndNameMap.getOrDefault(schemaPO.getCatalogId(), null);
+          if (catalogName == null) {
+            LOG.warn("The catalog of schema {} may be deleted", 
schemaPO.getSchemaId());
+            schemaIdAndNameMap.put(schemaPO.getSchemaId(), null);
+            return;
+          }
+
+          String fullName = DOT_JOINER.join(catalogName, 
schemaPO.getSchemaName());
+
+          schemaIdAndNameMap.put(schemaPO.getSchemaId(), fullName);
+        });
+
+    return schemaIdAndNameMap;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
index 41c4c409ca..3f27902d0c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/RoleMetaService.java
@@ -18,18 +18,18 @@
  */
 package org.apache.gravitino.storage.relational.service;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -42,18 +42,12 @@ import 
org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.RoleEntity;
-import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
-import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.RoleMetaMapper;
-import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import org.apache.gravitino.storage.relational.mapper.UserRoleRelMapper;
-import org.apache.gravitino.storage.relational.po.CatalogPO;
-import org.apache.gravitino.storage.relational.po.FilesetPO;
 import org.apache.gravitino.storage.relational.po.RolePO;
-import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
 import org.apache.gravitino.storage.relational.utils.POConverters;
@@ -64,9 +58,6 @@ import org.slf4j.LoggerFactory;
 
 /** The service class for role metadata. It provides the basic database 
operations for role. */
 public class RoleMetaService {
-  private static final String DOT = ".";
-  private static final Joiner DOT_JOINER = Joiner.on(DOT);
-
   private static final Logger LOG = 
LoggerFactory.getLogger(RoleMetaService.class);
   private static final RoleMetaService INSTANCE = new RoleMetaService();
 
@@ -369,46 +360,51 @@ public class RoleMetaService {
         .collect(Collectors.groupingBy(SecurableObjectPO::getType))
         .forEach(
             (type, objects) -> {
-              // If the type is Fileset, use the batch retrieval interface;
-              // otherwise, use the single retrieval interface
-              if (type.equals(MetadataObject.Type.FILESET.name())) {
-                List<Long> filesetIds =
-                    objects.stream()
-                        .map(SecurableObjectPO::getMetadataObjectId)
-                        .collect(Collectors.toList());
-
-                Map<Long, String> filesetIdAndNameMap = 
getFilesetObjectFullNames(filesetIds);
-
-                for (SecurableObjectPO securableObjectPO : objects) {
-                  String fullName =
-                      
filesetIdAndNameMap.get(securableObjectPO.getMetadataObjectId());
-                  if (fullName != null) {
-                    securableObjects.add(
-                        POConverters.fromSecurableObjectPO(
-                            fullName, securableObjectPO, 
getType(securableObjectPO.getType())));
-                  } else {
-                    LOG.warn(
-                        "The securable object {} {} may be deleted",
-                        securableObjectPO.getMetadataObjectId(),
-                        securableObjectPO.getType());
-                  }
-                }
-              } else {
-                // todoļ¼što get other securable object fullNames using batch 
retrieving
-                for (SecurableObjectPO securableObjectPO : objects) {
-                  String fullName =
-                      MetadataObjectService.getMetadataObjectFullName(
-                          securableObjectPO.getType(), 
securableObjectPO.getMetadataObjectId());
-                  if (fullName != null) {
-                    securableObjects.add(
-                        POConverters.fromSecurableObjectPO(
-                            fullName, securableObjectPO, 
getType(securableObjectPO.getType())));
-                  } else {
-                    LOG.warn(
-                        "The securable object {} {} may be deleted",
-                        securableObjectPO.getMetadataObjectId(),
-                        securableObjectPO.getType());
-                  }
+              List<Long> objectIds =
+                  objects.stream()
+                      .map(SecurableObjectPO::getMetadataObjectId)
+                      .collect(Collectors.toList());
+
+              Map<MetadataObject.Type, Function<List<Long>, Map<Long, String>>>
+                  objectFullNameGetterFnMap =
+                      ImmutableMap.of(
+                          MetadataObject.Type.METALAKE,
+                          MetadataObjectService::getMetalakeObjectsFullName,
+                          MetadataObject.Type.CATALOG,
+                          MetadataObjectService::getCatalogObjectsFullName,
+                          MetadataObject.Type.SCHEMA,
+                          MetadataObjectService::getSchemaObjectsFullName,
+                          MetadataObject.Type.TABLE,
+                          MetadataObjectService::getTableObjectsFullName,
+                          MetadataObject.Type.FILESET,
+                          MetadataObjectService::getFilesetObjectsFullName,
+                          MetadataObject.Type.MODEL,
+                          MetadataObjectService::getModelObjectsFullName,
+                          MetadataObject.Type.TOPIC,
+                          MetadataObjectService::getTopicObjectsFullName);
+
+              // dynamically calling getter function based on type
+              Map<Long, String> objectIdAndNameMap =
+                  Optional.of(MetadataObject.Type.valueOf(type))
+                      .map(objectFullNameGetterFnMap::get)
+                      .map(getter -> getter.apply(objectIds))
+                      .orElseThrow(
+                          () ->
+                              // for example: MetadataObject.Type.COLUMN
+                              new IllegalArgumentException(
+                                  "Unsupported metadata object type: " + 
type));
+
+              for (SecurableObjectPO securableObjectPO : objects) {
+                String fullName = 
objectIdAndNameMap.get(securableObjectPO.getMetadataObjectId());
+                if (fullName != null) {
+                  securableObjects.add(
+                      POConverters.fromSecurableObjectPO(
+                          fullName, securableObjectPO, 
getType(securableObjectPO.getType())));
+                } else {
+                  LOG.warn(
+                      "The securable object {} {} may be deleted",
+                      securableObjectPO.getMetadataObjectId(),
+                      securableObjectPO.getType());
                 }
               }
             });
@@ -443,64 +439,4 @@ public class RoleMetaService {
   private static String getEntityType(SecurableObject securableObject) {
     return securableObject.type().name();
   }
-
-  public static Map<Long, String> getFilesetObjectFullNames(List<Long> ids) {
-    List<FilesetPO> filesetPOs =
-        SessionUtils.getWithoutCommit(
-            FilesetMetaMapper.class, mapper -> 
mapper.listFilesetPOsByFilesetIds(ids));
-
-    if (filesetPOs == null || filesetPOs.isEmpty()) {
-      return new HashMap<>();
-    }
-
-    List<Long> catalogIds =
-        
filesetPOs.stream().map(FilesetPO::getCatalogId).collect(Collectors.toList());
-    List<Long> schemaIds =
-        
filesetPOs.stream().map(FilesetPO::getSchemaId).collect(Collectors.toList());
-
-    Map<Long, String> catalogIdAndNameMap = getCatalogIdAndNameMap(catalogIds);
-    Map<Long, String> schemaIdAndNameMap = getSchemaIdAndNameMap(schemaIds);
-
-    HashMap<Long, String> filesetIdAndNameMap = new HashMap<>();
-
-    filesetPOs.forEach(
-        filesetPO -> {
-          // since the catalog or schema can be deleted, we need to check the 
null value,
-          // and when catalog or schema is deleted, we will set catalogName or 
schemaName to null
-          String catalogName = 
catalogIdAndNameMap.getOrDefault(filesetPO.getCatalogId(), null);
-          if (catalogName == null) {
-            LOG.warn("The catalog of fileset {} may be deleted", 
filesetPO.getFilesetId());
-            filesetIdAndNameMap.put(filesetPO.getFilesetId(), null);
-            return;
-          }
-
-          String schemaName = 
schemaIdAndNameMap.getOrDefault(filesetPO.getSchemaId(), null);
-          if (schemaName == null) {
-            LOG.warn("The schema of fileset {} may be deleted", 
filesetPO.getFilesetId());
-            filesetIdAndNameMap.put(filesetPO.getFilesetId(), null);
-            return;
-          }
-
-          String fullName = DOT_JOINER.join(catalogName, schemaName, 
filesetPO.getFilesetName());
-          filesetIdAndNameMap.put(filesetPO.getFilesetId(), fullName);
-        });
-
-    return filesetIdAndNameMap;
-  }
-
-  public static Map<Long, String> getSchemaIdAndNameMap(List<Long> schemaIds) {
-    List<SchemaPO> schemaPOS =
-        SessionUtils.getWithoutCommit(
-            SchemaMetaMapper.class, mapper -> 
mapper.listSchemaPOsBySchemaIds(schemaIds));
-    return schemaPOS.stream()
-        .collect(Collectors.toMap(SchemaPO::getSchemaId, 
SchemaPO::getSchemaName));
-  }
-
-  public static Map<Long, String> getCatalogIdAndNameMap(List<Long> 
catalogIds) {
-    List<CatalogPO> catalogPOs =
-        SessionUtils.getWithoutCommit(
-            CatalogMetaMapper.class, mapper -> 
mapper.listCatalogPOsByCatalogIds(catalogIds));
-    return catalogPOs.stream()
-        .collect(Collectors.toMap(CatalogPO::getCatalogId, 
CatalogPO::getCatalogName));
-  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
index 7d18b45b46..b5cc85112a 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
@@ -52,9 +52,13 @@ public class TestSecurableObjects extends TestJDBCBackend {
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
     BaseMetalake metalake =
-        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName + 
"2", auditInfo);
     backend.insert(metalake, false);
 
+    BaseMetalake metalake2 =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake2, false);
+
     CatalogEntity catalog =
         createCatalog(
             RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"), 
"catalog", auditInfo);
@@ -75,6 +79,7 @@ public class TestSecurableObjects extends TestJDBCBackend {
             "fileset",
             auditInfo);
     backend.insert(fileset, false);
+
     TableEntity table =
         createTableEntity(
             RandomIdGenerator.INSTANCE.nextId(),
@@ -82,6 +87,7 @@ public class TestSecurableObjects extends TestJDBCBackend {
             "table",
             auditInfo);
     backend.insert(table, false);
+
     TopicEntity topic =
         createTopicEntity(
             RandomIdGenerator.INSTANCE.nextId(),
@@ -90,6 +96,14 @@ public class TestSecurableObjects extends TestJDBCBackend {
             auditInfo);
     backend.insert(topic, false);
 
+    SecurableObject metalakeObject =
+        SecurableObjects.ofMetalake(
+            metalake.name(), 
Lists.newArrayList(Privileges.UseCatalog.allow()));
+
+    SecurableObject metalakeObject2 =
+        SecurableObjects.ofMetalake(
+            metalake2.name(), 
Lists.newArrayList(Privileges.UseCatalog.allow()));
+
     SecurableObject catalogObject =
         SecurableObjects.ofCatalog(
             "catalog",
@@ -98,18 +112,28 @@ public class TestSecurableObjects extends TestJDBCBackend {
     SecurableObject schemaObject =
         SecurableObjects.ofSchema(
             catalogObject, "schema", 
Lists.newArrayList(Privileges.UseSchema.allow()));
+
     SecurableObject tableObject =
         SecurableObjects.ofTable(
             schemaObject, "table", 
Lists.newArrayList(Privileges.SelectTable.allow()));
+
     SecurableObject filesetObject =
         SecurableObjects.ofFileset(
             schemaObject, "fileset", 
Lists.newArrayList(Privileges.ReadFileset.allow()));
+
     SecurableObject topicObject =
         SecurableObjects.ofTopic(
             schemaObject, "topic", 
Lists.newArrayList(Privileges.ConsumeTopic.deny()));
 
     ArrayList<SecurableObject> securableObjects =
-        Lists.newArrayList(catalogObject, schemaObject, tableObject, 
filesetObject, topicObject);
+        Lists.newArrayList(
+            metalakeObject,
+            metalakeObject2,
+            catalogObject,
+            schemaObject,
+            tableObject,
+            filesetObject,
+            topicObject);
     securableObjects.sort(Comparator.comparing(MetadataObject::fullName));
 
     RoleEntity role1 =


Reply via email to