This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 082bbdc157 [#5620] feat(fileset): Support credential vending for 
fileset catalog (#5682)
082bbdc157 is described below

commit 082bbdc157206b43d07997025acb608aa8478e2a
Author: FANNG <xiaoj...@datastrato.com>
AuthorDate: Thu Dec 26 12:19:27 2024 +0800

    [#5620] feat(fileset): Support credential vending for fileset catalog 
(#5682)
    
    ### What changes were proposed in this pull request?
    
    Support credential vending for fileset catalog
    
    1. add `credential-providers` properties for the fileset catalog,
    schema, and fileset.
    2. try to get `credential-providers` from the order of fileset, schema,
    and catalog.
    3. The user could set multi-credential providers
    
    
    ### Why are the changes needed?
    
    Fix: #5620
    
    ### Does this PR introduce _any_ user-facing change?
    
    will add document after this PR is merged
    
    ### How was this patch tested?
    Add IT and test with local setup Gravitino server
---
 bundles/aws-bundle/build.gradle.kts                |   2 +
 .../gravitino/credential/CredentialConstants.java  |   1 +
 .../hadoop/HadoopCatalogPropertiesMetadata.java    |   2 +
 .../hadoop/HadoopFilesetPropertiesMetadata.java    |   2 +
 .../hadoop/HadoopSchemaPropertiesMetadata.java     |   2 +
 .../hadoop/SecureHadoopCatalogOperations.java      |  63 ++++++--
 .../test/FilesetCatalogCredentialIT.java           | 160 +++++++++++++++++++++
 .../java/org/apache/gravitino/GravitinoEnv.java    |  15 +-
 .../apache/gravitino/catalog/CatalogManager.java   |   5 +
 .../gravitino/catalog/CredentialManager.java       |  53 -------
 .../apache/gravitino/connector/BaseCatalog.java    |  19 +++
 .../connector/credential/PathContext.java          |  63 ++++++++
 .../credential/SupportsPathBasedCredentials.java   |  43 ++++++
 .../credential/CatalogCredentialManager.java       |  70 +++++++++
 .../credential/CredentialOperationDispatcher.java  | 124 ++++++++++++++++
 ...edentialUtils.java => CredentialPrivilege.java} |  14 +-
 .../gravitino/credential/CredentialUtils.java      |  63 +++++++-
 .../credential/config/CredentialConfig.java        |  42 ++++++
 .../credential/Dummy2CredentialProvider.java       |  89 ++++++++++++
 .../gravitino/credential/TestCredentialUtils.java  |  66 +++++++++
 ....apache.gravitino.credential.CredentialProvider |   3 +-
 .../apache/gravitino/server/GravitinoServer.java   |   6 +-
 .../rest/MetadataObjectCredentialOperations.java   |  24 +++-
 .../TestMetadataObjectCredentialOperations.java    |  13 +-
 24 files changed, 845 insertions(+), 99 deletions(-)

diff --git a/bundles/aws-bundle/build.gradle.kts 
b/bundles/aws-bundle/build.gradle.kts
index 94c7d1cb2c..3af5c8b4f3 100644
--- a/bundles/aws-bundle/build.gradle.kts
+++ b/bundles/aws-bundle/build.gradle.kts
@@ -37,6 +37,7 @@ dependencies {
   implementation(libs.aws.iam)
   implementation(libs.aws.policy)
   implementation(libs.aws.sts)
+  implementation(libs.commons.lang3)
   implementation(libs.hadoop3.aws)
   implementation(project(":catalogs:catalog-common")) {
     exclude("*")
@@ -46,6 +47,7 @@ dependencies {
 tasks.withType(ShadowJar::class.java) {
   isZip64 = true
   configurations = listOf(project.configurations.runtimeClasspath.get())
+  relocate("org.apache.commons", 
"org.apache.gravitino.aws.shaded.org.apache.commons")
   archiveClassifier.set("")
 }
 
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
index 29f9241c89..d2753f24b5 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.credential;
 
 public class CredentialConstants {
   public static final String CREDENTIAL_PROVIDER_TYPE = 
"credential-provider-type";
+  public static final String CREDENTIAL_PROVIDERS = "credential-providers";
   public static final String S3_TOKEN_CREDENTIAL_PROVIDER = "s3-token";
   public static final String S3_TOKEN_EXPIRE_IN_SECS = 
"s3-token-expire-in-secs";
 
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
index 397e13aa4a..22cf0d5b2c 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
@@ -27,6 +27,7 @@ import 
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 import org.apache.gravitino.catalog.hadoop.fs.LocalFileSystemProvider;
 import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.credential.config.CredentialConfig;
 
 public class HadoopCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetadata {
 
@@ -84,6 +85,7 @@ public class HadoopCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetada
           // The following two are about authentication.
           .putAll(KERBEROS_PROPERTY_ENTRIES)
           .putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
+          .putAll(CredentialConfig.CREDENTIAL_PROPERTY_ENTRIES)
           .build();
 
   @Override
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
index 250a48d292..84862dd094 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
@@ -24,6 +24,7 @@ import 
org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
 import 
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
 import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.credential.config.CredentialConfig;
 
 public class HadoopFilesetPropertiesMetadata extends BasePropertiesMetadata {
 
@@ -32,6 +33,7 @@ public class HadoopFilesetPropertiesMetadata extends 
BasePropertiesMetadata {
     ImmutableMap.Builder<String, PropertyEntry<?>> builder = 
ImmutableMap.builder();
     builder.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
     builder.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES);
+    builder.putAll(CredentialConfig.CREDENTIAL_PROPERTY_ENTRIES);
     return builder.build();
   }
 }
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
index 8892433ac6..9028cc48f3 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchemaPropertiesMetadata.java
@@ -24,6 +24,7 @@ import 
org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
 import 
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
 import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.credential.config.CredentialConfig;
 
 public class HadoopSchemaPropertiesMetadata extends BasePropertiesMetadata {
 
@@ -49,6 +50,7 @@ public class HadoopSchemaPropertiesMetadata extends 
BasePropertiesMetadata {
                   false /* hidden */))
           .putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
           .putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
+          .putAll(CredentialConfig.CREDENTIAL_PROPERTY_ENTRIES)
           .build();
 
   @Override
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
index 2180e45d42..7ae10805b5 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
@@ -20,11 +20,16 @@
 package org.apache.gravitino.catalog.hadoop;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 import javax.security.auth.Subject;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
@@ -38,6 +43,9 @@ import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.SupportsSchemas;
+import org.apache.gravitino.connector.credential.PathContext;
+import org.apache.gravitino.connector.credential.SupportsPathBasedCredentials;
+import org.apache.gravitino.credential.CredentialUtils;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -50,13 +58,14 @@ import org.apache.gravitino.file.FilesetCatalog;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("removal")
 public class SecureHadoopCatalogOperations
-    implements CatalogOperations, SupportsSchemas, FilesetCatalog {
+    implements CatalogOperations, SupportsSchemas, FilesetCatalog, 
SupportsPathBasedCredentials {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(SecureHadoopCatalogOperations.class);
 
@@ -66,6 +75,8 @@ public class SecureHadoopCatalogOperations
 
   private UserContext catalogUserContext;
 
+  private Map<String, String> catalogProperties;
+
   public SecureHadoopCatalogOperations() {
     this.hadoopCatalogOperations = new HadoopCatalogOperations();
   }
@@ -74,6 +85,20 @@ public class SecureHadoopCatalogOperations
     this.hadoopCatalogOperations = new HadoopCatalogOperations(store);
   }
 
+  @Override
+  public void initialize(
+      Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
+      throws RuntimeException {
+    hadoopCatalogOperations.initialize(config, info, propertiesMetadata);
+    this.catalogUserContext =
+        UserContext.getUserContext(
+            NameIdentifier.of(info.namespace(), info.name()),
+            config,
+            hadoopCatalogOperations.getHadoopConf(),
+            info);
+    this.catalogProperties = info.properties();
+  }
+
   @VisibleForTesting
   public HadoopCatalogOperations getBaseHadoopCatalogOperations() {
     return hadoopCatalogOperations;
@@ -163,19 +188,6 @@ public class SecureHadoopCatalogOperations
     }
   }
 
-  @Override
-  public void initialize(
-      Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
-      throws RuntimeException {
-    hadoopCatalogOperations.initialize(config, info, propertiesMetadata);
-    catalogUserContext =
-        UserContext.getUserContext(
-            NameIdentifier.of(info.namespace(), info.name()),
-            config,
-            hadoopCatalogOperations.getHadoopConf(),
-            info);
-  }
-
   @Override
   public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
       throws NoSuchFilesetException, IllegalArgumentException {
@@ -245,6 +257,29 @@ public class SecureHadoopCatalogOperations
     hadoopCatalogOperations.testConnection(catalogIdent, type, provider, 
comment, properties);
   }
 
+  @Override
+  public List<PathContext> getPathContext(NameIdentifier filesetIdentifier) {
+    Fileset fileset = loadFileset(filesetIdentifier);
+    String path = fileset.storageLocation();
+    Preconditions.checkState(
+        StringUtils.isNotBlank(path), "The location of fileset should not be 
empty.");
+
+    Set<String> providers =
+        CredentialUtils.getCredentialProvidersByOrder(
+            () -> fileset.properties(),
+            () -> {
+              Namespace namespace = filesetIdentifier.namespace();
+              NameIdentifier schemaIdentifier =
+                  NameIdentifierUtil.ofSchema(
+                      namespace.level(0), namespace.level(1), 
namespace.level(2));
+              return loadSchema(schemaIdentifier).properties();
+            },
+            () -> catalogProperties);
+    return providers.stream()
+        .map(provider -> new PathContext(path, provider))
+        .collect(Collectors.toList());
+  }
+
   /**
    * Add the user to the subject so that we can get the last user in the 
subject. Hadoop catalog
    * uses this method to pass api user from the client side, so that we can 
get the user in the
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
new file mode 100644
index 0000000000..94239fef28
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/FilesetCatalogCredentialIT.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
+import org.apache.gravitino.credential.S3TokenCredential;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.storage.S3Properties;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@EnabledIfEnvironmentVariable(named = "GRAVITINO_TEST_CLOUD_IT", matches = 
"true")
+public class FilesetCatalogCredentialIT extends BaseIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilesetCatalogCredentialIT.class);
+
+  public static final String BUCKET_NAME = System.getenv("S3_BUCKET_NAME");
+  public static final String S3_ACCESS_KEY = System.getenv("S3_ACCESS_KEY_ID");
+  public static final String S3_SECRET_KEY = 
System.getenv("S3_SECRET_ACCESS_KEY");
+  public static final String S3_ROLE_ARN = System.getenv("S3_ROLE_ARN");
+
+  private String metalakeName = 
GravitinoITUtils.genRandomName("gvfs_it_metalake");
+  private String catalogName = GravitinoITUtils.genRandomName("catalog");
+  private String schemaName = GravitinoITUtils.genRandomName("schema");
+  private GravitinoMetalake metalake;
+
+  @BeforeAll
+  public void startIntegrationTest() {
+    // Do nothing
+  }
+
+  @BeforeAll
+  public void startUp() throws Exception {
+    copyBundleJarsToHadoop("aws-bundle");
+    // Need to download jars to gravitino server
+    super.startIntegrationTest();
+
+    metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+    catalogName = GravitinoITUtils.genRandomName("catalog");
+    schemaName = GravitinoITUtils.genRandomName("schema");
+
+    Assertions.assertFalse(client.metalakeExists(metalakeName));
+    metalake = client.createMetalake(metalakeName, "metalake comment", 
Collections.emptyMap());
+    Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(FILESYSTEM_PROVIDERS, "s3");
+    properties.put(
+        CredentialConstants.CREDENTIAL_PROVIDERS,
+        S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE
+            + ","
+            + S3SecretKeyCredential.S3_SECRET_KEY_CREDENTIAL_TYPE);
+    properties.put(
+        CredentialConstants.CREDENTIAL_PROVIDER_TYPE,
+        S3SecretKeyCredential.S3_SECRET_KEY_CREDENTIAL_TYPE);
+    properties.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
+    properties.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
+    properties.put(S3Properties.GRAVITINO_S3_ENDPOINT, 
"s3.ap-southeast-2.amazonaws.com");
+    properties.put(S3Properties.GRAVITINO_S3_REGION, "ap-southeast-2");
+    properties.put(S3Properties.GRAVITINO_S3_ROLE_ARN, S3_ROLE_ARN);
+
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", 
properties);
+    Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+    catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+    Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+  }
+
+  @AfterAll
+  public void tearDown() throws IOException {
+    Catalog catalog = metalake.loadCatalog(catalogName);
+    catalog.asSchemas().dropSchema(schemaName, true);
+    metalake.dropCatalog(catalogName, true);
+    client.dropMetalake(metalakeName, true);
+
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Exception in closing CloseableGroup", e);
+    }
+  }
+
+  protected String genStorageLocation(String fileset) {
+    return String.format("s3a://%s/%s", BUCKET_NAME, fileset);
+  }
+
+  @Test
+  void testGetCatalogCredential() {
+    Catalog catalog = metalake.loadCatalog(catalogName);
+    Credential[] credentials = catalog.supportsCredentials().getCredentials();
+    Assertions.assertEquals(1, credentials.length);
+    Assertions.assertTrue(credentials[0] instanceof S3SecretKeyCredential);
+  }
+
+  @Test
+  void testGetFilesetCredential() {
+    String filesetName = 
GravitinoITUtils.genRandomName("test_fileset_credential");
+    NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
+    Catalog catalog = metalake.loadCatalog(catalogName);
+    String storageLocation = genStorageLocation(filesetName);
+    catalog
+        .asFilesetCatalog()
+        .createFileset(
+            filesetIdent,
+            "fileset comment",
+            Fileset.Type.MANAGED,
+            storageLocation,
+            ImmutableMap.of(
+                CredentialConstants.CREDENTIAL_PROVIDERS,
+                S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE));
+
+    Fileset fileset = catalog.asFilesetCatalog().loadFileset(filesetIdent);
+    Credential[] credentials = fileset.supportsCredentials().getCredentials();
+    Assertions.assertEquals(1, credentials.length);
+    Assertions.assertTrue(credentials[0] instanceof S3TokenCredential);
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 96c60b834f..57f04a0cfb 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -28,7 +28,6 @@ import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
 import org.apache.gravitino.catalog.CatalogDispatcher;
 import org.apache.gravitino.catalog.CatalogManager;
 import org.apache.gravitino.catalog.CatalogNormalizeDispatcher;
-import org.apache.gravitino.catalog.CredentialManager;
 import org.apache.gravitino.catalog.FilesetDispatcher;
 import org.apache.gravitino.catalog.FilesetNormalizeDispatcher;
 import org.apache.gravitino.catalog.FilesetOperationDispatcher;
@@ -47,6 +46,7 @@ import org.apache.gravitino.catalog.TableOperationDispatcher;
 import org.apache.gravitino.catalog.TopicDispatcher;
 import org.apache.gravitino.catalog.TopicNormalizeDispatcher;
 import org.apache.gravitino.catalog.TopicOperationDispatcher;
+import org.apache.gravitino.credential.CredentialOperationDispatcher;
 import org.apache.gravitino.hook.AccessControlHookDispatcher;
 import org.apache.gravitino.hook.CatalogHookDispatcher;
 import org.apache.gravitino.hook.FilesetHookDispatcher;
@@ -108,7 +108,7 @@ public class GravitinoEnv {
 
   private MetalakeDispatcher metalakeDispatcher;
 
-  private CredentialManager credentialManager;
+  private CredentialOperationDispatcher credentialOperationDispatcher;
 
   private TagDispatcher tagDispatcher;
 
@@ -264,12 +264,12 @@ public class GravitinoEnv {
   }
 
   /**
-   * Get the {@link CredentialManager} associated with the Gravitino 
environment.
+   * Get the {@link CredentialOperationDispatcher} associated with the 
Gravitino environment.
    *
-   * @return The {@link CredentialManager} instance.
+   * @return The {@link CredentialOperationDispatcher} instance.
    */
-  public CredentialManager credentialManager() {
-    return credentialManager;
+  public CredentialOperationDispatcher credentialOperationDispatcher() {
+    return credentialOperationDispatcher;
   }
 
   /**
@@ -432,7 +432,8 @@ public class GravitinoEnv {
         new CatalogNormalizeDispatcher(catalogHookDispatcher);
     this.catalogDispatcher = new CatalogEventDispatcher(eventBus, 
catalogNormalizeDispatcher);
 
-    this.credentialManager = new CredentialManager(catalogManager, 
entityStore, idGenerator);
+    this.credentialOperationDispatcher =
+        new CredentialOperationDispatcher(catalogManager, entityStore, 
idGenerator);
 
     SchemaOperationDispatcher schemaOperationDispatcher =
         new SchemaOperationDispatcher(catalogManager, entityStore, 
idGenerator);
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 4a46952f87..1e9c1d9d94 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -126,6 +126,7 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
 
   /** Wrapper class for a catalog instance and its class loader. */
   public static class CatalogWrapper {
+
     private BaseCatalog catalog;
     private IsolatedClassLoader classLoader;
 
@@ -169,6 +170,10 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
           });
     }
 
+    public <R> R doWithCredentialOps(ThrowableFunction<BaseCatalog, R> fn) 
throws Exception {
+      return classLoader.withClassLoader(cl -> fn.apply(catalog));
+    }
+
     public <R> R doWithTopicOps(ThrowableFunction<TopicCatalog, R> fn) throws 
Exception {
       return classLoader.withClassLoader(
           cl -> {
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CredentialManager.java 
b/core/src/main/java/org/apache/gravitino/catalog/CredentialManager.java
deleted file mode 100644
index 808fc96fb0..0000000000
--- a/core/src/main/java/org/apache/gravitino/catalog/CredentialManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.gravitino.catalog;
-
-import java.util.List;
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.gravitino.EntityStore;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.connector.BaseCatalog;
-import org.apache.gravitino.credential.Credential;
-import org.apache.gravitino.exceptions.NoSuchCatalogException;
-import org.apache.gravitino.storage.IdGenerator;
-import org.apache.gravitino.utils.NameIdentifierUtil;
-
-/** Get credentials with the specific catalog classloader. */
-public class CredentialManager extends OperationDispatcher {
-
-  public CredentialManager(
-      CatalogManager catalogManager, EntityStore store, IdGenerator 
idGenerator) {
-    super(catalogManager, store, idGenerator);
-  }
-
-  public List<Credential> getCredentials(NameIdentifier identifier) {
-    return doWithCatalog(
-        NameIdentifierUtil.getCatalogIdentifier(identifier),
-        c -> getCredentials(c.catalog(), identifier),
-        NoSuchCatalogException.class);
-  }
-
-  private List<Credential> getCredentials(BaseCatalog catalog, NameIdentifier 
identifier) {
-    throw new NotImplementedException(
-        String.format(
-            "Load credentials is not implemented for catalog: %s, identifier: 
%s",
-            catalog.name(), identifier));
-  }
-}
diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java 
b/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
index 218c2a428b..14b1912b4d 100644
--- a/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
+++ b/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
@@ -31,6 +31,7 @@ import org.apache.gravitino.annotation.Evolving;
 import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
 import org.apache.gravitino.connector.authorization.BaseAuthorization;
 import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.credential.CatalogCredentialManager;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.utils.IsolatedClassLoader;
 import org.slf4j.Logger;
@@ -51,6 +52,7 @@ import org.slf4j.LoggerFactory;
 @Evolving
 public abstract class BaseCatalog<T extends BaseCatalog>
     implements Catalog, CatalogProvider, HasPropertyMetadata, Closeable {
+
   private static final Logger LOG = LoggerFactory.getLogger(BaseCatalog.class);
 
   // This variable is used as a key in properties of catalogs to inject custom 
operation to
@@ -72,6 +74,8 @@ public abstract class BaseCatalog<T extends BaseCatalog>
 
   private volatile Map<String, String> properties;
 
+  private volatile CatalogCredentialManager catalogCredentialManager;
+
   private static String ENTITY_IS_NOT_SET = "entity is not set";
 
   // Any Gravitino configuration that starts with this prefix will be trim and 
passed to the
@@ -225,6 +229,10 @@ public abstract class BaseCatalog<T extends BaseCatalog>
       authorizationPlugin.close();
       authorizationPlugin = null;
     }
+    if (catalogCredentialManager != null) {
+      catalogCredentialManager.close();
+      catalogCredentialManager = null;
+    }
   }
 
   public Capability capability() {
@@ -239,6 +247,17 @@ public abstract class BaseCatalog<T extends BaseCatalog>
     return capability;
   }
 
+  public CatalogCredentialManager catalogCredentialManager() {
+    if (catalogCredentialManager == null) {
+      synchronized (this) {
+        if (catalogCredentialManager == null) {
+          this.catalogCredentialManager = new CatalogCredentialManager(name(), 
properties());
+        }
+      }
+    }
+    return catalogCredentialManager;
+  }
+
   private CatalogOperations createOps(Map<String, String> conf) {
     String customCatalogOperationClass = conf.get(CATALOG_OPERATION_IMPL);
     return Optional.ofNullable(customCatalogOperationClass)
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/credential/PathContext.java 
b/core/src/main/java/org/apache/gravitino/connector/credential/PathContext.java
new file mode 100644
index 0000000000..5c520d6bfd
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/connector/credential/PathContext.java
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.connector.credential;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/**
+ * The {@code PathContext} class represents the path and its associated 
credential type to generate
+ * a credential for {@link 
org.apache.gravitino.credential.CredentialOperationDispatcher}.
+ */
+@DeveloperApi
+public class PathContext {
+
+  private final String path;
+
+  private final String credentialType;
+
+  /**
+   * Constructs a new {@code PathContext} instance with the given path and 
credential type.
+   *
+   * @param path The path string.
+   * @param credentialType The type of the credential.
+   */
+  public PathContext(String path, String credentialType) {
+    this.path = path;
+    this.credentialType = credentialType;
+  }
+
+  /**
+   * Gets the path string.
+   *
+   * @return The path associated with this instance.
+   */
+  public String path() {
+    return path;
+  }
+
+  /**
+   * Gets the credential type.
+   *
+   * @return The credential type associated with this instance.
+   */
+  public String credentialType() {
+    return credentialType;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/credential/SupportsPathBasedCredentials.java
 
b/core/src/main/java/org/apache/gravitino/connector/credential/SupportsPathBasedCredentials.java
new file mode 100644
index 0000000000..93e08a3906
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/connector/credential/SupportsPathBasedCredentials.java
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.connector.credential;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** The catalog operation should implement this interface to generate the path 
based credentials. */
+@DeveloperApi
+public interface SupportsPathBasedCredentials {
+
+  /**
+   * Get {@link PathContext} lists.
+   *
+   * <p>In most cases there will be only one element in the list. For catalogs 
which support multi
+   * locations like fileset, there may be multiple elements.
+   *
+   * <p>The name identifier is the identifier of the resource like fileset, 
table, etc. not include
+   * metalake, catalog, schema.
+   *
+   * @param nameIdentifier, The identifier for fileset, table, etc.
+   * @return A list of {@link PathContext}
+   */
+  List<PathContext> getPathContext(NameIdentifier nameIdentifier);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/credential/CatalogCredentialManager.java
 
b/core/src/main/java/org/apache/gravitino/credential/CatalogCredentialManager.java
new file mode 100644
index 0000000000..2fe6fedccd
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/credential/CatalogCredentialManager.java
@@ -0,0 +1,70 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential;
+
+import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage lifetime of the credential provider in one catalog, dispatch 
credential request to the
+ * corresponding credential provider.
+ */
+public class CatalogCredentialManager implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogCredentialManager.class);
+
+  private final String catalogName;
+  private final Map<String, CredentialProvider> credentialProviders;
+
+  public CatalogCredentialManager(String catalogName, Map<String, String> 
catalogProperties) {
+    this.catalogName = catalogName;
+    this.credentialProviders = 
CredentialUtils.loadCredentialProviders(catalogProperties);
+  }
+
+  public Credential getCredential(String credentialType, CredentialContext 
context) {
+    // todo: add credential cache
+    Preconditions.checkState(
+        credentialProviders.containsKey(credentialType),
+        String.format("Credential %s not found", credentialType));
+    return credentialProviders.get(credentialType).getCredential(context);
+  }
+
+  @Override
+  public void close() {
+    credentialProviders
+        .values()
+        .forEach(
+            credentialProvider -> {
+              try {
+                credentialProvider.close();
+              } catch (IOException e) {
+                LOG.warn(
+                    "Close credential provider failed, catalog: {}, credential 
provider: {}",
+                    catalogName,
+                    credentialProvider.credentialType(),
+                    e);
+              }
+            });
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/credential/CredentialOperationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/credential/CredentialOperationDispatcher.java
new file mode 100644
index 0000000000..2ec76aeb4a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/credential/CredentialOperationDispatcher.java
@@ -0,0 +1,124 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.ws.rs.NotAuthorizedException;
+import javax.ws.rs.NotSupportedException;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.CatalogManager;
+import org.apache.gravitino.catalog.OperationDispatcher;
+import org.apache.gravitino.connector.BaseCatalog;
+import org.apache.gravitino.connector.credential.PathContext;
+import org.apache.gravitino.connector.credential.SupportsPathBasedCredentials;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/** Get credentials with the specific catalog classloader. */
+public class CredentialOperationDispatcher extends OperationDispatcher {
+
+  public CredentialOperationDispatcher(
+      CatalogManager catalogManager, EntityStore store, IdGenerator 
idGenerator) {
+    super(catalogManager, store, idGenerator);
+  }
+
+  public List<Credential> getCredentials(NameIdentifier identifier) {
+    CredentialPrivilege privilege =
+        getCredentialPrivilege(PrincipalUtils.getCurrentUserName(), 
identifier);
+    return doWithCatalog(
+        NameIdentifierUtil.getCatalogIdentifier(identifier),
+        catalogWrapper ->
+            catalogWrapper.doWithCredentialOps(
+                baseCatalog -> getCredentials(baseCatalog, identifier, 
privilege)),
+        NoSuchCatalogException.class);
+  }
+
+  private List<Credential> getCredentials(
+      BaseCatalog baseCatalog, NameIdentifier nameIdentifier, 
CredentialPrivilege privilege) {
+    Map<String, CredentialContext> contexts =
+        getCredentialContexts(baseCatalog, nameIdentifier, privilege);
+    return contexts.entrySet().stream()
+        .map(
+            entry ->
+                baseCatalog
+                    .catalogCredentialManager()
+                    .getCredential(entry.getKey(), entry.getValue()))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
+  private Map<String, CredentialContext> getCredentialContexts(
+      BaseCatalog baseCatalog, NameIdentifier nameIdentifier, 
CredentialPrivilege privilege) {
+    if 
(nameIdentifier.equals(NameIdentifierUtil.getCatalogIdentifier(nameIdentifier)))
 {
+      return getCatalogCredentialContexts(baseCatalog.properties());
+    }
+
+    if (baseCatalog.ops() instanceof SupportsPathBasedCredentials) {
+      List<PathContext> pathContexts =
+          ((SupportsPathBasedCredentials) 
baseCatalog.ops()).getPathContext(nameIdentifier);
+      return getPathBasedCredentialContexts(privilege, pathContexts);
+    }
+    throw new NotSupportedException(
+        String.format("Catalog %s doesn't support generate credentials", 
baseCatalog.name()));
+  }
+
+  private Map<String, CredentialContext> getCatalogCredentialContexts(
+      Map<String, String> catalogProperties) {
+    CatalogCredentialContext context =
+        new CatalogCredentialContext(PrincipalUtils.getCurrentUserName());
+    Set<String> providers = CredentialUtils.getCredentialProvidersByOrder(() 
-> catalogProperties);
+    return providers.stream().collect(Collectors.toMap(provider -> provider, 
provider -> context));
+  }
+
+  public static Map<String, CredentialContext> getPathBasedCredentialContexts(
+      CredentialPrivilege privilege, List<PathContext> pathContexts) {
+    return pathContexts.stream()
+        .collect(
+            Collectors.toMap(
+                pathContext -> pathContext.credentialType(),
+                pathContext -> {
+                  String path = pathContext.path();
+                  Set<String> writePaths = new HashSet<>();
+                  Set<String> readPaths = new HashSet<>();
+                  if (CredentialPrivilege.WRITE.equals(privilege)) {
+                    writePaths.add(path);
+                  } else {
+                    readPaths.add(path);
+                  }
+                  return new PathBasedCredentialContext(
+                      PrincipalUtils.getCurrentUserName(), writePaths, 
readPaths);
+                }));
+  }
+
+  @SuppressWarnings("UnusedVariable")
+  private CredentialPrivilege getCredentialPrivilege(String user, 
NameIdentifier identifier)
+      throws NotAuthorizedException {
+    // TODO: will implement in another PR
+    return CredentialPrivilege.WRITE;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/credential/CredentialUtils.java 
b/core/src/main/java/org/apache/gravitino/credential/CredentialPrivilege.java
similarity index 63%
copy from 
core/src/main/java/org/apache/gravitino/credential/CredentialUtils.java
copy to 
core/src/main/java/org/apache/gravitino/credential/CredentialPrivilege.java
index 09439d58ae..3ff77cd3e8 100644
--- a/core/src/main/java/org/apache/gravitino/credential/CredentialUtils.java
+++ 
b/core/src/main/java/org/apache/gravitino/credential/CredentialPrivilege.java
@@ -19,14 +19,8 @@
 
 package org.apache.gravitino.credential;
 
-import com.google.common.collect.ImmutableSet;
-import org.apache.gravitino.utils.PrincipalUtils;
-
-public class CredentialUtils {
-  public static Credential vendCredential(CredentialProvider 
credentialProvider, String[] path) {
-    PathBasedCredentialContext pathBasedCredentialContext =
-        new PathBasedCredentialContext(
-            PrincipalUtils.getCurrentUserName(), ImmutableSet.copyOf(path), 
ImmutableSet.of());
-    return credentialProvider.getCredential(pathBasedCredentialContext);
-  }
+/** Represents the privilege to get credential from credential providers. */
+public enum CredentialPrivilege {
+  READ,
+  WRITE,
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/credential/CredentialUtils.java 
b/core/src/main/java/org/apache/gravitino/credential/CredentialUtils.java
index 09439d58ae..9a202ec974 100644
--- a/core/src/main/java/org/apache/gravitino/credential/CredentialUtils.java
+++ b/core/src/main/java/org/apache/gravitino/credential/CredentialUtils.java
@@ -19,14 +19,75 @@
 
 package org.apache.gravitino.credential;
 
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableSet;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.gravitino.utils.PrincipalUtils;
 
 public class CredentialUtils {
+
+  private static final Splitter splitter = Splitter.on(",");
+
   public static Credential vendCredential(CredentialProvider 
credentialProvider, String[] path) {
     PathBasedCredentialContext pathBasedCredentialContext =
         new PathBasedCredentialContext(
-            PrincipalUtils.getCurrentUserName(), ImmutableSet.copyOf(path), 
ImmutableSet.of());
+            PrincipalUtils.getCurrentUserName(), ImmutableSet.copyOf(path), 
Collections.emptySet());
     return credentialProvider.getCredential(pathBasedCredentialContext);
   }
+
+  public static Map<String, CredentialProvider> loadCredentialProviders(
+      Map<String, String> catalogProperties) {
+    Set<String> credentialProviders =
+        CredentialUtils.getCredentialProvidersByOrder(() -> catalogProperties);
+
+    return credentialProviders.stream()
+        .collect(
+            Collectors.toMap(
+                String::toString,
+                credentialType ->
+                    CredentialProviderFactory.create(credentialType, 
catalogProperties)));
+  }
+
+  /**
+   * Get Credential providers from properties supplier.
+   *
+   * <p>If there are multiple properties suppliers, will try to get the 
credential providers in the
+   * input order.
+   *
+   * @param propertiesSuppliers The properties suppliers.
+   * @return A set of credential providers.
+   */
+  public static Set<String> getCredentialProvidersByOrder(
+      Supplier<Map<String, String>>... propertiesSuppliers) {
+
+    for (Supplier<Map<String, String>> supplier : propertiesSuppliers) {
+      Map<String, String> properties = supplier.get();
+      Set<String> providers = getCredentialProvidersFromProperties(properties);
+      if (!providers.isEmpty()) {
+        return providers;
+      }
+    }
+
+    return Collections.emptySet();
+  }
+
+  private static Set<String> getCredentialProvidersFromProperties(Map<String, 
String> properties) {
+    if (properties == null) {
+      return Collections.emptySet();
+    }
+
+    String providers = 
properties.get(CredentialConstants.CREDENTIAL_PROVIDERS);
+    if (providers == null) {
+      return Collections.emptySet();
+    }
+    return splitter
+        .trimResults()
+        .omitEmptyStrings()
+        .splitToStream(providers)
+        .collect(Collectors.toSet());
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/credential/config/CredentialConfig.java
 
b/core/src/main/java/org/apache/gravitino/credential/config/CredentialConfig.java
new file mode 100644
index 0000000000..d8823417cd
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/credential/config/CredentialConfig.java
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential.config;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.credential.CredentialConstants;
+
+public class CredentialConfig {
+
+  public static final Map<String, PropertyEntry<?>> 
CREDENTIAL_PROPERTY_ENTRIES =
+      new ImmutableMap.Builder<String, PropertyEntry<?>>()
+          .put(
+              CredentialConstants.CREDENTIAL_PROVIDERS,
+              PropertyEntry.booleanPropertyEntry(
+                  CredentialConstants.CREDENTIAL_PROVIDERS,
+                  "Credential providers for the Gravitino catalog, schema, 
fileset, table, etc.",
+                  false /* required */,
+                  false /* immutable */,
+                  null /* default value */,
+                  false /* hidden */,
+                  false /* reserved */))
+          .build();
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/credential/Dummy2CredentialProvider.java
 
b/core/src/test/java/org/apache/gravitino/credential/Dummy2CredentialProvider.java
new file mode 100644
index 0000000000..63f63d61d0
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/credential/Dummy2CredentialProvider.java
@@ -0,0 +1,89 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.Set;
+import javax.ws.rs.NotSupportedException;
+import lombok.Getter;
+
+public class Dummy2CredentialProvider implements CredentialProvider {
+  Map<String, String> properties;
+  static final String CREDENTIAL_TYPE = "dummy2";
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    this.properties = properties;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public String credentialType() {
+    return CREDENTIAL_TYPE;
+  }
+
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    Preconditions.checkArgument(
+        context instanceof PathBasedCredentialContext
+            || context instanceof CatalogCredentialContext,
+        "Doesn't support context: " + context.getClass().getSimpleName());
+    if (context instanceof PathBasedCredentialContext) {
+      return new Dummy2Credential((PathBasedCredentialContext) context);
+    }
+    return null;
+  }
+
+  public static class Dummy2Credential implements Credential {
+
+    @Getter private Set<String> writeLocations;
+    @Getter private Set<String> readLocations;
+
+    public Dummy2Credential(PathBasedCredentialContext locationContext) {
+      this.writeLocations = locationContext.getWritePaths();
+      this.readLocations = locationContext.getReadPaths();
+    }
+
+    @Override
+    public String credentialType() {
+      return Dummy2CredentialProvider.CREDENTIAL_TYPE;
+    }
+
+    @Override
+    public long expireTimeInMs() {
+      return 0;
+    }
+
+    @Override
+    public Map<String, String> credentialInfo() {
+      return ImmutableMap.of(
+          "writeLocation", writeLocations.toString(), "readLocation", 
readLocations.toString());
+    }
+
+    @Override
+    public void initialize(Map<String, String> credentialInfo, long 
expireTimeInMs) {
+      throw new NotSupportedException();
+    }
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/credential/TestCredentialUtils.java 
b/core/src/test/java/org/apache/gravitino/credential/TestCredentialUtils.java
new file mode 100644
index 0000000000..c31affdc15
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/credential/TestCredentialUtils.java
@@ -0,0 +1,66 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential;
+
+import java.util.Map;
+import java.util.Set;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableSet;
+
+public class TestCredentialUtils {
+
+  @Test
+  void testLoadCredentialProviders() {
+    Map<String, String> catalogProperties =
+        ImmutableMap.of(
+            CredentialConstants.CREDENTIAL_PROVIDERS,
+            DummyCredentialProvider.CREDENTIAL_TYPE
+                + ","
+                + Dummy2CredentialProvider.CREDENTIAL_TYPE);
+    Map<String, CredentialProvider> providers =
+        CredentialUtils.loadCredentialProviders(catalogProperties);
+    Assertions.assertTrue(providers.size() == 2);
+
+    
Assertions.assertTrue(providers.containsKey(DummyCredentialProvider.CREDENTIAL_TYPE));
+    Assertions.assertTrue(
+        DummyCredentialProvider.CREDENTIAL_TYPE.equals(
+            
providers.get(DummyCredentialProvider.CREDENTIAL_TYPE).credentialType()));
+    
Assertions.assertTrue(providers.containsKey(Dummy2CredentialProvider.CREDENTIAL_TYPE));
+    Assertions.assertTrue(
+        Dummy2CredentialProvider.CREDENTIAL_TYPE.equals(
+            
providers.get(Dummy2CredentialProvider.CREDENTIAL_TYPE).credentialType()));
+  }
+
+  @Test
+  void testGetCredentialProviders() {
+    Map<String, String> filesetProperties = ImmutableMap.of();
+    Map<String, String> schemaProperties =
+        ImmutableMap.of(CredentialConstants.CREDENTIAL_PROVIDERS, "a,b");
+    Map<String, String> catalogProperties =
+        ImmutableMap.of(CredentialConstants.CREDENTIAL_PROVIDERS, "a,b,c");
+
+    Set<String> credentialProviders =
+        CredentialUtils.getCredentialProvidersByOrder(
+            () -> filesetProperties, () -> schemaProperties, () -> 
catalogProperties);
+    Assertions.assertEquals(credentialProviders, ImmutableSet.of("a", "b"));
+  }
+}
diff --git 
a/core/src/test/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
 
b/core/src/test/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
index cbdbff0bee..6e1fdde4bd 100644
--- 
a/core/src/test/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
+++ 
b/core/src/test/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
@@ -16,4 +16,5 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-org.apache.gravitino.credential.DummyCredentialProvider
\ No newline at end of file
+org.apache.gravitino.credential.DummyCredentialProvider
+org.apache.gravitino.credential.Dummy2CredentialProvider
diff --git 
a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java 
b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
index 16a2096f32..63e53aefd5 100644
--- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
+++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
@@ -26,12 +26,12 @@ import javax.servlet.Servlet;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.catalog.CatalogDispatcher;
-import org.apache.gravitino.catalog.CredentialManager;
 import org.apache.gravitino.catalog.FilesetDispatcher;
 import org.apache.gravitino.catalog.PartitionDispatcher;
 import org.apache.gravitino.catalog.SchemaDispatcher;
 import org.apache.gravitino.catalog.TableDispatcher;
 import org.apache.gravitino.catalog.TopicDispatcher;
+import org.apache.gravitino.credential.CredentialOperationDispatcher;
 import org.apache.gravitino.metalake.MetalakeDispatcher;
 import org.apache.gravitino.metrics.MetricsSystem;
 import org.apache.gravitino.metrics.source.MetricsSource;
@@ -115,7 +115,9 @@ public class GravitinoServer extends ResourceConfig {
             
bind(gravitinoEnv.filesetDispatcher()).to(FilesetDispatcher.class).ranked(1);
             
bind(gravitinoEnv.topicDispatcher()).to(TopicDispatcher.class).ranked(1);
             
bind(gravitinoEnv.tagDispatcher()).to(TagDispatcher.class).ranked(1);
-            
bind(gravitinoEnv.credentialManager()).to(CredentialManager.class).ranked(1);
+            bind(gravitinoEnv.credentialOperationDispatcher())
+                .to(CredentialOperationDispatcher.class)
+                .ranked(1);
           }
         });
     register(JsonProcessingExceptionMapper.class);
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
index 7c6ea4a8eb..1046bbba1a 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectCredentialOperations.java
@@ -21,11 +21,14 @@ package org.apache.gravitino.server.web.rest;
 
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
+import com.google.common.collect.ImmutableSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Set;
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.GET;
+import javax.ws.rs.NotSupportedException;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
@@ -34,8 +37,8 @@ import javax.ws.rs.core.Response;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.catalog.CredentialManager;
 import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialOperationDispatcher;
 import org.apache.gravitino.dto.credential.CredentialDTO;
 import org.apache.gravitino.dto.responses.CredentialResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
@@ -51,15 +54,18 @@ public class MetadataObjectCredentialOperations {
   private static final Logger LOG =
       LoggerFactory.getLogger(MetadataObjectCredentialOperations.class);
 
-  private CredentialManager credentialManager;
+  private static final Set<MetadataObject.Type> 
supportsCredentialMetadataTypes =
+      ImmutableSet.of(MetadataObject.Type.CATALOG, 
MetadataObject.Type.FILESET);
+
+  private CredentialOperationDispatcher credentialOperationDispatcher;
 
   @SuppressWarnings("unused")
   @Context
   private HttpServletRequest httpRequest;
 
   @Inject
-  public MetadataObjectCredentialOperations(CredentialManager dispatcher) {
-    this.credentialManager = dispatcher;
+  public MetadataObjectCredentialOperations(CredentialOperationDispatcher 
dispatcher) {
+    this.credentialOperationDispatcher = dispatcher;
   }
 
   @GET
@@ -83,9 +89,13 @@ public class MetadataObjectCredentialOperations {
             MetadataObject object =
                 MetadataObjects.parse(
                     fullName, 
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT)));
+            if (!supportsCredentialOperations(object)) {
+              throw new NotSupportedException(
+                  "Doesn't support credential operations for metadata object 
type");
+            }
 
             NameIdentifier identifier = 
MetadataObjectUtil.toEntityIdent(metalake, object);
-            List<Credential> credentials = 
credentialManager.getCredentials(identifier);
+            List<Credential> credentials = 
credentialOperationDispatcher.getCredentials(identifier);
             if (credentials == null) {
               return Utils.ok(new CredentialResponse(new CredentialDTO[0]));
             }
@@ -97,4 +107,8 @@ public class MetadataObjectCredentialOperations {
       return ExceptionHandlers.handleCredentialException(OperationType.GET, 
fullName, e);
     }
   }
+
+  private static boolean supportsCredentialOperations(MetadataObject 
metadataObject) {
+    return supportsCredentialMetadataTypes.contains(metadataObject.type());
+  }
 }
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetadataObjectCredentialOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetadataObjectCredentialOperations.java
index 1ac5d38135..464ccd8698 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetadataObjectCredentialOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestMetadataObjectCredentialOperations.java
@@ -31,8 +31,8 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
-import org.apache.gravitino.catalog.CredentialManager;
 import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialOperationDispatcher;
 import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.dto.responses.CredentialResponse;
 import org.apache.gravitino.dto.responses.ErrorConstants;
@@ -59,7 +59,8 @@ public class TestMetadataObjectCredentialOperations extends 
JerseyTest {
     }
   }
 
-  private CredentialManager credentialManager = mock(CredentialManager.class);
+  private CredentialOperationDispatcher credentialOperationDispatcher =
+      mock(CredentialOperationDispatcher.class);
 
   private String metalake = "test_metalake";
 
@@ -78,7 +79,7 @@ public class TestMetadataObjectCredentialOperations extends 
JerseyTest {
         new AbstractBinder() {
           @Override
           protected void configure() {
-            bind(credentialManager).to(CredentialManager.class).ranked(2);
+            
bind(credentialOperationDispatcher).to(CredentialOperationDispatcher.class).ranked(2);
             
bindFactory(MockServletRequestFactory.class).to(HttpServletRequest.class);
           }
         });
@@ -101,7 +102,7 @@ public class TestMetadataObjectCredentialOperations extends 
JerseyTest {
 
     S3SecretKeyCredential credential = new S3SecretKeyCredential("access-id", 
"secret-key");
     // Test return one credential
-    
when(credentialManager.getCredentials(any())).thenReturn(Arrays.asList(credential));
+    
when(credentialOperationDispatcher.getCredentials(any())).thenReturn(Arrays.asList(credential));
     Response response =
         target(basePath(metalake))
             .path(metadataObject.type().toString())
@@ -123,7 +124,7 @@ public class TestMetadataObjectCredentialOperations extends 
JerseyTest {
     Assertions.assertEquals(0, credentialToTest.expireTimeInMs());
 
     // Test doesn't return credential
-    when(credentialManager.getCredentials(any())).thenReturn(null);
+    when(credentialOperationDispatcher.getCredentials(any())).thenReturn(null);
     response =
         target(basePath(metalake))
             .path(metadataObject.type().toString())
@@ -140,7 +141,7 @@ public class TestMetadataObjectCredentialOperations extends 
JerseyTest {
 
     // Test throws NoSuchCredentialException
     doThrow(new NoSuchCredentialException("mock error"))
-        .when(credentialManager)
+        .when(credentialOperationDispatcher)
         .getCredentials(any());
     response =
         target(basePath(metalake))

Reply via email to