This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch cherry-pick-20e2ae1e-to-branch-1.2
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 8822f88767ed69d83f8eab2b1c90b3341b79375f
Author: yangyang zhong <[email protected]>
AuthorDate: Fri Mar 20 19:31:24 2026 +0800

    [#9518] improvement(authz): support batch get owner (#9914)
    
    ### What changes were proposed in this pull request?
    
    support batch get owner
    
    ### Why are the changes needed?
    
    Fix: #9518
    
    ### Does this PR introduce _any_ user-facing change?
    
    None
    
    ### How was this patch tested?
    
    Existing IT
    
    ---------
    
    Co-authored-by: Rory <[email protected]>
---
 .../java/org/apache/gravitino/GravitinoEnv.java    |   2 +-
 .../org/apache/gravitino/RelationalEntity.java     |  87 ++++++++++
 .../gravitino/SupportsRelationOperations.java      |  29 ++++
 .../gravitino/cache/CaffeineEntityCache.java       |  10 ++
 .../org/apache/gravitino/cache/EntityCache.java    |  15 ++
 .../org/apache/gravitino/cache/NoOpsCache.java     |   7 +
 .../org/apache/gravitino/cache/SegmentedLock.java  | 124 ++++++++++++--
 .../gravitino/storage/relational/JDBCBackend.java  |  39 +++++
 .../storage/relational/RelationalEntityStore.java  |  85 ++++++++++
 .../storage/relational/mapper/OwnerMetaMapper.java |   9 ++
 .../mapper/OwnerMetaSQLProviderFactory.java        |   8 +
 .../provider/base/OwnerMetaBaseSQLProvider.java    |  29 ++++
 .../storage/relational/po/UserOwnerRelPO.java      |  30 ++++
 .../relational/service/OwnerMetaService.java       |  65 ++++++++
 .../relational/service/TestOwnerMetaService.java   | 180 +++++++++++++++++++++
 .../server/authorization/MetadataAuthzHelper.java  |  20 +++
 16 files changed, 722 insertions(+), 17 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index b0a58554d4..4918457bd0 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -456,7 +456,7 @@ public class GravitinoEnv {
   }
 
   public boolean cacheEnabled() {
-    return config.get(Configs.CACHE_ENABLED);
+    return config == null || config.get(Configs.CACHE_ENABLED);
   }
 
   public void start() {
diff --git a/core/src/main/java/org/apache/gravitino/RelationalEntity.java 
b/core/src/main/java/org/apache/gravitino/RelationalEntity.java
new file mode 100644
index 0000000000..4c9cd20bcc
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/RelationalEntity.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino;
+
+/**
+ * Represents a directed relation between two entities. The source is 
identified by a {@link
+ * NameIdentifier} and an {@link Entity.EntityType}; the target is the actual 
resolved entity,
+ * avoiding additional database round-trips to look it up.
+ *
+ * @param <T> the type of the target entity
+ */
+public class RelationalEntity<T extends Entity & HasIdentifier> {
+  private final SupportsRelationOperations.Type type;
+  private final NameIdentifier source;
+  private final Entity.EntityType sourceType;
+  private final T targetEntity;
+
+  /**
+   * Constructs a RelationalEntity.
+   *
+   * @param type the relation type
+   * @param source the source identifier
+   * @param sourceType the entity type of the source
+   * @param targetEntity the resolved target entity
+   */
+  public RelationalEntity(
+      SupportsRelationOperations.Type type,
+      NameIdentifier source,
+      Entity.EntityType sourceType,
+      T targetEntity) {
+    this.type = type;
+    this.source = source;
+    this.sourceType = sourceType;
+    this.targetEntity = targetEntity;
+  }
+
+  /**
+   * Gets the relation type.
+   *
+   * @return the type of the relation
+   */
+  public SupportsRelationOperations.Type type() {
+    return type;
+  }
+
+  /**
+   * Gets the source identifier.
+   *
+   * @return the source identifier
+   */
+  public NameIdentifier source() {
+    return source;
+  }
+
+  /**
+   * Gets the entity type of the source.
+   *
+   * @return the source entity type
+   */
+  public Entity.EntityType sourceType() {
+    return sourceType;
+  }
+
+  /**
+   * Gets the resolved target entity.
+   *
+   * @return the target entity
+   */
+  public T targetEntity() {
+    return targetEntity;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java 
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index 7d8aca914e..854760060d 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -92,6 +92,35 @@ public interface SupportsRelationOperations {
       Type relType, NameIdentifier nameIdentifier, Entity.EntityType 
identType, boolean allFields)
       throws IOException;
 
+  /**
+   * Retrieves the relations for a batch of source entities in a single call.
+   *
+   * <p>This is the batch counterpart of {@link #listEntitiesByRelation}. 
Instead of issuing one
+   * query per source entity, callers can supply a list of identifiers and 
receive all matching
+   * {@link RelationalEntity} objects in one round-trip to the backend.
+   *
+   * <p>For example, to fetch the owners for a collection of tables in one 
call:
+   *
+   * <pre>
+   *   batchListEntitiesByRelation(OWNER_REL, tableIdentifiers, 
EntityType.TABLE);
+   * </pre>
+   *
+   * <p>Each returned {@link RelationalEntity} carries the source identifier, 
source entity type,
+   * and the resolved target entity, allowing callers to correlate results 
back to their inputs.
+   * Source identifiers that have no related entity will produce no {@link 
RelationalEntity} entry
+   * in the result; they will not cause an error.
+   *
+   * @param relType The type of relation to query.
+   * @param nameIdentifiers The list of source entity identifiers to look up 
relations for.
+   * @param identType The entity type that each element in {@code 
nameIdentifiers} represents.
+   * @return A list of {@link RelationalEntity} objects, one per (source, 
target) pair found. May be
+   *     empty but never null.
+   * @throws IOException If a storage-related error occurs during the batch 
query.
+   */
+  List<RelationalEntity<?>> batchListEntitiesByRelation(
+      Type relType, List<NameIdentifier> nameIdentifiers, Entity.EntityType 
identType)
+      throws IOException;
+
   /**
    * Get a specific entity that is related to a given source entity.
    *
diff --git 
a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java 
b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
index ede392c996..952bb1e193 100644
--- a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java
@@ -340,6 +340,16 @@ public class CaffeineEntityCache extends BaseEntityCache {
     return segmentedLock.withLockAndThrow(key, action);
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public <T, E extends Exception> T withMultipleKeyCacheLock(
+      List<EntityCacheKey> keys, EntityCache.ThrowingSupplier<T, E> action) 
throws E {
+    Preconditions.checkArgument(keys != null, "Keys cannot be null");
+    Preconditions.checkArgument(action != null, "Action cannot be null");
+
+    return segmentedLock.withMultipleKeyLockAndThrow(keys, action);
+  }
+
   /**
    * Removes the expired entity from the cache. This method is a hook method 
for the Cache, when an
    * entry expires, it will call this method.
diff --git a/core/src/main/java/org/apache/gravitino/cache/EntityCache.java 
b/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
index 3ad6de1e8a..218c5099d0 100644
--- a/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/EntityCache.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.cache;
 
+import java.util.List;
 import org.apache.gravitino.Entity;
 
 /**
@@ -61,6 +62,20 @@ public interface EntityCache extends 
SupportsEntityStoreCache, SupportsRelationE
   <T, E extends Exception> T withCacheLock(EntityCacheKey key, 
ThrowingSupplier<T, E> action)
       throws E;
 
+  /**
+   * Acquires locks for multiple cache keys and executes the action, returning 
the result. Keys are
+   * locked in a consistent order to avoid deadlocks.
+   *
+   * @param keys The cache keys to lock
+   * @param action The action to execute while holding all locks
+   * @param <T> The type of the result
+   * @param <E> The type of exception that may be thrown
+   * @return The result of the action
+   * @throws E if the action throws an exception of type E
+   */
+  <T, E extends Exception> T withMultipleKeyCacheLock(
+      List<EntityCacheKey> keys, ThrowingSupplier<T, E> action) throws E;
+
   /**
    * A functional interface that represents a supplier that may throw an 
exception.
    *
diff --git a/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java 
b/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
index 74babb75a5..a9c9939bc1 100644
--- a/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
+++ b/core/src/main/java/org/apache/gravitino/cache/NoOpsCache.java
@@ -71,6 +71,13 @@ public class NoOpsCache extends BaseEntityCache {
     return opLock.withLockAndThrow(key, action);
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public <T, E extends Exception> T withMultipleKeyCacheLock(
+      List<EntityCacheKey> keys, ThrowingSupplier<T, E> action) throws E {
+    return opLock.withMultipleKeyLockAndThrow(keys, action);
+  }
+
   /** {@inheritDoc} */
   @Override
   public <E extends Entity & HasIdentifier> Optional<E> getIfPresent(
diff --git a/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java 
b/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
index dce8eb63ce..c620125900 100644
--- a/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
+++ b/core/src/main/java/org/apache/gravitino/cache/SegmentedLock.java
@@ -21,18 +21,30 @@ package org.apache.gravitino.cache;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Striped;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Segmented lock for improved concurrency. Divides locks into segments to 
reduce contention.
  * Supports global clearing operations that require exclusive access to all 
segments.
  */
 public class SegmentedLock {
-  private final Striped<Lock> stripedLocks;
   private static final Object NULL_KEY = new Object();
 
+  private final Striped<Lock> stripedLocks;
+
+  /**
+   * Stable index for each stripe lock, built once at construction. Used to 
sort locks in {@link
+   * #getDistinctSortedLocks} with a guaranteed total order (no hash-collision 
ties).
+   */
+  private final Map<Lock, Integer> lockIndices;
+
   /** CountDownLatch for global operations - null when no operation is in 
progress */
   private final AtomicReference<CountDownLatch> globalOperationLatch = new 
AtomicReference<>();
 
@@ -50,6 +62,10 @@ public class SegmentedLock {
     }
 
     this.stripedLocks = Striped.lock(numSegments);
+    this.lockIndices =
+        IntStream.range(0, this.stripedLocks.size())
+            .boxed()
+            .collect(Collectors.toMap(this.stripedLocks::getAt, i -> i));
   }
 
   /**
@@ -176,6 +192,65 @@ public class SegmentedLock {
     }
   }
 
+  /**
+   * Acquires locks for multiple keys in a consistent order (sorted by stripe 
index) to avoid
+   * deadlocks, then executes the action.
+   *
+   * @param keys The keys to lock
+   * @param action Action to run
+   * @param <E> Exception type
+   * @throws E Exception from the action
+   */
+  public <E extends Exception> void withMultipleKeyLockAndThrow(
+      List<? extends Object> keys, EntityCache.ThrowingRunnable<E> action) 
throws E {
+    waitForGlobalComplete();
+    List<Lock> sortedLocks = getDistinctSortedLocks(keys);
+    acquireAllLocks(sortedLocks);
+    try {
+      action.run();
+    } finally {
+      releaseAllLocks(sortedLocks);
+    }
+  }
+
+  /**
+   * Acquires locks for multiple keys in a consistent order (sorted by stripe 
index) to avoid
+   * deadlocks, then executes the action and returns the result.
+   *
+   * @param keys The keys to lock
+   * @param action Action to run
+   * @param <T> Result type
+   * @param <E> Exception type
+   * @return Action result
+   * @throws E Exception from the action
+   */
+  public <T, E extends Exception> T withMultipleKeyLockAndThrow(
+      List<? extends Object> keys, EntityCache.ThrowingSupplier<T, E> action) 
throws E {
+    waitForGlobalComplete();
+    List<Lock> sortedLocks = getDistinctSortedLocks(keys);
+    acquireAllLocks(sortedLocks);
+    try {
+      return action.get();
+    } finally {
+      releaseAllLocks(sortedLocks);
+    }
+  }
+
+  /** Checks if a global operation is currently in progress. */
+  @VisibleForTesting
+  public boolean isClearing() {
+    return globalOperationLatch.get() != null;
+  }
+
+  /**
+   * Returns number of lock segments.
+   *
+   * @return Number of segments
+   */
+  public int getNumSegments() {
+    return stripedLocks.size();
+  }
+
   /**
    * Executes a global clearing operation with exclusive access to all 
segments. This method sets
    * the clearing flag and ensures no other operations can proceed until the 
clearing is complete.
@@ -202,6 +277,38 @@ public class SegmentedLock {
     }
   }
 
+  private List<Lock> getDistinctSortedLocks(List<? extends Object> keys) {
+    return keys.stream()
+        .map(this::getSegmentLock)
+        .distinct()
+        .sorted(Comparator.comparingInt(lockIndices::get))
+        .collect(Collectors.toList());
+  }
+
+  private void acquireAllLocks(List<Lock> locks) {
+    int lockedCount = 0;
+    try {
+      for (Lock lock : locks) {
+        lock.lockInterruptibly();
+        lockedCount++;
+      }
+    } catch (InterruptedException e) {
+      releaseLocks(locks, lockedCount);
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Thread was interrupted while acquiring 
batch lock", e);
+    }
+  }
+
+  private static void releaseLocks(List<Lock> locks, int lockedCount) {
+    for (int i = lockedCount - 1; i >= 0; i--) {
+      locks.get(i).unlock();
+    }
+  }
+
+  private void releaseAllLocks(List<Lock> locks) {
+    releaseLocks(locks, locks.size());
+  }
+
   /**
    * Waits for any ongoing global operation to complete. This method is called 
by regular operations
    * to ensure they don't interfere with global operations.
@@ -218,19 +325,4 @@ public class SegmentedLock {
       }
     }
   }
-
-  /** Checks if a global operation is currently in progress. */
-  @VisibleForTesting
-  public boolean isClearing() {
-    return globalOperationLatch.get() != null;
-  }
-
-  /**
-   * Returns number of lock segments.
-   *
-   * @return Number of segments
-   */
-  public int getNumSegments() {
-    return stripedLocks.size();
-  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 2f4becd77e..900065f87a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -38,6 +38,7 @@ import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.RelationalEntity;
 import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.UnsupportedEntityTypeException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -350,6 +351,31 @@ public class JDBCBackend implements RelationalBackend {
       case JOB_TEMPLATE:
         return (List<E>)
             
JobTemplateMetaService.getInstance().batchGetJobTemplateByIdentifier(identifiers);
+      case USER:
+        // TODO: I will add batch operations for users, groups and views
+        List<E> users = Lists.newArrayList();
+        for (NameIdentifier identifier : identifiers) {
+          users.add((E) 
UserMetaService.getInstance().getUserByIdentifier(identifier));
+        }
+        return users;
+      case GROUP:
+        List<E> groups = Lists.newArrayList();
+        for (NameIdentifier identifier : identifiers) {
+          groups.add((E) 
GroupMetaService.getInstance().getGroupByIdentifier(identifier));
+        }
+        return groups;
+      case ROLE:
+        List<E> roles = Lists.newArrayList();
+        for (NameIdentifier identifier : identifiers) {
+          roles.add((E) 
RoleMetaService.getInstance().getRoleByIdentifier(identifier));
+        }
+        return roles;
+      case VIEW:
+        List<E> views = Lists.newArrayList();
+        for (NameIdentifier identifier : identifiers) {
+          views.add((E) 
ViewMetaService.getInstance().getViewByIdentifier(identifier));
+        }
+        return views;
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for batch get operation", entityType);
@@ -669,6 +695,19 @@ public class JDBCBackend implements RelationalBackend {
     }
   }
 
+  @Override
+  public List<RelationalEntity<?>> batchListEntitiesByRelation(
+      Type relType, List<NameIdentifier> nameIdentifiers, Entity.EntityType 
identType)
+      throws IOException {
+    switch (relType) {
+      case OWNER_REL:
+        return OwnerMetaService.getInstance().batchGetOwner(nameIdentifiers, 
identType);
+      default:
+        throw new IllegalArgumentException(
+            String.format("Doesn't support the relation type %s", relType));
+    }
+  }
+
   @Override
   public void insertRelation(
       SupportsRelationOperations.Type relType,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index 31183f2649..9fdd2c7886 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
 import org.apache.commons.lang3.tuple.Pair;
@@ -36,10 +38,12 @@ import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.HasIdentifier;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.RelationalEntity;
 import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.cache.CacheFactory;
 import org.apache.gravitino.cache.CachedEntityIdResolver;
 import org.apache.gravitino.cache.EntityCache;
+import org.apache.gravitino.cache.EntityCacheKey;
 import org.apache.gravitino.cache.EntityCacheRelationKey;
 import org.apache.gravitino.cache.NoOpsCache;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -225,6 +229,47 @@ public class RelationalEntityStore implements EntityStore, 
SupportsRelationOpera
         });
   }
 
+  @Override
+  public List<RelationalEntity<?>> batchListEntitiesByRelation(
+      Type relType, List<NameIdentifier> nameIdentifiers, Entity.EntityType 
identType)
+      throws IOException {
+    if (nameIdentifiers == null || nameIdentifiers.isEmpty()) {
+      return new ArrayList<>();
+    }
+
+    List<EntityCacheKey> lockKeys = new ArrayList<>();
+    for (NameIdentifier id : nameIdentifiers) {
+      lockKeys.add(EntityCacheRelationKey.of(id, identType, relType));
+    }
+
+    return cache.withMultipleKeyCacheLock(
+        lockKeys,
+        () -> {
+          List<RelationalEntity<?>> result = new ArrayList<>();
+          List<NameIdentifier> uncachedIdentifiers = new ArrayList<>();
+
+          for (NameIdentifier nameIdentifier : nameIdentifiers) {
+            Optional<List<RelationalEntity<?>>> cachedRelations =
+                getCachedRelations(relType, nameIdentifier, identType);
+            if (cachedRelations.isPresent()) {
+              result.addAll(cachedRelations.get());
+            } else {
+              uncachedIdentifiers.add(nameIdentifier);
+            }
+          }
+
+          if (!uncachedIdentifiers.isEmpty()) {
+            List<RelationalEntity<?>> backendRelations =
+                backend.batchListEntitiesByRelation(relType, 
uncachedIdentifiers, identType);
+            result.addAll(backendRelations);
+
+            batchPopulateRelationCache(relType, identType, 
uncachedIdentifiers, backendRelations);
+          }
+
+          return result;
+        });
+  }
+
   @Override
   public <E extends Entity & HasIdentifier> E getEntityByRelation(
       Type relType,
@@ -318,4 +363,44 @@ public class RelationalEntityStore implements EntityStore, 
SupportsRelationOpera
       throws IOException, EntityAlreadyExistsException {
     backend.batchPut(entities, overwritten);
   }
+
+  private <E extends Entity & HasIdentifier> 
Optional<List<RelationalEntity<?>>> getCachedRelations(
+      SupportsRelationOperations.Type relType,
+      NameIdentifier nameIdentifier,
+      Entity.EntityType identType) {
+    Optional<List<E>> entitiesOpt = cache.getIfPresent(relType, 
nameIdentifier, identType);
+    if (entitiesOpt.isPresent()) {
+      List<RelationalEntity<?>> cachedRelations = new ArrayList<>();
+      for (E entity : entitiesOpt.get()) {
+        cachedRelations.add(new RelationalEntity<>(relType, nameIdentifier, 
identType, entity));
+      }
+      return Optional.of(cachedRelations);
+    }
+    return Optional.empty();
+  }
+
+  private <E extends Entity & HasIdentifier> void batchPopulateRelationCache(
+      SupportsRelationOperations.Type relType,
+      Entity.EntityType identType,
+      List<NameIdentifier> uncachedIdentifiers,
+      List<RelationalEntity<?>> backendRelations) {
+    Map<NameIdentifier, List<RelationalEntity<?>>> relationsBySource = new 
HashMap<>();
+    for (RelationalEntity<?> relation : backendRelations) {
+      relationsBySource.computeIfAbsent(relation.source(), k -> new 
ArrayList<>()).add(relation);
+    }
+
+    for (NameIdentifier sourceId : uncachedIdentifiers) {
+      List<RelationalEntity<?>> sourceRelations = 
relationsBySource.get(sourceId);
+      List<E> entityList = new ArrayList<>();
+      if (sourceRelations != null) {
+        for (RelationalEntity<?> rel : sourceRelations) {
+          @SuppressWarnings("unchecked")
+          E entity = (E) rel.targetEntity();
+          entityList.add(entity);
+        }
+      }
+
+      cache.put(sourceId, identType, relType, entityList);
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java
index 455d78fb59..17247163e2 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaMapper.java
@@ -18,8 +18,10 @@
  */
 package org.apache.gravitino.storage.relational.mapper;
 
+import java.util.List;
 import org.apache.gravitino.storage.relational.po.GroupPO;
 import org.apache.gravitino.storage.relational.po.OwnerRelPO;
+import org.apache.gravitino.storage.relational.po.UserOwnerRelPO;
 import org.apache.gravitino.storage.relational.po.UserPO;
 import org.apache.ibatis.annotations.InsertProvider;
 import org.apache.ibatis.annotations.Param;
@@ -45,6 +47,13 @@ public interface OwnerMetaMapper {
       @Param("metadataObjectId") Long metadataObjectId,
       @Param("metadataObjectType") String metadataObjectType);
 
+  @SelectProvider(
+      type = OwnerMetaSQLProviderFactory.class,
+      method = "batchSelectUserOwnerMetaByMetadataObjectIdAndType")
+  List<UserOwnerRelPO> batchSelectUserOwnerMetaByMetadataObjectIdAndType(
+      @Param("metadataObjectIds") List<Long> metadataObjectIds,
+      @Param("metadataObjectType") String metadataObjectType);
+
   @SelectProvider(
       type = OwnerMetaSQLProviderFactory.class,
       method = "selectGroupOwnerMetaByMetadataObjectIdAndType")
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
index b7b528a300..0dfcdc5bed 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/OwnerMetaSQLProviderFactory.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.OwnerMetaBaseSQLProvider;
@@ -96,4 +97,11 @@ public class OwnerMetaSQLProviderFactory {
       @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
     return getProvider().deleteOwnerMetasByLegacyTimeline(legacyTimeline, 
limit);
   }
+
+  public static String batchSelectUserOwnerMetaByMetadataObjectIdAndType(
+      @Param("metadataObjectIds") List<Long> metadataObjectIds,
+      @Param("metadataObjectType") String metadataObjectType) {
+    return getProvider()
+        .batchSelectUserOwnerMetaByMetadataObjectIdAndType(metadataObjectIds, 
metadataObjectType);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
index e300c5bb44..ece7bfc22a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
@@ -20,6 +20,7 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 
 import static 
org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper.OWNER_TABLE_NAME;
 
+import java.util.List;
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
@@ -53,6 +54,34 @@ public class OwnerMetaBaseSQLProvider {
         + " ot.deleted_at = 0 AND ut.deleted_at = 0";
   }
 
+  public String batchSelectUserOwnerMetaByMetadataObjectIdAndType(
+      @Param("metadataObjectIds") List<Long> metadataObjectIds,
+      @Param("metadataObjectType") String metadataObjectType) {
+    return "<script>"
+        + "SELECT ot.metadata_object_id as metadataObjectId,"
+        + "ut.user_id as userId, "
+        + "ut.user_name as userName, "
+        + "ut.metalake_id as metalakeId, "
+        + "ut.audit_info as auditInfo, "
+        + "ut.current_version as currentVersion, "
+        + "ut.last_version as lastVersion, "
+        + "ut.deleted_at as deletedAt "
+        + "FROM "
+        + OWNER_TABLE_NAME
+        + " ot LEFT JOIN "
+        + UserMetaMapper.USER_TABLE_NAME
+        + " ut ON ut.user_id = ot.owner_id "
+        + "WHERE "
+        + "ot.metadata_object_type = #{metadataObjectType} "
+        + "AND ot.owner_type = 'USER' "
+        + "AND ot.metadata_object_id IN "
+        + "<foreach collection='metadataObjectIds' item='itemId' open='(' 
separator=',' close=')'>"
+        + "#{itemId}"
+        + "</foreach> "
+        + "AND ot.deleted_at = 0 AND ut.deleted_at = 0 "
+        + "</script>";
+  }
+
   public String selectGroupOwnerMetaByMetadataObjectIdAndType(
       @Param("metadataObjectId") Long metadataObjectId,
       @Param("metadataObjectType") String metadataObjectType) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/UserOwnerRelPO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/UserOwnerRelPO.java
new file mode 100644
index 0000000000..969b49c1c1
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/UserOwnerRelPO.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.po;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class UserOwnerRelPO extends UserPO {
+
+  private Long metadataObjectId;
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
index a68b2ebb3e..2a209e654f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/OwnerMetaService.java
@@ -20,15 +20,27 @@ package org.apache.gravitino.storage.relational.service;
 
 import static 
org.apache.gravitino.metrics.source.MetricsSource.GRAVITINO_RELATIONAL_STORE_METRIC_NAME;
 
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.RelationalEntity;
+import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.authorization.AuthorizationUtils;
+import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.metrics.Monitored;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.po.GroupPO;
 import org.apache.gravitino.storage.relational.po.OwnerRelPO;
+import org.apache.gravitino.storage.relational.po.UserOwnerRelPO;
 import org.apache.gravitino.storage.relational.po.UserPO;
 import org.apache.gravitino.storage.relational.utils.POConverters;
 import org.apache.gravitino.storage.relational.utils.SessionUtils;
@@ -79,6 +91,59 @@ public class OwnerMetaService {
     return Optional.empty();
   }
 
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "batchGetOwner")
+  public List<RelationalEntity<?>> batchGetOwner(
+      List<NameIdentifier> identifiers, Entity.EntityType type) {
+    if (CollectionUtils.isEmpty(identifiers)) {
+      return new ArrayList<>();
+    }
+    String metalake = NameIdentifierUtil.getMetalake(identifiers.get(0));
+    for (NameIdentifier identifier : identifiers) {
+      Preconditions.checkArgument(
+          Objects.equals(NameIdentifierUtil.getMetalake(identifier), metalake),
+          "identifiers should in one metalake");
+    }
+    List<RelationalEntity<?>> result = new ArrayList<>();
+    Map<Long, NameIdentifier> nameIdentifierMap = new HashMap<>();
+    List<Long> entityIds =
+        identifiers.stream()
+            .map(
+                identifier -> {
+                  long entityId = EntityIdService.getEntityId(identifier, 
type);
+                  nameIdentifierMap.put(entityId, identifier);
+                  return entityId;
+                })
+            .collect(Collectors.toList());
+
+    // Get user owners
+    List<UserOwnerRelPO> userPOList =
+        SessionUtils.getWithoutCommit(
+            OwnerMetaMapper.class,
+            mapper ->
+                
mapper.batchSelectUserOwnerMetaByMetadataObjectIdAndType(entityIds, 
type.name()));
+    if (CollectionUtils.isNotEmpty(userPOList)) {
+      userPOList.forEach(
+          userPO -> {
+            UserEntity userEntity =
+                POConverters.fromUserPO(
+                    userPO, Collections.emptyList(), 
AuthorizationUtils.ofUserNamespace(metalake));
+            result.add(
+                new RelationalEntity<>(
+                    SupportsRelationOperations.Type.OWNER_REL,
+                    nameIdentifierMap.get(userPO.getMetadataObjectId()),
+                    type,
+                    userEntity));
+          });
+    }
+
+    // TODO: Add batch support for group owners when GroupOwnerRelPO and batch 
method are available
+    // For now, we only handle user owners in batch mode
+
+    return result;
+  }
+
   @Monitored(metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME, 
baseMetricName = "setOwner")
   public void setOwner(
       NameIdentifier entity,
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
index 8f42bb55c6..8d1ad4ca93 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
@@ -23,8 +23,15 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.RelationalEntity;
+import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
@@ -491,6 +498,179 @@ class TestOwnerMetaService extends TestJDBCBackend {
     Assertions.assertEquals(0, countActiveOwnerRel(user.id()));
   }
 
+  @TestTemplate
+  void testBatchGetOwnerWithUserOwners() throws IOException {
+    createAndInsertMakeLake(METALAKE_NAME);
+    CatalogEntity catalog = createAndInsertCatalog(METALAKE_NAME, 
CATALOG_NAME);
+    SchemaEntity schema = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME);
+    TableEntity table =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME),
+            TABLE_NAME,
+            AUDIT_INFO);
+    backend.insert(table, false);
+    TopicEntity topic =
+        createTopicEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME),
+            TOPIC_NAME,
+            AUDIT_INFO);
+    backend.insert(topic, false);
+
+    UserEntity user =
+        createUserEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            AuthorizationUtils.ofUserNamespace(METALAKE_NAME),
+            "user",
+            AUDIT_INFO);
+    backend.insert(user, false);
+
+    OwnerMetaService.getInstance()
+        .setOwner(catalog.nameIdentifier(), catalog.type(), 
user.nameIdentifier(), user.type());
+    OwnerMetaService.getInstance()
+        .setOwner(schema.nameIdentifier(), schema.type(), 
user.nameIdentifier(), user.type());
+    OwnerMetaService.getInstance()
+        .setOwner(table.nameIdentifier(), table.type(), user.nameIdentifier(), 
user.type());
+    OwnerMetaService.getInstance()
+        .setOwner(topic.nameIdentifier(), topic.type(), user.nameIdentifier(), 
user.type());
+
+    List<NameIdentifier> identifiers =
+        List.of(
+            catalog.nameIdentifier(),
+            schema.nameIdentifier(),
+            table.nameIdentifier(),
+            topic.nameIdentifier());
+
+    List<RelationalEntity<?>> relations =
+        OwnerMetaService.getInstance().batchGetOwner(identifiers, 
Entity.EntityType.CATALOG);
+    Assertions.assertEquals(1, relations.size());
+
+    // catalog is CATALOG type; schema/table/topic share the same owner but 
different types.
+    // batchGetOwner queries by a single identType, so query per type.
+    List<RelationalEntity<?>> catalogRelations =
+        OwnerMetaService.getInstance()
+            .batchGetOwner(List.of(catalog.nameIdentifier()), 
Entity.EntityType.CATALOG);
+    List<RelationalEntity<?>> schemaRelations =
+        OwnerMetaService.getInstance()
+            .batchGetOwner(List.of(schema.nameIdentifier()), 
Entity.EntityType.SCHEMA);
+    List<RelationalEntity<?>> tableRelations =
+        OwnerMetaService.getInstance()
+            .batchGetOwner(List.of(table.nameIdentifier()), 
Entity.EntityType.TABLE);
+    List<RelationalEntity<?>> topicRelations =
+        OwnerMetaService.getInstance()
+            .batchGetOwner(List.of(topic.nameIdentifier()), 
Entity.EntityType.TOPIC);
+
+    Assertions.assertEquals(1, catalogRelations.size());
+    Assertions.assertEquals(catalog.nameIdentifier(), 
catalogRelations.get(0).source());
+    Assertions.assertEquals(Entity.EntityType.CATALOG, 
catalogRelations.get(0).sourceType());
+    Assertions.assertEquals(Entity.EntityType.USER, 
catalogRelations.get(0).targetEntity().type());
+    Assertions.assertEquals(
+        SupportsRelationOperations.Type.OWNER_REL, 
catalogRelations.get(0).type());
+
+    Assertions.assertEquals(1, schemaRelations.size());
+    Assertions.assertEquals(schema.nameIdentifier(), 
schemaRelations.get(0).source());
+
+    Assertions.assertEquals(1, tableRelations.size());
+    Assertions.assertEquals(table.nameIdentifier(), 
tableRelations.get(0).source());
+
+    Assertions.assertEquals(1, topicRelations.size());
+    Assertions.assertEquals(topic.nameIdentifier(), 
topicRelations.get(0).source());
+
+    // All targets should be the same user
+    List<RelationalEntity<?>> allRelations =
+        List.of(
+            catalogRelations.get(0),
+            schemaRelations.get(0),
+            tableRelations.get(0),
+            topicRelations.get(0));
+    for (RelationalEntity<?> rel : allRelations) {
+      Assertions.assertEquals(user.nameIdentifier(), 
rel.targetEntity().nameIdentifier());
+    }
+  }
+
+  @TestTemplate
+  void testBatchGetOwnerWithMultipleEntitiesOfSameType() throws IOException {
+    createAndInsertMakeLake(METALAKE_NAME);
+    createAndInsertCatalog(METALAKE_NAME, CATALOG_NAME);
+    SchemaEntity schema1 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME);
+    SchemaEntity schema2 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME + "_2");
+    SchemaEntity schema3 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME + "_3");
+
+    UserEntity user1 =
+        createUserEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            AuthorizationUtils.ofUserNamespace(METALAKE_NAME),
+            "user1",
+            AUDIT_INFO);
+    backend.insert(user1, false);
+    UserEntity user2 =
+        createUserEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            AuthorizationUtils.ofUserNamespace(METALAKE_NAME),
+            "user2",
+            AUDIT_INFO);
+    backend.insert(user2, false);
+
+    // schema1 -> user1, schema2 -> user2, schema3 has no owner
+    OwnerMetaService.getInstance()
+        .setOwner(schema1.nameIdentifier(), schema1.type(), 
user1.nameIdentifier(), user1.type());
+    OwnerMetaService.getInstance()
+        .setOwner(schema2.nameIdentifier(), schema2.type(), 
user2.nameIdentifier(), user2.type());
+
+    List<RelationalEntity<?>> relations =
+        OwnerMetaService.getInstance()
+            .batchGetOwner(
+                List.of(
+                    schema1.nameIdentifier(), schema2.nameIdentifier(), 
schema3.nameIdentifier()),
+                Entity.EntityType.SCHEMA);
+
+    Assertions.assertEquals(2, relations.size());
+
+    Map<NameIdentifier, NameIdentifier> sourceToTarget =
+        relations.stream()
+            .collect(
+                Collectors.toMap(RelationalEntity::source, r -> 
r.targetEntity().nameIdentifier()));
+
+    Assertions.assertEquals(user1.nameIdentifier(), 
sourceToTarget.get(schema1.nameIdentifier()));
+    Assertions.assertEquals(user2.nameIdentifier(), 
sourceToTarget.get(schema2.nameIdentifier()));
+    // schema3 has no owner — not present in results
+    
Assertions.assertFalse(sourceToTarget.containsKey(schema3.nameIdentifier()));
+
+    // Verify RelationalEntity metadata
+    for (RelationalEntity<?> rel : relations) {
+      Assertions.assertEquals(SupportsRelationOperations.Type.OWNER_REL, 
rel.type());
+      Assertions.assertEquals(Entity.EntityType.SCHEMA, rel.sourceType());
+      Assertions.assertEquals(Entity.EntityType.USER, 
rel.targetEntity().type());
+    }
+  }
+
+  @TestTemplate
+  void testBatchGetOwnerWithEmptyInput() {
+    List<RelationalEntity<?>> relations =
+        OwnerMetaService.getInstance()
+            .batchGetOwner(Collections.emptyList(), Entity.EntityType.TABLE);
+    Assertions.assertNotNull(relations);
+    Assertions.assertTrue(relations.isEmpty());
+  }
+
+  @TestTemplate
+  void testBatchGetOwnerNoneHaveOwners() throws IOException {
+    createAndInsertMakeLake(METALAKE_NAME);
+    createAndInsertCatalog(METALAKE_NAME, CATALOG_NAME);
+    SchemaEntity schema1 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME);
+    SchemaEntity schema2 = createAndInsertSchema(METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME + "_2");
+
+    List<RelationalEntity<?>> relations =
+        OwnerMetaService.getInstance()
+            .batchGetOwner(
+                List.of(schema1.nameIdentifier(), schema2.nameIdentifier()),
+                Entity.EntityType.SCHEMA);
+
+    Assertions.assertNotNull(relations);
+    Assertions.assertTrue(relations.isEmpty());
+  }
+
   private Integer countAllOwnerRel(Long ownerId) {
     try (SqlSession sqlSession =
             
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
index 5a0f1ee8bf..b63251fd96 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataAuthzHelper.java
@@ -33,11 +33,13 @@ import java.util.function.Function;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.Metalake;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.authorization.AuthorizationRequestContext;
 import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.dto.tag.MetadataObjectDTO;
@@ -166,6 +168,7 @@ public class MetadataAuthzHelper {
       Entity.EntityType entityType,
       NameIdentifier[] nameIdentifiers) {
     preloadToCache(entityType, nameIdentifiers);
+    preloadOwner(entityType, nameIdentifiers);
     return filterByExpression(metalake, expression, entityType, 
nameIdentifiers, e -> e);
   }
 
@@ -390,4 +393,21 @@ public class MetadataAuthzHelper {
             entityType,
             EntityClassMapper.getEntityClass(entityType));
   }
+
+  private static void preloadOwner(Entity.EntityType entityType, 
NameIdentifier[] nameIdentifiers) {
+    if (!GravitinoEnv.getInstance().cacheEnabled()) {
+      return;
+    }
+    EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
+    try {
+      entityStore
+          .relationOperations()
+          .batchListEntitiesByRelation(
+              SupportsRelationOperations.Type.OWNER_REL,
+              Arrays.stream(nameIdentifiers).toList(),
+              entityType);
+    } catch (Exception e) {
+      LOG.warn("Ignore preloadOwner error:{}", e.getMessage(), e);
+    }
+  }
 }


Reply via email to