This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a880b3f58 [#7624] feat(policy): Add core logic implementation for 
policy management (part-2) (#7796)
8a880b3f58 is described below

commit 8a880b3f5841b30a0105fe338c98bbf1ff712524
Author: mchades <[email protected]>
AuthorDate: Wed Aug 6 14:49:32 2025 +0800

    [#7624] feat(policy): Add core logic implementation for policy management 
(part-2) (#7796)
    
    ### What changes were proposed in this pull request?
    
    Add core logic implementation for policy relation management
    
    ### Why are the changes needed?
    
    Fix: #7624
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    tests added
---
 .../gravitino/SupportsRelationOperations.java      |  53 +++
 .../org/apache/gravitino/meta/GenericEntity.java   | 161 ++++++++
 .../apache/gravitino/policy/PolicyDispatcher.java  |   7 +-
 .../org/apache/gravitino/policy/PolicyManager.java | 177 ++++++++-
 .../gravitino/storage/relational/JDBCBackend.java  |  50 ++-
 .../storage/relational/RelationalEntityStore.java  |  24 ++
 .../relational/service/MetadataObjectService.java  |  36 ++
 .../relational/service/PolicyMetaService.java      |  61 ++-
 .../java/org/apache/gravitino/tag/TagManager.java  |   2 +
 .../apache/gravitino/policy/TestPolicyManager.java | 426 +++++++++++++++++++++
 .../relational/service/TestPolicyMetaService.java  | 102 +++--
 11 files changed, 1000 insertions(+), 99 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java 
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index 8827c10665..8aea5b2384 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -20,6 +20,7 @@ package org.apache.gravitino;
 
 import java.io.IOException;
 import java.util.List;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
 
 /**
  * This is an extended interface. This is mainly used for strengthen the 
ability of querying
@@ -76,6 +77,30 @@ public interface SupportsRelationOperations {
       Type relType, NameIdentifier nameIdentifier, Entity.EntityType 
identType, boolean allFields)
       throws IOException;
 
+  /**
+   * Get a specific entity that is related to a given source entity.
+   *
+   * <p>For example, this can be used to get a specific policy that is 
directly associated with a
+   * metadata object.
+   *
+   * @param <E> The type of the entity to be returned.
+   * @param relType The type of relation.
+   * @param srcIdentifier The identifier of the source entity in the relation 
(e.g., a metadata
+   *     object).
+   * @param srcType The type of the source entity.
+   * @param destEntityIdent The identifier of the target entity to retrieve 
(e.g., a policy).
+   * @return The specific entity that is related to the source entity.
+   * @throws IOException If a storage-related error occurs.
+   * @throws NoSuchEntityException If the source entity or the target related 
entity does not exist,
+   *     or if the relation does not exist.
+   */
+  <E extends Entity & HasIdentifier> E getEntityByRelation(
+      Type relType,
+      NameIdentifier srcIdentifier,
+      Entity.EntityType srcType,
+      NameIdentifier destEntityIdent)
+      throws IOException, NoSuchEntityException;
+
   /**
    * insert a relation between two entities
    *
@@ -95,4 +120,32 @@ public interface SupportsRelationOperations {
       Entity.EntityType dstType,
       boolean override)
       throws IOException;
+
+  /**
+   * Updates the relations for a given entity by adding a set of new relations 
and removing another
+   * set of relations.
+   *
+   * @param <E> The type of the entity returned in the list, which represents 
the final state of
+   *     related entities.
+   * @param relType The type of relation to update.
+   * @param srcEntityIdent The identifier of the source entity whose relations 
are being updated.
+   * @param srcEntityType The type of the source entity, which is the entity 
whose relations are
+   *     being updated.
+   * @param destEntitiesToAdd An array of identifiers for entities to be 
associated with.
+   * @param destEntitiesToRemove An array of identifiers for entities to be 
disassociated from.
+   * @return A list of entities that are related to the given entity after the 
update.
+   * @throws IOException If a storage-related error occurs.
+   * @throws NoSuchEntityException If any of the specified entities does not 
exist.
+   * @throws EntityAlreadyExistsException If a relation to be added already 
exists.
+   */
+  default <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
+      Type relType,
+      NameIdentifier srcEntityIdent,
+      Entity.EntityType srcEntityType,
+      NameIdentifier[] destEntitiesToAdd,
+      NameIdentifier[] destEntitiesToRemove)
+      throws IOException, NoSuchEntityException, EntityAlreadyExistsException {
+    throw new UnsupportedOperationException(
+        "updateEntityRelations is not supported by this implementation");
+  }
 }
diff --git a/core/src/main/java/org/apache/gravitino/meta/GenericEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/GenericEntity.java
new file mode 100644
index 0000000000..0f8c899800
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/GenericEntity.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.meta;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Field;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.policy.PolicyManager;
+
+/**
+ * A generic entity that mainly used for temporary transactions or internal 
operations.
+ *
+ * <p>For example, it can be used to represent different types of entities for 
{@link
+ * PolicyManager#listMetadataObjectsForPolicy(String, String)} intermediate 
result then can be
+ * converted to metadata objects.
+ */
+@EqualsAndHashCode
+@ToString
+public class GenericEntity implements Entity, HasIdentifier {
+
+  public static final Field ID = Field.required("id", Long.class, "The unique 
identifier");
+  public static final Field ENTITY_TYPE =
+      Field.required("entityType", EntityType.class, "The entity's type");
+
+  private Long id;
+  private EntityType entityType;
+  private String name;
+
+  private GenericEntity() {}
+
+  /**
+   * A map of fields and their corresponding values.
+   *
+   * @return An unmodifiable map containing the entity's fields and values.
+   */
+  @Override
+  public Map<Field, Object> fields() {
+    Map<Field, Object> fields = new HashMap<>();
+    fields.put(ID, id);
+    fields.put(ENTITY_TYPE, type());
+
+    return Collections.unmodifiableMap(fields);
+  }
+
+  /**
+   * The name of the entity. May be null if not set.
+   *
+   * @return The name of the entity.
+   */
+  @Override
+  public String name() {
+    return name;
+  }
+
+  /**
+   * The unique id of the entity.
+   *
+   * @return The unique id of the entity.
+   */
+  @Override
+  public Long id() {
+    return id;
+  }
+
+  /**
+   * Retrieves the type of the entity.
+   *
+   * @return the type of the entity.
+   */
+  @Override
+  public EntityType type() {
+    return entityType;
+  }
+
+  /** Builder class for creating instances of {@link GenericEntity}. */
+  public static class Builder {
+    private final GenericEntity entity;
+
+    /** Constructs a new {@link Builder}. */
+    private Builder() {
+      entity = new GenericEntity();
+    }
+
+    /**
+     * Sets the unique identifier of the entity.
+     *
+     * @param id the unique identifier of the entity.
+     * @return the builder instance.
+     */
+    public Builder withId(Long id) {
+      entity.id = id;
+      return this;
+    }
+
+    /**
+     * Sets the name of the entity.
+     *
+     * @param name the name of the entity.
+     * @return the builder instance.
+     */
+    public Builder withName(String name) {
+      entity.name = name;
+      return this;
+    }
+
+    /**
+     * Sets the type of the entity.
+     *
+     * @param type the type of the entity.
+     * @return the builder instance.
+     */
+    public Builder withEntityType(EntityType type) {
+      entity.entityType = type;
+      return this;
+    }
+
+    /**
+     * Builds the {@link GenericEntity} instance after validation.
+     *
+     * @return the constructed and validated {@link GenericEntity} instance.
+     */
+    public GenericEntity build() {
+      GenericEntity genericEntity = new GenericEntity();
+      genericEntity.id = entity.id;
+      genericEntity.name = entity.name;
+      genericEntity.entityType = entity.entityType;
+      genericEntity.validate();
+      return genericEntity;
+    }
+  }
+
+  /**
+   * Creates a new instance of {@link Builder}.
+   *
+   * @return The new instance.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/policy/PolicyDispatcher.java 
b/core/src/main/java/org/apache/gravitino/policy/PolicyDispatcher.java
index e9e449a770..571dc25e09 100644
--- a/core/src/main/java/org/apache/gravitino/policy/PolicyDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/policy/PolicyDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.policy;
 
+import java.util.Arrays;
 import java.util.Set;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.annotation.Evolving;
@@ -171,7 +172,11 @@ public interface PolicyDispatcher {
    * @param metadataObject the metadata object for which associated policies
    * @return The array of policy names associated with the specified metadata 
object.
    */
-  String[] listPoliciesForMetadataObject(String metalake, MetadataObject 
metadataObject);
+  default String[] listPoliciesForMetadataObject(String metalake, 
MetadataObject metadataObject) {
+    return Arrays.stream(listPolicyInfosForMetadataObject(metalake, 
metadataObject))
+        .map(Policy::name)
+        .toArray(String[]::new);
+  }
 
   /**
    * List all the policies with detailed information associated with a 
metadata object under a
diff --git a/core/src/main/java/org/apache/gravitino/policy/PolicyManager.java 
b/core/src/main/java/org/apache/gravitino/policy/PolicyManager.java
index 6c1ed5794f..2e7485c5fa 100644
--- a/core/src/main/java/org/apache/gravitino/policy/PolicyManager.java
+++ b/core/src/main/java/org/apache/gravitino/policy/PolicyManager.java
@@ -21,9 +21,12 @@ package org.apache.gravitino.policy;
 
 import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Set;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
@@ -32,20 +35,24 @@ import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchPolicyException;
+import org.apache.gravitino.exceptions.PolicyAlreadyAssociatedException;
 import org.apache.gravitino.exceptions.PolicyAlreadyExistsException;
 import org.apache.gravitino.lock.LockType;
 import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.relational.service.MetadataObjectService;
+import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("unused") // todo: remove this when all the methods are 
implemented
 public class PolicyManager implements PolicyDispatcher {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(PolicyManager.class);
@@ -222,17 +229,71 @@ public class PolicyManager implements PolicyDispatcher {
 
   @Override
   public MetadataObject[] listMetadataObjectsForPolicy(String metalake, String 
policyName) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
+    NameIdentifier policyIdent = NameIdentifierUtil.ofPolicy(metalake, 
policyName);
+    checkMetalake(NameIdentifier.of(metalake), entityStore);
 
-  @Override
-  public String[] listPoliciesForMetadataObject(String metalake, 
MetadataObject metadataObject) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    return TreeLockUtils.doWithTreeLock(
+        policyIdent,
+        LockType.READ,
+        () -> {
+          try {
+            if (!entityStore.exists(policyIdent, Entity.EntityType.POLICY)) {
+              throw new NoSuchPolicyException(
+                  "Policy with name %s under metalake %s does not exist", 
policyName, metalake);
+            }
+
+            List<GenericEntity> entities =
+                entityStore
+                    .relationOperations()
+                    .listEntitiesByRelation(
+                        
SupportsRelationOperations.Type.POLICY_METADATA_OBJECT_REL,
+                        policyIdent,
+                        Entity.EntityType.POLICY);
+            return MetadataObjectService.fromGenericEntities(entities)
+                .toArray(new MetadataObject[0]);
+          } catch (IOException e) {
+            LOG.error("Failed to list metadata objects for policy {}", 
policyName, e);
+            throw new RuntimeException(e);
+          }
+        });
   }
 
   @Override
   public Policy[] listPolicyInfosForMetadataObject(String metalake, 
MetadataObject metadataObject) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    NameIdentifier entityIdent = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+    Entity.EntityType entityType = 
MetadataObjectUtil.toEntityType(metadataObject);
+
+    MetadataObjectUtil.checkMetadataObject(metalake, metadataObject);
+    Preconditions.checkArgument(
+        Policy.SUPPORTS_ALL_OBJECT_TYPES.contains(metadataObject.type()),
+        "Cannot list policies for unsupported metadata object type %s",
+        metadataObject.type());
+    checkMetalake(NameIdentifier.of(metalake), entityStore);
+
+    return TreeLockUtils.doWithTreeLock(
+        entityIdent,
+        LockType.READ,
+        () -> {
+          try {
+            return entityStore.relationOperations()
+                .listEntitiesByRelation(
+                    SupportsRelationOperations.Type.POLICY_METADATA_OBJECT_REL,
+                    entityIdent,
+                    entityType,
+                    true /* allFields */)
+                .stream()
+                .map(entity -> (PolicyEntity) entity)
+                .toArray(PolicyEntity[]::new);
+          } catch (NoSuchEntityException e) {
+            throw new NoSuchMetadataObjectException(
+                e,
+                "Failed to list policies for metadata object %s due to not 
found",
+                metadataObject);
+          } catch (IOException e) {
+            LOG.error("Failed to list policies for metadata object {}", 
metadataObject, e);
+            throw new RuntimeException(e);
+          }
+        });
   }
 
   @Override
@@ -241,13 +302,111 @@ public class PolicyManager implements PolicyDispatcher {
       MetadataObject metadataObject,
       String[] policiesToAdd,
       String[] policiesToRemove) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    Preconditions.checkArgument(
+        Policy.SUPPORTS_ALL_OBJECT_TYPES.contains(metadataObject.type()),
+        "Cannot associate policies for unsupported metadata object type %s",
+        metadataObject.type());
+
+    NameIdentifier entityIdent = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+    Entity.EntityType entityType = 
MetadataObjectUtil.toEntityType(metadataObject);
+
+    MetadataObjectUtil.checkMetadataObject(metalake, metadataObject);
+
+    // Remove all the policies that are both set to add and remove
+    Set<String> policiesToAddSet =
+        policiesToAdd == null ? Sets.newHashSet() : 
Sets.newHashSet(policiesToAdd);
+    Set<String> policiesToRemoveSet =
+        policiesToRemove == null ? Sets.newHashSet() : 
Sets.newHashSet(policiesToRemove);
+    Set<String> common = Sets.intersection(policiesToAddSet, 
policiesToRemoveSet).immutableCopy();
+    policiesToAddSet.removeAll(common);
+    policiesToRemoveSet.removeAll(common);
+
+    NameIdentifier[] policiesToAddIdent =
+        policiesToAddSet.stream()
+            .map(p -> NameIdentifierUtil.ofPolicy(metalake, p))
+            .toArray(NameIdentifier[]::new);
+    NameIdentifier[] policiesToRemoveIdent =
+        policiesToRemoveSet.stream()
+            .map(p -> NameIdentifierUtil.ofPolicy(metalake, p))
+            .toArray(NameIdentifier[]::new);
+
+    checkMetalake(NameIdentifier.of(metalake), entityStore);
+    return TreeLockUtils.doWithTreeLock(
+        entityIdent,
+        LockType.READ,
+        () ->
+            TreeLockUtils.doWithTreeLock(
+                NameIdentifier.of(NamespaceUtil.ofPolicy(metalake).levels()),
+                LockType.WRITE,
+                () -> {
+                  try {
+                    List<PolicyEntity> updatedPolicies =
+                        entityStore
+                            .relationOperations()
+                            .updateEntityRelations(
+                                
SupportsRelationOperations.Type.POLICY_METADATA_OBJECT_REL,
+                                entityIdent,
+                                entityType,
+                                policiesToAddIdent,
+                                policiesToRemoveIdent);
+                    return 
updatedPolicies.stream().map(PolicyEntity::name).toArray(String[]::new);
+                  } catch (NoSuchEntityException e) {
+                    throw new NoSuchMetadataObjectException(
+                        e,
+                        "Failed to associate policies for metadata object %s 
due to not found",
+                        metadataObject);
+                  } catch (EntityAlreadyExistsException e) {
+                    throw new PolicyAlreadyAssociatedException(
+                        e,
+                        "Failed to associate policies for metadata object due 
to some policies %s already "
+                            + "associated to the metadata object %s",
+                        Arrays.toString(policiesToAdd),
+                        metadataObject);
+                  } catch (IOException e) {
+                    LOG.error(
+                        "Failed to associate policies for metadata object {}", 
metadataObject, e);
+                    throw new RuntimeException(e);
+                  }
+                }));
   }
 
   @Override
   public Policy getPolicyForMetadataObject(
       String metalake, MetadataObject metadataObject, String policyName) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    NameIdentifier entityIdent = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+    Entity.EntityType entityType = 
MetadataObjectUtil.toEntityType(metadataObject);
+    NameIdentifier policyIdent = NameIdentifierUtil.ofPolicy(metalake, 
policyName);
+
+    MetadataObjectUtil.checkMetadataObject(metalake, metadataObject);
+    checkMetalake(NameIdentifier.of(metalake), entityStore);
+
+    return TreeLockUtils.doWithTreeLock(
+        entityIdent,
+        LockType.READ,
+        () -> {
+          try {
+            return entityStore
+                .relationOperations()
+                .getEntityByRelation(
+                    SupportsRelationOperations.Type.POLICY_METADATA_OBJECT_REL,
+                    entityIdent,
+                    entityType,
+                    policyIdent);
+          } catch (NoSuchEntityException e) {
+            if (e.getMessage().contains("No such policy entity")) {
+              throw new NoSuchPolicyException(
+                  e, "Policy %s does not exist for metadata object %s", 
policyName, metadataObject);
+            } else {
+              throw new NoSuchMetadataObjectException(
+                  e,
+                  "Failed to get policy for metadata object %s due to not 
found",
+                  metadataObject);
+            }
+          } catch (IOException e) {
+            LOG.error("Failed to get policy for metadata object {}", 
metadataObject, e);
+            throw new RuntimeException(e);
+          }
+        });
   }
 
   private void changePolicyEnabledState(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index b9440c9ae9..d512ce29b4 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -470,7 +470,8 @@ public class JDBCBackend implements RelationalBackend {
 
   @Override
   public <E extends Entity & HasIdentifier> List<E> listEntitiesByRelation(
-      Type relType, NameIdentifier nameIdentifier, Entity.EntityType 
identType, boolean allFields) {
+      Type relType, NameIdentifier nameIdentifier, Entity.EntityType 
identType, boolean allFields)
+      throws IOException {
     switch (relType) {
       case OWNER_REL:
         List<E> list = Lists.newArrayList();
@@ -507,6 +508,15 @@ public class JDBCBackend implements RelationalBackend {
               String.format("JOB_TEMPLATE_JOB_REL doesn't support type %s", 
identType.name()));
         }
 
+      case POLICY_METADATA_OBJECT_REL:
+        if (identType == Entity.EntityType.POLICY) {
+          return (List<E>)
+              
PolicyMetaService.getInstance().listAssociatedEntitiesForPolicy(nameIdentifier);
+        } else {
+          return (List<E>)
+              PolicyMetaService.getInstance()
+                  .listPoliciesForMetadataObject(nameIdentifier, identType);
+        }
       default:
         throw new IllegalArgumentException(
             String.format("Doesn't support the relation type %s", relType));
@@ -531,6 +541,44 @@ public class JDBCBackend implements RelationalBackend {
     }
   }
 
+  @Override
+  public <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
+      Type relType,
+      NameIdentifier srcEntityIdent,
+      Entity.EntityType srcEntityType,
+      NameIdentifier[] destEntitiesToAdd,
+      NameIdentifier[] destEntitiesToRemove)
+      throws IOException, NoSuchEntityException, EntityAlreadyExistsException {
+    switch (relType) {
+      case POLICY_METADATA_OBJECT_REL:
+        return (List<E>)
+            PolicyMetaService.getInstance()
+                .associatePoliciesWithMetadataObject(
+                    srcEntityIdent, srcEntityType, destEntitiesToAdd, 
destEntitiesToRemove);
+      default:
+        throw new IllegalArgumentException(
+            String.format("Doesn't support the relation type %s", relType));
+    }
+  }
+
+  @Override
+  public <E extends Entity & HasIdentifier> E getEntityByRelation(
+      Type relType,
+      NameIdentifier srcIdentifier,
+      Entity.EntityType srcType,
+      NameIdentifier destEntityIdent)
+      throws IOException, NoSuchEntityException {
+    switch (relType) {
+      case POLICY_METADATA_OBJECT_REL:
+        return (E)
+            PolicyMetaService.getInstance()
+                .getPolicyForMetadataObject(srcIdentifier, srcType, 
destEntityIdent);
+      default:
+        throw new IllegalArgumentException(
+            String.format("Doesn't support the relation type %s", relType));
+    }
+  }
+
   public enum JDBCBackendType {
     H2(true),
     MYSQL(false),
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index 296629e518..9ef62d656a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -224,6 +224,17 @@ public class RelationalEntityStore
         });
   }
 
+  @Override
+  public <E extends Entity & HasIdentifier> E getEntityByRelation(
+      Type relType,
+      NameIdentifier srcIdentifier,
+      Entity.EntityType srcType,
+      NameIdentifier destEntityIdent)
+      throws IOException, NoSuchEntityException {
+    // todo: support cache
+    return backend.getEntityByRelation(relType, srcIdentifier, srcType, 
destEntityIdent);
+  }
+
   @Override
   public void insertRelation(
       SupportsRelationOperations.Type relType,
@@ -236,4 +247,17 @@ public class RelationalEntityStore
     cache.invalidate(srcIdentifier, srcType, relType);
     backend.insertRelation(relType, srcIdentifier, srcType, dstIdentifier, 
dstType, override);
   }
+
+  @Override
+  public <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
+      Type relType,
+      NameIdentifier srcEntityIdent,
+      Entity.EntityType srcEntityType,
+      NameIdentifier[] destEntitiesToAdd,
+      NameIdentifier[] destEntitiesToRemove)
+      throws IOException, NoSuchEntityException, EntityAlreadyExistsException {
+    cache.invalidate(srcEntityIdent, srcEntityType, relType);
+    return backend.updateEntityRelations(
+        relType, srcEntityIdent, srcEntityType, destEntitiesToAdd, 
destEntitiesToRemove);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index 834b04199b..a373495987 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -21,12 +21,16 @@ package org.apache.gravitino.storage.relational.service;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
@@ -73,6 +77,38 @@ public class MetadataObjectService {
 
   private MetadataObjectService() {}
 
+  public static List<MetadataObject> fromGenericEntities(List<GenericEntity> 
entities) {
+    if (entities == null || entities.isEmpty()) {
+      return Lists.newArrayList();
+    }
+
+    Map<Entity.EntityType, List<Long>> groupIdsByType =
+        entities.stream()
+            .collect(
+                Collectors.groupingBy(
+                    GenericEntity::type,
+                    Collectors.mapping(GenericEntity::id, 
Collectors.toList())));
+
+    List<MetadataObject> metadataObjects = Lists.newArrayList();
+    for (Map.Entry<Entity.EntityType, List<Long>> entry : 
groupIdsByType.entrySet()) {
+      MetadataObject.Type objectType = 
MetadataObject.Type.valueOf(entry.getKey().name());
+      Map<Long, String> metadataObjectNames =
+          
TYPE_TO_FULLNAME_FUNCTION_MAP.get(objectType).apply(entry.getValue());
+
+      for (Map.Entry<Long, String> metadataObjectName : 
metadataObjectNames.entrySet()) {
+        String fullName = metadataObjectName.getValue();
+
+        // Metadata object may be deleted asynchronously when we query the 
name, so it will
+        // return null, we should skip this metadata object.
+        if (fullName != null) {
+          metadataObjects.add(MetadataObjects.parse(fullName, objectType));
+        }
+      }
+    }
+
+    return metadataObjects;
+  }
+
   public static long getMetadataObjectId(
       long metalakeId, String fullName, MetadataObject.Type type) {
     if (type == MetadataObject.Type.METALAKE) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
index cc59db2400..f3c1b4d877 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
@@ -19,12 +19,10 @@
 package org.apache.gravitino.storage.relational.service;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -32,10 +30,10 @@ import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.MetadataObject;
-import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
@@ -46,6 +44,7 @@ import org.apache.gravitino.storage.relational.po.PolicyPO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
 import org.apache.gravitino.storage.relational.utils.POConverters;
 import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
 import org.slf4j.Logger;
@@ -253,7 +252,7 @@ public class PolicyMetaService {
     return POConverters.fromPolicyPO(policyPO, 
NamespaceUtil.ofPolicy(metalake));
   }
 
-  public List<MetadataObject> 
listAssociatedMetadataObjectsForPolicy(NameIdentifier policyIdent)
+  public List<GenericEntity> listAssociatedEntitiesForPolicy(NameIdentifier 
policyIdent)
       throws IOException {
     String metalakeName = policyIdent.namespace().level(0);
     String policyName = policyIdent.name();
@@ -266,38 +265,17 @@ public class PolicyMetaService {
                   mapper.listPolicyMetadataObjectRelsByMetalakeAndPolicyName(
                       metalakeName, policyName));
 
-      List<MetadataObject> metadataObjects = Lists.newArrayList();
-      Map<String, List<PolicyMetadataObjectRelPO>> 
policyMetadataObjectRelPOsByType =
-          policyMetadataObjectRelPOs.stream()
-              
.collect(Collectors.groupingBy(PolicyMetadataObjectRelPO::getMetadataObjectType));
-
-      for (Map.Entry<String, List<PolicyMetadataObjectRelPO>> entry :
-          policyMetadataObjectRelPOsByType.entrySet()) {
-        String metadataObjectType = entry.getKey();
-        List<PolicyMetadataObjectRelPO> rels = entry.getValue();
-
-        List<Long> metadataObjectIds =
-            rels.stream()
-                .map(PolicyMetadataObjectRelPO::getMetadataObjectId)
-                .collect(Collectors.toList());
-        Map<Long, String> metadataObjectNames =
-            MetadataObjectService.TYPE_TO_FULLNAME_FUNCTION_MAP
-                .get(MetadataObject.Type.valueOf(metadataObjectType))
-                .apply(metadataObjectIds);
-
-        for (Map.Entry<Long, String> metadataObjectName : 
metadataObjectNames.entrySet()) {
-          String fullName = metadataObjectName.getValue();
-
-          // Metadata object may be deleted asynchronously when we query the 
name, so it will
-          // return null, we should skip this metadata object.
-          if (fullName != null) {
-            metadataObjects.add(
-                MetadataObjects.parse(fullName, 
MetadataObject.Type.valueOf(metadataObjectType)));
-          }
-        }
-      }
+      return policyMetadataObjectRelPOs.stream()
+          .map(
+              r ->
+                  GenericEntity.builder()
+                      .withId(r.getMetadataObjectId())
+                      .withEntityType(
+                          MetadataObjectUtil.toEntityType(
+                              
MetadataObject.Type.valueOf(r.getMetadataObjectType())))
+                      .build())
+          .collect(Collectors.toList());
 
-      return metadataObjects;
     } catch (RuntimeException e) {
       ExceptionUtils.checkSQLException(e, Entity.EntityType.POLICY, 
policyIdent.toString());
       throw e;
@@ -327,6 +305,19 @@ public class PolicyMetaService {
               ? Collections.emptyList()
               : getPolicyPOsByMetalakeAndNames(metalake, policyNamesToAdd);
 
+      // Check if the policies to add all support the metadata object type.
+      policyPOsToAdd.forEach(
+          policyPO -> {
+            PolicyEntity policy =
+                POConverters.fromPolicyPO(policyPO, 
NamespaceUtil.ofPolicy(metalake));
+            if 
(!policy.supportedObjectTypes().contains(metadataObject.type())) {
+              throw new IllegalArgumentException(
+                  String.format(
+                      "Cannot associate policies for unsupported metadata 
object type %s, expected: %s.",
+                      objectType, policy.supportedObjectTypes()));
+            }
+          });
+
       // Fetch all the policies need to remove from the metadata object.
       List<String> policyNamesToRemove =
           
Arrays.stream(policiesToRemove).map(NameIdentifier::name).collect(Collectors.toList());
diff --git a/core/src/main/java/org/apache/gravitino/tag/TagManager.java 
b/core/src/main/java/org/apache/gravitino/tag/TagManager.java
index 5a4819accf..9cfed65027 100644
--- a/core/src/main/java/org/apache/gravitino/tag/TagManager.java
+++ b/core/src/main/java/org/apache/gravitino/tag/TagManager.java
@@ -241,6 +241,7 @@ public class TagManager implements TagDispatcher {
         LockType.READ,
         () -> {
           try {
+            checkMetalake(NameIdentifier.of(metalake), entityStore);
             return supportsTagOperations
                 .listAssociatedTagsForMetadataObject(entityIdent, entityType)
                 .toArray(new Tag[0]);
@@ -267,6 +268,7 @@ public class TagManager implements TagDispatcher {
         LockType.READ,
         () -> {
           try {
+            checkMetalake(NameIdentifier.of(metalake), entityStore);
             return supportsTagOperations.getTagForMetadataObject(entityIdent, 
entityType, tagIdent);
           } catch (NoSuchEntityException e) {
             if (e.getMessage().contains("No such tag entity")) {
diff --git 
a/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java 
b/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java
index 61db34f3c6..7cd95b88fa 100644
--- a/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java
+++ b/core/src/test/java/org/apache/gravitino/policy/TestPolicyManager.java
@@ -41,6 +41,7 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
@@ -51,21 +52,36 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.EntityStoreFactory;
 import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.CatalogDispatcher;
+import org.apache.gravitino.catalog.SchemaDispatcher;
+import org.apache.gravitino.catalog.TableDispatcher;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.exceptions.NoSuchPolicyException;
+import org.apache.gravitino.exceptions.NotFoundException;
+import org.apache.gravitino.exceptions.PolicyAlreadyAssociatedException;
 import org.apache.gravitino.exceptions.PolicyAlreadyExistsException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
+import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.metalake.MetalakeDispatcher;
+import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -75,7 +91,14 @@ import org.mockito.Mockito;
 
 public class TestPolicyManager {
   private static final String METALAKE = "metalake_for_policy_test";
+  private static final String CATALOG = "catalog_for_policy_test";
+  private static final String SCHEMA = "schema_for_policy_test";
+  private static final String TABLE = "table_for_policy_test";
+  private static final String COLUMN = "column_for_policy_test";
   private static final MetalakeDispatcher metalakeDispatcher = 
mock(MetalakeDispatcher.class);
+  private static final CatalogDispatcher catalogDispatcher = 
mock(CatalogDispatcher.class);
+  private static final SchemaDispatcher schemaDispatcher = 
mock(SchemaDispatcher.class);
+  private static final TableDispatcher tableDispatcher = 
mock(TableDispatcher.class);
   private static final String JDBC_STORE_PATH =
       "/tmp/gravitino_jdbc_entityStore_" + 
UUID.randomUUID().toString().replace("-", "");
   private static final String DB_DIR = JDBC_STORE_PATH + "/testdb";
@@ -93,6 +116,9 @@ public class TestPolicyManager {
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
     FieldUtils.writeField(
         GravitinoEnv.getInstance(), "metalakeDispatcher", metalakeDispatcher, 
true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogDispatcher", 
catalogDispatcher, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "schemaDispatcher", 
schemaDispatcher, true);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "tableDispatcher", 
tableDispatcher, true);
 
     AuditInfo audit = 
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
     BaseMetalake metalake =
@@ -105,6 +131,53 @@ public class TestPolicyManager {
             .build();
     entityStore.put(metalake, false /* overwritten */);
     when(metalakeDispatcher.metalakeExists(any())).thenReturn(true);
+
+    CatalogEntity catalog =
+        CatalogEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(CATALOG)
+            .withNamespace(Namespace.of(METALAKE))
+            .withType(Catalog.Type.RELATIONAL)
+            .withProvider("test")
+            .withComment("Test catalog")
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(catalog, false /* overwritten */);
+    when(catalogDispatcher.catalogExists(any())).thenReturn(true);
+
+    SchemaEntity schema =
+        SchemaEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(SCHEMA)
+            .withNamespace(Namespace.of(METALAKE, CATALOG))
+            .withComment("Test schema")
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(schema, false /* overwritten */);
+    when(schemaDispatcher.schemaExists(any())).thenReturn(true);
+
+    ColumnEntity column =
+        ColumnEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(COLUMN)
+            .withPosition(0)
+            .withComment("Test column")
+            .withDataType(Types.IntegerType.get())
+            .withNullable(true)
+            .withAutoIncrement(false)
+            .withAuditInfo(audit)
+            .build();
+
+    TableEntity table =
+        TableEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(TABLE)
+            .withColumns(Lists.newArrayList(column))
+            .withNamespace(Namespace.of(METALAKE, CATALOG, SCHEMA))
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(table, false /* overwritten */);
+    when(tableDispatcher.tableExists(any())).thenReturn(true);
   }
 
   private static Config mockConfig() {
@@ -302,6 +375,359 @@ public class TestPolicyManager {
     Assertions.assertFalse(() -> policyManager.deletePolicy(METALAKE, 
"non_existent_policy"));
   }
 
+  @Test
+  public void testAssociatePoliciesForMetadataObject() {
+    Map<String, Object> customRules = ImmutableMap.of("rule1", 1, "rule2", 
"value2");
+    PolicyContent content = PolicyContents.custom(customRules, null);
+    String policyName1 = "policy1" + UUID.randomUUID().toString().replace("-", 
"");
+    createCustomPolicy(METALAKE, policyName1, content);
+    String policyName2 = "policy2" + UUID.randomUUID().toString().replace("-", 
"");
+    createCustomPolicy(METALAKE, policyName2, content);
+    String policyName3 = "policy3" + UUID.randomUUID().toString().replace("-", 
"");
+    createCustomPolicy(METALAKE, policyName3, content);
+
+    // Test associate policies for catalog
+    MetadataObject catalogObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, CATALOG), 
Entity.EntityType.CATALOG);
+    String[] policiesToAdd = new String[] {policyName1, policyName2, 
policyName3};
+
+    String[] policies =
+        policyManager.associatePoliciesForMetadataObject(
+            METALAKE, catalogObject, policiesToAdd, null);
+
+    Assertions.assertEquals(3, policies.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(policyName1, policyName2, policyName3), 
ImmutableSet.copyOf(policies));
+
+    // Test disassociate policies for catalog
+    String[] policiesToRemove = new String[] {policyName1};
+    String[] policies1 =
+        policyManager.associatePoliciesForMetadataObject(
+            METALAKE, catalogObject, null, policiesToRemove);
+
+    Assertions.assertEquals(2, policies1.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(policyName2, policyName3), 
ImmutableSet.copyOf(policies1));
+
+    // Test associate and disassociate no policies for catalog
+    String[] policies2 =
+        policyManager.associatePoliciesForMetadataObject(METALAKE, 
catalogObject, null, null);
+
+    Assertions.assertEquals(2, policies2.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(policyName2, policyName3), 
ImmutableSet.copyOf(policies2));
+
+    // Test re-associate policies for catalog
+    Throwable e =
+        Assertions.assertThrows(
+            PolicyAlreadyAssociatedException.class,
+            () ->
+                policyManager.associatePoliciesForMetadataObject(
+                    METALAKE, catalogObject, policiesToAdd, null));
+    Assertions.assertTrue(
+        e.getMessage().contains("Failed to associate policies for metadata 
object"));
+
+    // Test associate and disassociate non-existent policies for catalog
+    String[] policies3 =
+        policyManager.associatePoliciesForMetadataObject(
+            METALAKE, catalogObject, new String[] {"policy4", "policy5"}, new 
String[] {"policy6"});
+
+    Assertions.assertEquals(2, policies3.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(policyName2, policyName3), 
ImmutableSet.copyOf(policies3));
+
+    // Test associate policies for non-existent metadata object
+    MetadataObject nonExistentObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, "non_existent_catalog"),
+            Entity.EntityType.CATALOG);
+    Throwable e1 =
+        Assertions.assertThrows(
+            NotFoundException.class,
+            () ->
+                policyManager.associatePoliciesForMetadataObject(
+                    METALAKE, nonExistentObject, policiesToAdd, null));
+    Assertions.assertTrue(
+        e1.getMessage().contains("Failed to associate policies for metadata 
object"));
+
+    // Test associate policies for unsupported metadata object
+    MetadataObject metalakeObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofMetalake(METALAKE), 
Entity.EntityType.METALAKE);
+    Throwable e2 =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                policyManager.associatePoliciesForMetadataObject(
+                    METALAKE, metalakeObject, policiesToAdd, null));
+    Assertions.assertTrue(
+        e2.getMessage().contains("Cannot associate policies for unsupported 
metadata object type"),
+        "Actual message: " + e2.getMessage());
+
+    MetadataObject columnObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofColumn(METALAKE, CATALOG, SCHEMA, TABLE, 
COLUMN),
+            Entity.EntityType.COLUMN);
+    e2 =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                policyManager.associatePoliciesForMetadataObject(
+                    METALAKE, columnObject, policiesToAdd, null));
+    Assertions.assertTrue(
+        e2.getMessage().contains("Cannot associate policies for unsupported 
metadata object type"),
+        "Actual message: " + e2.getMessage());
+
+    // Test associate policies for schema
+    MetadataObject schemaObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofSchema(METALAKE, CATALOG, SCHEMA), 
Entity.EntityType.SCHEMA);
+    String[] policies4 =
+        policyManager.associatePoliciesForMetadataObject(
+            METALAKE, schemaObject, policiesToAdd, null);
+
+    Assertions.assertEquals(3, policies4.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(policyName1, policyName2, policyName3), 
ImmutableSet.copyOf(policies4));
+
+    // Test associate policies for table
+    String[] policiesToAdd1 = new String[] {policyName1};
+    MetadataObject tableObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+    String[] policies5 =
+        policyManager.associatePoliciesForMetadataObject(
+            METALAKE, tableObject, policiesToAdd1, null);
+
+    Assertions.assertEquals(1, policies5.length);
+    Assertions.assertEquals(ImmutableSet.of(policyName1), 
ImmutableSet.copyOf(policies5));
+
+    // Test associate and disassociate same policies for table
+    String[] policiesToAdd2 = new String[] {policyName2, policyName3};
+    String[] policiesToRemove1 = new String[] {policyName2};
+    String[] policies6 =
+        policyManager.associatePoliciesForMetadataObject(
+            METALAKE, tableObject, policiesToAdd2, policiesToRemove1);
+
+    Assertions.assertEquals(2, policies6.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(policyName1, policyName3), 
ImmutableSet.copyOf(policies6));
+  }
+
+  @Test
+  public void testListMetadataObjectsForPolicy() {
+    Map<String, Object> customRules = ImmutableMap.of("rule1", 1, "rule2", 
"value2");
+    PolicyContent content = PolicyContents.custom(customRules, null);
+    String policyName1 = "policy1" + UUID.randomUUID().toString().replace("-", 
"");
+    createCustomPolicy(METALAKE, policyName1, content);
+    String policyName2 = "policy2" + UUID.randomUUID().toString().replace("-", 
"");
+    createCustomPolicy(METALAKE, policyName2, content);
+    String policyName3 = "policy3" + UUID.randomUUID().toString().replace("-", 
"");
+    createCustomPolicy(METALAKE, policyName3, content);
+
+    MetadataObject catalogObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, CATALOG), 
Entity.EntityType.CATALOG);
+    MetadataObject schemaObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofSchema(METALAKE, CATALOG, SCHEMA), 
Entity.EntityType.SCHEMA);
+    MetadataObject tableObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+
+    policyManager.associatePoliciesForMetadataObject(
+        METALAKE, catalogObject, new String[] {policyName1, policyName2, 
policyName3}, null);
+    policyManager.associatePoliciesForMetadataObject(
+        METALAKE, schemaObject, new String[] {policyName1, policyName2}, null);
+    policyManager.associatePoliciesForMetadataObject(
+        METALAKE, tableObject, new String[] {policyName1}, null);
+
+    MetadataObject[] objects = 
policyManager.listMetadataObjectsForPolicy(METALAKE, policyName1);
+    Assertions.assertEquals(3, objects.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(catalogObject, schemaObject, tableObject), 
ImmutableSet.copyOf(objects));
+
+    MetadataObject[] objects1 = 
policyManager.listMetadataObjectsForPolicy(METALAKE, policyName2);
+    Assertions.assertEquals(2, objects1.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(catalogObject, schemaObject), 
ImmutableSet.copyOf(objects1));
+
+    MetadataObject[] objects2 = 
policyManager.listMetadataObjectsForPolicy(METALAKE, policyName3);
+    Assertions.assertEquals(1, objects2.length);
+    Assertions.assertEquals(ImmutableSet.of(catalogObject), 
ImmutableSet.copyOf(objects2));
+
+    // List metadata objects for non-existent policy
+    Throwable e =
+        Assertions.assertThrows(
+            NoSuchPolicyException.class,
+            () -> policyManager.listMetadataObjectsForPolicy(METALAKE, 
"non_existent_policy"));
+    Assertions.assertTrue(
+        e.getMessage()
+            .contains(
+                "Policy with name non_existent_policy under metalake "
+                    + METALAKE
+                    + " does not exist"));
+  }
+
+  @Test
+  public void testListPoliciesForMetadataObject() {
+    Map<String, Object> customRules = ImmutableMap.of("rule1", 1, "rule2", 
"value2");
+    PolicyContent content = PolicyContents.custom(customRules, null);
+    String policyName1 = "policy1" + UUID.randomUUID().toString().replace("-", 
"");
+    Policy policy1 = createCustomPolicy(METALAKE, policyName1, content);
+    String policyName2 = "policy2" + UUID.randomUUID().toString().replace("-", 
"");
+    Policy policy2 = createCustomPolicy(METALAKE, policyName2, content);
+    String policyName3 = "policy3" + UUID.randomUUID().toString().replace("-", 
"");
+    Policy policy3 = createCustomPolicy(METALAKE, policyName3, content);
+
+    MetadataObject catalogObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, CATALOG), 
Entity.EntityType.CATALOG);
+    MetadataObject schemaObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofSchema(METALAKE, CATALOG, SCHEMA), 
Entity.EntityType.SCHEMA);
+    MetadataObject tableObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+
+    policyManager.associatePoliciesForMetadataObject(
+        METALAKE,
+        catalogObject,
+        new String[] {policy1.name(), policy2.name(), policy3.name()},
+        null);
+    policyManager.associatePoliciesForMetadataObject(
+        METALAKE, schemaObject, new String[] {policy1.name(), policy2.name()}, 
null);
+    policyManager.associatePoliciesForMetadataObject(
+        METALAKE, tableObject, new String[] {policy1.name()}, null);
+
+    String[] policies = policyManager.listPoliciesForMetadataObject(METALAKE, 
catalogObject);
+    Assertions.assertEquals(3, policies.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(policyName1, policyName2, policyName3), 
ImmutableSet.copyOf(policies));
+
+    Policy[] policiesInfo = 
policyManager.listPolicyInfosForMetadataObject(METALAKE, catalogObject);
+    Assertions.assertEquals(3, policiesInfo.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(policy1, policy2, policy3), 
ImmutableSet.copyOf(policiesInfo));
+
+    String[] policies1 = policyManager.listPoliciesForMetadataObject(METALAKE, 
schemaObject);
+    Assertions.assertEquals(2, policies1.length);
+    Assertions.assertEquals(
+        ImmutableSet.of(policyName1, policyName2), 
ImmutableSet.copyOf(policies1));
+
+    Policy[] policiesInfo1 = 
policyManager.listPolicyInfosForMetadataObject(METALAKE, schemaObject);
+    Assertions.assertEquals(2, policiesInfo1.length);
+    Assertions.assertEquals(ImmutableSet.of(policy1, policy2), 
ImmutableSet.copyOf(policiesInfo1));
+
+    String[] policies2 = policyManager.listPoliciesForMetadataObject(METALAKE, 
tableObject);
+    Assertions.assertEquals(1, policies2.length);
+    Assertions.assertEquals(ImmutableSet.of(policyName1), 
ImmutableSet.copyOf(policies2));
+
+    Policy[] policiesInfo2 = 
policyManager.listPolicyInfosForMetadataObject(METALAKE, tableObject);
+    Assertions.assertEquals(1, policiesInfo2.length);
+    Assertions.assertEquals(ImmutableSet.of(policy1), 
ImmutableSet.copyOf(policiesInfo2));
+
+    // List policies for non-existent metadata object
+    MetadataObject nonExistentObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, "non_existent_catalog"),
+            Entity.EntityType.CATALOG);
+    Throwable e =
+        Assertions.assertThrows(
+            NotFoundException.class,
+            () -> policyManager.listPoliciesForMetadataObject(METALAKE, 
nonExistentObject));
+    Assertions.assertTrue(
+        e.getMessage()
+            .contains("Failed to list policies for metadata object " + 
nonExistentObject));
+  }
+
+  @Test
+  public void testGetPolicyForMetadataObject() {
+    Map<String, Object> customRules = ImmutableMap.of("rule1", 1, "rule2", 
"value2");
+    PolicyContent content = PolicyContents.custom(customRules, null);
+    String policyName1 = "policy1" + UUID.randomUUID().toString().replace("-", 
"");
+    Policy policy1 = createCustomPolicy(METALAKE, policyName1, content);
+    String policyName2 = "policy2" + UUID.randomUUID().toString().replace("-", 
"");
+    Policy policy2 = createCustomPolicy(METALAKE, policyName2, content);
+    String policyName3 = "policy3" + UUID.randomUUID().toString().replace("-", 
"");
+    Policy policy3 = createCustomPolicy(METALAKE, policyName3, content);
+
+    MetadataObject catalogObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, CATALOG), 
Entity.EntityType.CATALOG);
+    MetadataObject schemaObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofSchema(METALAKE, CATALOG, SCHEMA), 
Entity.EntityType.SCHEMA);
+    MetadataObject tableObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofTable(METALAKE, CATALOG, SCHEMA, TABLE), 
Entity.EntityType.TABLE);
+
+    policyManager.associatePoliciesForMetadataObject(
+        METALAKE, catalogObject, new String[] {policyName1, policyName2, 
policyName3}, null);
+    policyManager.associatePoliciesForMetadataObject(
+        METALAKE, schemaObject, new String[] {policyName1, policyName2}, null);
+    policyManager.associatePoliciesForMetadataObject(
+        METALAKE, tableObject, new String[] {policyName1}, null);
+
+    Policy result = policyManager.getPolicyForMetadataObject(METALAKE, 
catalogObject, policyName1);
+    Assertions.assertEquals(policy1, result);
+
+    Policy result1 = policyManager.getPolicyForMetadataObject(METALAKE, 
schemaObject, policyName1);
+    Assertions.assertEquals(policy1, result1);
+
+    Policy result2 =
+        policyManager.getPolicyForMetadataObject(METALAKE, schemaObject, 
policy2.name());
+    Assertions.assertEquals(policy2, result2);
+
+    Policy result3 =
+        policyManager.getPolicyForMetadataObject(METALAKE, catalogObject, 
policy3.name());
+    Assertions.assertEquals(policy3, result3);
+
+    Policy result4 =
+        policyManager.getPolicyForMetadataObject(METALAKE, tableObject, 
policy1.name());
+    Assertions.assertEquals(policy1, result4);
+
+    // Test get non-existent policy for metadata object
+    Throwable e =
+        Assertions.assertThrows(
+            NoSuchPolicyException.class,
+            () ->
+                policyManager.getPolicyForMetadataObject(
+                    METALAKE, catalogObject, "non_existent_policy"));
+    Assertions.assertTrue(e.getMessage().contains("Policy non_existent_policy 
does not exist"));
+
+    Throwable e1 =
+        Assertions.assertThrows(
+            NoSuchPolicyException.class,
+            () -> policyManager.getPolicyForMetadataObject(METALAKE, 
schemaObject, policy3.name()));
+    Assertions.assertTrue(
+        e1.getMessage().contains("Policy " + policyName3 + " does not exist"),
+        "Actual message: " + e1.getMessage());
+
+    Throwable e2 =
+        Assertions.assertThrows(
+            NoSuchPolicyException.class,
+            () -> policyManager.getPolicyForMetadataObject(METALAKE, 
tableObject, policy2.name()));
+    Assertions.assertTrue(
+        e2.getMessage().contains("Policy " + policyName2 + " does not exist"),
+        "Actual message: " + e2.getMessage());
+
+    // Test get policy for non-existent metadata object
+    MetadataObject nonExistentObject =
+        NameIdentifierUtil.toMetadataObject(
+            NameIdentifierUtil.ofCatalog(METALAKE, "non_existent_catalog"),
+            Entity.EntityType.CATALOG);
+    Throwable e3 =
+        Assertions.assertThrows(
+            NotFoundException.class,
+            () ->
+                policyManager.getPolicyForMetadataObject(
+                    METALAKE, nonExistentObject, policy1.name()));
+    Assertions.assertTrue(
+        e3.getMessage().contains("Failed to get policy for metadata object " + 
nonExistentObject));
+  }
+
   private Policy createCustomPolicy(
       String metalakeName, String policyName, PolicyContent policyContent) {
     return policyManager.createPolicy(
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
index 5735ef1728..56e694004d 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
@@ -19,19 +19,19 @@
 package org.apache.gravitino.storage.relational.service;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Instant;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.MetadataObject;
-import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -39,6 +39,7 @@ import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.meta.SchemaEntity;
@@ -64,12 +65,13 @@ public class TestPolicyMetaService extends TestJDBCBackend {
   private final PolicyContent content = 
PolicyContents.custom(ImmutableMap.of("filed1", 123), null);
 
   private final Set<MetadataObject.Type> supportedObjectTypes =
-      new HashSet<MetadataObject.Type>() {
-        {
-          add(MetadataObject.Type.CATALOG);
-          add(MetadataObject.Type.SCHEMA);
-        }
-      };
+      ImmutableSet.of(
+          MetadataObject.Type.CATALOG,
+          MetadataObject.Type.SCHEMA,
+          MetadataObject.Type.TABLE,
+          MetadataObject.Type.FILESET,
+          MetadataObject.Type.MODEL,
+          MetadataObject.Type.TOPIC);
 
   @Test
   public void testInsertAndGetPolicyByIdentifier() throws IOException {
@@ -652,46 +654,40 @@ public class TestPolicyMetaService extends 
TestJDBCBackend {
   }
 
   @Test
-  public void testListAssociatedMetadataObjectsForPolicy() throws IOException {
+  public void testListAssociatedEntitiesForPolicy() throws IOException {
     testAssociateAndDisassociatePoliciesWithMetadataObject();
 
     PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
 
-    // Test list associated metadata objects for policy2
-    List<MetadataObject> metadataObjects =
-        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+    // Test list associated dummy entities for policy2
+    List<GenericEntity> entities =
+        policyMetaService.listAssociatedEntitiesForPolicy(
             NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
 
-    Assertions.assertEquals(3, metadataObjects.size());
-    Assertions.assertTrue(
-        metadataObjects.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
-    Assertions.assertTrue(
-        metadataObjects.contains(
-            MetadataObjects.parse("catalog1.schema1", 
MetadataObject.Type.SCHEMA)));
-    Assertions.assertTrue(
-        metadataObjects.contains(
-            MetadataObjects.parse("catalog1.schema1.table1", 
MetadataObject.Type.TABLE)));
+    Assertions.assertEquals(3, entities.size());
+    Set<Entity.EntityType> actualTypes =
+        entities.stream().map(GenericEntity::type).collect(Collectors.toSet());
+    Assertions.assertTrue(actualTypes.contains(Entity.EntityType.CATALOG));
+    Assertions.assertTrue(actualTypes.contains(Entity.EntityType.SCHEMA));
+    Assertions.assertTrue(actualTypes.contains(Entity.EntityType.TABLE));
 
-    // Test list associated metadata objects for policy3
-    List<MetadataObject> metadataObjects1 =
-        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+    // Test list associated dummy entities for policy3
+    List<GenericEntity> entities1 =
+        policyMetaService.listAssociatedEntitiesForPolicy(
             NameIdentifierUtil.ofPolicy(metalakeName, "policy3"));
 
-    Assertions.assertEquals(3, metadataObjects1.size());
-    Assertions.assertTrue(
-        metadataObjects1.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
-    Assertions.assertTrue(
-        metadataObjects1.contains(
-            MetadataObjects.parse("catalog1.schema1", 
MetadataObject.Type.SCHEMA)));
-    Assertions.assertTrue(
-        metadataObjects1.contains(
-            MetadataObjects.parse("catalog1.schema1.table1", 
MetadataObject.Type.TABLE)));
+    Assertions.assertEquals(3, entities1.size());
+    Set<Entity.EntityType> actualTypes1 =
+        
entities1.stream().map(GenericEntity::type).collect(Collectors.toSet());
+    Assertions.assertTrue(actualTypes1.contains(Entity.EntityType.CATALOG));
+    Assertions.assertTrue(actualTypes1.contains(Entity.EntityType.SCHEMA));
+    Assertions.assertTrue(actualTypes1.contains(Entity.EntityType.TABLE));
 
-    // Test list associated metadata objects for non-existent policy
-    List<MetadataObject> metadataObjects2 =
-        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+    // Test list associated dummy entities for non-existent policy
+    List<GenericEntity> entities2 =
+        policyMetaService.listAssociatedEntitiesForPolicy(
             NameIdentifierUtil.ofPolicy(metalakeName, "policy4"));
-    Assertions.assertEquals(0, metadataObjects2.size());
+    Assertions.assertEquals(0, entities2.size());
 
     // Test metadata object non-exist scenario.
     backend.delete(
@@ -699,35 +695,35 @@ public class TestPolicyMetaService extends 
TestJDBCBackend {
         Entity.EntityType.TABLE,
         false);
 
-    List<MetadataObject> metadataObjects3 =
-        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+    List<GenericEntity> entities3 =
+        policyMetaService.listAssociatedEntitiesForPolicy(
             NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
 
-    Assertions.assertEquals(2, metadataObjects3.size());
-    Assertions.assertTrue(
-        metadataObjects3.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
-    Assertions.assertTrue(
-        metadataObjects3.contains(
-            MetadataObjects.parse("catalog1.schema1", 
MetadataObject.Type.SCHEMA)));
+    Assertions.assertEquals(2, entities3.size());
+    Set<Entity.EntityType> actualTypes3 =
+        
entities3.stream().map(GenericEntity::type).collect(Collectors.toSet());
+    Assertions.assertTrue(actualTypes3.contains(Entity.EntityType.CATALOG));
+    Assertions.assertTrue(actualTypes3.contains(Entity.EntityType.SCHEMA));
 
     backend.delete(
         NameIdentifier.of(metalakeName, "catalog1", "schema1"), 
Entity.EntityType.SCHEMA, false);
 
-    List<MetadataObject> metadataObjects4 =
-        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+    List<GenericEntity> entities4 =
+        policyMetaService.listAssociatedEntitiesForPolicy(
             NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
 
-    Assertions.assertEquals(1, metadataObjects4.size());
-    Assertions.assertTrue(
-        metadataObjects4.contains(MetadataObjects.parse("catalog1", 
MetadataObject.Type.CATALOG)));
+    Assertions.assertEquals(1, entities4.size());
+    Set<Entity.EntityType> actualTypes4 =
+        
entities4.stream().map(GenericEntity::type).collect(Collectors.toSet());
+    Assertions.assertTrue(actualTypes4.contains(Entity.EntityType.CATALOG));
 
     backend.delete(NameIdentifier.of(metalakeName, "catalog1"), 
Entity.EntityType.CATALOG, false);
 
-    List<MetadataObject> metadataObjects5 =
-        policyMetaService.listAssociatedMetadataObjectsForPolicy(
+    List<GenericEntity> entities5 =
+        policyMetaService.listAssociatedEntitiesForPolicy(
             NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
 
-    Assertions.assertEquals(0, metadataObjects5.size());
+    Assertions.assertEquals(0, entities5.size());
   }
 
   @Test

Reply via email to