This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new fd221a3383 feat(credentials): implement IRSA credential support for 
EKS deployments (#7489)
fd221a3383 is described below

commit fd221a33833b4f50c74ff699f89107692023ef2d
Author: Bryan Maloyer <[email protected]>
AuthorDate: Tue Aug 19 06:06:11 2025 +0200

    feat(credentials): implement IRSA credential support for EKS deployments 
(#7489)
    
    What changes were proposed in this pull request?
    
    This PR adds support for AWS IRSA (IAM Roles for Service Accounts)
    authentication in Gravitino, enabling seamless AWS service access when
    running on Amazon EKS without requiring static credentials.
    
    Feat: #7488
---
 .../gravitino/credential/AwsIrsaCredential.java    | 129 +++++++
 .../org.apache.gravitino.credential.Credential     |   1 +
 .../s3/credential/AwsIrsaCredentialProvider.java   | 409 +++++++++++++++++++++
 .../gravitino/s3/fs/S3CredentialsProvider.java     |   8 +
 .../gravitino/s3/fs/S3FileSystemProvider.java      |   5 +-
 .../java/org/apache/gravitino/s3/fs/S3Utils.java   |   7 +-
 ....apache.gravitino.credential.CredentialProvider |   3 +-
 .../credential/CredentialPropertyUtils.java        |  44 ++-
 docs/security/credential-vending.md                |  29 +-
 9 files changed, 611 insertions(+), 24 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/credential/AwsIrsaCredential.java 
b/api/src/main/java/org/apache/gravitino/credential/AwsIrsaCredential.java
new file mode 100644
index 0000000000..0c1888cffc
--- /dev/null
+++ b/api/src/main/java/org/apache/gravitino/credential/AwsIrsaCredential.java
@@ -0,0 +1,129 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+
+/** Generic AWS IRSA credential. */
+public class AwsIrsaCredential implements Credential {
+  /** The credential type for AWS IRSA credentials. */
+  public static final String AWS_IRSA_CREDENTIAL_TYPE = "aws-irsa";
+  /** The key for AWS access key ID in credential info. */
+  public static final String ACCESS_KEY_ID = "access-key-id";
+  /** The key for AWS secret access key in credential info. */
+  public static final String SECRET_ACCESS_KEY = "secret-access-key";
+  /** The key for AWS session token in credential info. */
+  public static final String SESSION_TOKEN = "session-token";
+
+  private String accessKeyId;
+  private String secretAccessKey;
+  private String sessionToken;
+  private long expireTimeInMs;
+
+  /**
+   * Constructs an AWS IRSA credential with the specified parameters.
+   *
+   * @param accessKeyId the AWS access key ID
+   * @param secretAccessKey the AWS secret access key
+   * @param sessionToken the AWS session token
+   * @param expireTimeInMs the expiration time in milliseconds
+   */
+  public AwsIrsaCredential(
+      String accessKeyId, String secretAccessKey, String sessionToken, long 
expireTimeInMs) {
+    validate(accessKeyId, secretAccessKey, sessionToken);
+    this.accessKeyId = accessKeyId;
+    this.secretAccessKey = secretAccessKey;
+    this.sessionToken = sessionToken;
+    this.expireTimeInMs = expireTimeInMs;
+  }
+
+  /** Default constructor for AWS IRSA credential. */
+  public AwsIrsaCredential() {}
+
+  @Override
+  public String credentialType() {
+    return AWS_IRSA_CREDENTIAL_TYPE;
+  }
+
+  @Override
+  public long expireTimeInMs() {
+    return expireTimeInMs;
+  }
+
+  @Override
+  public Map<String, String> credentialInfo() {
+    ImmutableMap.Builder<String, String> builder = new 
ImmutableMap.Builder<>();
+    builder.put(ACCESS_KEY_ID, accessKeyId);
+    builder.put(SECRET_ACCESS_KEY, secretAccessKey);
+    builder.put(SESSION_TOKEN, sessionToken);
+    return builder.build();
+  }
+
+  @Override
+  public void initialize(Map<String, String> credentialInfo, long 
expireTimeInMs) {
+    String accessKeyId = credentialInfo.get(ACCESS_KEY_ID);
+    String secretAccessKey = credentialInfo.get(SECRET_ACCESS_KEY);
+    String sessionToken = credentialInfo.get(SESSION_TOKEN);
+    validate(accessKeyId, secretAccessKey, sessionToken);
+    this.accessKeyId = accessKeyId;
+    this.secretAccessKey = secretAccessKey;
+    this.sessionToken = sessionToken;
+    this.expireTimeInMs = expireTimeInMs;
+  }
+
+  /**
+   * Returns the AWS access key ID.
+   *
+   * @return the access key ID
+   */
+  public String accessKeyId() {
+    return accessKeyId;
+  }
+
+  /**
+   * Returns the AWS secret access key.
+   *
+   * @return the secret access key
+   */
+  public String secretAccessKey() {
+    return secretAccessKey;
+  }
+
+  /**
+   * Returns the AWS session token.
+   *
+   * @return the session token
+   */
+  public String sessionToken() {
+    return sessionToken;
+  }
+
+  private void validate(String accessKeyId, String secretAccessKey, String 
sessionToken) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(accessKeyId), "Access key Id should not be 
empty");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(secretAccessKey), "Secret access key should not 
be empty");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(sessionToken), "Session token should not be 
empty");
+  }
+}
diff --git 
a/api/src/main/resources/META-INF/services/org.apache.gravitino.credential.Credential
 
b/api/src/main/resources/META-INF/services/org.apache.gravitino.credential.Credential
index 6071cb916a..2cd80032a7 100644
--- 
a/api/src/main/resources/META-INF/services/org.apache.gravitino.credential.Credential
+++ 
b/api/src/main/resources/META-INF/services/org.apache.gravitino.credential.Credential
@@ -24,3 +24,4 @@ org.apache.gravitino.credential.OSSTokenCredential
 org.apache.gravitino.credential.OSSSecretKeyCredential
 org.apache.gravitino.credential.ADLSTokenCredential
 org.apache.gravitino.credential.AzureAccountKeyCredential
+org.apache.gravitino.credential.AwsIrsaCredential
diff --git 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialProvider.java
 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialProvider.java
new file mode 100644
index 0000000000..8a59756d84
--- /dev/null
+++ 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialProvider.java
@@ -0,0 +1,409 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.s3.credential;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.credential.AwsIrsaCredential;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.config.S3CredentialConfig;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import 
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
+import software.amazon.awssdk.policybuilder.iam.IamEffect;
+import software.amazon.awssdk.policybuilder.iam.IamPolicy;
+import software.amazon.awssdk.policybuilder.iam.IamResource;
+import software.amazon.awssdk.policybuilder.iam.IamStatement;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import 
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityRequest;
+import 
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+/**
+ * AWS IRSA credential provider that supports both basic IRSA credentials and 
fine-grained
+ * path-based access control using AWS session policies.
+ *
+ * <p>This provider operates in two modes:
+ *
+ * <ul>
+ *   <li><b>Basic IRSA mode</b>: For non-path-based credential contexts, 
returns credentials with
+ *       full permissions of the associated IAM role (backward compatibility)
+ *   <li><b>Fine-grained mode</b>: For path-based credential contexts (e.g., 
table access with
+ *       vended credentials), uses AWS session policies to restrict 
permissions to specific S3 paths
+ * </ul>
+ *
+ * <p>The fine-grained mode leverages AWS session policies with 
AssumeRoleWithWebIdentity to create
+ * temporary credentials with restricted permissions. Session policies can 
only reduce (not expand)
+ * the permissions already granted by the IAM role:
+ *
+ * <ul>
+ *   <li>s3:GetObject, s3:GetObjectVersion for read access to specific table 
paths only
+ *   <li>s3:ListBucket with s3:prefix conditions limiting to table directories 
only
+ *   <li>s3:PutObject, s3:DeleteObject for write operations on specific paths 
only
+ *   <li>s3:GetBucketLocation for bucket metadata access
+ * </ul>
+ *
+ * <p>Prerequisites for fine-grained mode:
+ *
+ * <ul>
+ *   <li>EKS cluster with IRSA properly configured
+ *   <li>AWS_WEB_IDENTITY_TOKEN_FILE environment variable pointing to service 
account token
+ *   <li>IAM role configured for IRSA with broad S3 permissions (session 
policy will restrict them)
+ *   <li>Optional: s3-role-arn for assuming different role (if not provided, 
uses IRSA role
+ *       directly)
+ * </ul>
+ */
+public class AwsIrsaCredentialProvider implements CredentialProvider {
+
+  private WebIdentityTokenFileCredentialsProvider baseCredentialsProvider;
+  private String roleArn;
+  private int tokenExpireSecs;
+  private String region;
+  private String stsEndpoint;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    // Use WebIdentityTokenFileCredentialsProvider for base IRSA configuration
+    this.baseCredentialsProvider = 
WebIdentityTokenFileCredentialsProvider.create();
+
+    S3CredentialConfig s3CredentialConfig = new S3CredentialConfig(properties);
+    this.roleArn = s3CredentialConfig.s3RoleArn();
+    this.tokenExpireSecs = s3CredentialConfig.tokenExpireInSecs();
+    this.region = s3CredentialConfig.region();
+    this.stsEndpoint = s3CredentialConfig.stsEndpoint();
+  }
+
+  @Override
+  public void close() {
+    // No external resources to close
+  }
+
+  @Override
+  public String credentialType() {
+    return AwsIrsaCredential.AWS_IRSA_CREDENTIAL_TYPE;
+  }
+
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    if (!(context instanceof PathBasedCredentialContext)) {
+      // Fallback to original behavior for non-path-based contexts
+      AwsCredentials creds = baseCredentialsProvider.resolveCredentials();
+      if (creds instanceof AwsSessionCredentials) {
+        AwsSessionCredentials sessionCreds = (AwsSessionCredentials) creds;
+        long expiration =
+            sessionCreds.expirationTime().isPresent()
+                ? sessionCreds.expirationTime().get().toEpochMilli()
+                : 0L;
+        return new AwsIrsaCredential(
+            sessionCreds.accessKeyId(),
+            sessionCreds.secretAccessKey(),
+            sessionCreds.sessionToken(),
+            expiration);
+      } else {
+        throw new IllegalStateException(
+            "AWS IRSA credentials must be of type AwsSessionCredentials. "
+                + "Check your EKS/IRSA configuration. Got: "
+                + creds.getClass().getName());
+      }
+    }
+
+    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
+
+    Credentials s3Token =
+        createCredentialsWithSessionPolicy(
+            pathBasedCredentialContext.getReadPaths(),
+            pathBasedCredentialContext.getWritePaths(),
+            pathBasedCredentialContext.getUserName());
+    return new AwsIrsaCredential(
+        s3Token.accessKeyId(),
+        s3Token.secretAccessKey(),
+        s3Token.sessionToken(),
+        s3Token.expiration().toEpochMilli());
+  }
+
+  private Credentials createCredentialsWithSessionPolicy(
+      Set<String> readLocations, Set<String> writeLocations, String userName) {
+    validateInputParameters(readLocations, writeLocations, userName);
+
+    // Create session policy that restricts access to specific paths
+    IamPolicy sessionPolicy = createSessionPolicy(readLocations, 
writeLocations);
+
+    // Get web identity token file path and validate
+    String webIdentityTokenFile = getValidatedWebIdentityTokenFile();
+
+    // Get role ARN and validate
+    String effectiveRoleArn = getValidatedRoleArn();
+
+    try {
+      String tokenContent =
+          new String(Files.readAllBytes(Paths.get(webIdentityTokenFile)), 
StandardCharsets.UTF_8);
+      if (StringUtils.isBlank(tokenContent)) {
+        throw new IllegalStateException(
+            "Web identity token file is empty: " + webIdentityTokenFile);
+      }
+
+      return assumeRoleWithSessionPolicy(effectiveRoleArn, userName, 
tokenContent, sessionPolicy);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to create credentials with session policy for user: " + 
userName, e);
+    }
+  }
+
+  private IamPolicy createSessionPolicy(Set<String> readLocations, Set<String> 
writeLocations) {
+    IamPolicy.Builder policyBuilder = IamPolicy.builder();
+    String arnPrefix = getArnPrefix();
+
+    // Add read permissions for all locations
+    addReadPermissions(policyBuilder, readLocations, writeLocations, 
arnPrefix);
+
+    // Add write permissions if needed
+    if (!writeLocations.isEmpty()) {
+      addWritePermissions(policyBuilder, writeLocations, arnPrefix);
+    }
+
+    // Add bucket-level permissions
+    addBucketPermissions(policyBuilder, readLocations, writeLocations, 
arnPrefix);
+
+    return policyBuilder.build();
+  }
+
+  private void addReadPermissions(
+      IamPolicy.Builder policyBuilder,
+      Set<String> readLocations,
+      Set<String> writeLocations,
+      String arnPrefix) {
+    IamStatement.Builder allowGetObjectStatementBuilder =
+        IamStatement.builder()
+            .effect(IamEffect.ALLOW)
+            .addAction("s3:GetObject")
+            .addAction("s3:GetObjectVersion");
+
+    Stream.concat(readLocations.stream(), writeLocations.stream())
+        .distinct()
+        .forEach(
+            location -> {
+              URI uri = URI.create(location);
+              allowGetObjectStatementBuilder.addResource(
+                  IamResource.create(getS3UriWithArn(arnPrefix, uri)));
+            });
+
+    policyBuilder.addStatement(allowGetObjectStatementBuilder.build());
+  }
+
+  private void addWritePermissions(
+      IamPolicy.Builder policyBuilder, Set<String> writeLocations, String 
arnPrefix) {
+    IamStatement.Builder allowPutObjectStatementBuilder =
+        IamStatement.builder()
+            .effect(IamEffect.ALLOW)
+            .addAction("s3:PutObject")
+            .addAction("s3:DeleteObject");
+
+    writeLocations.forEach(
+        location -> {
+          URI uri = URI.create(location);
+          allowPutObjectStatementBuilder.addResource(
+              IamResource.create(getS3UriWithArn(arnPrefix, uri)));
+        });
+
+    policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
+  }
+
+  private void addBucketPermissions(
+      IamPolicy.Builder policyBuilder,
+      Set<String> readLocations,
+      Set<String> writeLocations,
+      String arnPrefix) {
+    Map<String, IamStatement.Builder> bucketListStatementBuilder = new 
HashMap<>();
+    Map<String, IamStatement.Builder> bucketGetLocationStatementBuilder = new 
HashMap<>();
+
+    Stream.concat(readLocations.stream(), writeLocations.stream())
+        .distinct()
+        .forEach(
+            location -> {
+              URI uri = URI.create(location);
+              String bucketArn = arnPrefix + getBucketName(uri);
+              String rawPath = trimLeadingSlash(uri.getPath());
+
+              // Add list bucket permissions with prefix conditions
+              bucketListStatementBuilder
+                  .computeIfAbsent(
+                      bucketArn,
+                      key ->
+                          IamStatement.builder()
+                              .effect(IamEffect.ALLOW)
+                              .addAction("s3:ListBucket")
+                              .addResource(key))
+                  .addConditions(
+                      IamConditionOperator.STRING_LIKE,
+                      "s3:prefix",
+                      Arrays.asList(
+                          rawPath, // Get raw path metadata information
+                          addWildcardToPath(rawPath))); // Listing objects in 
raw path
+
+              // Add get bucket location permissions
+              bucketGetLocationStatementBuilder.computeIfAbsent(
+                  bucketArn,
+                  key ->
+                      IamStatement.builder()
+                          .effect(IamEffect.ALLOW)
+                          .addAction("s3:GetBucketLocation")
+                          .addResource(key));
+            });
+
+    // Add bucket list statements
+    addStatementsToPolicy(policyBuilder, bucketListStatementBuilder, 
"s3:ListBucket");
+
+    // Add bucket location statements
+    addStatementsToPolicy(policyBuilder, bucketGetLocationStatementBuilder, 
null);
+  }
+
+  private void addStatementsToPolicy(
+      IamPolicy.Builder policyBuilder,
+      Map<String, IamStatement.Builder> statementBuilders,
+      String fallbackAction) {
+    if (!statementBuilders.isEmpty()) {
+      statementBuilders
+          .values()
+          .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
+    } else if (fallbackAction != null) {
+      policyBuilder.addStatement(
+          
IamStatement.builder().effect(IamEffect.ALLOW).addAction(fallbackAction).build());
+    }
+  }
+
+  private String getS3UriWithArn(String arnPrefix, URI uri) {
+    return arnPrefix + addWildcardToPath(removeSchemaFromS3Uri(uri));
+  }
+
+  private String getArnPrefix() {
+    // For session policies, we default to standard AWS S3 ARN prefix
+    // The region can be determined from the AWS environment or configuration
+    if (StringUtils.isNotBlank(region)) {
+      if (region.contains("cn-")) {
+        return "arn:aws-cn:s3:::";
+      } else if (region.contains("us-gov-")) {
+        return "arn:aws-us-gov:s3:::";
+      }
+    }
+    return "arn:aws:s3:::";
+  }
+
+  private static String addWildcardToPath(String path) {
+    return path.endsWith("/") ? path + "*" : path + "/*";
+  }
+
+  // Transform 's3://bucket/path' to /bucket/path
+  private static String removeSchemaFromS3Uri(URI uri) {
+    String bucket = uri.getHost();
+    String path = trimLeadingSlash(uri.getPath());
+    return String.join(
+        "/", Stream.of(bucket, 
path).filter(Objects::nonNull).toArray(String[]::new));
+  }
+
+  private static String trimLeadingSlash(String path) {
+    if (path.startsWith("/")) {
+      path = path.substring(1);
+    }
+    return path;
+  }
+
+  private static String getBucketName(URI uri) {
+    return uri.getHost();
+  }
+
+  private void validateInputParameters(
+      Set<String> readLocations, Set<String> writeLocations, String userName) {
+    if (StringUtils.isBlank(userName)) {
+      throw new IllegalArgumentException("userName cannot be null or empty");
+    }
+    if ((readLocations == null || readLocations.isEmpty())
+        && (writeLocations == null || writeLocations.isEmpty())) {
+      throw new IllegalArgumentException("At least one read or write location 
must be specified");
+    }
+  }
+
+  private String getValidatedWebIdentityTokenFile() {
+    String webIdentityTokenFile = System.getenv("AWS_WEB_IDENTITY_TOKEN_FILE");
+    if (StringUtils.isBlank(webIdentityTokenFile)) {
+      throw new IllegalStateException(
+          "AWS_WEB_IDENTITY_TOKEN_FILE environment variable is not set. "
+              + "Ensure IRSA is properly configured in your EKS cluster.");
+    }
+    if (!Files.exists(Paths.get(webIdentityTokenFile))) {
+      throw new IllegalStateException(
+          "Web identity token file does not exist: " + webIdentityTokenFile);
+    }
+    return webIdentityTokenFile;
+  }
+
+  private String getValidatedRoleArn() {
+    String effectiveRoleArn =
+        StringUtils.isNotBlank(roleArn) ? roleArn : 
System.getenv("AWS_ROLE_ARN");
+    if (StringUtils.isBlank(effectiveRoleArn)) {
+      throw new IllegalStateException(
+          "No role ARN available. Either configure s3-role-arn or ensure 
AWS_ROLE_ARN environment variable is set.");
+    }
+    if (!effectiveRoleArn.startsWith("arn:aws")) {
+      throw new IllegalArgumentException("Invalid role ARN format: " + 
effectiveRoleArn);
+    }
+    return effectiveRoleArn;
+  }
+
+  private Credentials assumeRoleWithSessionPolicy(
+      String roleArn, String userName, String webIdentityToken, IamPolicy 
sessionPolicy) {
+    // Create STS client for this request
+    StsClientBuilder stsBuilder = StsClient.builder();
+    if (StringUtils.isNotBlank(region)) {
+      stsBuilder.region(Region.of(region));
+    }
+    if (StringUtils.isNotBlank(stsEndpoint)) {
+      stsBuilder.endpointOverride(URI.create(stsEndpoint));
+    }
+
+    try (StsClient stsClient = stsBuilder.build()) {
+      AssumeRoleWithWebIdentityRequest request =
+          AssumeRoleWithWebIdentityRequest.builder()
+              .roleArn(roleArn)
+              .roleSessionName("gravitino_irsa_session_" + userName)
+              .durationSeconds(tokenExpireSecs)
+              .webIdentityToken(webIdentityToken)
+              .policy(sessionPolicy.toJson())
+              .build();
+
+      AssumeRoleWithWebIdentityResponse response = 
stsClient.assumeRoleWithWebIdentity(request);
+      return response.credentials();
+    }
+  }
+}
diff --git 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java
 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java
index 2fc1458895..e047c51e70 100644
--- 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java
+++ 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java
@@ -26,6 +26,7 @@ import com.amazonaws.auth.BasicSessionCredentials;
 import java.net.URI;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import 
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.credential.AwsIrsaCredential;
 import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.credential.S3TokenCredential;
@@ -75,6 +76,13 @@ public class S3CredentialsProvider implements 
AWSCredentialsProvider {
               s3TokenCredential.accessKeyId(),
               s3TokenCredential.secretAccessKey(),
               s3TokenCredential.sessionToken());
+    } else if (credential instanceof AwsIrsaCredential) {
+      AwsIrsaCredential awsIrsaCredential = (AwsIrsaCredential) credential;
+      basicSessionCredentials =
+          new BasicSessionCredentials(
+              awsIrsaCredential.accessKeyId(),
+              awsIrsaCredential.secretAccessKey(),
+              awsIrsaCredential.sessionToken());
     }
 
     if (credential.expireTimeInMs() > 0) {
diff --git 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
index ccec76d931..9ef1e00413 100644
--- 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
+++ 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
+import org.apache.gravitino.credential.AwsIrsaCredential;
 import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.credential.S3TokenCredential;
@@ -80,7 +81,9 @@ public class S3FileSystemProvider implements 
FileSystemProvider, SupportsCredent
   public Map<String, String> getFileSystemCredentialConf(Credential[] 
credentials) {
     Credential credential = S3Utils.getSuitableCredential(credentials);
     Map<String, String> result = Maps.newHashMap();
-    if (credential instanceof S3SecretKeyCredential || credential instanceof 
S3TokenCredential) {
+    if (credential instanceof S3SecretKeyCredential
+        || credential instanceof S3TokenCredential
+        || credential instanceof AwsIrsaCredential) {
       result.put(
           Constants.AWS_CREDENTIALS_PROVIDER, 
S3CredentialsProvider.class.getCanonicalName());
     }
diff --git a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java
index 078a1180ba..0c6244b9d2 100644
--- a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java
+++ b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.s3.fs;
 
+import org.apache.gravitino.credential.AwsIrsaCredential;
 import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.S3SecretKeyCredential;
 import org.apache.gravitino.credential.S3TokenCredential;
@@ -39,7 +40,11 @@ public class S3Utils {
         return credential;
       }
     }
-
+    for (Credential credential : credentials) {
+      if (credential instanceof AwsIrsaCredential) {
+        return credential;
+      }
+    }
     // If dynamic credential not found, use the static one
     for (Credential credential : credentials) {
       if (credential instanceof S3SecretKeyCredential) {
diff --git 
a/bundles/aws/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
 
b/bundles/aws/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
index 7349688d6a..78ab962b52 100644
--- 
a/bundles/aws/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
+++ 
b/bundles/aws/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
@@ -17,4 +17,5 @@
 # under the License.
 #
 org.apache.gravitino.s3.credential.S3TokenProvider
-org.apache.gravitino.s3.credential.S3SecretKeyProvider
\ No newline at end of file
+org.apache.gravitino.s3.credential.S3SecretKeyProvider
+org.apache.gravitino.s3.credential.AwsIrsaCredentialProvider
\ No newline at end of file
diff --git 
a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
 
b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
index d7a3caf067..866de92420 100644
--- 
a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
+++ 
b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
@@ -47,25 +47,28 @@ public class CredentialPropertyUtils {
   static final String ICEBERG_ADLS_ACCOUNT_KEY = 
"adls.auth.shared-key.account.key";
 
   private static Map<String, String> icebergCredentialPropertyMap =
-      ImmutableMap.of(
-          GCSTokenCredential.GCS_TOKEN_NAME,
-          ICEBERG_GCS_TOKEN,
-          S3SecretKeyCredential.GRAVITINO_S3_STATIC_ACCESS_KEY_ID,
-          ICEBERG_S3_ACCESS_KEY_ID,
-          S3SecretKeyCredential.GRAVITINO_S3_STATIC_SECRET_ACCESS_KEY,
-          ICEBERG_S3_SECRET_ACCESS_KEY,
-          S3TokenCredential.GRAVITINO_S3_TOKEN,
-          ICEBERG_S3_TOKEN,
-          OSSTokenCredential.GRAVITINO_OSS_TOKEN,
-          ICEBERG_OSS_SECURITY_TOKEN,
-          OSSTokenCredential.GRAVITINO_OSS_SESSION_ACCESS_KEY_ID,
-          ICEBERG_OSS_ACCESS_KEY_ID,
-          OSSTokenCredential.GRAVITINO_OSS_SESSION_SECRET_ACCESS_KEY,
-          ICEBERG_OSS_ACCESS_KEY_SECRET,
-          AzureAccountKeyCredential.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
-          ICEBERG_ADLS_ACCOUNT_NAME,
-          AzureAccountKeyCredential.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
-          ICEBERG_ADLS_ACCOUNT_KEY);
+      ImmutableMap.<String, String>builder()
+          .put(GCSTokenCredential.GCS_TOKEN_NAME, ICEBERG_GCS_TOKEN)
+          .put(S3SecretKeyCredential.GRAVITINO_S3_STATIC_ACCESS_KEY_ID, 
ICEBERG_S3_ACCESS_KEY_ID)
+          .put(
+              S3SecretKeyCredential.GRAVITINO_S3_STATIC_SECRET_ACCESS_KEY,
+              ICEBERG_S3_SECRET_ACCESS_KEY)
+          .put(S3TokenCredential.GRAVITINO_S3_TOKEN, ICEBERG_S3_TOKEN)
+          .put(OSSTokenCredential.GRAVITINO_OSS_TOKEN, 
ICEBERG_OSS_SECURITY_TOKEN)
+          .put(OSSTokenCredential.GRAVITINO_OSS_SESSION_ACCESS_KEY_ID, 
ICEBERG_OSS_ACCESS_KEY_ID)
+          .put(
+              OSSTokenCredential.GRAVITINO_OSS_SESSION_SECRET_ACCESS_KEY,
+              ICEBERG_OSS_ACCESS_KEY_SECRET)
+          .put(
+              AzureAccountKeyCredential.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
+              ICEBERG_ADLS_ACCOUNT_NAME)
+          .put(
+              AzureAccountKeyCredential.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
+              ICEBERG_ADLS_ACCOUNT_KEY)
+          .put(AwsIrsaCredential.ACCESS_KEY_ID, ICEBERG_S3_ACCESS_KEY_ID)
+          .put(AwsIrsaCredential.SECRET_ACCESS_KEY, 
ICEBERG_S3_SECRET_ACCESS_KEY)
+          .put(AwsIrsaCredential.SESSION_TOKEN, ICEBERG_S3_TOKEN)
+          .build();
 
   /**
    * Transforms a specific credential into a map of Iceberg properties.
@@ -78,7 +81,8 @@ public class CredentialPropertyUtils {
         || credential instanceof S3SecretKeyCredential
         || credential instanceof OSSTokenCredential
         || credential instanceof OSSSecretKeyCredential
-        || credential instanceof AzureAccountKeyCredential) {
+        || credential instanceof AzureAccountKeyCredential
+        || credential instanceof AwsIrsaCredential) {
       return transformProperties(credential.credentialInfo(), 
icebergCredentialPropertyMap);
     }
 
diff --git a/docs/security/credential-vending.md 
b/docs/security/credential-vending.md
index 3c4075731b..703cae95b7 100644
--- a/docs/security/credential-vending.md
+++ b/docs/security/credential-vending.md
@@ -14,7 +14,7 @@ Gravitino credential vending is used to generate temporary or 
static credentials
 - Supports Gravitino Iceberg REST server.
 - Supports Gravitino server, only support Hadoop catalog.
 - Supports pluggable credentials with build-in credentials:
-  - S3: `S3TokenCredential`, `S3SecretKeyCredential`
+  - S3: `S3TokenCredential`, `S3SecretKeyCredential`, `AwsIrsaCredential`
   - GCS: `GCSTokenCredential`
   - ADLS: `ADLSTokenCredential`, `AzureAccountKeyCredential`
   - OSS: `OSSTokenCredential`, `OSSSecretKeyCredential`
@@ -33,6 +33,33 @@ Gravitino credential vending is used to generate temporary 
or static credentials
 
 ### S3 credentials
 
+#### S3 IRSA credential
+
+A credential using AWS IAM Roles for Service Accounts (IRSA) to access S3 with 
temporary credentials, typically used in EKS environments. This provider 
supports both basic IRSA credentials and fine-grained path-based access control 
with dynamically generated IAM policies.
+
+**Features:**
+- **Basic IRSA mode**: Returns credentials with full permissions of the 
associated IAM role (for non-path-based contexts)
+- **Fine-grained mode**: Generates path-specific credentials with minimal 
required permissions (for table access with `X-Iceberg-Access-Delegation: 
vended-credentials`)
+- **Automatic policy generation**: Creates custom IAM policies scoped to 
specific table paths including data, metadata, and write locations
+- **EKS integration**: Leverages existing IRSA setup while providing enhanced 
security through path-based restrictions
+
+
+| Gravitino server catalog properties | Gravitino Iceberg REST server 
configurations       | Description                                              
                                                 | Default value | Required | 
Since Version |
+|-------------------------------------|----------------------------------------------------|-----------------------------------------------------------------------------------------------------------|---------------|----------|---------------|
+| `credential-providers`              | 
`gravitino.iceberg-rest.credential-providers`      | `aws-irsa` for AWS IRSA 
credential provider.                                                            
  | (none)        | Yes      | 1.0.0         |
+| `s3-role-arn`                       | `gravitino.iceberg-rest.s3-role-arn`   
            | The ARN of the IAM role to assume. Required for fine-grained 
path-based access control.                   | (none)        | Yes*     | 1.0.0 
        |
+| `s3-region`                         | `gravitino.iceberg-rest.s3-region`     
            | The AWS region for STS operations. Used for fine-grained access 
control.                                  | (none)        | No       | 1.0.0    
     |
+| `s3-token-expire-in-secs`           | 
`gravitino.iceberg-rest.s3-token-expire-in-secs`   | Token expiration time in 
seconds for fine-grained credentials. Cannot exceed role's max session 
duration. | 3600          | No       | 1.0.0         |
+| `s3-token-service-endpoint`         | 
`gravitino.iceberg-rest.s3-token-service-endpoint` | Alternative STS endpoint 
for fine-grained credential generation. Useful for S3-compatible services.      
 | (none)        | No       | 1.0.0         |
+
+**Note**: `s3-role-arn` is required only when using fine-grained path-based 
access control with vended credentials. For basic IRSA usage without path 
restrictions, only `credential-providers=aws-irsa` is needed.
+
+**Prerequisites for fine-grained mode:**
+- EKS cluster with IRSA properly configured
+- `AWS_WEB_IDENTITY_TOKEN_FILE` environment variable pointing to the service 
account token
+- IAM role with permissions to assume the target role specified in 
`s3-role-arn`
+- Target IAM role with necessary S3 permissions for the data locations
+
 #### S3 secret key credential
 
 A credential with static S3 access key id and secret access key.


Reply via email to