Copilot commented on code in PR #9827:
URL: https://github.com/apache/gravitino/pull/9827#discussion_r2744334073
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -110,15 +103,41 @@ public Optional<IcebergConfig>
getIcebergCatalogConfig(String catalogName) {
return
Optional.of(getIcebergConfigFromCatalogProperties(catalog.properties()));
}
- @VisibleForTesting
- void setClient(GravitinoClient client) {
- this.client = client;
+ /**
+ * Lazily creates the CatalogFetcher based on whether authorization is
enabled. Uses internal
+ * interface when authorization is enabled, otherwise uses HTTP interface.
+ */
+ private CatalogFetcher getCatalogFetcher() {
+ if (catalogFetcher != null) {
+ return catalogFetcher;
+ }
+ synchronized (this) {
+ if (catalogFetcher == null) {
+ IcebergRESTServerContext serverContext =
IcebergRESTServerContext.getInstance();
+ if (serverContext.isAuthorizationEnabled()) {
+ catalogFetcher = new InternalCatalogFetcher(gravitinoMetalake);
+ } else {
+ String uri = properties.get(IcebergConstants.GRAVITINO_URI);
+ if (StringUtils.isBlank(uri)) {
+ JettyServerConfig config =
+ JettyServerConfig.fromConfig(
+ GravitinoEnv.getInstance().config(),
+ JettyServerConfig.GRAVITINO_SERVER_CONFIG_PREFIX);
+ uri = String.format("http://%s:%d", config.getHost(),
config.getHttpPort());
+ }
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(uri), IcebergConstants.GRAVITINO_URI + "
is blank");
+ catalogFetcher = new HttpCatalogFetcher(uri, gravitinoMetalake,
properties);
+ }
+ }
+ }
+ return catalogFetcher;
}
@Override
public void close() {
- if (client != null) {
- client.close();
+ if (catalogFetcher != null) {
+ catalogFetcher.close();
}
Review Comment:
The close() method accesses catalogFetcher without synchronization (lines
138-142), while getCatalogFetcher() uses double-checked locking with
synchronization. This creates a potential race condition:
1. Thread A starts calling close() and checks catalogFetcher != null
2. Thread B calls getCatalogFetcher() and initializes catalogFetcher
3. Thread A closes the newly initialized catalogFetcher
4. Thread B returns the now-closed catalogFetcher
While this scenario is unlikely in practice (close is typically called
during shutdown), it's a theoretical race condition. Consider synchronizing the
close() method or making it synchronized on the same lock used in
getCatalogFetcher().
```suggestion
CatalogFetcher toClose;
synchronized (this) {
toClose = catalogFetcher;
catalogFetcher = null;
}
if (toClose != null) {
toClose.close();
}
```
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -142,54 +161,126 @@ public String getDefaultCatalogName() {
return
defaultDynamicCatalogName.orElse(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
}
- // client is lazy loaded because the Gravitino server may not be started yet
when the provider is
- // initialized.
- private GravitinoClient getGravitinoClient() {
- if (client != null) {
- return client;
+ @Override
+ public boolean canCheckConfigChange() {
+ // Only support checking config changes when using internal fetcher
(authorization enabled),
+ // as it can efficiently fetch current config without HTTP overhead.
+ return IcebergRESTServerContext.getInstance().isAuthorizationEnabled();
+ }
+
+ /** Interface for fetching catalog information. */
+ private interface CatalogFetcher extends Closeable {
+ Catalog loadCatalog(String catalogName) throws NoSuchCatalogException;
+
+ @Override
+ default void close() {}
+ }
Review Comment:
The CatalogFetcher interface is defined as private within
DynamicIcebergConfigProvider, which is good for encapsulation. However, the
interface extends Closeable but provides a default no-op close() implementation
(line 176). This creates a potential resource leak if a future implementation
needs cleanup but forgets to override close().
The current implementations handle this correctly (HttpCatalogFetcher closes
the client, InternalCatalogFetcher has no resources), but consider removing the
default implementation to force implementers to explicitly handle cleanup, even
if it's a no-op. This makes resource management explicit and prevents future
mistakes.
##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java:
##########
@@ -9,7 +9,7 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
+ * Unless required by applicable law or agreed in writing,
Review Comment:
The Apache license header has been modified with a grammatical change from
"to" to "in writing" which deviates from the standard Apache 2.0 license text.
The standard wording is "Unless required by applicable law or agreed to in
writing". This change should be reverted to match the standard Apache License
2.0 header text.
```suggestion
* Unless required by applicable law or agreed to in writing,
```
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ValidatingCache.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache wrapper that validates cached entries against the current
configuration. When the config
+ * provider can check config changes efficiently, it compares the cached
wrapper's config with the
+ * current config. If the config has changed, the stale entry is invalidated
before returning.
+ */
+public class ValidatingCache {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ValidatingCache.class);
+
+ private final Cache<String, CatalogWrapperForREST> cache;
+ private final IcebergConfigProvider configProvider;
+
+ public ValidatingCache(
+ Cache<String, CatalogWrapperForREST> cache, IcebergConfigProvider
configProvider) {
+ this.cache = cache;
+ this.configProvider = configProvider;
+ }
+
+ /**
+ * Gets the value from the cache, validating it first if the provider can
check config changes. If
+ * the cached entry is stale (config has changed), it will be invalidated
and a new entry will be
+ * created.
+ *
+ * @param key The cache key (catalog name).
+ * @param mappingFunction The function to create a new value if not present
or invalidated.
+ * @return The cached or newly created CatalogWrapperForREST.
+ */
+ public CatalogWrapperForREST get(
+ String key, Function<String, CatalogWrapperForREST> mappingFunction) {
+ validateCachedEntry(key);
+ return cache.get(key, mappingFunction);
+ }
+
+ /** Invalidates all entries in the cache. */
+ public void invalidateAll() {
+ cache.invalidateAll();
+ }
+
+ /**
+ * Validates if the cached entry is still valid. When the provider can check
config changes
+ * efficiently, checks if the cached config matches the current config. If
not, invalidates the
+ * cache entry.
+ */
+ private void validateCachedEntry(String catalogName) {
+ if (!configProvider.canCheckConfigChange()) {
+ return;
+ }
+
+ CatalogWrapperForREST cachedWrapper = cache.getIfPresent(catalogName);
+ if (cachedWrapper == null) {
+ return;
+ }
+
+ Optional<IcebergConfig> currentConfig =
configProvider.getIcebergCatalogConfig(catalogName);
+ if (currentConfig.isEmpty()) {
+ return;
+ }
+
+ if (!isSameConfig(cachedWrapper.getIcebergConfig(), currentConfig.get())) {
+ LOG.info(
+ "Catalog {} config has changed, invalidating cache and creating new
wrapper.",
+ catalogName);
+ cache.invalidate(catalogName);
+ }
+ }
Review Comment:
There is a potential race condition in the validateCachedEntry method.
Between checking if the cached wrapper exists (line 77) and comparing configs
(line 87), another thread could invalidate the cache entry. While this won't
cause a crash, it could lead to unnecessary cache invalidation or comparing
against a null config.
Consider adding synchronization or using a more atomic approach to prevent
this race condition, especially since this code will be called frequently in a
multi-threaded REST service environment.
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java:
##########
@@ -63,4 +63,16 @@ default String getMetalakeName() {
default String getDefaultCatalogName() {
return IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG;
}
+
+ /**
+ * Indicates whether the catalog config can be checked for changes
efficiently. When true, the
+ * caller can compare cached config with current config to detect stale
entries. This is typically
+ * true only for providers that can fetch config without expensive
operations (e.g., internal
+ * calls that bypass HTTP).
+ *
+ * @return true if config changes can be detected efficiently, false
otherwise.
+ */
+ default boolean canCheckConfigChange() {
+ return false;
+ }
Review Comment:
The canCheckConfigChange method defaults to false, which means it's disabled
for all existing implementations except DynamicIcebergConfigProvider when
authorization is enabled. However, the documentation doesn't clearly explain
the performance implications or when this should return true vs false.
Consider expanding the JavaDoc to explain:
1. The performance cost of checking config changes (it calls
getIcebergCatalogConfig which could be expensive)
2. When implementations should return true (only when they can fetch config
efficiently without HTTP calls)
3. The tradeoff between cache freshness and performance
This will help future implementers make informed decisions.
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -142,54 +161,126 @@ public String getDefaultCatalogName() {
return
defaultDynamicCatalogName.orElse(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
}
- // client is lazy loaded because the Gravitino server may not be started yet
when the provider is
- // initialized.
- private GravitinoClient getGravitinoClient() {
- if (client != null) {
- return client;
+ @Override
+ public boolean canCheckConfigChange() {
+ // Only support checking config changes when using internal fetcher
(authorization enabled),
+ // as it can efficiently fetch current config without HTTP overhead.
+ return IcebergRESTServerContext.getInstance().isAuthorizationEnabled();
+ }
Review Comment:
The new canCheckConfigChange() method in IcebergConfigProvider interface is
not tested in TestDynamicIcebergConfigProvider. This method is critical for the
ValidatingCache functionality and should have test coverage to verify:
1. It returns true when authorization is enabled (internal fetcher mode)
2. It returns false when authorization is disabled (HTTP fetcher mode)
3. The ValidatingCache respects this flag and only validates when it returns
true
Add test cases to verify this behavior works correctly for both modes.
##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java:
##########
@@ -21,31 +21,100 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Catalog;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.CatalogDispatcher;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
-import org.apache.gravitino.client.GravitinoClient;
-import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
+import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class TestDynamicIcebergConfigProvider {
+
+ @BeforeEach
+ public void setUp() throws IllegalAccessException {
+ // Create IcebergRESTServerContext with authorization disabled by default
+ createMockServerContext(false);
+ }
+
+ @AfterEach
+ public void tearDown() throws IllegalAccessException {
+ // Clean up GravitinoEnv and IcebergRESTServerContext state after each test
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogDispatcher",
null, true);
+ resetServerContext();
+ }
Review Comment:
The setUp and tearDown methods manipulate shared singleton state
(IcebergRESTServerContext and GravitinoEnv) using reflection. If tests are run
in parallel, this could lead to test interference and flaky tests. The tests
don't appear to have any mechanism to prevent parallel execution.
Consider adding @Execution(ExecutionMode.SAME_THREAD) at the class level to
ensure tests run sequentially and don't interfere with each other's singleton
state. This is especially important since these tests modify global state that
could affect other tests.
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ValidatingCache.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache wrapper that validates cached entries against the current
configuration. When the config
+ * provider can check config changes efficiently, it compares the cached
wrapper's config with the
+ * current config. If the config has changed, the stale entry is invalidated
before returning.
+ */
+public class ValidatingCache {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ValidatingCache.class);
+
+ private final Cache<String, CatalogWrapperForREST> cache;
+ private final IcebergConfigProvider configProvider;
+
+ public ValidatingCache(
+ Cache<String, CatalogWrapperForREST> cache, IcebergConfigProvider
configProvider) {
+ this.cache = cache;
+ this.configProvider = configProvider;
+ }
+
+ /**
+ * Gets the value from the cache, validating it first if the provider can
check config changes. If
+ * the cached entry is stale (config has changed), it will be invalidated
and a new entry will be
+ * created.
+ *
+ * @param key The cache key (catalog name).
+ * @param mappingFunction The function to create a new value if not present
or invalidated.
+ * @return The cached or newly created CatalogWrapperForREST.
+ */
+ public CatalogWrapperForREST get(
+ String key, Function<String, CatalogWrapperForREST> mappingFunction) {
+ validateCachedEntry(key);
+ return cache.get(key, mappingFunction);
+ }
+
+ /** Invalidates all entries in the cache. */
+ public void invalidateAll() {
+ cache.invalidateAll();
+ }
+
+ /**
+ * Validates if the cached entry is still valid. When the provider can check
config changes
+ * efficiently, checks if the cached config matches the current config. If
not, invalidates the
+ * cache entry.
+ */
+ private void validateCachedEntry(String catalogName) {
+ if (!configProvider.canCheckConfigChange()) {
+ return;
+ }
+
+ CatalogWrapperForREST cachedWrapper = cache.getIfPresent(catalogName);
+ if (cachedWrapper == null) {
+ return;
+ }
+
+ Optional<IcebergConfig> currentConfig =
configProvider.getIcebergCatalogConfig(catalogName);
+ if (currentConfig.isEmpty()) {
+ return;
+ }
+
+ if (!isSameConfig(cachedWrapper.getIcebergConfig(), currentConfig.get())) {
+ LOG.info(
+ "Catalog {} config has changed, invalidating cache and creating new
wrapper.",
+ catalogName);
+ cache.invalidate(catalogName);
+ }
+ }
+
+ private boolean isSameConfig(IcebergConfig cachedConfig, IcebergConfig
currentConfig) {
+ return cachedConfig.getAllConfig().equals(currentConfig.getAllConfig());
+ }
+}
Review Comment:
The ValidatingCache class is a new critical component that validates cached
catalog wrappers against current configuration, but it has no dedicated test
coverage. This validation logic is essential for the internal catalog fetcher
feature and should have comprehensive unit tests covering:
1. Cache validation when config changes
2. Cache validation when config remains the same
3. Behavior when canCheckConfigChange() returns false
4. Behavior when currentConfig is empty
5. Edge cases with null or invalid configs
Consider adding a TestValidatingCache.java file to ensure this functionality
works correctly.
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -142,54 +161,126 @@ public String getDefaultCatalogName() {
return
defaultDynamicCatalogName.orElse(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
}
- // client is lazy loaded because the Gravitino server may not be started yet
when the provider is
- // initialized.
- private GravitinoClient getGravitinoClient() {
- if (client != null) {
- return client;
+ @Override
+ public boolean canCheckConfigChange() {
+ // Only support checking config changes when using internal fetcher
(authorization enabled),
+ // as it can efficiently fetch current config without HTTP overhead.
+ return IcebergRESTServerContext.getInstance().isAuthorizationEnabled();
+ }
+
+ /** Interface for fetching catalog information. */
+ private interface CatalogFetcher extends Closeable {
+ Catalog loadCatalog(String catalogName) throws NoSuchCatalogException;
+
+ @Override
+ default void close() {}
+ }
+
+ /**
+ * Internal catalog fetcher that uses CatalogDispatcher directly. This
bypasses the HTTP layer and
+ * is used when authorization is enabled.
+ *
+ * <p>Note: When authorization is enabled, IcebergCatalogWrapperManager
bypasses its cache to
+ * avoid consistency issues between the IcebergCatalogWrapper cache and
CatalogManager cache.
+ */
+ private static class InternalCatalogFetcher implements CatalogFetcher {
+ private final String metalake;
+
+ InternalCatalogFetcher(String metalake) {
+ this.metalake = metalake;
}
- synchronized (this) {
- if (client == null) {
- client = createGravitinoClient(gravitinoUri, gravitinoMetalake,
properties);
- }
+
+ @Override
+ public Catalog loadCatalog(String catalogName) throws
NoSuchCatalogException {
+ CatalogDispatcher catalogDispatcher =
GravitinoEnv.getInstance().catalogDispatcher();
+ Preconditions.checkState(
+ catalogDispatcher != null,
+ "CatalogDispatcher is not available. "
+ + "Internal catalog fetcher requires running within Gravitino
server.");
+ NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(metalake,
catalogName);
+ return catalogDispatcher.loadCatalog(catalogIdent);
}
- return client;
}
- private GravitinoClient createGravitinoClient(
- String uri, String metalake, Map<String, String> properties) {
- ClientBuilder builder =
GravitinoClient.builder(uri).withMetalake(metalake);
- String authType =
- properties.getOrDefault(
- IcebergConstants.GRAVITINO_AUTH_TYPE,
AuthProperties.SIMPLE_AUTH_TYPE);
- if (AuthProperties.isSimple(authType)) {
- String userName =
+ /**
+ * HTTP catalog fetcher that uses GravitinoClient to access catalogs via
REST API. This is used
+ * when authorization is not enabled.
+ */
+ private static class HttpCatalogFetcher implements CatalogFetcher {
+ private final String uri;
+ private final String metalake;
+ private final Map<String, String> properties;
+ private volatile GravitinoClient client;
+
+ HttpCatalogFetcher(String uri, String metalake, Map<String, String>
properties) {
+ this.uri = uri;
+ this.metalake = metalake;
+ this.properties = properties;
+ }
+
+ @Override
+ public Catalog loadCatalog(String catalogName) throws
NoSuchCatalogException {
+ return
getGravitinoClient().loadMetalake(metalake).loadCatalog(catalogName);
+ }
+
+ @Override
+ public void close() {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ // Client is lazy loaded because the Gravitino server may not be started
yet when the provider
+ // is initialized.
+ private GravitinoClient getGravitinoClient() {
+ if (client != null) {
+ return client;
+ }
+ synchronized (this) {
+ if (client == null) {
+ client = createGravitinoClient(uri, metalake, properties);
+ }
+ }
+ return client;
+ }
+
+ private GravitinoClient createGravitinoClient(
+ String uri, String metalake, Map<String, String> properties) {
+ ClientBuilder builder =
GravitinoClient.builder(uri).withMetalake(metalake);
+ String authType =
properties.getOrDefault(
- IcebergConstants.GRAVITINO_SIMPLE_USERNAME,
"iceberg-rest-server");
- builder.withSimpleAuth(userName);
- } else if (AuthProperties.isOAuth2(authType)) {
- String oAuthUri = getRequiredConfig(properties,
IcebergConstants.GRAVITINO_OAUTH2_SERVER_URI);
- String credential =
- getRequiredConfig(properties,
IcebergConstants.GRAVITINO_OAUTH2_CREDENTIAL);
- String path = getRequiredConfig(properties,
IcebergConstants.GRAVITINO_OAUTH2_TOKEN_PATH);
- String scope = getRequiredConfig(properties,
IcebergConstants.GRAVITINO_OAUTH2_SCOPE);
- DefaultOAuth2TokenProvider oAuth2TokenProvider =
- DefaultOAuth2TokenProvider.builder()
- .withUri(oAuthUri)
- .withCredential(credential)
- .withPath(path)
- .withScope(scope)
- .build();
- builder.withOAuth(oAuth2TokenProvider);
- } else {
- throw new UnsupportedOperationException("Unsupported auth type: " +
authType);
+ IcebergConstants.GRAVITINO_AUTH_TYPE,
AuthProperties.SIMPLE_AUTH_TYPE);
+ if (AuthProperties.isSimple(authType)) {
+ String userName =
+ properties.getOrDefault(
+ IcebergConstants.GRAVITINO_SIMPLE_USERNAME,
"iceberg-rest-server");
+ builder.withSimpleAuth(userName);
+ } else if (AuthProperties.isOAuth2(authType)) {
+ String oAuthUri =
+ getRequiredConfig(properties,
IcebergConstants.GRAVITINO_OAUTH2_SERVER_URI);
+ String credential =
+ getRequiredConfig(properties,
IcebergConstants.GRAVITINO_OAUTH2_CREDENTIAL);
+ String path = getRequiredConfig(properties,
IcebergConstants.GRAVITINO_OAUTH2_TOKEN_PATH);
+ String scope = getRequiredConfig(properties,
IcebergConstants.GRAVITINO_OAUTH2_SCOPE);
+ DefaultOAuth2TokenProvider oAuth2TokenProvider =
+ DefaultOAuth2TokenProvider.builder()
+ .withUri(oAuthUri)
+ .withCredential(credential)
+ .withPath(path)
+ .withScope(scope)
+ .build();
+ builder.withOAuth(oAuth2TokenProvider);
+ } else {
+ throw new UnsupportedOperationException("Unsupported auth type: " +
authType);
+ }
+ return builder.build();
}
- return builder.build();
- }
- private String getRequiredConfig(Map<String, String> properties, String key)
{
- String configValue = properties.get(key);
- Preconditions.checkArgument(StringUtils.isNotBlank(configValue), key + "
should not be empty");
- return configValue;
+ private String getRequiredConfig(Map<String, String> properties, String
key) {
+ String configValue = properties.get(key);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(configValue), key + " should not be empty");
+ return configValue;
+ }
}
Review Comment:
The HttpCatalogFetcher class duplicates the GravitinoClient creation logic
that was previously in DynamicIcebergConfigProvider. While this refactoring
provides good separation of concerns, consider extracting the client creation
logic into a separate static factory method to avoid code duplication and
improve maintainability. This would make it easier to update authentication
logic in the future and ensure consistency across different parts of the
codebase.
##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java:
##########
@@ -21,31 +21,100 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Catalog;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.CatalogDispatcher;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
-import org.apache.gravitino.client.GravitinoClient;
-import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
+import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class TestDynamicIcebergConfigProvider {
+
+ @BeforeEach
+ public void setUp() throws IllegalAccessException {
+ // Create IcebergRESTServerContext with authorization disabled by default
+ createMockServerContext(false);
+ }
+
+ @AfterEach
+ public void tearDown() throws IllegalAccessException {
+ // Clean up GravitinoEnv and IcebergRESTServerContext state after each test
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogDispatcher",
null, true);
+ resetServerContext();
+ }
+
+ private void createMockServerContext(boolean authorizationEnabled) throws
IllegalAccessException {
+ // Use reflection to set the IcebergRESTServerContext instance
+ IcebergConfigProvider mockProvider =
Mockito.mock(IcebergConfigProvider.class);
+ Mockito.when(mockProvider.getMetalakeName()).thenReturn("test_metalake");
+
Mockito.when(mockProvider.getDefaultCatalogName()).thenReturn("default_catalog");
+ IcebergRESTServerContext.create(mockProvider, authorizationEnabled);
+ }
+
+ private void resetServerContext() throws IllegalAccessException {
+ // Reset the IcebergRESTServerContext singleton
+ Class<?> holderClass =
IcebergRESTServerContext.class.getDeclaredClasses()[0];
+ FieldUtils.writeStaticField(holderClass, "INSTANCE", null, true);
+ }
Review Comment:
The resetServerContext method uses reflection to reset a static singleton
(lines 66-70). This approach has several issues:
1. It assumes the first declared class is the InstanceHolder, which is
fragile
2. If there are multiple inner classes or the order changes, this will break
3. The hardcoded array index [0] makes the code brittle
Consider making the reset method more robust by finding the class by name:
```java
private void resetServerContext() throws IllegalAccessException {
Class<?> holderClass =
Arrays.stream(IcebergRESTServerContext.class.getDeclaredClasses())
.filter(c -> c.getSimpleName().equals("InstanceHolder"))
.findFirst()
.orElseThrow(() -> new RuntimeException("InstanceHolder not found"));
FieldUtils.writeStaticField(holderClass, "INSTANCE", null, true);
}
```
Alternatively, consider adding a test-only reset method in
IcebergRESTServerContext marked with @VisibleForTesting.
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -142,54 +161,126 @@ public String getDefaultCatalogName() {
return
defaultDynamicCatalogName.orElse(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
}
- // client is lazy loaded because the Gravitino server may not be started yet
when the provider is
- // initialized.
- private GravitinoClient getGravitinoClient() {
- if (client != null) {
- return client;
+ @Override
+ public boolean canCheckConfigChange() {
+ // Only support checking config changes when using internal fetcher
(authorization enabled),
+ // as it can efficiently fetch current config without HTTP overhead.
+ return IcebergRESTServerContext.getInstance().isAuthorizationEnabled();
+ }
+
+ /** Interface for fetching catalog information. */
+ private interface CatalogFetcher extends Closeable {
+ Catalog loadCatalog(String catalogName) throws NoSuchCatalogException;
+
+ @Override
+ default void close() {}
+ }
+
+ /**
+ * Internal catalog fetcher that uses CatalogDispatcher directly. This
bypasses the HTTP layer and
+ * is used when authorization is enabled.
+ *
+ * <p>Note: When authorization is enabled, IcebergCatalogWrapperManager
bypasses its cache to
+ * avoid consistency issues between the IcebergCatalogWrapper cache and
CatalogManager cache.
+ */
+ private static class InternalCatalogFetcher implements CatalogFetcher {
+ private final String metalake;
+
+ InternalCatalogFetcher(String metalake) {
+ this.metalake = metalake;
}
- synchronized (this) {
- if (client == null) {
- client = createGravitinoClient(gravitinoUri, gravitinoMetalake,
properties);
- }
+
+ @Override
+ public Catalog loadCatalog(String catalogName) throws
NoSuchCatalogException {
+ CatalogDispatcher catalogDispatcher =
GravitinoEnv.getInstance().catalogDispatcher();
+ Preconditions.checkState(
+ catalogDispatcher != null,
+ "CatalogDispatcher is not available. "
+ + "Internal catalog fetcher requires running within Gravitino
server.");
+ NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(metalake,
catalogName);
+ return catalogDispatcher.loadCatalog(catalogIdent);
}
Review Comment:
The InternalCatalogFetcher's loadCatalog method checks if catalogDispatcher
is null and throws an IllegalStateException with a helpful message (lines
196-199). However, this check happens on every catalog load operation. Since
the catalogDispatcher should be set once during server initialization and never
change, consider moving this check to the constructor of InternalCatalogFetcher
instead.
This would:
1. Fail-fast during initialization rather than at runtime
2. Eliminate the repeated null check overhead on every catalog load
3. Make the class invariant clearer (catalogDispatcher must always be
available)
The constructor could look like:
```java
InternalCatalogFetcher(String metalake) {
this.metalake = metalake;
CatalogDispatcher dispatcher =
GravitinoEnv.getInstance().catalogDispatcher();
Preconditions.checkState(dispatcher != null,
"CatalogDispatcher is not available. Internal catalog fetcher requires
running within Gravitino server.");
this.catalogDispatcher = dispatcher;
}
```
##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java:
##########
@@ -170,4 +258,84 @@ void testIcebergConfig() {
Assertions.assertEquals(
icebergConfig.getIcebergCatalogProperties().get("catalog.backend-name"),
"custom_backend");
}
+
+ @Test
+ public void testInternalCatalogFetcher() throws IllegalAccessException {
+ String metalakeName = "test_metalake";
+ String catalogName = "internal_catalog";
+
+ // Enable authorization to use internal fetcher
+ createMockServerContext(true);
+
+ // Mock CatalogDispatcher
+ CatalogDispatcher mockCatalogDispatcher =
Mockito.mock(CatalogDispatcher.class);
+ Catalog mockCatalog = Mockito.mock(Catalog.class);
+
+ NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(metalakeName,
catalogName);
+
Mockito.when(mockCatalogDispatcher.loadCatalog(catalogIdent)).thenReturn(mockCatalog);
+ Mockito.when(mockCatalog.provider()).thenReturn("lakehouse-iceberg");
+ Mockito.when(mockCatalog.properties())
+ .thenReturn(
+ new HashMap<String, String>() {
+ {
+ put(IcebergConstants.CATALOG_BACKEND, "custom");
+ put(IcebergConstants.CATALOG_BACKEND_NAME, catalogName);
+ }
+ });
+
+ // Set the mock CatalogDispatcher to GravitinoEnv
+ FieldUtils.writeField(
+ GravitinoEnv.getInstance(), "catalogDispatcher",
mockCatalogDispatcher, true);
+
+ // Initialize provider with required properties
+ Map<String, String> properties = new HashMap<>();
+ properties.put(IcebergConstants.GRAVITINO_METALAKE, metalakeName);
+
+ DynamicIcebergConfigProvider provider = new DynamicIcebergConfigProvider();
+ provider.initialize(properties);
+
+ // Test that internal interface is used (CatalogDispatcher should be
called)
+ Optional<IcebergConfig> icebergConfig =
provider.getIcebergCatalogConfig(catalogName);
+
+ Assertions.assertTrue(icebergConfig.isPresent());
+ Mockito.verify(mockCatalogDispatcher).loadCatalog(catalogIdent);
+ }
Review Comment:
The test testInternalCatalogFetcher verifies that the InternalCatalogFetcher
is used when authorization is enabled, but it only tests the happy path.
Consider adding additional test cases for:
1. Error handling when CatalogDispatcher is null (before GravitinoEnv is
initialized)
2. NoSuchCatalogException propagation from CatalogDispatcher
3. Thread safety when multiple threads access the provider concurrently
These edge cases are important to ensure the internal fetcher behaves
correctly in production scenarios.
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java:
##########
@@ -142,54 +161,126 @@ public String getDefaultCatalogName() {
return
defaultDynamicCatalogName.orElse(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG);
}
- // client is lazy loaded because the Gravitino server may not be started yet
when the provider is
- // initialized.
- private GravitinoClient getGravitinoClient() {
- if (client != null) {
- return client;
+ @Override
+ public boolean canCheckConfigChange() {
+ // Only support checking config changes when using internal fetcher
(authorization enabled),
+ // as it can efficiently fetch current config without HTTP overhead.
+ return IcebergRESTServerContext.getInstance().isAuthorizationEnabled();
+ }
+
+ /** Interface for fetching catalog information. */
+ private interface CatalogFetcher extends Closeable {
+ Catalog loadCatalog(String catalogName) throws NoSuchCatalogException;
+
+ @Override
+ default void close() {}
+ }
+
+ /**
+ * Internal catalog fetcher that uses CatalogDispatcher directly. This
bypasses the HTTP layer and
+ * is used when authorization is enabled.
+ *
+ * <p>Note: When authorization is enabled, IcebergCatalogWrapperManager
bypasses its cache to
+ * avoid consistency issues between the IcebergCatalogWrapper cache and
CatalogManager cache.
+ */
+ private static class InternalCatalogFetcher implements CatalogFetcher {
+ private final String metalake;
+
+ InternalCatalogFetcher(String metalake) {
+ this.metalake = metalake;
}
- synchronized (this) {
- if (client == null) {
- client = createGravitinoClient(gravitinoUri, gravitinoMetalake,
properties);
- }
+
+ @Override
+ public Catalog loadCatalog(String catalogName) throws
NoSuchCatalogException {
+ CatalogDispatcher catalogDispatcher =
GravitinoEnv.getInstance().catalogDispatcher();
+ Preconditions.checkState(
+ catalogDispatcher != null,
+ "CatalogDispatcher is not available. "
+ + "Internal catalog fetcher requires running within Gravitino
server.");
+ NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(metalake,
catalogName);
+ return catalogDispatcher.loadCatalog(catalogIdent);
}
- return client;
}
- private GravitinoClient createGravitinoClient(
- String uri, String metalake, Map<String, String> properties) {
- ClientBuilder builder =
GravitinoClient.builder(uri).withMetalake(metalake);
- String authType =
- properties.getOrDefault(
- IcebergConstants.GRAVITINO_AUTH_TYPE,
AuthProperties.SIMPLE_AUTH_TYPE);
- if (AuthProperties.isSimple(authType)) {
- String userName =
+ /**
+ * HTTP catalog fetcher that uses GravitinoClient to access catalogs via
REST API. This is used
+ * when authorization is not enabled.
+ */
+ private static class HttpCatalogFetcher implements CatalogFetcher {
+ private final String uri;
+ private final String metalake;
+ private final Map<String, String> properties;
+ private volatile GravitinoClient client;
+
+ HttpCatalogFetcher(String uri, String metalake, Map<String, String>
properties) {
+ this.uri = uri;
+ this.metalake = metalake;
+ this.properties = properties;
+ }
+
+ @Override
+ public Catalog loadCatalog(String catalogName) throws
NoSuchCatalogException {
+ return
getGravitinoClient().loadMetalake(metalake).loadCatalog(catalogName);
+ }
+
+ @Override
+ public void close() {
+ if (client != null) {
+ client.close();
Review Comment:
The close() method in HttpCatalogFetcher has the same concurrency issue as
the outer class. It accesses the volatile client field without synchronization
(lines 227-230), while getGravitinoClient() uses double-checked locking with
synchronized blocks. This creates a race condition where close() might close a
client that another thread is initializing or about to use.
Consider synchronizing the close() method to prevent this race condition:
```java
@Override
public void close() {
synchronized (this) {
if (client != null) {
client.close();
client = null;
}
}
}
```
Setting client to null after closing also helps prevent use-after-close
scenarios.
```suggestion
synchronized (this) {
if (client != null) {
client.close();
client = null;
}
```
##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java:
##########
@@ -21,31 +21,100 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Catalog;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.CatalogDispatcher;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
-import org.apache.gravitino.client.GravitinoClient;
-import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import
org.apache.gravitino.iceberg.service.authorization.IcebergRESTServerContext;
+import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class TestDynamicIcebergConfigProvider {
+
+ @BeforeEach
+ public void setUp() throws IllegalAccessException {
+ // Create IcebergRESTServerContext with authorization disabled by default
+ createMockServerContext(false);
+ }
+
+ @AfterEach
+ public void tearDown() throws IllegalAccessException {
+ // Clean up GravitinoEnv and IcebergRESTServerContext state after each test
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "catalogDispatcher",
null, true);
+ resetServerContext();
+ }
+
+ private void createMockServerContext(boolean authorizationEnabled) throws
IllegalAccessException {
+ // Use reflection to set the IcebergRESTServerContext instance
+ IcebergConfigProvider mockProvider =
Mockito.mock(IcebergConfigProvider.class);
+ Mockito.when(mockProvider.getMetalakeName()).thenReturn("test_metalake");
+
Mockito.when(mockProvider.getDefaultCatalogName()).thenReturn("default_catalog");
+ IcebergRESTServerContext.create(mockProvider, authorizationEnabled);
+ }
+
+ private void resetServerContext() throws IllegalAccessException {
+ // Reset the IcebergRESTServerContext singleton
+ Class<?> holderClass =
IcebergRESTServerContext.class.getDeclaredClasses()[0];
+ FieldUtils.writeStaticField(holderClass, "INSTANCE", null, true);
+ }
+
+ /**
+ * Creates a mock CatalogFetcher using reflection to set it on the provider.
This avoids directly
+ * accessing the private CatalogFetcher interface.
+ */
+ private void setMockCatalogFetcher(
+ DynamicIcebergConfigProvider provider, Map<String, Catalog> catalogMap)
+ throws IllegalAccessException {
+ // Create a mock object that implements the private CatalogFetcher
interface via reflection
+ Object mockFetcher =
+ java.lang.reflect.Proxy.newProxyInstance(
+ getClass().getClassLoader(),
+ new Class<?>[] {getCatalogFetcherInterface()},
+ (proxy, method, args) -> {
+ if ("loadCatalog".equals(method.getName())) {
+ String catalogName = (String) args[0];
+ Catalog catalog = catalogMap.get(catalogName);
+ if (catalog == null) {
+ throw new NoSuchCatalogException("Catalog not found: %s",
catalogName);
+ }
+ return catalog;
+ } else if ("close".equals(method.getName())) {
+ return null;
+ }
+ return null;
+ });
+ FieldUtils.writeField(provider, "catalogFetcher", mockFetcher, true);
+ }
+
+ private Class<?> getCatalogFetcherInterface() {
+ for (Class<?> innerClass :
DynamicIcebergConfigProvider.class.getDeclaredClasses()) {
+ if (innerClass.getSimpleName().equals("CatalogFetcher")) {
+ return innerClass;
+ }
+ }
+ throw new RuntimeException("CatalogFetcher interface not found");
+ }
Review Comment:
The test uses reflection to access and mock the private CatalogFetcher
interface (lines 76-107). While this approach works, it creates tight coupling
between the test and the internal implementation details of
DynamicIcebergConfigProvider. If the internal structure changes (e.g.,
CatalogFetcher is renamed or moved), these tests will break.
Consider one of these alternatives:
1. Make CatalogFetcher package-private instead of private, allowing test
access without reflection
2. Add a test-only constructor or setter that accepts a CatalogFetcher
(marked with @VisibleForTesting)
3. Use PowerMock or similar frameworks that handle this more elegantly
This would make the tests more maintainable and less fragile to internal
refactoring.
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ValidatingCache.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache wrapper that validates cached entries against the current
configuration. When the config
+ * provider can check config changes efficiently, it compares the cached
wrapper's config with the
+ * current config. If the config has changed, the stale entry is invalidated
before returning.
+ */
+public class ValidatingCache {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ValidatingCache.class);
+
+ private final Cache<String, CatalogWrapperForREST> cache;
+ private final IcebergConfigProvider configProvider;
+
+ public ValidatingCache(
+ Cache<String, CatalogWrapperForREST> cache, IcebergConfigProvider
configProvider) {
+ this.cache = cache;
+ this.configProvider = configProvider;
+ }
+
+ /**
+ * Gets the value from the cache, validating it first if the provider can
check config changes. If
+ * the cached entry is stale (config has changed), it will be invalidated
and a new entry will be
+ * created.
+ *
+ * @param key The cache key (catalog name).
+ * @param mappingFunction The function to create a new value if not present
or invalidated.
+ * @return The cached or newly created CatalogWrapperForREST.
+ */
+ public CatalogWrapperForREST get(
+ String key, Function<String, CatalogWrapperForREST> mappingFunction) {
+ validateCachedEntry(key);
+ return cache.get(key, mappingFunction);
+ }
+
+ /** Invalidates all entries in the cache. */
+ public void invalidateAll() {
+ cache.invalidateAll();
+ }
+
+ /**
+ * Validates if the cached entry is still valid. When the provider can check
config changes
+ * efficiently, checks if the cached config matches the current config. If
not, invalidates the
+ * cache entry.
+ */
+ private void validateCachedEntry(String catalogName) {
+ if (!configProvider.canCheckConfigChange()) {
+ return;
+ }
+
+ CatalogWrapperForREST cachedWrapper = cache.getIfPresent(catalogName);
+ if (cachedWrapper == null) {
+ return;
+ }
+
+ Optional<IcebergConfig> currentConfig =
configProvider.getIcebergCatalogConfig(catalogName);
+ if (currentConfig.isEmpty()) {
Review Comment:
In the validateCachedEntry method, if currentConfig is empty (line 83-85),
the method returns early without invalidating the cache. However, this could
leave stale entries in the cache if a catalog was deleted from Gravitino. When
a catalog is removed, getIcebergCatalogConfig should return Optional.empty(),
but the cached entry will remain valid indefinitely.
Consider invalidating the cache entry when currentConfig is empty to ensure
removed catalogs are properly cleaned up:
```java
if (currentConfig.isEmpty()) {
LOG.info("Catalog {} no longer exists, invalidating cache.", catalogName);
cache.invalidate(catalogName);
return;
}
```
```suggestion
if (currentConfig.isEmpty()) {
LOG.info("Catalog {} no longer exists, invalidating cache.",
catalogName);
cache.invalidate(catalogName);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]