jerryshao commented on code in PR #7200: URL: https://github.com/apache/gravitino/pull/7200#discussion_r2122621387
########## core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java: ########## @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree; +import com.googlecode.concurrenttrees.radix.RadixTree; +import com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.gravitino.Config; +import org.apache.gravitino.Configs; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.HasIdentifier; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.SupportsRelationOperations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** This class implements a meta cache using Caffeine cache. */ +public class CaffeineEntityCache extends BaseEntityCache { + private static final Logger LOG = LoggerFactory.getLogger(CaffeineEntityCache.class.getName()); + + /** Singleton instance */ + private static volatile CaffeineEntityCache INSTANCE = null; + + /** Cache part */ + private final Cache<EntityCacheKey, List<Entity>> cacheData; + + /** Index part */ + private RadixTree<EntityCacheKey> cacheIndex; + + private final ReentrantLock opLock = new ReentrantLock(); + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + @VisibleForTesting + static void resetForTest() { + INSTANCE = null; + } + + /** + * Returns the instance of MetaCacheCaffeine based on the cache configuration and entity store. + * + * @param cacheConfig The cache configuration + * @param entityStore The entity store to load entities from the database + * @return The instance of {@link CaffeineEntityCache} + */ + public static CaffeineEntityCache getInstance(Config cacheConfig, EntityStore entityStore) { + if (INSTANCE == null) { + synchronized (CaffeineEntityCache.class) { + if (INSTANCE == null) { + INSTANCE = new CaffeineEntityCache(cacheConfig, entityStore); + } + } + } + return INSTANCE; + } + + /** + * Constructs a new MetaCacheCaffeine. + * + * @param cacheConfig the cache configuration + * @param entityStore The entity store to load entities from the database + */ + private CaffeineEntityCache(Config cacheConfig, EntityStore entityStore) { + super(cacheConfig, entityStore); + cacheIndex = new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory()); + + /** + * Executor for async cache cleanup, when a cache expires then use this executor to sync other + * cache and index trees. + */ + ThreadPoolExecutor cleanupExec = + new ThreadPoolExecutor( + 1, + 1, + 0L, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(100), + r -> { + Thread t = new Thread(r, "CaffeineMetaCache-Cleanup"); + t.setDaemon(true); + return t; + }, + new ThreadPoolExecutor.CallerRunsPolicy()); + + Caffeine<EntityCacheKey, List<Entity>> cacheDataBuilder = newBaseBuilder(cacheConfig); + + cacheDataBuilder + .executor(cleanupExec) + .removalListener( + (key, value, cause) -> { + if (cause == RemovalCause.EXPLICIT || cause == RemovalCause.REPLACED) { + return; + } + try { + invalidateExpiredItem(key); + } catch (Throwable t) { + LOG.error( + "Failed to remove entity key={} value={} from cache asynchronously, cause={}", + key, + value, + cause, + t); + } + }); + + this.cacheData = cacheDataBuilder.build(); + + if (cacheConfig.get(Configs.CACHE_STATUS_ENABLED)) { + startCacheStatsMonitor(); + } + } + + /** {@inheritDoc} */ + @Override + public <E extends Entity & HasIdentifier> List<E> getOrLoad( + NameIdentifier ident, Entity.EntityType type, SupportsRelationOperations.Type relType) + throws IOException { + Preconditions.checkArgument(ident != null, "NameIdentifier cannot be null"); + Preconditions.checkArgument(type != null, "EntityType cannot be null"); + Preconditions.checkArgument(relType != null, "SupportsRelationOperations.Type cannot be null"); + + return withLockAndThrow( + () -> { + EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type, relType); + List<Entity> entitiesFromCache = cacheData.getIfPresent(entityCacheKey); + + if (entitiesFromCache != null) { + return convertEntity(entitiesFromCache); + } + + List<E> entities = entityStore.listEntitiesByRelation(relType, ident, type); + syncEntitiesToCache( + entityCacheKey, entities.stream().map(e -> (Entity) e).collect(Collectors.toList())); + + return entities; + }); + } + + /** {@inheritDoc} */ + @Override + public <E extends Entity & HasIdentifier> E getOrLoad( + NameIdentifier ident, Entity.EntityType type) throws IOException { + Preconditions.checkArgument(ident != null, "NameIdentifier cannot be null"); + Preconditions.checkArgument(type != null, "EntityType cannot be null"); + + return withLockAndThrow( + () -> { + EntityCacheKey entityCacheKey = EntityCacheKey.of(ident, type); + List<Entity> entitiesFromCache = cacheData.getIfPresent(entityCacheKey); Review Comment: You'd better add some comments to explain it. ########## core/src/main/java/org/apache/gravitino/cache/EntityCacheWeigher.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.cache; + +import com.github.benmanes.caffeine.cache.Weigher; +import java.util.List; +import lombok.NonNull; +import org.apache.gravitino.Entity; +import org.checkerframework.checker.index.qual.NonNegative; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link Weigher} implementation that calculates the weight of an entity based on its type. The + * weight is calculated as follows: + * + * <ul> + * <li>Metalake: 100 + * <li>Catalog: 75 + * <li>Schema: 50 + * <li>Other: 15 + * </ul> + */ +public class EntityCacheWeigher implements Weigher<EntityCacheKey, List<Entity>> { + public static final int METALAKE_WEIGHT = 100; + public static final int CATALOG_WEIGHT = 75; + public static final int SCHEMA_WEIGHT = 50; + public static final int OTHER_WEIGHT = 15; + private static final Logger LOG = LoggerFactory.getLogger(EntityCacheWeigher.class.getName()); + private static final EntityCacheWeigher INSTANCE = new EntityCacheWeigher(); + private static final long MAX_WEIGHT = + 2 * (METALAKE_WEIGHT * 10 + CATALOG_WEIGHT * (10 * 200) + SCHEMA_WEIGHT * (10 * 200 * 1000)); + + private EntityCacheWeigher() {} + + /** + * Returns the maximum weight that can be stored in the cache. + * + * <p>The total weight is estimated based on the expected number of entities: + * + * <ul> + * <li>~10 Metalakes per Gravitino instance + * <li>~200 Catalogs per Metalake + * <li>~1000 Schemas per Catalog + * </ul> + * + * <p>The total estimated entity count is: + * + * <pre> + * 10 * METALAKE_WEIGHT + * + (10 * 200) * CATALOG_WEIGHT + * + (10 * 200 * 1000) * SCHEMA_WEIGHT + * </pre> + * + * <p>To provide headroom and avoid early eviction, the result is multiplied by 2: + * + * <pre> + * total = 2 * (10 * METALAKE_WEIGHT + 2000 * CATALOG_WEIGHT + 2_000_000 * SCHEMA_WEIGHT) + * </pre> + * + * @return The maximum weight that can be stored in the cache. + */ + public static long getMaxWeight() { + return MAX_WEIGHT; + } + + /** + * Returns the singleton instance of the {@link EntityCacheWeigher}. + * + * @return the singleton instance of the {@link EntityCacheWeigher}. + */ + public static EntityCacheWeigher getInstance() { + return INSTANCE; + } + + /** {@inheritDoc} */ + @Override + public @NonNegative int weigh( + @NonNull EntityCacheKey storeEntityCacheKey, @NonNull List<Entity> entities) { + int weight = 0; + for (Entity entity : entities) { + weight += calculateWeight(entity.type()); + } + + if (weight > getMaxWeight()) { + LOG.warn("Entity group exceeds max weight: {}", weight); + } + + return weight; + } + + private int calculateWeight(Entity.EntityType entityType) { + int weight; + switch (entityType) { + case METALAKE: + weight = METALAKE_WEIGHT; + break; + + case CATALOG: + weight = CATALOG_WEIGHT; + break; + + case SCHEMA: + weight = SCHEMA_WEIGHT; + break; + + default: + weight = OTHER_WEIGHT; + break; + } + + return weight; Review Comment: I think you can maintain an immutable static map for entity type and weight, so you can directly the weight from the map, no need to use switch case to check. ########## core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java: ########## @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree; +import com.googlecode.concurrenttrees.radix.RadixTree; +import com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.gravitino.Config; +import org.apache.gravitino.Configs; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.HasIdentifier; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.SupportsRelationOperations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** This class implements the {@link EntityCache} using Caffeine. */ +public class CaffeineEntityCache extends BaseEntityCache { + private final int CACHE_CLEANUP_CORE_THREADS = 1; + private final int CACHE_CLEANUP_MAX_THREADS = 1; + private final int CACHE_CLEANUP_QUEUE_CAPACITY = 100; + private final int CACHE_MONITOR_PERIOD_MINUTES = 5; + private final int CACHE_MONITOR_INITIAL_DELAY_MINUTES = 0; Review Comment: I think these variables can be static. ########## core/src/main/java/org/apache/gravitino/cache/CaffeineEntityCache.java: ########## @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree; +import com.googlecode.concurrenttrees.radix.RadixTree; +import com.googlecode.concurrenttrees.radix.node.concrete.DefaultCharArrayNodeFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.gravitino.Config; +import org.apache.gravitino.Configs; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.HasIdentifier; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.SupportsRelationOperations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** This class implements the {@link EntityCache} using Caffeine. */ +public class CaffeineEntityCache extends BaseEntityCache { + private final int CACHE_CLEANUP_CORE_THREADS = 1; + private final int CACHE_CLEANUP_MAX_THREADS = 1; + private final int CACHE_CLEANUP_QUEUE_CAPACITY = 100; + private final int CACHE_MONITOR_PERIOD_MINUTES = 5; + private final int CACHE_MONITOR_INITIAL_DELAY_MINUTES = 0; + private static final Logger LOG = LoggerFactory.getLogger(CaffeineEntityCache.class.getName()); + private final ReentrantLock opLock = new ReentrantLock(); + + private static volatile CaffeineEntityCache INSTANCE; + + /** Cache part */ + private final Cache<EntityCacheKey, List<Entity>> cacheData; + + /** Index part */ + private RadixTree<EntityCacheKey> cacheIndex; + + private ScheduledExecutorService scheduler; + + @VisibleForTesting + static void resetForTest() { + INSTANCE = null; + } + + /** + * Returns the instance of {@link CaffeineEntityCache} based on the cache configuration and entity + * store. + * + * @param cacheConfig The cache configuration + * @param entityStore The entity store to load entities from the database + * @return The instance of {@link CaffeineEntityCache} + */ + public static CaffeineEntityCache getInstance(Config cacheConfig, EntityStore entityStore) { + if (INSTANCE == null) { + synchronized (CaffeineEntityCache.class) { + if (INSTANCE == null) { + INSTANCE = new CaffeineEntityCache(cacheConfig, entityStore); + } + } + } + return INSTANCE; + } + + /** + * Constructs a new {@link CaffeineEntityCache}. + * + * @param cacheConfig the cache configuration + * @param entityStore The entity store to load entities from the database + */ + private CaffeineEntityCache(Config cacheConfig, EntityStore entityStore) { + super(cacheConfig, entityStore); + cacheIndex = new ConcurrentRadixTree<>(new DefaultCharArrayNodeFactory()); Review Comment: Please add `this.`. For the first time class variable is initialized, it is better to add `this.` to the variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
