jerryshao commented on code in PR #7200:
URL: https://github.com/apache/gravitino/pull/7200#discussion_r2115060669


##########
core/src/main/java/org/apache/gravitino/Configs.java:
##########
@@ -343,4 +343,48 @@ private Configs() {}
           .stringConf()
           .toSequence()
           .createWithDefault(Collections.emptyList());
+
+  // Maximum number of entries in the cache
+  public static final ConfigEntry<Integer> CACHE_MAX_ENTRIES =
+      new ConfigBuilder("gravitino.cache.maxEntries")
+          .doc("Maximum number of entries allowed in the cache.")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .intConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(10_000);
+
+  // Whether to enable cache expiration
+  public static final ConfigEntry<Boolean> CACHE_EXPIRATION_ENABLED =
+      new ConfigBuilder("gravitino.cache.expiration.enabled")
+          .doc("Whether to enable cache entry expiration based on time.")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .booleanConf()
+          .createWithDefault(true);
+
+  // Cache entry expiration time
+  public static final ConfigEntry<Long> CACHE_EXPIRATION_TIME =
+      new ConfigBuilder("gravitino.cache.expireTimeInMs")
+          .doc(
+              "Time in milliseconds after which a cache entry expires. Default 
is 3,600,000 ms (1 hour).")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .longConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(3_600_000L);
+
+  // Whether to enable cache status logging
+  public static final ConfigEntry<Boolean> CACHE_STATUS_ENABLED =
+      new ConfigBuilder("gravitino.cache.status.enabled")

Review Comment:
   Please use 3-level name: "gravitino.cache.enableStats". Also "status" and 
"stats" are different, if you want to log the stats, using stats.



##########
core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java:
##########
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.radix.RadixTree;
+import 
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class implements a meta cache using Caffeine cache. */
+public class CaffeineEntityCache extends BaseEntityCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
+
+  /** Singleton instance */
+  private static volatile CaffeineEntityCache INSTANCE = null;
+
+  /** Cache part */
+  private final Cache<EntityCacheKey, List<Entity>> cacheData;
+
+  /** Index part */
+  private RadixTree<EntityCacheKey> cacheIndex;
+
+  private final ReentrantLock opLock = new ReentrantLock();
+  ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+  @VisibleForTesting
+  static void resetForTest() {
+    INSTANCE = null;
+  }
+
+  /**
+   * Returns the instance of MetaCacheCaffeine based on the cache 
configuration and entity store.
+   *
+   * @param cacheConfig The cache configuration
+   * @param entityStore The entity store to load entities from the database
+   * @return The instance of {@link CaffeineEntityCache}
+   */
+  public static CaffeineEntityCache getInstance(Config cacheConfig, 
EntityStore entityStore) {
+    if (INSTANCE == null) {
+      synchronized (CaffeineEntityCache.class) {
+        if (INSTANCE == null) {
+          INSTANCE = new CaffeineEntityCache(cacheConfig, entityStore);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Constructs a new MetaCacheCaffeine.
+   *
+   * @param cacheConfig the cache configuration
+   * @param entityStore The entity store to load entities from the database
+   */
+  private CaffeineEntityCache(Config cacheConfig, EntityStore entityStore) {
+    super(cacheConfig, entityStore);
+    cacheIndex = new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    /**
+     * Executor for async cache cleanup, when a cache expires then use this 
executor to sync other
+     * cache and index trees.
+     */
+    ThreadPoolExecutor cleanupExec =
+        new ThreadPoolExecutor(
+            1,
+            1,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(100),
+            r -> {
+              Thread t = new Thread(r, "CaffeineMetaCache-Cleanup");

Review Comment:
   EntityCache.



##########
core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java:
##########
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.radix.RadixTree;
+import 
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class implements a meta cache using Caffeine cache. */
+public class CaffeineEntityCache extends BaseEntityCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
+
+  /** Singleton instance */
+  private static volatile CaffeineEntityCache INSTANCE = null;
+
+  /** Cache part */
+  private final Cache<EntityCacheKey, List<Entity>> cacheData;
+
+  /** Index part */
+  private RadixTree<EntityCacheKey> cacheIndex;
+
+  private final ReentrantLock opLock = new ReentrantLock();
+  ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+  @VisibleForTesting
+  static void resetForTest() {
+    INSTANCE = null;
+  }
+
+  /**
+   * Returns the instance of MetaCacheCaffeine based on the cache 
configuration and entity store.
+   *
+   * @param cacheConfig The cache configuration
+   * @param entityStore The entity store to load entities from the database
+   * @return The instance of {@link CaffeineEntityCache}
+   */
+  public static CaffeineEntityCache getInstance(Config cacheConfig, 
EntityStore entityStore) {
+    if (INSTANCE == null) {
+      synchronized (CaffeineEntityCache.class) {
+        if (INSTANCE == null) {
+          INSTANCE = new CaffeineEntityCache(cacheConfig, entityStore);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Constructs a new MetaCacheCaffeine.
+   *
+   * @param cacheConfig the cache configuration
+   * @param entityStore The entity store to load entities from the database
+   */
+  private CaffeineEntityCache(Config cacheConfig, EntityStore entityStore) {
+    super(cacheConfig, entityStore);
+    cacheIndex = new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    /**
+     * Executor for async cache cleanup, when a cache expires then use this 
executor to sync other
+     * cache and index trees.
+     */
+    ThreadPoolExecutor cleanupExec =
+        new ThreadPoolExecutor(
+            1,
+            1,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(100),
+            r -> {
+              Thread t = new Thread(r, "CaffeineMetaCache-Cleanup");
+              t.setDaemon(true);
+              return t;
+            },
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    Caffeine<EntityCacheKey, List<Entity>> cacheDataBuilder = 
newBaseBuilder(cacheConfig);
+
+    cacheDataBuilder
+        .executor(cleanupExec)
+        .removalListener(
+            (key, value, cause) -> {
+              if (cause == RemovalCause.EXPLICIT || cause == 
RemovalCause.REPLACED) {
+                return;
+              }
+              try {
+                invalidateExpiredItem(key);
+              } catch (Throwable t) {
+                LOG.error(
+                    "Failed to remove entity key={} value={} from cache 
asynchronously, cause={}",
+                    key,
+                    value,
+                    cause,
+                    t);
+              }
+            });
+
+    this.cacheData = cacheDataBuilder.build();
+
+    if (cacheConfig.get(Configs.CACHE_STATUS_ENABLED)) {
+      startCacheStatsMonitor();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> List<E> getOrLoad(
+      NameIdentifier ident, Entity.EntityType type, 
SupportsRelationOperations.Type relType)
+      throws IOException {
+    Preconditions.checkArgument(ident != null, "NameIdentifier cannot be 
null");
+    Preconditions.checkArgument(type != null, "EntityType cannot be null");
+    Preconditions.checkArgument(relType != null, 
"SupportsRelationOperations.Type cannot be null");
+
+    return withLockAndThrow(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type, 
relType);
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+
+          if (entitiesFromCache != null) {
+            return convertEntity(entitiesFromCache);
+          }
+
+          List<E> entities = entityStore.listEntitiesByRelation(relType, 
ident, type);
+          syncEntitiesToCache(
+              entityCacheKey, entities.stream().map(e -> (Entity) 
e).collect(Collectors.toList()));
+
+          return entities;
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> E getOrLoad(
+      NameIdentifier ident, Entity.EntityType type) throws IOException {
+    Preconditions.checkArgument(ident != null, "NameIdentifier cannot be 
null");
+    Preconditions.checkArgument(type != null, "EntityType cannot be null");
+
+    return withLockAndThrow(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type);
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+
+          if (entitiesFromCache != null) {
+            return convertEntity(entitiesFromCache.get(0));
+          }
+
+          E entityFromStore = entityStore.get(ident, type, 
getEntityClass(type));
+          syncEntitiesToCache(entityCacheKey, 
Lists.newArrayList(entityFromStore));
+
+          return entityFromStore;
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> Optional<List<E>> getIfPresent(
+      SupportsRelationOperations.Type relType,
+      NameIdentifier nameIdentifier,
+      Entity.EntityType identType) {
+    return withLock(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(nameIdentifier, 
identType, relType);
+
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+          if (entitiesFromCache != null) {
+            List<E> convertedEntities = convertEntity(entitiesFromCache);
+            return Optional.of(convertedEntities);
+          }
+
+          return Optional.empty();

Review Comment:
   Using `Optional` to simplify the code here or other places if possible 
`Optional.ofNullable(entitiesFromCache).map(xxx)`.



##########
core/src/main/java/org/apache/gravitino/cache/BaseEntityCache.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TagEntity;
+import org.apache.gravitino.meta.TopicEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.storage.relational.RelationalEntityStore;
+
+/**
+ * An abstract class that provides a basic implementation for the MetaCache 
interface. This class is

Review Comment:
   Update all the comments to reflect the correct class name. You still use 
"meta", not "entity".



##########
core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java:
##########
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.radix.RadixTree;
+import 
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class implements a meta cache using Caffeine cache. */
+public class CaffeineEntityCache extends BaseEntityCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
+
+  /** Singleton instance */
+  private static volatile CaffeineEntityCache INSTANCE = null;
+
+  /** Cache part */
+  private final Cache<EntityCacheKey, List<Entity>> cacheData;
+
+  /** Index part */
+  private RadixTree<EntityCacheKey> cacheIndex;
+
+  private final ReentrantLock opLock = new ReentrantLock();
+  ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();

Review Comment:
   scheduler is only used for logging, if it is not enabled, we don't need to 
initialize it.



##########
core/build.gradle.kts:
##########
@@ -37,6 +38,7 @@ dependencies {
   implementation(libs.guava)
   implementation(libs.h2db)
   implementation(libs.mybatis)
+  implementation(libs.concurrent.trees)

Review Comment:
   Please make this alphabetically ordered.



##########
core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java:
##########
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.radix.RadixTree;
+import 
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class implements a meta cache using Caffeine cache. */
+public class CaffeineEntityCache extends BaseEntityCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
+
+  /** Singleton instance */
+  private static volatile CaffeineEntityCache INSTANCE = null;
+
+  /** Cache part */
+  private final Cache<EntityCacheKey, List<Entity>> cacheData;
+
+  /** Index part */
+  private RadixTree<EntityCacheKey> cacheIndex;
+
+  private final ReentrantLock opLock = new ReentrantLock();
+  ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+  @VisibleForTesting
+  static void resetForTest() {
+    INSTANCE = null;
+  }
+
+  /**
+   * Returns the instance of MetaCacheCaffeine based on the cache 
configuration and entity store.
+   *
+   * @param cacheConfig The cache configuration
+   * @param entityStore The entity store to load entities from the database
+   * @return The instance of {@link CaffeineEntityCache}
+   */
+  public static CaffeineEntityCache getInstance(Config cacheConfig, 
EntityStore entityStore) {
+    if (INSTANCE == null) {
+      synchronized (CaffeineEntityCache.class) {
+        if (INSTANCE == null) {
+          INSTANCE = new CaffeineEntityCache(cacheConfig, entityStore);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Constructs a new MetaCacheCaffeine.
+   *
+   * @param cacheConfig the cache configuration
+   * @param entityStore The entity store to load entities from the database
+   */
+  private CaffeineEntityCache(Config cacheConfig, EntityStore entityStore) {
+    super(cacheConfig, entityStore);
+    cacheIndex = new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    /**
+     * Executor for async cache cleanup, when a cache expires then use this 
executor to sync other
+     * cache and index trees.
+     */
+    ThreadPoolExecutor cleanupExec =
+        new ThreadPoolExecutor(
+            1,
+            1,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(100),
+            r -> {
+              Thread t = new Thread(r, "CaffeineMetaCache-Cleanup");
+              t.setDaemon(true);
+              return t;
+            },
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    Caffeine<EntityCacheKey, List<Entity>> cacheDataBuilder = 
newBaseBuilder(cacheConfig);
+
+    cacheDataBuilder
+        .executor(cleanupExec)
+        .removalListener(
+            (key, value, cause) -> {
+              if (cause == RemovalCause.EXPLICIT || cause == 
RemovalCause.REPLACED) {
+                return;
+              }
+              try {
+                invalidateExpiredItem(key);
+              } catch (Throwable t) {
+                LOG.error(
+                    "Failed to remove entity key={} value={} from cache 
asynchronously, cause={}",
+                    key,
+                    value,
+                    cause,
+                    t);
+              }
+            });
+
+    this.cacheData = cacheDataBuilder.build();
+
+    if (cacheConfig.get(Configs.CACHE_STATUS_ENABLED)) {
+      startCacheStatsMonitor();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> List<E> getOrLoad(
+      NameIdentifier ident, Entity.EntityType type, 
SupportsRelationOperations.Type relType)
+      throws IOException {
+    Preconditions.checkArgument(ident != null, "NameIdentifier cannot be 
null");
+    Preconditions.checkArgument(type != null, "EntityType cannot be null");
+    Preconditions.checkArgument(relType != null, 
"SupportsRelationOperations.Type cannot be null");
+
+    return withLockAndThrow(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type, 
relType);
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+
+          if (entitiesFromCache != null) {
+            return convertEntity(entitiesFromCache);
+          }
+
+          List<E> entities = entityStore.listEntitiesByRelation(relType, 
ident, type);
+          syncEntitiesToCache(
+              entityCacheKey, entities.stream().map(e -> (Entity) 
e).collect(Collectors.toList()));
+
+          return entities;
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> E getOrLoad(
+      NameIdentifier ident, Entity.EntityType type) throws IOException {
+    Preconditions.checkArgument(ident != null, "NameIdentifier cannot be 
null");
+    Preconditions.checkArgument(type != null, "EntityType cannot be null");
+
+    return withLockAndThrow(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type);
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+
+          if (entitiesFromCache != null) {
+            return convertEntity(entitiesFromCache.get(0));
+          }
+
+          E entityFromStore = entityStore.get(ident, type, 
getEntityClass(type));
+          syncEntitiesToCache(entityCacheKey, 
Lists.newArrayList(entityFromStore));
+
+          return entityFromStore;
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> Optional<List<E>> getIfPresent(
+      SupportsRelationOperations.Type relType,
+      NameIdentifier nameIdentifier,
+      Entity.EntityType identType) {
+    return withLock(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(nameIdentifier, 
identType, relType);
+
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+          if (entitiesFromCache != null) {
+            List<E> convertedEntities = convertEntity(entitiesFromCache);
+            return Optional.of(convertedEntities);
+          }
+
+          return Optional.empty();
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> Optional<E> getIfPresent(
+      NameIdentifier ident, Entity.EntityType type) {
+    return withLock(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type);
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+          if (entitiesFromCache != null) {
+            return Optional.of(convertEntity(entitiesFromCache.get(0)));
+          }
+
+          return Optional.empty();
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean invalidate(
+      NameIdentifier ident, Entity.EntityType type, 
SupportsRelationOperations.Type relType) {
+    return invalidate(ident, type);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean invalidate(NameIdentifier ident, Entity.EntityType type) {
+    return withLock(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type);
+
+          boolean existed = contains(ident, type);
+          if (existed) {
+            invalidateEntities(entityCacheKey.identifier());
+          }
+
+          return existed;
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean contains(
+      NameIdentifier ident, Entity.EntityType type, 
SupportsRelationOperations.Type relType) {
+    return withLock(() -> cacheData.getIfPresent(EntityCacheKey.of(ident, 
type)) != null);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean contains(NameIdentifier ident, Entity.EntityType type) {
+    return withLock(() -> cacheData.getIfPresent(EntityCacheKey.of(ident, 
type)) != null);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long size() {
+    return cacheData.estimatedSize();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void clear() {
+    withLock(
+        () -> {
+          cacheData.invalidateAll();
+          cacheIndex = new ConcurrentRadixTree<>(new 
DefaultCharArrayNodeFactory());
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> void put(
+      NameIdentifier ident,
+      Entity.EntityType type,
+      SupportsRelationOperations.Type relType,
+      List<E> entities) {
+    syncEntitiesToCache(
+        EntityCacheKey.of(ident, type, relType),
+        entities.stream().map(e -> (Entity) e).collect(Collectors.toList()));
+  }
+
+  @Override
+  public <E extends Entity & HasIdentifier> void put(E entity) {
+    withLock(
+        () -> {
+          NameIdentifier identifier = getIdentFromEntity(entity);
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(identifier, 
entity.type());
+          syncEntitiesToCache(entityCacheKey, Lists.newArrayList(entity));
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Exception> void withCacheLock(ThrowingRunnable<E> action) 
throws E {
+    withLockAndThrow(action);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E, T extends Exception> E withCacheLock(ThrowingSupplier<E, T> 
action) throws T {
+    return withLockAndThrow(action);
+  }
+
+  /**
+   * Removes the expired entity from the cache. This method is a hook method 
for the Cache, when an
+   * entry expires, it will call this method.
+   *
+   * @param key The key of the expired entity
+   */
+  @Override
+  protected void invalidateExpiredItem(EntityCacheKey key) {
+    withLock(
+        () -> {
+          cacheIndex.remove(key.toString());
+        });
+  }
+
+  /**
+   * Syncs the entities to the cache, if cache is too big and can not put it 
to the cache, then it
+   * will remove and cacheIndex will not be updated.
+   *
+   * @param key The key of the entities.
+   * @param newEntities The new entities to sync to the cache.
+   */
+  private void syncEntitiesToCache(EntityCacheKey key, List<Entity> 
newEntities) {
+    List<Entity> existingEntities = cacheData.getIfPresent(key);
+
+    if (existingEntities != null && key.relationType() != null) {
+      List<Entity> merged = new ArrayList<>(existingEntities);
+      merged.addAll(newEntities);
+
+      cacheData.put(key, merged);
+    } else {
+      cacheData.put(key, newEntities);
+    }
+
+    if (cacheData.policy().getIfPresentQuietly(key) != null) {
+      cacheIndex.put(key.toString(), key);
+    }
+  }
+
+  /**
+   * Returns a new instance of Caffeine cache builder.
+   *
+   * @param cacheConfig The cache configuration
+   * @param <KEY> The key type
+   * @param <VALUE> The value type
+   * @return The new instance of Caffeine cache builder
+   */
+  @SuppressWarnings("unchecked")
+  private <KEY, VALUE> Caffeine<KEY, VALUE> newBaseBuilder(Config cacheConfig) 
{
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+
+    if (cacheConfig.get(Configs.CACHE_WEIGHER_ENABLED)) {
+      builder.maximumWeight(EntityCacheWeigher.getMaxWeight());
+      builder.weigher(EntityCacheWeigher.getInstance());
+    } else {
+      builder.maximumSize(cacheConfig.get(Configs.CACHE_MAX_ENTRIES));
+    }
+
+    if (cacheConfig.get(Configs.CACHE_EXPIRATION_ENABLED)) {
+      builder.expireAfterWrite(
+          cacheConfig.get(Configs.CACHE_EXPIRATION_TIME), 
TimeUnit.MILLISECONDS);
+    }
+
+    if (cacheConfig.get(Configs.CACHE_STATUS_ENABLED)) {
+      builder.recordStats();
+    }
+
+    return (Caffeine<KEY, VALUE>) builder;
+  }
+
+  /**
+   * Invalidates the entities by the given cache key.
+   *
+   * @param identifier The identifier of the entities to invalidate
+   */
+  private void invalidateEntities(NameIdentifier identifier) {
+    Iterable<EntityCacheKey> entityKeysToRemove =
+        cacheIndex.getValuesForKeysStartingWith(identifier.toString());
+
+    cacheData.invalidateAll(entityKeysToRemove);
+    entityKeysToRemove.forEach(key -> cacheIndex.remove(key.toString()));
+  }
+
+  /**
+   * Runs the given action with the lock.
+   *
+   * @param action The action to run with the lock
+   */
+  private void withLock(Runnable action) {
+    try {
+      opLock.lockInterruptibly();
+      try {
+        action.run();
+      } finally {
+        opLock.unlock();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();

Review Comment:
   Shall we interrupt the current thread, if so then this entity cache will not 
work any more, right?



##########
core/src/main/java/org/apache/gravitino/Configs.java:
##########
@@ -343,4 +343,48 @@ private Configs() {}
           .stringConf()
           .toSequence()
           .createWithDefault(Collections.emptyList());
+
+  // Maximum number of entries in the cache
+  public static final ConfigEntry<Integer> CACHE_MAX_ENTRIES =
+      new ConfigBuilder("gravitino.cache.maxEntries")
+          .doc("Maximum number of entries allowed in the cache.")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .intConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(10_000);
+
+  // Whether to enable cache expiration
+  public static final ConfigEntry<Boolean> CACHE_EXPIRATION_ENABLED =
+      new ConfigBuilder("gravitino.cache.expiration.enabled")
+          .doc("Whether to enable cache entry expiration based on time.")
+          .version(ConfigConstants.VERSION_0_10_0)
+          .booleanConf()
+          .createWithDefault(true);

Review Comment:
   I think we don't need to have this configuration explicitly, we can set the 
expiration time to 0 or -1 to disable the expiration.



##########
core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java:
##########
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.radix.RadixTree;
+import 
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class implements a meta cache using Caffeine cache. */
+public class CaffeineEntityCache extends BaseEntityCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
+
+  /** Singleton instance */
+  private static volatile CaffeineEntityCache INSTANCE = null;
+
+  /** Cache part */
+  private final Cache<EntityCacheKey, List<Entity>> cacheData;
+
+  /** Index part */
+  private RadixTree<EntityCacheKey> cacheIndex;
+
+  private final ReentrantLock opLock = new ReentrantLock();
+  ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+  @VisibleForTesting
+  static void resetForTest() {
+    INSTANCE = null;
+  }
+
+  /**
+   * Returns the instance of MetaCacheCaffeine based on the cache 
configuration and entity store.
+   *
+   * @param cacheConfig The cache configuration
+   * @param entityStore The entity store to load entities from the database
+   * @return The instance of {@link CaffeineEntityCache}
+   */
+  public static CaffeineEntityCache getInstance(Config cacheConfig, 
EntityStore entityStore) {
+    if (INSTANCE == null) {
+      synchronized (CaffeineEntityCache.class) {
+        if (INSTANCE == null) {
+          INSTANCE = new CaffeineEntityCache(cacheConfig, entityStore);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Constructs a new MetaCacheCaffeine.
+   *
+   * @param cacheConfig the cache configuration
+   * @param entityStore The entity store to load entities from the database
+   */
+  private CaffeineEntityCache(Config cacheConfig, EntityStore entityStore) {
+    super(cacheConfig, entityStore);
+    cacheIndex = new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    /**
+     * Executor for async cache cleanup, when a cache expires then use this 
executor to sync other
+     * cache and index trees.
+     */
+    ThreadPoolExecutor cleanupExec =
+        new ThreadPoolExecutor(
+            1,
+            1,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(100),
+            r -> {
+              Thread t = new Thread(r, "CaffeineMetaCache-Cleanup");
+              t.setDaemon(true);
+              return t;
+            },
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    Caffeine<EntityCacheKey, List<Entity>> cacheDataBuilder = 
newBaseBuilder(cacheConfig);
+
+    cacheDataBuilder
+        .executor(cleanupExec)
+        .removalListener(
+            (key, value, cause) -> {
+              if (cause == RemovalCause.EXPLICIT || cause == 
RemovalCause.REPLACED) {
+                return;
+              }
+              try {
+                invalidateExpiredItem(key);
+              } catch (Throwable t) {
+                LOG.error(
+                    "Failed to remove entity key={} value={} from cache 
asynchronously, cause={}",
+                    key,
+                    value,
+                    cause,
+                    t);
+              }
+            });
+
+    this.cacheData = cacheDataBuilder.build();
+
+    if (cacheConfig.get(Configs.CACHE_STATUS_ENABLED)) {
+      startCacheStatsMonitor();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> List<E> getOrLoad(
+      NameIdentifier ident, Entity.EntityType type, 
SupportsRelationOperations.Type relType)
+      throws IOException {
+    Preconditions.checkArgument(ident != null, "NameIdentifier cannot be 
null");
+    Preconditions.checkArgument(type != null, "EntityType cannot be null");
+    Preconditions.checkArgument(relType != null, 
"SupportsRelationOperations.Type cannot be null");
+
+    return withLockAndThrow(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type, 
relType);
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+
+          if (entitiesFromCache != null) {
+            return convertEntity(entitiesFromCache);
+          }
+
+          List<E> entities = entityStore.listEntitiesByRelation(relType, 
ident, type);
+          syncEntitiesToCache(
+              entityCacheKey, entities.stream().map(e -> (Entity) 
e).collect(Collectors.toList()));
+
+          return entities;
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> E getOrLoad(
+      NameIdentifier ident, Entity.EntityType type) throws IOException {
+    Preconditions.checkArgument(ident != null, "NameIdentifier cannot be 
null");
+    Preconditions.checkArgument(type != null, "EntityType cannot be null");
+
+    return withLockAndThrow(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type);
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);

Review Comment:
   Why it returns a list here?



##########
core/src/main/java/org/apache/gravitino/cache/BaseEntityCache.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TagEntity;
+import org.apache.gravitino.meta.TopicEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.storage.relational.RelationalEntityStore;
+
+/**
+ * An abstract class that provides a basic implementation for the MetaCache 
interface. This class is
+ * abstract and cannot be instantiated directly, it is designed to be a base 
class for other meta
+ * cache implementations.
+ *
+ * <p>The purpose of the BaseMetaCache is to provide a unified way of 
accessing entity stores,
+ * allowing subclasses to focus on caching logic without having to deal with 
entity store
+ * management.
+ */
+public abstract class BaseEntityCache implements EntityCache {
+  private static final Map<Entity.EntityType, Class<?>> ENTITY_CLASS_MAP;
+  // The entity store used by the cache, initialized through the constructor.
+  protected final RelationalEntityStore entityStore;
+  protected final Config cacheConfig;
+
+  static {
+    Map<Entity.EntityType, Class<?>> map = new 
EnumMap<>(Entity.EntityType.class);
+    map.put(Entity.EntityType.METALAKE, BaseMetalake.class);
+    map.put(Entity.EntityType.CATALOG, CatalogEntity.class);
+    map.put(Entity.EntityType.SCHEMA, SchemaEntity.class);
+    map.put(Entity.EntityType.TABLE, TableEntity.class);
+    map.put(Entity.EntityType.FILESET, FilesetEntity.class);
+    map.put(Entity.EntityType.MODEL, ModelEntity.class);
+    map.put(Entity.EntityType.TOPIC, TopicEntity.class);
+    map.put(Entity.EntityType.TAG, TagEntity.class);
+    map.put(Entity.EntityType.MODEL_VERSION, ModelVersionEntity.class);
+    map.put(Entity.EntityType.COLUMN, ColumnEntity.class);
+    map.put(Entity.EntityType.USER, UserEntity.class);
+    map.put(Entity.EntityType.GROUP, Entity.class);
+    map.put(Entity.EntityType.ROLE, RoleEntity.class);
+    ENTITY_CLASS_MAP = Collections.unmodifiableMap(map);
+  }
+
+  /**
+   * Constructs a new {@link BaseEntityCache} instance. If the provided 
entityStore is null, it will
+   * use the entity store configured in the Gravitino environment.
+   *
+   * @param entityStore The entity store to be used by the cache, can be null.
+   */
+  public BaseEntityCache(Config config, EntityStore entityStore) {
+    this.cacheConfig = config;
+    this.entityStore = (RelationalEntityStore) entityStore;
+  }
+
+  /**
+   * Returns the class of the entity based on its type.
+   *
+   * @param type The entity type
+   * @return The class of the entity
+   * @throws IllegalArgumentException if the entity type is not supported
+   */
+  @SuppressWarnings("unchecked")
+  public static <E extends Entity & HasIdentifier> Class<E> 
getEntityClass(Entity.EntityType type) {
+    Preconditions.checkArgument(type != null, "EntityType must not be null");
+
+    Class<?> clazz = ENTITY_CLASS_MAP.get(type);
+    if (clazz == null) {
+      throw new IllegalArgumentException("Unsupported EntityType: " + 
type.getShortName());
+    }

Review Comment:
   You can also use `Preconditions` here.



##########
core/src/main/java/org/apache/gravitino/cache/BaseEntityCache.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TagEntity;
+import org.apache.gravitino.meta.TopicEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.storage.relational.RelationalEntityStore;
+
+/**
+ * An abstract class that provides a basic implementation for the MetaCache 
interface. This class is
+ * abstract and cannot be instantiated directly, it is designed to be a base 
class for other meta
+ * cache implementations.
+ *
+ * <p>The purpose of the BaseMetaCache is to provide a unified way of 
accessing entity stores,
+ * allowing subclasses to focus on caching logic without having to deal with 
entity store
+ * management.
+ */
+public abstract class BaseEntityCache implements EntityCache {
+  private static final Map<Entity.EntityType, Class<?>> ENTITY_CLASS_MAP;
+  // The entity store used by the cache, initialized through the constructor.
+  protected final RelationalEntityStore entityStore;
+  protected final Config cacheConfig;
+
+  static {
+    Map<Entity.EntityType, Class<?>> map = new 
EnumMap<>(Entity.EntityType.class);
+    map.put(Entity.EntityType.METALAKE, BaseMetalake.class);
+    map.put(Entity.EntityType.CATALOG, CatalogEntity.class);
+    map.put(Entity.EntityType.SCHEMA, SchemaEntity.class);
+    map.put(Entity.EntityType.TABLE, TableEntity.class);
+    map.put(Entity.EntityType.FILESET, FilesetEntity.class);
+    map.put(Entity.EntityType.MODEL, ModelEntity.class);
+    map.put(Entity.EntityType.TOPIC, TopicEntity.class);
+    map.put(Entity.EntityType.TAG, TagEntity.class);
+    map.put(Entity.EntityType.MODEL_VERSION, ModelVersionEntity.class);
+    map.put(Entity.EntityType.COLUMN, ColumnEntity.class);
+    map.put(Entity.EntityType.USER, UserEntity.class);
+    map.put(Entity.EntityType.GROUP, Entity.class);
+    map.put(Entity.EntityType.ROLE, RoleEntity.class);
+    ENTITY_CLASS_MAP = Collections.unmodifiableMap(map);
+  }
+
+  /**
+   * Constructs a new {@link BaseEntityCache} instance. If the provided 
entityStore is null, it will
+   * use the entity store configured in the Gravitino environment.
+   *
+   * @param entityStore The entity store to be used by the cache, can be null.
+   */
+  public BaseEntityCache(Config config, EntityStore entityStore) {
+    this.cacheConfig = config;
+    this.entityStore = (RelationalEntityStore) entityStore;
+  }
+
+  /**
+   * Returns the class of the entity based on its type.
+   *
+   * @param type The entity type
+   * @return The class of the entity
+   * @throws IllegalArgumentException if the entity type is not supported
+   */
+  @SuppressWarnings("unchecked")
+  public static <E extends Entity & HasIdentifier> Class<E> 
getEntityClass(Entity.EntityType type) {
+    Preconditions.checkArgument(type != null, "EntityType must not be null");
+
+    Class<?> clazz = ENTITY_CLASS_MAP.get(type);
+    if (clazz == null) {
+      throw new IllegalArgumentException("Unsupported EntityType: " + 
type.getShortName());
+    }

Review Comment:
   BTW, as I mentioned before, type short name is meaningless, please use 
`type.toString()`.



##########
core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java:
##########
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.radix.RadixTree;
+import 
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class implements a meta cache using Caffeine cache. */
+public class CaffeineEntityCache extends BaseEntityCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
+
+  /** Singleton instance */
+  private static volatile CaffeineEntityCache INSTANCE = null;
+
+  /** Cache part */
+  private final Cache<EntityCacheKey, List<Entity>> cacheData;
+
+  /** Index part */
+  private RadixTree<EntityCacheKey> cacheIndex;
+
+  private final ReentrantLock opLock = new ReentrantLock();
+  ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();

Review Comment:
   Also why don't we make it `private`?



##########
core/src/main/java/org/apache/gravitino/cache/BaseEntityCache.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TagEntity;
+import org.apache.gravitino.meta.TopicEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.storage.relational.RelationalEntityStore;
+
+/**
+ * An abstract class that provides a basic implementation for the MetaCache 
interface. This class is
+ * abstract and cannot be instantiated directly, it is designed to be a base 
class for other meta
+ * cache implementations.
+ *
+ * <p>The purpose of the BaseMetaCache is to provide a unified way of 
accessing entity stores,
+ * allowing subclasses to focus on caching logic without having to deal with 
entity store
+ * management.
+ */
+public abstract class BaseEntityCache implements EntityCache {
+  private static final Map<Entity.EntityType, Class<?>> ENTITY_CLASS_MAP;
+  // The entity store used by the cache, initialized through the constructor.
+  protected final RelationalEntityStore entityStore;
+  protected final Config cacheConfig;
+
+  static {
+    Map<Entity.EntityType, Class<?>> map = new 
EnumMap<>(Entity.EntityType.class);
+    map.put(Entity.EntityType.METALAKE, BaseMetalake.class);
+    map.put(Entity.EntityType.CATALOG, CatalogEntity.class);
+    map.put(Entity.EntityType.SCHEMA, SchemaEntity.class);
+    map.put(Entity.EntityType.TABLE, TableEntity.class);
+    map.put(Entity.EntityType.FILESET, FilesetEntity.class);
+    map.put(Entity.EntityType.MODEL, ModelEntity.class);
+    map.put(Entity.EntityType.TOPIC, TopicEntity.class);
+    map.put(Entity.EntityType.TAG, TagEntity.class);
+    map.put(Entity.EntityType.MODEL_VERSION, ModelVersionEntity.class);
+    map.put(Entity.EntityType.COLUMN, ColumnEntity.class);
+    map.put(Entity.EntityType.USER, UserEntity.class);
+    map.put(Entity.EntityType.GROUP, Entity.class);
+    map.put(Entity.EntityType.ROLE, RoleEntity.class);
+    ENTITY_CLASS_MAP = Collections.unmodifiableMap(map);
+  }
+
+  /**
+   * Constructs a new {@link BaseEntityCache} instance. If the provided 
entityStore is null, it will
+   * use the entity store configured in the Gravitino environment.
+   *
+   * @param entityStore The entity store to be used by the cache, can be null.
+   */
+  public BaseEntityCache(Config config, EntityStore entityStore) {
+    this.cacheConfig = config;
+    this.entityStore = (RelationalEntityStore) entityStore;
+  }
+
+  /**
+   * Returns the class of the entity based on its type.
+   *
+   * @param type The entity type
+   * @return The class of the entity
+   * @throws IllegalArgumentException if the entity type is not supported
+   */
+  @SuppressWarnings("unchecked")
+  public static <E extends Entity & HasIdentifier> Class<E> 
getEntityClass(Entity.EntityType type) {
+    Preconditions.checkArgument(type != null, "EntityType must not be null");
+
+    Class<?> clazz = ENTITY_CLASS_MAP.get(type);
+    if (clazz == null) {
+      throw new IllegalArgumentException("Unsupported EntityType: " + 
type.getShortName());
+    }
+
+    return (Class<E>) clazz;
+  }
+
+  /**
+   * Returns the {@link NameIdentifier} of the entity based on its type.
+   *
+   * @param entity The entity
+   * @return The {@link NameIdentifier} of the entity
+   */
+  protected static NameIdentifier getIdentFromEntity(Entity entity) {
+    validateEntityHasIdentifier(entity);
+    HasIdentifier hasIdentifier = (HasIdentifier) entity;
+
+    return hasIdentifier.nameIdentifier();
+  }
+
+  /**
+   * Checks if the entity is of type {@link HasIdentifier}.
+   *
+   * @param entity The entity to check.
+   */
+  protected static void validateEntityHasIdentifier(Entity entity) {
+    Preconditions.checkArgument(
+        entity instanceof HasIdentifier, "Unsupported EntityType: " + 
entity.type());
+  }
+
+  /**
+   * Converts a list of entities to a new list.
+   *
+   * @param entities Thr original list of entities.
+   * @return A list of converted entities.
+   * @param <E> The type of the entities in the list.
+   */
+  @SuppressWarnings("unchecked")
+  protected static <E extends Entity & HasIdentifier> List<E> 
convertEntity(List<Entity> entities) {

Review Comment:
   I guess this line is also too long, please make sure that the line length 
should not exceed 100 characters.



##########
core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java:
##########
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.radix.RadixTree;
+import 
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class implements a meta cache using Caffeine cache. */
+public class CaffeineEntityCache extends BaseEntityCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
+
+  /** Singleton instance */
+  private static volatile CaffeineEntityCache INSTANCE = null;
+
+  /** Cache part */
+  private final Cache<EntityCacheKey, List<Entity>> cacheData;
+
+  /** Index part */
+  private RadixTree<EntityCacheKey> cacheIndex;
+
+  private final ReentrantLock opLock = new ReentrantLock();
+  ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+  @VisibleForTesting
+  static void resetForTest() {
+    INSTANCE = null;
+  }
+
+  /**
+   * Returns the instance of MetaCacheCaffeine based on the cache 
configuration and entity store.
+   *
+   * @param cacheConfig The cache configuration
+   * @param entityStore The entity store to load entities from the database
+   * @return The instance of {@link CaffeineEntityCache}
+   */
+  public static CaffeineEntityCache getInstance(Config cacheConfig, 
EntityStore entityStore) {
+    if (INSTANCE == null) {
+      synchronized (CaffeineEntityCache.class) {
+        if (INSTANCE == null) {
+          INSTANCE = new CaffeineEntityCache(cacheConfig, entityStore);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Constructs a new MetaCacheCaffeine.
+   *
+   * @param cacheConfig the cache configuration
+   * @param entityStore The entity store to load entities from the database
+   */
+  private CaffeineEntityCache(Config cacheConfig, EntityStore entityStore) {
+    super(cacheConfig, entityStore);
+    cacheIndex = new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    /**
+     * Executor for async cache cleanup, when a cache expires then use this 
executor to sync other
+     * cache and index trees.
+     */
+    ThreadPoolExecutor cleanupExec =
+        new ThreadPoolExecutor(
+            1,
+            1,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(100),

Review Comment:
   Can you not hardcode this parameter here, I think at least we can define 
some constants.



##########
core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java:
##########
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.radix.RadixTree;
+import 
com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class implements a meta cache using Caffeine cache. */
+public class CaffeineEntityCache extends BaseEntityCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineEntityCache.class.getName());
+
+  /** Singleton instance */
+  private static volatile CaffeineEntityCache INSTANCE = null;
+
+  /** Cache part */
+  private final Cache<EntityCacheKey, List<Entity>> cacheData;
+
+  /** Index part */
+  private RadixTree<EntityCacheKey> cacheIndex;
+
+  private final ReentrantLock opLock = new ReentrantLock();
+  ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+
+  @VisibleForTesting
+  static void resetForTest() {
+    INSTANCE = null;
+  }
+
+  /**
+   * Returns the instance of MetaCacheCaffeine based on the cache 
configuration and entity store.
+   *
+   * @param cacheConfig The cache configuration
+   * @param entityStore The entity store to load entities from the database
+   * @return The instance of {@link CaffeineEntityCache}
+   */
+  public static CaffeineEntityCache getInstance(Config cacheConfig, 
EntityStore entityStore) {
+    if (INSTANCE == null) {
+      synchronized (CaffeineEntityCache.class) {
+        if (INSTANCE == null) {
+          INSTANCE = new CaffeineEntityCache(cacheConfig, entityStore);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Constructs a new MetaCacheCaffeine.
+   *
+   * @param cacheConfig the cache configuration
+   * @param entityStore The entity store to load entities from the database
+   */
+  private CaffeineEntityCache(Config cacheConfig, EntityStore entityStore) {
+    super(cacheConfig, entityStore);
+    cacheIndex = new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory());
+
+    /**
+     * Executor for async cache cleanup, when a cache expires then use this 
executor to sync other
+     * cache and index trees.
+     */
+    ThreadPoolExecutor cleanupExec =
+        new ThreadPoolExecutor(
+            1,
+            1,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<>(100),
+            r -> {
+              Thread t = new Thread(r, "CaffeineMetaCache-Cleanup");
+              t.setDaemon(true);
+              return t;
+            },
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    Caffeine<EntityCacheKey, List<Entity>> cacheDataBuilder = 
newBaseBuilder(cacheConfig);
+
+    cacheDataBuilder
+        .executor(cleanupExec)
+        .removalListener(
+            (key, value, cause) -> {
+              if (cause == RemovalCause.EXPLICIT || cause == 
RemovalCause.REPLACED) {
+                return;
+              }
+              try {
+                invalidateExpiredItem(key);
+              } catch (Throwable t) {
+                LOG.error(
+                    "Failed to remove entity key={} value={} from cache 
asynchronously, cause={}",
+                    key,
+                    value,
+                    cause,
+                    t);
+              }
+            });
+
+    this.cacheData = cacheDataBuilder.build();
+
+    if (cacheConfig.get(Configs.CACHE_STATUS_ENABLED)) {
+      startCacheStatsMonitor();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> List<E> getOrLoad(
+      NameIdentifier ident, Entity.EntityType type, 
SupportsRelationOperations.Type relType)
+      throws IOException {
+    Preconditions.checkArgument(ident != null, "NameIdentifier cannot be 
null");
+    Preconditions.checkArgument(type != null, "EntityType cannot be null");
+    Preconditions.checkArgument(relType != null, 
"SupportsRelationOperations.Type cannot be null");
+
+    return withLockAndThrow(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type, 
relType);
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+
+          if (entitiesFromCache != null) {
+            return convertEntity(entitiesFromCache);
+          }
+
+          List<E> entities = entityStore.listEntitiesByRelation(relType, 
ident, type);
+          syncEntitiesToCache(
+              entityCacheKey, entities.stream().map(e -> (Entity) 
e).collect(Collectors.toList()));
+
+          return entities;
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> E getOrLoad(
+      NameIdentifier ident, Entity.EntityType type) throws IOException {
+    Preconditions.checkArgument(ident != null, "NameIdentifier cannot be 
null");
+    Preconditions.checkArgument(type != null, "EntityType cannot be null");
+
+    return withLockAndThrow(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type);
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+
+          if (entitiesFromCache != null) {
+            return convertEntity(entitiesFromCache.get(0));
+          }
+
+          E entityFromStore = entityStore.get(ident, type, 
getEntityClass(type));
+          syncEntitiesToCache(entityCacheKey, 
Lists.newArrayList(entityFromStore));
+
+          return entityFromStore;
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> Optional<List<E>> getIfPresent(
+      SupportsRelationOperations.Type relType,
+      NameIdentifier nameIdentifier,
+      Entity.EntityType identType) {
+    return withLock(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(nameIdentifier, 
identType, relType);
+
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+          if (entitiesFromCache != null) {
+            List<E> convertedEntities = convertEntity(entitiesFromCache);
+            return Optional.of(convertedEntities);
+          }
+
+          return Optional.empty();
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public <E extends Entity & HasIdentifier> Optional<E> getIfPresent(
+      NameIdentifier ident, Entity.EntityType type) {
+    return withLock(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type);
+          List<Entity> entitiesFromCache = 
cacheData.getIfPresent(entityCacheKey);
+          if (entitiesFromCache != null) {
+            return Optional.of(convertEntity(entitiesFromCache.get(0)));
+          }
+
+          return Optional.empty();
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean invalidate(
+      NameIdentifier ident, Entity.EntityType type, 
SupportsRelationOperations.Type relType) {
+    return invalidate(ident, type);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean invalidate(NameIdentifier ident, Entity.EntityType type) {
+    return withLock(
+        () -> {
+          EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type);
+
+          boolean existed = contains(ident, type);
+          if (existed) {
+            invalidateEntities(entityCacheKey.identifier());
+          }
+
+          return existed;
+        });
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean contains(
+      NameIdentifier ident, Entity.EntityType type, 
SupportsRelationOperations.Type relType) {
+    return withLock(() -> cacheData.getIfPresent(EntityCacheKey.of(ident, 
type)) != null);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean contains(NameIdentifier ident, Entity.EntityType type) {
+    return withLock(() -> cacheData.getIfPresent(EntityCacheKey.of(ident, 
type)) != null);
+  }

Review Comment:
   So these two methods only checks the cache, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to