This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 5159313d70 [#7179] improvement(core): Remove old cached System (#7459)
5159313d70 is described below
commit 5159313d700977ac677ca7a8ff34c4ebfe4fc71b
Author: Lord of Abyss <[email protected]>
AuthorDate: Mon Jun 30 09:34:46 2025 +0800
[#7179] improvement(core): Remove old cached System (#7459)
### What changes were proposed in this pull request?
Remove old cached System.
To unify the caching architecture, we should remove legacy cache modules
that directly proxy the underlying EntityStore, especially those that
simply cache raw entity results. For example:
- `MetalakeManager`: its cache is a direct wrapper around EntityStore.
- `FilesetCatalogOperations`: its caching logic is also a shallow reuse
of the underlying store.
In contrast, cache modules whose content is not directly sourced from
the EntityStore, and which provide additional encapsulation or are
relied on by external systems, should be preserved. For example:
- `CatalogManager`: it caches CatalogWrapper objects instead of raw
Entity instances, and is externally depended upon.
In summary, the cache cleanup strategy should be based on two
principles:
- Whether the cache directly proxies the EntityStore.
- Whether it is externally depended upon or involves semantic wrapping.
This helps clearly distinguish between caches that should be removed and
those that should be retained.
### Why are the changes needed?
Fix: #7179
### Does this PR introduce _any_ user-facing change?
no.
### How was this patch tested?
local test.
---
.../catalog/fileset/FilesetCatalogOperations.java | 95 +++++------
.../apache/gravitino/metalake/MetalakeManager.java | 174 ++++++++-------------
.../gravitino/metalake/TestMetalakeManager.java | 60 -------
3 files changed, 100 insertions(+), 229 deletions(-)
diff --git
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
index ef88bdf175..9fa992d333 100644
---
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
+++
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
@@ -52,7 +52,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
@@ -130,8 +129,6 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
private boolean disableFSOps;
- private Cache<NameIdentifier, FilesetImpl> filesetCache;
-
FilesetCatalogOperations(EntityStore store) {
this.store = store;
}
@@ -194,7 +191,6 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
}
this.catalogStorageLocations = getAndCheckCatalogStorageLocations(config);
- this.filesetCache = initializeFilesetCache(config);
}
@Override
@@ -217,28 +213,24 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@Override
public Fileset loadFileset(NameIdentifier ident) throws
NoSuchFilesetException {
- return filesetCache.get(
- ident,
- k -> {
- try {
- FilesetEntity filesetEntity =
- store.get(ident, Entity.EntityType.FILESET,
FilesetEntity.class);
-
- return FilesetImpl.builder()
- .withName(ident.name())
- .withType(filesetEntity.filesetType())
- .withComment(filesetEntity.comment())
- .withStorageLocations(filesetEntity.storageLocations())
- .withProperties(filesetEntity.properties())
- .withAuditInfo(filesetEntity.auditInfo())
- .build();
-
- } catch (NoSuchEntityException exception) {
- throw new NoSuchFilesetException(exception,
FILESET_DOES_NOT_EXIST_MSG, ident);
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to load fileset %s" + ident,
ioe);
- }
- });
+ try {
+ FilesetEntity filesetEntity =
+ store.get(ident, Entity.EntityType.FILESET, FilesetEntity.class);
+
+ return FilesetImpl.builder()
+ .withName(ident.name())
+ .withType(filesetEntity.filesetType())
+ .withComment(filesetEntity.comment())
+ .withStorageLocations(filesetEntity.storageLocations())
+ .withProperties(filesetEntity.properties())
+ .withAuditInfo(filesetEntity.auditInfo())
+ .build();
+
+ } catch (NoSuchEntityException exception) {
+ throw new NoSuchFilesetException(exception, FILESET_DOES_NOT_EXIST_MSG,
ident);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to load fileset %s" + ident, ioe);
+ }
}
@Override
@@ -300,12 +292,6 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
}
});
- // Check if the fileset already existed in cache first. If it does, it
means the fileset is
- // already created, so we should throw an exception.
- if (filesetCache.getIfPresent(ident) != null) {
- throw new FilesetAlreadyExistsException("Fileset %s already exists",
ident);
- }
-
try {
if (store.exists(ident, Entity.EntityType.FILESET)) {
throw new FilesetAlreadyExistsException("Fileset %s already exists",
ident);
@@ -448,17 +434,14 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
throw new RuntimeException("Failed to create fileset " + ident, ioe);
}
- FilesetImpl fileset =
- FilesetImpl.builder()
- .withName(ident.name())
- .withComment(comment)
- .withType(type)
- .withStorageLocations(formattedStorageLocations)
- .withProperties(filesetEntity.properties())
- .withAuditInfo(filesetEntity.auditInfo())
- .build();
- filesetCache.put(ident, fileset);
- return fileset;
+ return FilesetImpl.builder()
+ .withName(ident.name())
+ .withComment(comment)
+ .withType(type)
+ .withStorageLocations(formattedStorageLocations)
+ .withProperties(filesetEntity.properties())
+ .withAuditInfo(filesetEntity.auditInfo())
+ .build();
}
private Map<String, String> setDefaultLocationIfAbsent(
@@ -512,7 +495,6 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
throw new RuntimeException("Failed to load fileset " + ident, ioe);
}
- filesetCache.invalidate(ident);
try {
FilesetEntity updatedFilesetEntity =
store.update(
@@ -521,18 +503,14 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
Entity.EntityType.FILESET,
e -> updateFilesetEntity(ident, e, changes));
- FilesetImpl fileset =
- FilesetImpl.builder()
- .withName(updatedFilesetEntity.name())
- .withComment(updatedFilesetEntity.comment())
- .withType(updatedFilesetEntity.filesetType())
- .withStorageLocations(updatedFilesetEntity.storageLocations())
- .withProperties(updatedFilesetEntity.properties())
- .withAuditInfo(updatedFilesetEntity.auditInfo())
- .build();
- filesetCache.put(updatedFilesetEntity.nameIdentifier(), fileset);
- return fileset;
-
+ return FilesetImpl.builder()
+ .withName(updatedFilesetEntity.name())
+ .withComment(updatedFilesetEntity.comment())
+ .withType(updatedFilesetEntity.filesetType())
+ .withStorageLocations(updatedFilesetEntity.storageLocations())
+ .withProperties(updatedFilesetEntity.properties())
+ .withAuditInfo(updatedFilesetEntity.auditInfo())
+ .build();
} catch (IOException ioe) {
throw new RuntimeException("Failed to update fileset " + ident, ioe);
} catch (NoSuchEntityException nsee) {
@@ -589,7 +567,6 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
}
}
- filesetCache.invalidate(ident);
return store.delete(ident, Entity.EntityType.FILESET);
} catch (NoSuchEntityException ne) {
LOG.warn("Fileset {} does not exist", ident);
@@ -788,8 +765,6 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
Map<String, Path> schemaPaths = getAndCheckSchemaPaths(ident.name(),
properties);
boolean dropped = super.dropSchema(ident, cascade);
- filesetCache.invalidateAll(
-
filesets.stream().map(FilesetEntity::nameIdentifier).collect(Collectors.toList()));
if (disableFSOps) {
return dropped;
}
@@ -920,7 +895,7 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@Override
public void close() throws IOException {
- filesetCache.invalidateAll();
+ // do nothing
}
private Cache<NameIdentifier, FilesetImpl>
initializeFilesetCache(Map<String, String> config) {
diff --git
a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
index cbe1fa6cb0..9b2a968acf 100644
--- a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
+++ b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
@@ -20,12 +20,7 @@ package org.apache.gravitino.metalake;
import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.Scheduler;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
@@ -33,8 +28,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
@@ -72,26 +65,9 @@ public class MetalakeManager implements MetalakeDispatcher,
Closeable {
private final IdGenerator idGenerator;
- // Currently, there will be only one MetalakeManager instance in the system.
In this case
- // we can clear or close the cache when the instance is destroyed.
- @VisibleForTesting
- static final Cache<NameIdentifier, BaseMetalake> METALAKE_CACHE =
- Caffeine.newBuilder()
- .expireAfterAccess(24, TimeUnit.HOURS)
- .removalListener((k, v, c) -> LOG.info("Closing metalake {}.", k))
- .scheduler(
- Scheduler.forScheduledExecutorService(
- new ScheduledThreadPoolExecutor(
- 1,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("metalake-cleaner-%d")
- .build())))
- .build();
-
@Override
public void close() {
- METALAKE_CACHE.invalidateAll();
+ // do nothing
}
/**
@@ -104,11 +80,11 @@ public class MetalakeManager implements
MetalakeDispatcher, Closeable {
this.store = store;
this.idGenerator = idGenerator;
- // pre-load all metalakes and put them into cache, this is useful when
user load schema/table
+ // preload all metalakes and put them into cache, this is useful when user
load schema/table
// directly without list/get metalake first.
- BaseMetalake[] metalakes = listMetalakes();
- for (BaseMetalake metalake : metalakes) {
- METALAKE_CACHE.put(metalake.nameIdentifier(), metalake);
+ BaseMetalake[] baseMetalakes = listMetalakes();
+ for (BaseMetalake baseMetalake : baseMetalakes) {
+ loadMetalake(baseMetalake.nameIdentifier());
}
}
@@ -140,10 +116,7 @@ public class MetalakeManager implements
MetalakeDispatcher, Closeable {
public static boolean metalakeInUse(EntityStore store, NameIdentifier ident)
throws NoSuchMetalakeException {
try {
- BaseMetalake metalake = METALAKE_CACHE.getIfPresent(ident);
- if (metalake == null) {
- metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class);
- }
+ BaseMetalake metalake = store.get(ident, EntityType.METALAKE,
BaseMetalake.class);
return (boolean)
metalake.propertiesMetadata().getOrDefault(metalake.properties(),
PROPERTY_IN_USE);
} catch (NoSuchEntityException e) {
@@ -191,22 +164,18 @@ public class MetalakeManager implements
MetalakeDispatcher, Closeable {
return TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
- () ->
- METALAKE_CACHE.get(
- ident,
- k -> {
- try {
- BaseMetalake baseMetalake =
- store.get(ident, EntityType.METALAKE,
BaseMetalake.class);
- return newMetalakeWithResolvedProperties(baseMetalake);
- } catch (NoSuchEntityException e) {
- LOG.warn("Metalake {} does not exist", ident, e);
- throw new
NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);
- } catch (IOException ioe) {
- LOG.error("Loading Metalake {} failed due to storage
issues", ident, ioe);
- throw new RuntimeException(ioe);
- }
- }));
+ () -> {
+ try {
+ BaseMetalake baseMetalake = store.get(ident, EntityType.METALAKE,
BaseMetalake.class);
+ return newMetalakeWithResolvedProperties(baseMetalake);
+ } catch (NoSuchEntityException e) {
+ LOG.warn("Metalake {} does not exist", ident, e);
+ throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG,
ident);
+ } catch (IOException ioe) {
+ LOG.error("Loading Metalake {} failed due to storage issues",
ident, ioe);
+ throw new RuntimeException(ioe);
+ }
+ });
}
private BaseMetalake newMetalakeWithResolvedProperties(BaseMetalake
metalakeEntity) {
@@ -266,7 +235,6 @@ public class MetalakeManager implements MetalakeDispatcher,
Closeable {
() -> {
try {
store.put(metalake, false /* overwritten */);
- METALAKE_CACHE.put(ident,
newMetalakeWithResolvedProperties(metalake));
return metalake;
} catch (EntityAlreadyExistsException | AlreadyExistsException e) {
LOG.warn("Metalake {} already exists", ident, e);
@@ -298,25 +266,21 @@ public class MetalakeManager implements
MetalakeDispatcher, Closeable {
throw new MetalakeNotInUseException(
"Metalake %s is not in use, please enable it first", ident);
}
- METALAKE_CACHE.invalidate(ident);
- BaseMetalake baseMetalake =
- store.update(
- ident,
- BaseMetalake.class,
- EntityType.METALAKE,
- metalake -> {
- BaseMetalake.Builder builder =
newMetalakeBuilder(metalake);
- Map<String, String> newProps =
- metalake.properties() == null
- ? Maps.newHashMap()
- : Maps.newHashMap(metalake.properties());
- builder = updateEntity(builder, newProps, changes);
-
- return builder.build();
- });
- METALAKE_CACHE.put(
- baseMetalake.nameIdentifier(),
newMetalakeWithResolvedProperties(baseMetalake));
- return baseMetalake;
+
+ return store.update(
+ ident,
+ BaseMetalake.class,
+ EntityType.METALAKE,
+ metalake -> {
+ BaseMetalake.Builder builder = newMetalakeBuilder(metalake);
+ Map<String, String> newProps =
+ metalake.properties() == null
+ ? Maps.newHashMap()
+ : Maps.newHashMap(metalake.properties());
+ builder = updateEntity(builder, newProps, changes);
+
+ return builder.build();
+ });
} catch (NoSuchEntityException ne) {
LOG.warn("Metalake {} does not exist", ident, ne);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG,
ident);
@@ -353,8 +317,6 @@ public class MetalakeManager implements MetalakeDispatcher,
Closeable {
"Metalake %s is in use, please disable it first or use force
option", ident);
}
- METALAKE_CACHE.invalidate(ident);
-
List<CatalogEntity> catalogEntities =
store.list(Namespace.of(ident.name()), CatalogEntity.class,
EntityType.CATALOG);
if (!catalogEntities.isEmpty() && !force) {
@@ -381,25 +343,22 @@ public class MetalakeManager implements
MetalakeDispatcher, Closeable {
try {
boolean inUse = metalakeInUse(store, ident);
if (!inUse) {
- METALAKE_CACHE.invalidate(ident);
- BaseMetalake baseMetalake =
- store.update(
- ident,
- BaseMetalake.class,
- EntityType.METALAKE,
- metalake -> {
- BaseMetalake.Builder builder =
newMetalakeBuilder(metalake);
-
- Map<String, String> newProps =
- metalake.properties() == null
- ? Maps.newHashMap()
- : Maps.newHashMap(metalake.properties());
- newProps.put(PROPERTY_IN_USE, "true");
- builder.withProperties(newProps);
-
- return builder.build();
- });
- METALAKE_CACHE.put(ident,
newMetalakeWithResolvedProperties(baseMetalake));
+ store.update(
+ ident,
+ BaseMetalake.class,
+ EntityType.METALAKE,
+ metalake -> {
+ BaseMetalake.Builder builder =
newMetalakeBuilder(metalake);
+
+ Map<String, String> newProps =
+ metalake.properties() == null
+ ? Maps.newHashMap()
+ : Maps.newHashMap(metalake.properties());
+ newProps.put(PROPERTY_IN_USE, "true");
+ builder.withProperties(newProps);
+
+ return builder.build();
+ });
}
return null;
@@ -418,25 +377,22 @@ public class MetalakeManager implements
MetalakeDispatcher, Closeable {
try {
boolean inUse = metalakeInUse(store, ident);
if (inUse) {
- METALAKE_CACHE.invalidate(ident);
- BaseMetalake baseMetalake =
- store.update(
- ident,
- BaseMetalake.class,
- EntityType.METALAKE,
- metalake -> {
- BaseMetalake.Builder builder =
newMetalakeBuilder(metalake);
-
- Map<String, String> newProps =
- metalake.properties() == null
- ? Maps.newHashMap()
- : Maps.newHashMap(metalake.properties());
- newProps.put(PROPERTY_IN_USE, "false");
- builder.withProperties(newProps);
-
- return builder.build();
- });
- METALAKE_CACHE.put(ident,
newMetalakeWithResolvedProperties(baseMetalake));
+ store.update(
+ ident,
+ BaseMetalake.class,
+ EntityType.METALAKE,
+ metalake -> {
+ BaseMetalake.Builder builder =
newMetalakeBuilder(metalake);
+
+ Map<String, String> newProps =
+ metalake.properties() == null
+ ? Maps.newHashMap()
+ : Maps.newHashMap(metalake.properties());
+ newProps.put(PROPERTY_IN_USE, "false");
+ builder.withProperties(newProps);
+
+ return builder.build();
+ });
}
return null;
} catch (IOException e) {
diff --git
a/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
b/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
index 4765ddc716..03624f28da 100644
--- a/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
+++ b/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
@@ -23,7 +23,6 @@ import static
org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
import static org.mockito.Mockito.doReturn;
-import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.IOException;
@@ -210,65 +209,6 @@ public class TestMetalakeManager {
Assertions.assertFalse(dropped1, "metalake should be non-existent");
}
- @Test
- public void testMetalakeCache() {
- NameIdentifier ident = NameIdentifier.of("test51");
- Map<String, String> props = ImmutableMap.of("key1", "value1");
- BaseMetalake metalake = metalakeManager.createMetalake(ident, "comment",
props);
- Assertions.assertEquals("test51", metalake.name());
- Assertions.assertEquals("comment", metalake.comment());
-
- Cache<NameIdentifier, BaseMetalake> cache = MetalakeManager.METALAKE_CACHE;
-
- BaseMetalake baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNotNull(baseMetalake);
- Assertions.assertEquals("test51", baseMetalake.name());
- Assertions.assertEquals("comment", baseMetalake.comment());
-
- metalakeManager.disableMetalake(ident);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNotNull(baseMetalake);
- metalakeManager.dropMetalake(ident);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNull(baseMetalake);
-
- metalakeManager.createMetalake(ident, "comment", props);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNotNull(baseMetalake);
-
- metalakeManager.disableMetalake(ident);
- metalakeManager.dropMetalake(ident);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNull(baseMetalake);
-
- metalakeManager.createMetalake(ident, "comment", props);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNotNull(baseMetalake);
- metalakeManager.disableMetalake(ident);
- metalakeManager.dropMetalake(ident);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNull(baseMetalake);
-
- metalakeManager.createMetalake(ident, "comment", props);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNotNull(baseMetalake);
- metalakeManager.disableMetalake(ident);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNotNull(baseMetalake);
- Assertions.assertEquals("false", baseMetalake.properties().get("in-use"));
- metalakeManager.enableMetalake(ident);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNotNull(baseMetalake);
- Assertions.assertEquals("true", baseMetalake.properties().get("in-use"));
-
- metalakeManager.loadMetalake(ident);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNotNull(baseMetalake);
- metalakeManager.disableMetalake(ident);
- baseMetalake = cache.getIfPresent(ident);
- Assertions.assertNotNull(baseMetalake);
- }
-
private void testProperties(Map<String, String> expectedProps, Map<String,
String> testProps) {
expectedProps.forEach(
(k, v) -> {