This is an automated email from the ASF dual-hosted git repository. liuxun pushed a commit to branch fix-cache in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit b7ff9feff15a088ea682259189bab9aaaee2fcc8 Author: Xun <[email protected]> AuthorDate: Mon Aug 25 16:30:54 2025 +0800 fixed 5 issues --- .../java/org/apache/gravitino/EntityStore.java | 8 +- .../gravitino/cache/CaffeineEntityCache.java | 213 ++++++++++++--------- .../storage/relational/RelationalEntityStore.java | 20 +- .../gravitino/cache/TestCaffeineEntityCache.java | 4 +- .../storage/memory/TestMemoryEntityStore.java | 36 ++-- .../java/org/apache/gravitino/utils/TestUtil.java | 2 - 6 files changed, 163 insertions(+), 120 deletions(-) diff --git a/core/src/main/java/org/apache/gravitino/EntityStore.java b/core/src/main/java/org/apache/gravitino/EntityStore.java index 7bd77d274b..f305733192 100644 --- a/core/src/main/java/org/apache/gravitino/EntityStore.java +++ b/core/src/main/java/org/apache/gravitino/EntityStore.java @@ -150,8 +150,12 @@ public interface EntityStore extends Closeable { throws IOException, NoSuchEntityException, EntityAlreadyExistsException; <E extends Entity & HasIdentifier> E update( - NameIdentifier ident, Class<E> type, EntityType entityType, List<String> roles, Function<E, E> updater) - throws IOException, NoSuchEntityException, EntityAlreadyExistsException; + NameIdentifier ident, + Class<E> type, + EntityType entityType, + List<String> roles, + Function<E, E> updater) + throws IOException, NoSuchEntityException, EntityAlreadyExistsException; /** * Get the entity from the underlying storage. diff --git a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java index cb8e6dd756..b0cc0bcd5a 100644 --- a/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java +++ b/core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java @@ -45,16 +45,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.stream.Collectors; - import org.apache.commons.lang3.ArrayUtils; import org.apache.gravitino.Config; import org.apache.gravitino.Configs; import org.apache.gravitino.Entity; import org.apache.gravitino.HasIdentifier; -import org.apache.gravitino.MetadataObject; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; import org.apache.gravitino.SupportsRelationOperations; +import org.apache.gravitino.hook.CatalogHookDispatcher; import org.apache.gravitino.meta.GroupEntity; import org.apache.gravitino.meta.ModelVersionEntity; import org.apache.gravitino.meta.RoleEntity; @@ -316,13 +315,17 @@ public class CaffeineEntityCache extends BaseEntityCache { } List<String> allCacheIndexKeys1 = Lists.newArrayList(); - cacheIndex.getKeysStartingWith("").forEach( + cacheIndex + .getKeysStartingWith("") + .forEach( k -> { allCacheIndexKeys1.add(k.toString()); }); List<String> allReverseIndexKeys1 = Lists.newArrayList(); - reverseIndex.getKeysStartingWith("").forEach( + reverseIndex + .getKeysStartingWith("") + .forEach( k -> { allReverseIndexKeys1.add(k.toString()); }); @@ -333,60 +336,74 @@ public class CaffeineEntityCache extends BaseEntityCache { cacheData.put(key, newEntities); for (Entity entity : newEntities) { - if (entity instanceof UserEntity) { - // UserEntity is not supported in the cache, skip it. - UserEntity userEntity = (UserEntity) entity; - if (userEntity.roleNames() != null) { - userEntity.roleNames().forEach( - role -> { - Namespace ns = NamespaceUtil.ofUser(userEntity.namespace().level(0)); - NameIdentifier nameIdentifier = NameIdentifier.of(ns, role); - putReverseIndex(nameIdentifier, Entity.EntityType.ROLE, key); - }); - } - } else if (entity instanceof GroupEntity) { - // UserEntity is not supported in the cache, skip it. - GroupEntity groupEntity = (GroupEntity) entity; - if (groupEntity.roleNames() != null) { - groupEntity.roleNames().forEach( - role -> { - Namespace ns = NamespaceUtil.ofGroup(groupEntity.namespace().level(0)); - NameIdentifier nameIdentifier = NameIdentifier.of(ns, role); - putReverseIndex(nameIdentifier, Entity.EntityType.ROLE, key); - }); - } - } else if (entity instanceof RoleEntity) { - // UserEntity is not supported in the cache, skip it. - RoleEntity roleEntity = (RoleEntity) entity; - if (roleEntity.securableObjects() != null) { - roleEntity.securableObjects().forEach( - securableObject -> { - Namespace namespace = Namespace.empty(); -// Namespace nsParent = Namespace.fromString(securableObject.parent()); - Entity.EntityType entityType = Entity.EntityType.METALAKE; - switch (securableObject.type()){ - case CATALOG: - entityType = Entity.EntityType.CATALOG; - namespace = NamespaceUtil.ofCatalog(roleEntity.namespace().level(0)); - break; - case FILESET: - entityType = Entity.EntityType.FILESET; - Namespace nsParent = Namespace.fromString(securableObject.parent()); - namespace = NamespaceUtil.ofFileset(roleEntity.namespace().level(0), nsParent.level(0), nsParent.level(1)); - break; - default: - throw new IllegalStateException("Unprocessed securable object type: " + securableObject.type()); - } - Namespace so_namespace = Namespace.of(ArrayUtils.add(namespace.levels(), securableObject.name())); - NameIdentifier nameIdentifier = NameIdentifier.of(so_namespace, securableObject.name()); - putReverseIndex(nameIdentifier, entityType, key); - }); - } + if (entity instanceof UserEntity) { + // UserEntity is not supported in the cache, skip it. + UserEntity userEntity = (UserEntity) entity; + if (userEntity.roleNames() != null) { + userEntity + .roleNames() + .forEach( + role -> { + Namespace ns = NamespaceUtil.ofUser(userEntity.namespace().level(0)); + NameIdentifier nameIdentifier = NameIdentifier.of(ns, role); + putReverseIndex(nameIdentifier, Entity.EntityType.ROLE, key); + }); } + } else if (entity instanceof GroupEntity) { + // UserEntity is not supported in the cache, skip it. + GroupEntity groupEntity = (GroupEntity) entity; + if (groupEntity.roleNames() != null) { + groupEntity + .roleNames() + .forEach( + role -> { + Namespace ns = NamespaceUtil.ofGroup(groupEntity.namespace().level(0)); + NameIdentifier nameIdentifier = NameIdentifier.of(ns, role); + putReverseIndex(nameIdentifier, Entity.EntityType.ROLE, key); + }); + } + } else if (entity instanceof RoleEntity) { + // UserEntity is not supported in the cache, skip it. + RoleEntity roleEntity = (RoleEntity) entity; + if (roleEntity.securableObjects() != null) { + roleEntity + .securableObjects() + .forEach( + securableObject -> { + Namespace namespace = Namespace.empty(); + // Namespace nsParent = + // Namespace.fromString(securableObject.parent()); + Entity.EntityType entityType = Entity.EntityType.METALAKE; + switch (securableObject.type()) { + case CATALOG: + entityType = Entity.EntityType.CATALOG; + namespace = NamespaceUtil.ofCatalog(roleEntity.namespace().level(0)); + break; + case FILESET: + entityType = Entity.EntityType.FILESET; + Namespace nsParent = Namespace.fromString(securableObject.parent()); + namespace = + NamespaceUtil.ofFileset( + roleEntity.namespace().level(0), + nsParent.level(0), + nsParent.level(1)); + break; + default: + LOG.error("syncEntitiesToCache: Unprocessed securable object type: " + securableObject.type()); +// throw new IllegalStateException( +// "Unprocessed securable object type: " + securableObject.type()); + } + Namespace so_namespace = + Namespace.of(ArrayUtils.add(namespace.levels(), securableObject.name())); + NameIdentifier nameIdentifier = + NameIdentifier.of(so_namespace, securableObject.name()); + putReverseIndex(nameIdentifier, entityType, key); + }); + } + } - -// Namespace ns = NamespaceUtil.ofRole(ident.namespace().level(0)); -// NameIdentifier nameIdentifier = NameIdentifier.of(ns, role); + // Namespace ns = NamespaceUtil.ofRole(ident.namespace().level(0)); + // NameIdentifier nameIdentifier = NameIdentifier.of(ns, role); putReverseIndex(entity, key); } @@ -396,13 +413,17 @@ public class CaffeineEntityCache extends BaseEntityCache { } List<String> allCacheIndexKeys2 = Lists.newArrayList(); - cacheIndex.getKeysStartingWith("").forEach( + cacheIndex + .getKeysStartingWith("") + .forEach( k -> { allCacheIndexKeys2.add(k.toString()); }); List<String> allReverseIndexKeys2 = Lists.newArrayList(); - reverseIndex.getKeysStartingWith("").forEach( + reverseIndex + .getKeysStartingWith("") + .forEach( k -> { allReverseIndexKeys2.add(k.toString()); }); @@ -414,33 +435,36 @@ public class CaffeineEntityCache extends BaseEntityCache { private void check_cache() { // DEBUG code List<String> allCacheIndexKeys1 = Lists.newArrayList(); - cacheIndex.getKeysStartingWith("").forEach( + cacheIndex + .getKeysStartingWith("") + .forEach( k -> { allCacheIndexKeys1.add(k.toString()); }); List<String> allReverseIndexKeys1 = Lists.newArrayList(); - reverseIndex.getKeysStartingWith("").forEach( + reverseIndex + .getKeysStartingWith("") + .forEach( k -> { allReverseIndexKeys1.add(k.toString()); }); Set<EntityCacheRelationKey> allCacheDataKeys1 = cacheData.asMap().keySet(); - List<String> allCacheDataKeysStrings1 = allCacheDataKeys1.stream() - .map(Object::toString) - .collect(Collectors.toList()); + List<String> allCacheDataKeysStrings1 = + allCacheDataKeys1.stream().map(Object::toString).collect(Collectors.toList()); long cacheDataSize = cacheData.estimatedSize(); int cacheIndexSize = cacheIndex.size(); int reverseIndexSize = reverseIndex.size(); -// if (cacheDataSize != cacheIndexSize/* -// || cacheDataSize != reverseIndexSize -// || cacheIndexSize != reverseIndexSize*/) { -// throw new IllegalStateException( -// String.format( -// "Cache data size (%d) does not match cache index size (%d).", -// cacheDataSize, cacheIndexSize)); -// } + // if (cacheDataSize != cacheIndexSize/* + // || cacheDataSize != reverseIndexSize + // || cacheIndexSize != reverseIndexSize*/) { + // throw new IllegalStateException( + // String.format( + // "Cache data size (%d) does not match cache index size (%d).", + // cacheDataSize, cacheIndexSize)); + // } } /** @@ -486,8 +510,9 @@ public class CaffeineEntityCache extends BaseEntityCache { return entityCacheKey; } - public void putReverseIndex(NameIdentifier nameIdentifier, Entity.EntityType type, EntityCacheRelationKey key) { -// EntityCacheKey entityCacheKey = makeEntityCacheKey(entity); + public void putReverseIndex( + NameIdentifier nameIdentifier, Entity.EntityType type, EntityCacheRelationKey key) { + // EntityCacheKey entityCacheKey = makeEntityCacheKey(entity); EntityCacheKey entityCacheKey = EntityCacheKey.of(nameIdentifier, type); String strEntityCacheKey = entityCacheKey.toString(); List<EntityCacheKey> entityKeysToRemove = @@ -504,9 +529,9 @@ public class CaffeineEntityCache extends BaseEntityCache { EntityCacheKey entityCacheKey = makeEntityCacheKey(entity); String strEntityCacheKey = entityCacheKey.toString(); List<EntityCacheKey> entityKeysToRemove = - Lists.newArrayList(reverseIndex.getValuesForKeysStartingWith(strEntityCacheKey)); + Lists.newArrayList(reverseIndex.getValuesForKeysStartingWith(strEntityCacheKey)); String strEntityCacheKeyNo = - String.format("%s-%d", strEntityCacheKey, entityKeysToRemove.size()); + String.format("%s-%d", strEntityCacheKey, entityKeysToRemove.size()); reverseIndex.put(strEntityCacheKeyNo, key); } } @@ -518,27 +543,32 @@ public class CaffeineEntityCache extends BaseEntityCache { */ private boolean invalidateEntities(NameIdentifier identifier) { List<String> allCacheIndexKeys1 = Lists.newArrayList(); - cacheIndex.getKeysStartingWith("").forEach( + cacheIndex + .getKeysStartingWith("") + .forEach( k -> { allCacheIndexKeys1.add(k.toString()); }); List<String> relationCacheIndex2 = Lists.newArrayList(); - cacheIndex.getKeysStartingWith(identifier.toString()) - .forEach(key -> { + cacheIndex + .getKeysStartingWith(identifier.toString()) + .forEach( + key -> { relationCacheIndex2.add(key.toString()); }); List<String> allReverseIndexKeys1 = Lists.newArrayList(); - reverseIndex.getKeysStartingWith("").forEach( + reverseIndex + .getKeysStartingWith("") + .forEach( k -> { allReverseIndexKeys1.add(k.toString()); }); Set<EntityCacheRelationKey> allCacheDataKeys1 = cacheData.asMap().keySet(); - List<String> allCacheDataKeysStrings1 = allCacheDataKeys1.stream() - .map(Object::toString) - .collect(Collectors.toList()); + List<String> allCacheDataKeysStrings1 = + allCacheDataKeys1.stream().map(Object::toString).collect(Collectors.toList()); List<EntityCacheKey> entityKeysToRemove = Lists.newArrayList(cacheIndex.getValuesForKeysStartingWith(identifier.toString())); @@ -605,27 +635,32 @@ public class CaffeineEntityCache extends BaseEntityCache { int reverseIndexSize1 = reverseIndex.size(); List<String> allCacheIndexKeys11 = Lists.newArrayList(); - cacheIndex.getKeysStartingWith("").forEach( + cacheIndex + .getKeysStartingWith("") + .forEach( k -> { allCacheIndexKeys11.add(k.toString()); }); List<String> relationCacheIndex21 = Lists.newArrayList(); - cacheIndex.getKeysStartingWith(identifier.toString()) - .forEach(key -> { + cacheIndex + .getKeysStartingWith(identifier.toString()) + .forEach( + key -> { relationCacheIndex21.add(key.toString()); }); List<String> allReverseIndexKeys11 = Lists.newArrayList(); - reverseIndex.getKeysStartingWith("").forEach( + reverseIndex + .getKeysStartingWith("") + .forEach( k -> { allReverseIndexKeys11.add(k.toString()); }); Set<EntityCacheRelationKey> allCacheDataKeys2 = cacheData.asMap().keySet(); - List<String> allCacheDataKeysStrings2 = allCacheDataKeys1.stream() - .map(Object::toString) - .collect(Collectors.toList()); + List<String> allCacheDataKeysStrings2 = + allCacheDataKeys1.stream().map(Object::toString).collect(Collectors.toList()); // DEBUG code check_cache(); diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java index 66335b2cc0..34e3547311 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java @@ -20,7 +20,6 @@ package org.apache.gravitino.storage.relational; import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_STORE; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.List; @@ -41,7 +40,6 @@ import org.apache.gravitino.cache.CacheFactory; import org.apache.gravitino.cache.EntityCache; import org.apache.gravitino.cache.NoOpsCache; import org.apache.gravitino.exceptions.NoSuchEntityException; -import org.apache.gravitino.meta.RoleEntity; import org.apache.gravitino.meta.TagEntity; import org.apache.gravitino.tag.SupportsTagOperations; import org.apache.gravitino.utils.Executable; @@ -121,7 +119,11 @@ public class RelationalEntityStore @Override public <E extends Entity & HasIdentifier> E update( - NameIdentifier ident, Class<E> type, Entity.EntityType entityType, List<String> roles, Function<E, E> updater) + NameIdentifier ident, + Class<E> type, + Entity.EntityType entityType, + List<String> roles, + Function<E, E> updater) throws IOException, NoSuchEntityException, EntityAlreadyExistsException { cache.invalidate(ident, entityType); @@ -130,12 +132,12 @@ public class RelationalEntityStore Namespace ns = NamespaceUtil.ofRole(ident.namespace().level(0)); NameIdentifier nameIdentifier = NameIdentifier.of(ns, role); -// RoleEntity roleEntity = RoleEntity.builder() -// .withId(1L) -// .withName(role) -// .withNamespace(NamespaceUtil.ofRole(ident.namespace().level(0))) -// .build(); -// cache.invalidate(nameIdentifier, Entity.EntityType.ROLE); + // RoleEntity roleEntity = RoleEntity.builder() + // .withId(1L) + // .withName(role) + // .withNamespace(NamespaceUtil.ofRole(ident.namespace().level(0))) + // .build(); + // cache.invalidate(nameIdentifier, Entity.EntityType.ROLE); }); return backend.update(ident, entityType, updater); diff --git a/core/src/test/java/org/apache/gravitino/cache/TestCaffeineEntityCache.java b/core/src/test/java/org/apache/gravitino/cache/TestCaffeineEntityCache.java index 8518c4d292..bea1f01402 100644 --- a/core/src/test/java/org/apache/gravitino/cache/TestCaffeineEntityCache.java +++ b/core/src/test/java/org/apache/gravitino/cache/TestCaffeineEntityCache.java @@ -190,8 +190,8 @@ public class TestCaffeineEntityCache { @Test /** * SCENE[2] <br> - * CACHE1 = Role1 -> [catalog1, catalog2] <br> - * CACHE2 = catalog1 -> [tab1, tab2] <br> + * CACHE1 = Role1 -> [catalog1, catalog2] <br> + * CACHE2 = catalog1 -> [tab1, tab2] <br> * ACTIVE: INVALIDATE catalog1, then need to remove RECORD1 and RECORD2 */ void testRemoveCacheRelation2() { diff --git a/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java b/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java index 122ea39982..7652a9301b 100644 --- a/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java +++ b/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java @@ -135,23 +135,27 @@ public class TestMemoryEntityStore { @Override public <E extends Entity & HasIdentifier> E update( - NameIdentifier ident, Class<E> type, EntityType entityType, List<String> roles, Function<E, E> updater) - throws IOException, NoSuchEntityException { + NameIdentifier ident, + Class<E> type, + EntityType entityType, + List<String> roles, + Function<E, E> updater) + throws IOException, NoSuchEntityException { return executeInTransaction( - () -> { - E e = (E) entityMap.get(ident); - if (e == null) { - throw new NoSuchEntityException("Entity %s does not exist", ident); - } - - E newE = updater.apply(e); - NameIdentifier newIdent = NameIdentifier.of(newE.namespace(), newE.name()); - if (!newIdent.equals(ident)) { - delete(ident, entityType); - } - entityMap.put(newIdent, newE); - return newE; - }); + () -> { + E e = (E) entityMap.get(ident); + if (e == null) { + throw new NoSuchEntityException("Entity %s does not exist", ident); + } + + E newE = updater.apply(e); + NameIdentifier newIdent = NameIdentifier.of(newE.namespace(), newE.name()); + if (!newIdent.equals(ident)) { + delete(ident, entityType); + } + entityMap.put(newIdent, newE); + return newE; + }); } @Override diff --git a/core/src/test/java/org/apache/gravitino/utils/TestUtil.java b/core/src/test/java/org/apache/gravitino/utils/TestUtil.java index b2f0d8087f..2662a34d0b 100644 --- a/core/src/test/java/org/apache/gravitino/utils/TestUtil.java +++ b/core/src/test/java/org/apache/gravitino/utils/TestUtil.java @@ -27,8 +27,6 @@ import com.google.common.collect.ImmutableMap; import java.time.Instant; import java.util.List; import java.util.Map; - -import com.google.common.collect.Lists; import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace;
