Copilot commented on code in PR #9827:
URL: https://github.com/apache/gravitino/pull/9827#discussion_r2744334073


##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -110,15 +103,41 @@ public Optional<IcebergConfig> 
getIcebergCatalogConfig(String catalogName) {
     return 
Optional.of(getIcebergConfigFromCatalogProperties(catalog.properties()));
   }
 
-  @VisibleForTesting
-  void setClient(GravitinoClient client) {
-    this.client = client;
+  /**
+   * Lazily creates the CatalogFetcher based on whether authorization is 
enabled. Uses internal
+   * interface when authorization is enabled, otherwise uses HTTP interface.
+   */
+  private CatalogFetcher getCatalogFetcher() {
+    if (catalogFetcher != null) {
+      return catalogFetcher;
+    }
+    synchronized (this) {
+      if (catalogFetcher == null) {
+        IcebergRESTServerContext serverContext = 
IcebergRESTServerContext.getInstance();
+        if (serverContext.isAuthorizationEnabled()) {
+          catalogFetcher = new InternalCatalogFetcher(gravitinoMetalake);
+        } else {
+          String uri = properties.get(IcebergConstants.GRAVITINO_URI);
+          if (StringUtils.isBlank(uri)) {
+            JettyServerConfig config =
+                JettyServerConfig.fromConfig(
+                    GravitinoEnv.getInstance().config(),
+                    JettyServerConfig.GRAVITINO_SERVER_CONFIG_PREFIX);
+            uri = String.format("http://%s:%d";, config.getHost(), 
config.getHttpPort());
+          }
+          Preconditions.checkArgument(
+              StringUtils.isNotBlank(uri), IcebergConstants.GRAVITINO_URI + " 
is blank");
+          catalogFetcher = new HttpCatalogFetcher(uri, gravitinoMetalake, 
properties);
+        }
+      }
+    }
+    return catalogFetcher;
   }
 
   @Override
   public void close() {
-    if (client != null) {
-      client.close();
+    if (catalogFetcher != null) {
+      catalogFetcher.close();
     }

Review Comment:
   The close() method accesses catalogFetcher without synchronization (lines 
138-142), while getCatalogFetcher() uses double-checked locking with 
synchronization. This creates a potential race condition:
   1. Thread A starts calling close() and checks catalogFetcher != null
   2. Thread B calls getCatalogFetcher() and initializes catalogFetcher
   3. Thread A closes the newly initialized catalogFetcher
   4. Thread B returns the now-closed catalogFetcher
   
   While this scenario is unlikely in practice (close is typically called 
during shutdown), it's a theoretical race condition. Consider synchronizing the 
close() method or making it synchronized on the same lock used in 
getCatalogFetcher().
   ```suggestion
       CatalogFetcher toClose;
       synchronized (this) {
         toClose = catalogFetcher;
         catalogFetcher = null;
       }
       if (toClose != null) {
         toClose.close();
       }
   ```



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -142,54 +161,126 @@ public String getDefaultCatalogName() {
     return 
defaultDynamicCatalogName.orElse(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
   }
 
-  // client is lazy loaded because the Gravitino server may not be started yet 
when the provider is
-  // initialized.
-  private GravitinoClient getGravitinoClient() {
-    if (client != null) {
-      return client;
+  @Override
+  public boolean canCheckConfigChange() {
+    // Only support checking config changes when using internal fetcher 
(authorization enabled),
+    // as it can efficiently fetch current config without HTTP overhead.
+    return IcebergRESTServerContext.getInstance().isAuthorizationEnabled();
+  }
+
+  /** Interface for fetching catalog information. */
+  private interface CatalogFetcher extends Closeable {
+    Catalog loadCatalog(String catalogName) throws NoSuchCatalogException;
+
+    @Override
+    default void close() {}
+  }

Review Comment:
   The CatalogFetcher interface is defined as private within 
DynamicIcebergConfigProvider, which is good for encapsulation. However, the 
interface extends Closeable but provides a default no-op close() implementation 
(line 176). This creates a potential resource leak if a future implementation 
needs cleanup but forgets to override close().
   
   The current implementations handle this correctly (HttpCatalogFetcher closes 
the client, InternalCatalogFetcher has no resources), but consider removing the 
default implementation to force implementers to explicitly handle cleanup, even 
if it's a no-op. This makes resource management explicit and prevents future 
mistakes.



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java:
##########
@@ -9,7 +9,7 @@
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
+ *  Unless required by applicable law or agreed in writing,

Review Comment:
   The Apache license header has been modified with a grammatical change from 
"to" to "in writing" which deviates from the standard Apache 2.0 license text. 
The standard wording is "Unless required by applicable law or agreed to in 
writing". This change should be reverted to match the standard Apache License 
2.0 header text.
   ```suggestion
    *  Unless required by applicable law or agreed to in writing,
   ```



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ValidatingCache.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache wrapper that validates cached entries against the current 
configuration. When the config
+ * provider can check config changes efficiently, it compares the cached 
wrapper's config with the
+ * current config. If the config has changed, the stale entry is invalidated 
before returning.
+ */
+public class ValidatingCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ValidatingCache.class);
+
+  private final Cache<String, CatalogWrapperForREST> cache;
+  private final IcebergConfigProvider configProvider;
+
+  public ValidatingCache(
+      Cache<String, CatalogWrapperForREST> cache, IcebergConfigProvider 
configProvider) {
+    this.cache = cache;
+    this.configProvider = configProvider;
+  }
+
+  /**
+   * Gets the value from the cache, validating it first if the provider can 
check config changes. If
+   * the cached entry is stale (config has changed), it will be invalidated 
and a new entry will be
+   * created.
+   *
+   * @param key The cache key (catalog name).
+   * @param mappingFunction The function to create a new value if not present 
or invalidated.
+   * @return The cached or newly created CatalogWrapperForREST.
+   */
+  public CatalogWrapperForREST get(
+      String key, Function<String, CatalogWrapperForREST> mappingFunction) {
+    validateCachedEntry(key);
+    return cache.get(key, mappingFunction);
+  }
+
+  /** Invalidates all entries in the cache. */
+  public void invalidateAll() {
+    cache.invalidateAll();
+  }
+
+  /**
+   * Validates if the cached entry is still valid. When the provider can check 
config changes
+   * efficiently, checks if the cached config matches the current config. If 
not, invalidates the
+   * cache entry.
+   */
+  private void validateCachedEntry(String catalogName) {
+    if (!configProvider.canCheckConfigChange()) {
+      return;
+    }
+
+    CatalogWrapperForREST cachedWrapper = cache.getIfPresent(catalogName);
+    if (cachedWrapper == null) {
+      return;
+    }
+
+    Optional<IcebergConfig> currentConfig = 
configProvider.getIcebergCatalogConfig(catalogName);
+    if (currentConfig.isEmpty()) {
+      return;
+    }
+
+    if (!isSameConfig(cachedWrapper.getIcebergConfig(), currentConfig.get())) {
+      LOG.info(
+          "Catalog {} config has changed, invalidating cache and creating new 
wrapper.",
+          catalogName);
+      cache.invalidate(catalogName);
+    }
+  }

Review Comment:
   There is a potential race condition in the validateCachedEntry method. 
Between checking if the cached wrapper exists (line 77) and comparing configs 
(line 87), another thread could invalidate the cache entry. While this won't 
cause a crash, it could lead to unnecessary cache invalidation or comparing 
against a null config.
   
   Consider adding synchronization or using a more atomic approach to prevent 
this race condition, especially since this code will be called frequently in a 
multi-threaded REST service environment.



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java:
##########
@@ -63,4 +63,16 @@ default String getMetalakeName() {
   default String getDefaultCatalogName() {
     return IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG;
   }
+
+  /**
+   * Indicates whether the catalog config can be checked for changes 
efficiently. When true, the
+   * caller can compare cached config with current config to detect stale 
entries. This is typically
+   * true only for providers that can fetch config without expensive 
operations (e.g., internal
+   * calls that bypass HTTP).
+   *
+   * @return true if config changes can be detected efficiently, false 
otherwise.
+   */
+  default boolean canCheckConfigChange() {
+    return false;
+  }

Review Comment:
   The canCheckConfigChange method defaults to false, which means it's disabled 
for all existing implementations except DynamicIcebergConfigProvider when 
authorization is enabled. However, the documentation doesn't clearly explain 
the performance implications or when this should return true vs false. 
   
   Consider expanding the JavaDoc to explain:
   1. The performance cost of checking config changes (it calls 
getIcebergCatalogConfig which could be expensive)
   2. When implementations should return true (only when they can fetch config 
efficiently without HTTP calls)
   3. The tradeoff between cache freshness and performance
   
   This will help future implementers make informed decisions.



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -142,54 +161,126 @@ public String getDefaultCatalogName() {
     return 
defaultDynamicCatalogName.orElse(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
   }
 
-  // client is lazy loaded because the Gravitino server may not be started yet 
when the provider is
-  // initialized.
-  private GravitinoClient getGravitinoClient() {
-    if (client != null) {
-      return client;
+  @Override
+  public boolean canCheckConfigChange() {
+    // Only support checking config changes when using internal fetcher 
(authorization enabled),
+    // as it can efficiently fetch current config without HTTP overhead.
+    return IcebergRESTServerContext.getInstance().isAuthorizationEnabled();
+  }

Review Comment:
   The new canCheckConfigChange() method in IcebergConfigProvider interface is 
not tested in TestDynamicIcebergConfigProvider. This method is critical for the 
ValidatingCache functionality and should have test coverage to verify:
   1. It returns true when authorization is enabled (internal fetcher mode)
   2. It returns false when authorization is disabled (HTTP fetcher mode)
   3. The ValidatingCache respects this flag and only validates when it returns 
true
   
   Add test cases to verify this behavior works correctly for both modes.



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java:
##########
@@ -21,31 +21,100 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.CatalogDispatcher;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
-import org.apache.gravitino.client.GravitinoClient;
-import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
 import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 public class TestDynamicIcebergConfigProvider {
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    // Create IcebergRESTServerContext with authorization disabled by default
+    createMockServerContext(false);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    // Clean up GravitinoEnv and IcebergRESTServerContext state after each test
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogDispatcher", 
null, true);
+    resetServerContext();
+  }

Review Comment:
   The setUp and tearDown methods manipulate shared singleton state 
(IcebergRESTServerContext and GravitinoEnv) using reflection. If tests are run 
in parallel, this could lead to test interference and flaky tests. The tests 
don't appear to have any mechanism to prevent parallel execution.
   
   Consider adding @Execution(ExecutionMode.SAME_THREAD) at the class level to 
ensure tests run sequentially and don't interfere with each other's singleton 
state. This is especially important since these tests modify global state that 
could affect other tests.



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ValidatingCache.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache wrapper that validates cached entries against the current 
configuration. When the config
+ * provider can check config changes efficiently, it compares the cached 
wrapper's config with the
+ * current config. If the config has changed, the stale entry is invalidated 
before returning.
+ */
+public class ValidatingCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ValidatingCache.class);
+
+  private final Cache<String, CatalogWrapperForREST> cache;
+  private final IcebergConfigProvider configProvider;
+
+  public ValidatingCache(
+      Cache<String, CatalogWrapperForREST> cache, IcebergConfigProvider 
configProvider) {
+    this.cache = cache;
+    this.configProvider = configProvider;
+  }
+
+  /**
+   * Gets the value from the cache, validating it first if the provider can 
check config changes. If
+   * the cached entry is stale (config has changed), it will be invalidated 
and a new entry will be
+   * created.
+   *
+   * @param key The cache key (catalog name).
+   * @param mappingFunction The function to create a new value if not present 
or invalidated.
+   * @return The cached or newly created CatalogWrapperForREST.
+   */
+  public CatalogWrapperForREST get(
+      String key, Function<String, CatalogWrapperForREST> mappingFunction) {
+    validateCachedEntry(key);
+    return cache.get(key, mappingFunction);
+  }
+
+  /** Invalidates all entries in the cache. */
+  public void invalidateAll() {
+    cache.invalidateAll();
+  }
+
+  /**
+   * Validates if the cached entry is still valid. When the provider can check 
config changes
+   * efficiently, checks if the cached config matches the current config. If 
not, invalidates the
+   * cache entry.
+   */
+  private void validateCachedEntry(String catalogName) {
+    if (!configProvider.canCheckConfigChange()) {
+      return;
+    }
+
+    CatalogWrapperForREST cachedWrapper = cache.getIfPresent(catalogName);
+    if (cachedWrapper == null) {
+      return;
+    }
+
+    Optional<IcebergConfig> currentConfig = 
configProvider.getIcebergCatalogConfig(catalogName);
+    if (currentConfig.isEmpty()) {
+      return;
+    }
+
+    if (!isSameConfig(cachedWrapper.getIcebergConfig(), currentConfig.get())) {
+      LOG.info(
+          "Catalog {} config has changed, invalidating cache and creating new 
wrapper.",
+          catalogName);
+      cache.invalidate(catalogName);
+    }
+  }
+
+  private boolean isSameConfig(IcebergConfig cachedConfig, IcebergConfig 
currentConfig) {
+    return cachedConfig.getAllConfig().equals(currentConfig.getAllConfig());
+  }
+}

Review Comment:
   The ValidatingCache class is a new critical component that validates cached 
catalog wrappers against current configuration, but it has no dedicated test 
coverage. This validation logic is essential for the internal catalog fetcher 
feature and should have comprehensive unit tests covering:
   1. Cache validation when config changes
   2. Cache validation when config remains the same
   3. Behavior when canCheckConfigChange() returns false
   4. Behavior when currentConfig is empty
   5. Edge cases with null or invalid configs
   
   Consider adding a TestValidatingCache.java file to ensure this functionality 
works correctly.



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -142,54 +161,126 @@ public String getDefaultCatalogName() {
     return 
defaultDynamicCatalogName.orElse(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
   }
 
-  // client is lazy loaded because the Gravitino server may not be started yet 
when the provider is
-  // initialized.
-  private GravitinoClient getGravitinoClient() {
-    if (client != null) {
-      return client;
+  @Override
+  public boolean canCheckConfigChange() {
+    // Only support checking config changes when using internal fetcher 
(authorization enabled),
+    // as it can efficiently fetch current config without HTTP overhead.
+    return IcebergRESTServerContext.getInstance().isAuthorizationEnabled();
+  }
+
+  /** Interface for fetching catalog information. */
+  private interface CatalogFetcher extends Closeable {
+    Catalog loadCatalog(String catalogName) throws NoSuchCatalogException;
+
+    @Override
+    default void close() {}
+  }
+
+  /**
+   * Internal catalog fetcher that uses CatalogDispatcher directly. This 
bypasses the HTTP layer and
+   * is used when authorization is enabled.
+   *
+   * <p>Note: When authorization is enabled, IcebergCatalogWrapperManager 
bypasses its cache to
+   * avoid consistency issues between the IcebergCatalogWrapper cache and 
CatalogManager cache.
+   */
+  private static class InternalCatalogFetcher implements CatalogFetcher {
+    private final String metalake;
+
+    InternalCatalogFetcher(String metalake) {
+      this.metalake = metalake;
     }
-    synchronized (this) {
-      if (client == null) {
-        client = createGravitinoClient(gravitinoUri, gravitinoMetalake, 
properties);
-      }
+
+    @Override
+    public Catalog loadCatalog(String catalogName) throws 
NoSuchCatalogException {
+      CatalogDispatcher catalogDispatcher = 
GravitinoEnv.getInstance().catalogDispatcher();
+      Preconditions.checkState(
+          catalogDispatcher != null,
+          "CatalogDispatcher is not available. "
+              + "Internal catalog fetcher requires running within Gravitino 
server.");
+      NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(metalake, 
catalogName);
+      return catalogDispatcher.loadCatalog(catalogIdent);
     }
-    return client;
   }
 
-  private GravitinoClient createGravitinoClient(
-      String uri, String metalake, Map<String, String> properties) {
-    ClientBuilder builder = 
GravitinoClient.builder(uri).withMetalake(metalake);
-    String authType =
-        properties.getOrDefault(
-            IcebergConstants.GRAVITINO_AUTH_TYPE, 
AuthProperties.SIMPLE_AUTH_TYPE);
-    if (AuthProperties.isSimple(authType)) {
-      String userName =
+  /**
+   * HTTP catalog fetcher that uses GravitinoClient to access catalogs via 
REST API. This is used
+   * when authorization is not enabled.
+   */
+  private static class HttpCatalogFetcher implements CatalogFetcher {
+    private final String uri;
+    private final String metalake;
+    private final Map<String, String> properties;
+    private volatile GravitinoClient client;
+
+    HttpCatalogFetcher(String uri, String metalake, Map<String, String> 
properties) {
+      this.uri = uri;
+      this.metalake = metalake;
+      this.properties = properties;
+    }
+
+    @Override
+    public Catalog loadCatalog(String catalogName) throws 
NoSuchCatalogException {
+      return 
getGravitinoClient().loadMetalake(metalake).loadCatalog(catalogName);
+    }
+
+    @Override
+    public void close() {
+      if (client != null) {
+        client.close();
+      }
+    }
+
+    // Client is lazy loaded because the Gravitino server may not be started 
yet when the provider
+    // is initialized.
+    private GravitinoClient getGravitinoClient() {
+      if (client != null) {
+        return client;
+      }
+      synchronized (this) {
+        if (client == null) {
+          client = createGravitinoClient(uri, metalake, properties);
+        }
+      }
+      return client;
+    }
+
+    private GravitinoClient createGravitinoClient(
+        String uri, String metalake, Map<String, String> properties) {
+      ClientBuilder builder = 
GravitinoClient.builder(uri).withMetalake(metalake);
+      String authType =
           properties.getOrDefault(
-              IcebergConstants.GRAVITINO_SIMPLE_USERNAME, 
"iceberg-rest-server");
-      builder.withSimpleAuth(userName);
-    } else if (AuthProperties.isOAuth2(authType)) {
-      String oAuthUri = getRequiredConfig(properties, 
IcebergConstants.GRAVITINO_OAUTH2_SERVER_URI);
-      String credential =
-          getRequiredConfig(properties, 
IcebergConstants.GRAVITINO_OAUTH2_CREDENTIAL);
-      String path = getRequiredConfig(properties, 
IcebergConstants.GRAVITINO_OAUTH2_TOKEN_PATH);
-      String scope = getRequiredConfig(properties, 
IcebergConstants.GRAVITINO_OAUTH2_SCOPE);
-      DefaultOAuth2TokenProvider oAuth2TokenProvider =
-          DefaultOAuth2TokenProvider.builder()
-              .withUri(oAuthUri)
-              .withCredential(credential)
-              .withPath(path)
-              .withScope(scope)
-              .build();
-      builder.withOAuth(oAuth2TokenProvider);
-    } else {
-      throw new UnsupportedOperationException("Unsupported auth type: " + 
authType);
+              IcebergConstants.GRAVITINO_AUTH_TYPE, 
AuthProperties.SIMPLE_AUTH_TYPE);
+      if (AuthProperties.isSimple(authType)) {
+        String userName =
+            properties.getOrDefault(
+                IcebergConstants.GRAVITINO_SIMPLE_USERNAME, 
"iceberg-rest-server");
+        builder.withSimpleAuth(userName);
+      } else if (AuthProperties.isOAuth2(authType)) {
+        String oAuthUri =
+            getRequiredConfig(properties, 
IcebergConstants.GRAVITINO_OAUTH2_SERVER_URI);
+        String credential =
+            getRequiredConfig(properties, 
IcebergConstants.GRAVITINO_OAUTH2_CREDENTIAL);
+        String path = getRequiredConfig(properties, 
IcebergConstants.GRAVITINO_OAUTH2_TOKEN_PATH);
+        String scope = getRequiredConfig(properties, 
IcebergConstants.GRAVITINO_OAUTH2_SCOPE);
+        DefaultOAuth2TokenProvider oAuth2TokenProvider =
+            DefaultOAuth2TokenProvider.builder()
+                .withUri(oAuthUri)
+                .withCredential(credential)
+                .withPath(path)
+                .withScope(scope)
+                .build();
+        builder.withOAuth(oAuth2TokenProvider);
+      } else {
+        throw new UnsupportedOperationException("Unsupported auth type: " + 
authType);
+      }
+      return builder.build();
     }
-    return builder.build();
-  }
 
-  private String getRequiredConfig(Map<String, String> properties, String key) 
{
-    String configValue = properties.get(key);
-    Preconditions.checkArgument(StringUtils.isNotBlank(configValue), key + " 
should not be empty");
-    return configValue;
+    private String getRequiredConfig(Map<String, String> properties, String 
key) {
+      String configValue = properties.get(key);
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(configValue), key + " should not be empty");
+      return configValue;
+    }
   }

Review Comment:
   The HttpCatalogFetcher class duplicates the GravitinoClient creation logic 
that was previously in DynamicIcebergConfigProvider. While this refactoring 
provides good separation of concerns, consider extracting the client creation 
logic into a separate static factory method to avoid code duplication and 
improve maintainability. This would make it easier to update authentication 
logic in the future and ensure consistency across different parts of the 
codebase.



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java:
##########
@@ -21,31 +21,100 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.CatalogDispatcher;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
-import org.apache.gravitino.client.GravitinoClient;
-import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
 import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 public class TestDynamicIcebergConfigProvider {
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    // Create IcebergRESTServerContext with authorization disabled by default
+    createMockServerContext(false);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    // Clean up GravitinoEnv and IcebergRESTServerContext state after each test
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogDispatcher", 
null, true);
+    resetServerContext();
+  }
+
+  private void createMockServerContext(boolean authorizationEnabled) throws 
IllegalAccessException {
+    // Use reflection to set the IcebergRESTServerContext instance
+    IcebergConfigProvider mockProvider = 
Mockito.mock(IcebergConfigProvider.class);
+    Mockito.when(mockProvider.getMetalakeName()).thenReturn("test_metalake");
+    
Mockito.when(mockProvider.getDefaultCatalogName()).thenReturn("default_catalog");
+    IcebergRESTServerContext.create(mockProvider, authorizationEnabled);
+  }
+
+  private void resetServerContext() throws IllegalAccessException {
+    // Reset the IcebergRESTServerContext singleton
+    Class<?> holderClass = 
IcebergRESTServerContext.class.getDeclaredClasses()[0];
+    FieldUtils.writeStaticField(holderClass, "INSTANCE", null, true);
+  }

Review Comment:
   The resetServerContext method uses reflection to reset a static singleton 
(lines 66-70). This approach has several issues:
   
   1. It assumes the first declared class is the InstanceHolder, which is 
fragile
   2. If there are multiple inner classes or the order changes, this will break
   3. The hardcoded array index [0] makes the code brittle
   
   Consider making the reset method more robust by finding the class by name:
   ```java
   private void resetServerContext() throws IllegalAccessException {
     Class<?> holderClass = 
Arrays.stream(IcebergRESTServerContext.class.getDeclaredClasses())
         .filter(c -> c.getSimpleName().equals("InstanceHolder"))
         .findFirst()
         .orElseThrow(() -> new RuntimeException("InstanceHolder not found"));
     FieldUtils.writeStaticField(holderClass, "INSTANCE", null, true);
   }
   ```
   
   Alternatively, consider adding a test-only reset method in 
IcebergRESTServerContext marked with @VisibleForTesting.



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -142,54 +161,126 @@ public String getDefaultCatalogName() {
     return 
defaultDynamicCatalogName.orElse(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
   }
 
-  // client is lazy loaded because the Gravitino server may not be started yet 
when the provider is
-  // initialized.
-  private GravitinoClient getGravitinoClient() {
-    if (client != null) {
-      return client;
+  @Override
+  public boolean canCheckConfigChange() {
+    // Only support checking config changes when using internal fetcher 
(authorization enabled),
+    // as it can efficiently fetch current config without HTTP overhead.
+    return IcebergRESTServerContext.getInstance().isAuthorizationEnabled();
+  }
+
+  /** Interface for fetching catalog information. */
+  private interface CatalogFetcher extends Closeable {
+    Catalog loadCatalog(String catalogName) throws NoSuchCatalogException;
+
+    @Override
+    default void close() {}
+  }
+
+  /**
+   * Internal catalog fetcher that uses CatalogDispatcher directly. This 
bypasses the HTTP layer and
+   * is used when authorization is enabled.
+   *
+   * <p>Note: When authorization is enabled, IcebergCatalogWrapperManager 
bypasses its cache to
+   * avoid consistency issues between the IcebergCatalogWrapper cache and 
CatalogManager cache.
+   */
+  private static class InternalCatalogFetcher implements CatalogFetcher {
+    private final String metalake;
+
+    InternalCatalogFetcher(String metalake) {
+      this.metalake = metalake;
     }
-    synchronized (this) {
-      if (client == null) {
-        client = createGravitinoClient(gravitinoUri, gravitinoMetalake, 
properties);
-      }
+
+    @Override
+    public Catalog loadCatalog(String catalogName) throws 
NoSuchCatalogException {
+      CatalogDispatcher catalogDispatcher = 
GravitinoEnv.getInstance().catalogDispatcher();
+      Preconditions.checkState(
+          catalogDispatcher != null,
+          "CatalogDispatcher is not available. "
+              + "Internal catalog fetcher requires running within Gravitino 
server.");
+      NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(metalake, 
catalogName);
+      return catalogDispatcher.loadCatalog(catalogIdent);
     }

Review Comment:
   The InternalCatalogFetcher's loadCatalog method checks if catalogDispatcher 
is null and throws an IllegalStateException with a helpful message (lines 
196-199). However, this check happens on every catalog load operation. Since 
the catalogDispatcher should be set once during server initialization and never 
change, consider moving this check to the constructor of InternalCatalogFetcher 
instead.
   
   This would:
   1. Fail-fast during initialization rather than at runtime
   2. Eliminate the repeated null check overhead on every catalog load
   3. Make the class invariant clearer (catalogDispatcher must always be 
available)
   
   The constructor could look like:
   ```java
   InternalCatalogFetcher(String metalake) {
     this.metalake = metalake;
     CatalogDispatcher dispatcher = 
GravitinoEnv.getInstance().catalogDispatcher();
     Preconditions.checkState(dispatcher != null, 
         "CatalogDispatcher is not available. Internal catalog fetcher requires 
running within Gravitino server.");
     this.catalogDispatcher = dispatcher;
   }
   ```



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java:
##########
@@ -170,4 +258,84 @@ void testIcebergConfig() {
     Assertions.assertEquals(
         
icebergConfig.getIcebergCatalogProperties().get("catalog.backend-name"), 
"custom_backend");
   }
+
+  @Test
+  public void testInternalCatalogFetcher() throws IllegalAccessException {
+    String metalakeName = "test_metalake";
+    String catalogName = "internal_catalog";
+
+    // Enable authorization to use internal fetcher
+    createMockServerContext(true);
+
+    // Mock CatalogDispatcher
+    CatalogDispatcher mockCatalogDispatcher = 
Mockito.mock(CatalogDispatcher.class);
+    Catalog mockCatalog = Mockito.mock(Catalog.class);
+
+    NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(metalakeName, 
catalogName);
+    
Mockito.when(mockCatalogDispatcher.loadCatalog(catalogIdent)).thenReturn(mockCatalog);
+    Mockito.when(mockCatalog.provider()).thenReturn("lakehouse-iceberg");
+    Mockito.when(mockCatalog.properties())
+        .thenReturn(
+            new HashMap<String, String>() {
+              {
+                put(IcebergConstants.CATALOG_BACKEND, "custom");
+                put(IcebergConstants.CATALOG_BACKEND_NAME, catalogName);
+              }
+            });
+
+    // Set the mock CatalogDispatcher to GravitinoEnv
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "catalogDispatcher", 
mockCatalogDispatcher, true);
+
+    // Initialize provider with required properties
+    Map<String, String> properties = new HashMap<>();
+    properties.put(IcebergConstants.GRAVITINO_METALAKE, metalakeName);
+
+    DynamicIcebergConfigProvider provider = new DynamicIcebergConfigProvider();
+    provider.initialize(properties);
+
+    // Test that internal interface is used (CatalogDispatcher should be 
called)
+    Optional<IcebergConfig> icebergConfig = 
provider.getIcebergCatalogConfig(catalogName);
+
+    Assertions.assertTrue(icebergConfig.isPresent());
+    Mockito.verify(mockCatalogDispatcher).loadCatalog(catalogIdent);
+  }

Review Comment:
   The test testInternalCatalogFetcher verifies that the InternalCatalogFetcher 
is used when authorization is enabled, but it only tests the happy path. 
Consider adding additional test cases for:
   1. Error handling when CatalogDispatcher is null (before GravitinoEnv is 
initialized)
   2. NoSuchCatalogException propagation from CatalogDispatcher
   3. Thread safety when multiple threads access the provider concurrently
   
   These edge cases are important to ensure the internal fetcher behaves 
correctly in production scenarios.



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -142,54 +161,126 @@ public String getDefaultCatalogName() {
     return 
defaultDynamicCatalogName.orElse(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
   }
 
-  // client is lazy loaded because the Gravitino server may not be started yet 
when the provider is
-  // initialized.
-  private GravitinoClient getGravitinoClient() {
-    if (client != null) {
-      return client;
+  @Override
+  public boolean canCheckConfigChange() {
+    // Only support checking config changes when using internal fetcher 
(authorization enabled),
+    // as it can efficiently fetch current config without HTTP overhead.
+    return IcebergRESTServerContext.getInstance().isAuthorizationEnabled();
+  }
+
+  /** Interface for fetching catalog information. */
+  private interface CatalogFetcher extends Closeable {
+    Catalog loadCatalog(String catalogName) throws NoSuchCatalogException;
+
+    @Override
+    default void close() {}
+  }
+
+  /**
+   * Internal catalog fetcher that uses CatalogDispatcher directly. This 
bypasses the HTTP layer and
+   * is used when authorization is enabled.
+   *
+   * <p>Note: When authorization is enabled, IcebergCatalogWrapperManager 
bypasses its cache to
+   * avoid consistency issues between the IcebergCatalogWrapper cache and 
CatalogManager cache.
+   */
+  private static class InternalCatalogFetcher implements CatalogFetcher {
+    private final String metalake;
+
+    InternalCatalogFetcher(String metalake) {
+      this.metalake = metalake;
     }
-    synchronized (this) {
-      if (client == null) {
-        client = createGravitinoClient(gravitinoUri, gravitinoMetalake, 
properties);
-      }
+
+    @Override
+    public Catalog loadCatalog(String catalogName) throws 
NoSuchCatalogException {
+      CatalogDispatcher catalogDispatcher = 
GravitinoEnv.getInstance().catalogDispatcher();
+      Preconditions.checkState(
+          catalogDispatcher != null,
+          "CatalogDispatcher is not available. "
+              + "Internal catalog fetcher requires running within Gravitino 
server.");
+      NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(metalake, 
catalogName);
+      return catalogDispatcher.loadCatalog(catalogIdent);
     }
-    return client;
   }
 
-  private GravitinoClient createGravitinoClient(
-      String uri, String metalake, Map<String, String> properties) {
-    ClientBuilder builder = 
GravitinoClient.builder(uri).withMetalake(metalake);
-    String authType =
-        properties.getOrDefault(
-            IcebergConstants.GRAVITINO_AUTH_TYPE, 
AuthProperties.SIMPLE_AUTH_TYPE);
-    if (AuthProperties.isSimple(authType)) {
-      String userName =
+  /**
+   * HTTP catalog fetcher that uses GravitinoClient to access catalogs via 
REST API. This is used
+   * when authorization is not enabled.
+   */
+  private static class HttpCatalogFetcher implements CatalogFetcher {
+    private final String uri;
+    private final String metalake;
+    private final Map<String, String> properties;
+    private volatile GravitinoClient client;
+
+    HttpCatalogFetcher(String uri, String metalake, Map<String, String> 
properties) {
+      this.uri = uri;
+      this.metalake = metalake;
+      this.properties = properties;
+    }
+
+    @Override
+    public Catalog loadCatalog(String catalogName) throws 
NoSuchCatalogException {
+      return 
getGravitinoClient().loadMetalake(metalake).loadCatalog(catalogName);
+    }
+
+    @Override
+    public void close() {
+      if (client != null) {
+        client.close();

Review Comment:
   The close() method in HttpCatalogFetcher has the same concurrency issue as 
the outer class. It accesses the volatile client field without synchronization 
(lines 227-230), while getGravitinoClient() uses double-checked locking with 
synchronized blocks. This creates a race condition where close() might close a 
client that another thread is initializing or about to use.
   
   Consider synchronizing the close() method to prevent this race condition:
   ```java
   @Override
   public void close() {
     synchronized (this) {
       if (client != null) {
         client.close();
         client = null;
       }
     }
   }
   ```
   
   Setting client to null after closing also helps prevent use-after-close 
scenarios.
   ```suggestion
         synchronized (this) {
           if (client != null) {
             client.close();
             client = null;
           }
   ```



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java:
##########
@@ -21,31 +21,100 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.CatalogDispatcher;
 import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
-import org.apache.gravitino.client.GravitinoClient;
-import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
 import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import 
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 public class TestDynamicIcebergConfigProvider {
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    // Create IcebergRESTServerContext with authorization disabled by default
+    createMockServerContext(false);
+  }
+
+  @AfterEach
+  public void tearDown() throws IllegalAccessException {
+    // Clean up GravitinoEnv and IcebergRESTServerContext state after each test
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogDispatcher", 
null, true);
+    resetServerContext();
+  }
+
+  private void createMockServerContext(boolean authorizationEnabled) throws 
IllegalAccessException {
+    // Use reflection to set the IcebergRESTServerContext instance
+    IcebergConfigProvider mockProvider = 
Mockito.mock(IcebergConfigProvider.class);
+    Mockito.when(mockProvider.getMetalakeName()).thenReturn("test_metalake");
+    
Mockito.when(mockProvider.getDefaultCatalogName()).thenReturn("default_catalog");
+    IcebergRESTServerContext.create(mockProvider, authorizationEnabled);
+  }
+
+  private void resetServerContext() throws IllegalAccessException {
+    // Reset the IcebergRESTServerContext singleton
+    Class<?> holderClass = 
IcebergRESTServerContext.class.getDeclaredClasses()[0];
+    FieldUtils.writeStaticField(holderClass, "INSTANCE", null, true);
+  }
+
+  /**
+   * Creates a mock CatalogFetcher using reflection to set it on the provider. 
This avoids directly
+   * accessing the private CatalogFetcher interface.
+   */
+  private void setMockCatalogFetcher(
+      DynamicIcebergConfigProvider provider, Map<String, Catalog> catalogMap)
+      throws IllegalAccessException {
+    // Create a mock object that implements the private CatalogFetcher 
interface via reflection
+    Object mockFetcher =
+        java.lang.reflect.Proxy.newProxyInstance(
+            getClass().getClassLoader(),
+            new Class<?>[] {getCatalogFetcherInterface()},
+            (proxy, method, args) -> {
+              if ("loadCatalog".equals(method.getName())) {
+                String catalogName = (String) args[0];
+                Catalog catalog = catalogMap.get(catalogName);
+                if (catalog == null) {
+                  throw new NoSuchCatalogException("Catalog not found: %s", 
catalogName);
+                }
+                return catalog;
+              } else if ("close".equals(method.getName())) {
+                return null;
+              }
+              return null;
+            });
+    FieldUtils.writeField(provider, "catalogFetcher", mockFetcher, true);
+  }
+
+  private Class<?> getCatalogFetcherInterface() {
+    for (Class<?> innerClass : 
DynamicIcebergConfigProvider.class.getDeclaredClasses()) {
+      if (innerClass.getSimpleName().equals("CatalogFetcher")) {
+        return innerClass;
+      }
+    }
+    throw new RuntimeException("CatalogFetcher interface not found");
+  }

Review Comment:
   The test uses reflection to access and mock the private CatalogFetcher 
interface (lines 76-107). While this approach works, it creates tight coupling 
between the test and the internal implementation details of 
DynamicIcebergConfigProvider. If the internal structure changes (e.g., 
CatalogFetcher is renamed or moved), these tests will break.
   
   Consider one of these alternatives:
   1. Make CatalogFetcher package-private instead of private, allowing test 
access without reflection
   2. Add a test-only constructor or setter that accepts a CatalogFetcher 
(marked with @VisibleForTesting)
   3. Use PowerMock or similar frameworks that handle this more elegantly
   
   This would make the tests more maintainable and less fragile to internal 
refactoring.



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ValidatingCache.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache wrapper that validates cached entries against the current 
configuration. When the config
+ * provider can check config changes efficiently, it compares the cached 
wrapper's config with the
+ * current config. If the config has changed, the stale entry is invalidated 
before returning.
+ */
+public class ValidatingCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ValidatingCache.class);
+
+  private final Cache<String, CatalogWrapperForREST> cache;
+  private final IcebergConfigProvider configProvider;
+
+  public ValidatingCache(
+      Cache<String, CatalogWrapperForREST> cache, IcebergConfigProvider 
configProvider) {
+    this.cache = cache;
+    this.configProvider = configProvider;
+  }
+
+  /**
+   * Gets the value from the cache, validating it first if the provider can 
check config changes. If
+   * the cached entry is stale (config has changed), it will be invalidated 
and a new entry will be
+   * created.
+   *
+   * @param key The cache key (catalog name).
+   * @param mappingFunction The function to create a new value if not present 
or invalidated.
+   * @return The cached or newly created CatalogWrapperForREST.
+   */
+  public CatalogWrapperForREST get(
+      String key, Function<String, CatalogWrapperForREST> mappingFunction) {
+    validateCachedEntry(key);
+    return cache.get(key, mappingFunction);
+  }
+
+  /** Invalidates all entries in the cache. */
+  public void invalidateAll() {
+    cache.invalidateAll();
+  }
+
+  /**
+   * Validates if the cached entry is still valid. When the provider can check 
config changes
+   * efficiently, checks if the cached config matches the current config. If 
not, invalidates the
+   * cache entry.
+   */
+  private void validateCachedEntry(String catalogName) {
+    if (!configProvider.canCheckConfigChange()) {
+      return;
+    }
+
+    CatalogWrapperForREST cachedWrapper = cache.getIfPresent(catalogName);
+    if (cachedWrapper == null) {
+      return;
+    }
+
+    Optional<IcebergConfig> currentConfig = 
configProvider.getIcebergCatalogConfig(catalogName);
+    if (currentConfig.isEmpty()) {

Review Comment:
   In the validateCachedEntry method, if currentConfig is empty (line 83-85), 
the method returns early without invalidating the cache. However, this could 
leave stale entries in the cache if a catalog was deleted from Gravitino. When 
a catalog is removed, getIcebergCatalogConfig should return Optional.empty(), 
but the cached entry will remain valid indefinitely.
   
   Consider invalidating the cache entry when currentConfig is empty to ensure 
removed catalogs are properly cleaned up:
   ```java
   if (currentConfig.isEmpty()) {
     LOG.info("Catalog {} no longer exists, invalidating cache.", catalogName);
     cache.invalidate(catalogName);
     return;
   }
   ```
   ```suggestion
       if (currentConfig.isEmpty()) {
         LOG.info("Catalog {} no longer exists, invalidating cache.", 
catalogName);
         cache.invalidate(catalogName);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to