This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bc396e1b7b4 [feat](fs-v2): add S3 IAM Role support  (#51229)
bc396e1b7b4 is described below

commit bc396e1b7b40b5e1741ecb53619a7a4575e9b618
Author: Calvin Kirs <[email protected]>
AuthorDate: Fri May 30 10:10:21 2025 +0800

    [feat](fs-v2): add S3 IAM Role support  (#51229)
    
    ### What problem does this PR solve?
    https://github.com/apache/doris/issues/50238
    
    Prefer AK/SK when both AK/SK and IAM Role are provided; if only IAM Role
    is set, use role-based credentials. AK/SK no longer required when IAM
    Role is configured
    
    Replaced listBuckets with headBucket for connectivity check as it's more
    standard, widely supported, and compatible across S3-like systems.
---
 .../java/org/apache/doris/analysis/LoadStmt.java   |  15 +-
 .../java/org/apache/doris/common/util/S3Util.java  |  35 +++
 .../storage/AbstractS3CompatibleProperties.java    |  17 ++
 .../datasource/property/storage/S3Properties.java  |  68 ++++-
 .../java/org/apache/doris/fsv2/obj/ObjStorage.java |   4 +
 .../org/apache/doris/fsv2/obj/S3ObjStorage.java    | 332 ++++++++++++++-------
 .../apache/doris/fsv2/remote/RemoteFileSystem.java |   2 +-
 .../org/apache/doris/fsv2/remote/S3FileSystem.java |  23 +-
 .../property/storage/S3PropertiesTest.java         |  73 +++++
 ...t_domain_connection_and_ak_sk_correction.groovy |  36 +--
 10 files changed, 448 insertions(+), 157 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index a119af5345d..02b014fd26d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -607,8 +607,21 @@ public class LoadStmt extends DdlStmt implements 
NotFallbackInParser {
             String endpoint = storageProperties.getEndpoint();
             checkEndpoint(endpoint);
             checkWhiteList(endpoint);
+            List<String> filePaths = new ArrayList<>();
+            if (dataDescriptions != null && !dataDescriptions.isEmpty()) {
+                for (DataDescription dataDescription : dataDescriptions) {
+                    if (dataDescription.getFilePaths() != null) {
+                        for (String filePath : dataDescription.getFilePaths()) 
{
+                            if (filePath != null && !filePath.isEmpty()) {
+                                filePaths.add(filePath);
+                            }
+                        }
+                    }
+                }
+            }
             //should add connectivity test
-            boolean connectivityTest = 
FileSystemFactory.get(brokerDesc.getStorageProperties()).connectivityTest();
+            boolean connectivityTest = 
FileSystemFactory.get(brokerDesc.getStorageProperties())
+                    .connectivityTest(filePaths);
             if (!connectivityTest) {
                 throw new UserException("Failed to access object storage, 
message=connectivity test failed");
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index e204fab2e22..c147fce6801 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -151,6 +151,41 @@ public class S3Util {
                     InstanceProfileCredentialsProvider.create());
     }
 
+    public static S3Client buildS3Client(URI endpoint, String region, boolean 
isUsePathStyle,
+                                         AwsCredentialsProvider credential) {
+        EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
+                .builder()
+                .baseDelay(Duration.ofSeconds(1))
+                .maxBackoffTime(Duration.ofMinutes(1))
+                .build();
+        // retry 3 time with Equal backoff
+        RetryPolicy retryPolicy = RetryPolicy
+                .builder()
+                .numRetries(3)
+                .backoffStrategy(backoffStrategy)
+                .build();
+        ClientOverrideConfiguration clientConf = ClientOverrideConfiguration
+                .builder()
+                // set retry policy
+                .retryPolicy(retryPolicy)
+                // using AwsS3V4Signer
+                .putAdvancedOption(SdkAdvancedClientOption.SIGNER, 
AwsS3V4Signer.create())
+                .build();
+        return S3Client.builder()
+                
.httpClient(UrlConnectionHttpClient.builder().socketTimeout(Duration.ofSeconds(30))
+                        .connectionTimeout(Duration.ofSeconds(30)).build())
+                .endpointOverride(endpoint)
+                .credentialsProvider(credential)
+                .region(Region.of(region))
+                .overrideConfiguration(clientConf)
+                // disable chunkedEncoding because of bos not supported
+                .serviceConfiguration(S3Configuration.builder()
+                        .chunkedEncodingEnabled(false)
+                        .pathStyleAccessEnabled(isUsePathStyle)
+                        .build())
+                .build();
+    }
+
     public static S3Client buildS3Client(URI endpoint, String region, boolean 
isUsePathStyle, String accessKey,
             String secretKey, String sessionToken, String roleArn, String 
externalId) {
         EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
index 3c0422954e5..0d252b2043d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
@@ -20,9 +20,14 @@ package org.apache.doris.datasource.property.storage;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.property.ConnectorProperty;
 
+import com.google.common.base.Strings;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -152,6 +157,18 @@ public abstract class AbstractS3CompatibleProperties 
extends StorageProperties i
         return generateBackendS3Configuration();
     }
 
+    public AwsCredentialsProvider getAwsCredentialsProvider() {
+        if (StringUtils.isNotBlank(getAccessKey()) && 
StringUtils.isNotBlank(getSecretKey())) {
+            if (Strings.isNullOrEmpty(sessionToken)) {
+                return 
StaticCredentialsProvider.create(AwsBasicCredentials.create(getAccessKey(), 
getSecretKey()));
+            } else {
+                return 
StaticCredentialsProvider.create(AwsSessionCredentials.create(getAccessKey(), 
getSecretKey(),
+                        sessionToken));
+            }
+        }
+        return null;
+    }
+
 
     @Override
     protected void initNormalizeAndCheckProps() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index 343b4b29d39..4305a04ebc8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -18,11 +18,22 @@
 package org.apache.doris.datasource.property.storage;
 
 import org.apache.doris.datasource.property.ConnectorProperty;
+import 
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
+import 
software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import 
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import 
software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import 
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.services.sts.StsClient;
+import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
 
 import java.lang.reflect.Field;
 import java.util.List;
@@ -51,11 +62,13 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
 
     @Getter
     @ConnectorProperty(names = {"s3.access_key", "AWS_ACCESS_KEY", 
"access_key", "ACCESS_KEY"},
+            required = false,
             description = "The access key of S3.")
     protected String accessKey = "";
 
     @Getter
     @ConnectorProperty(names = {"s3.secret_key", "AWS_SECRET_KEY", 
"secret_key", "SECRET_KEY"},
+            required = false,
             description = "The secret key of S3.")
     protected String secretKey = "";
 
@@ -90,14 +103,12 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
             description = "The sts region of S3.")
     protected String s3StsRegion = "";
 
-    @ConnectorProperty(names = {"s3.iam_role"},
-            supported = false,
+    @ConnectorProperty(names = {"s3.role_arn", "AWS_ROLE_ARN"},
             required = false,
             description = "The iam role of S3.")
     protected String s3IAMRole = "";
 
-    @ConnectorProperty(names = {"s3.external_id"},
-            supported = false,
+    @ConnectorProperty(names = {"s3.external_id", "AWS_EXTERNAL_ID"},
             required = false,
             description = "The external id of S3.")
     protected String s3ExternalId = "";
@@ -122,6 +133,18 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
         super(Type.S3, origProps);
     }
 
+    @Override
+    protected void initNormalizeAndCheckProps() {
+        super.initNormalizeAndCheckProps();
+        if (StringUtils.isNotBlank(accessKey) && 
StringUtils.isNotBlank(secretKey)) {
+            return;
+        }
+        if (StringUtils.isNotBlank(s3ExternalId) && 
StringUtils.isNotBlank(s3IAMRole)) {
+            return;
+        }
+        throw new StoragePropertiesException("Please set s3.access_key and 
s3.secret_key or s3.iam_role and "
+                + "s3.external_id");
+    }
 
     /**
      * Guess if the storage properties is for this storage type.
@@ -195,7 +218,42 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
 
     @Override
     public Map<String, String> getBackendConfigProperties() {
-        return generateBackendS3Configuration(s3ConnectionMaximum,
+        Map<String, String> backendProperties = 
generateBackendS3Configuration(s3ConnectionMaximum,
                 s3ConnectionRequestTimeoutS, s3ConnectionTimeoutS, 
String.valueOf(usePathStyle));
+
+        if (StringUtils.isNotBlank(s3ExternalId)
+                && StringUtils.isNotBlank(s3IAMRole)) {
+            backendProperties.put("AWS_ROLE_ARN", s3IAMRole);
+            backendProperties.put("AWS_EXTERNAL_ID", s3ExternalId);
+        }
+        return backendProperties;
+    }
+
+    @Override
+    public AwsCredentialsProvider getAwsCredentialsProvider() {
+        AwsCredentialsProvider credentialsProvider = 
super.getAwsCredentialsProvider();
+        if (credentialsProvider != null) {
+            return credentialsProvider;
+        }
+        if (StringUtils.isNotBlank(s3IAMRole)) {
+            StsClient stsClient = StsClient.builder()
+                    
.credentialsProvider(InstanceProfileCredentialsProvider.create())
+                    .build();
+
+            return StsAssumeRoleCredentialsProvider.builder()
+                    .stsClient(stsClient)
+                    .refreshRequest(builder -> {
+                        
builder.roleArn(s3IAMRole).roleSessionName("aws-sdk-java-v2-fe");
+                        if (!Strings.isNullOrEmpty(s3ExternalId)) {
+                            builder.externalId(s3ExternalId);
+                        }
+                    }).build();
+        }
+        return 
AwsCredentialsProviderChain.of(SystemPropertyCredentialsProvider.create(),
+                EnvironmentVariableCredentialsProvider.create(),
+                WebIdentityTokenFileCredentialsProvider.create(),
+                ProfileCredentialsProvider.create(),
+                InstanceProfileCredentialsProvider.create());
     }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/ObjStorage.java
index d3033bdec23..f45e4b1eebb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/ObjStorage.java
@@ -32,6 +32,10 @@ import java.io.InputStream;
  * @param <C> cloud SDK Client
  */
 public interface ObjStorage<C> {
+
+    // CHUNK_SIZE for multi part upload
+    int CHUNK_SIZE = 5 * 1024 * 1024;
+
     C getClient() throws UserException;
 
     Triple<String, String, String> getStsToken() throws DdlException;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java
index 76c12a85834..761fad75734 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java
@@ -20,7 +20,6 @@ package org.apache.doris.fsv2.obj;
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.common.credentials.CloudCredential;
 import org.apache.doris.common.util.S3URI;
 import org.apache.doris.common.util.S3Util;
 import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
@@ -34,8 +33,14 @@ import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.Nullable;
 import software.amazon.awssdk.core.sync.RequestBody;
 import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
 import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
 import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
 import software.amazon.awssdk.services.s3.model.Delete;
 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
@@ -52,7 +57,10 @@ import 
software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.InputStream;
 import java.net.URI;
@@ -60,6 +68,7 @@ import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -101,103 +110,12 @@ public class S3ObjStorage implements 
ObjStorage<S3Client> {
                 endpointStr = "http://"; + endpointStr;
             }
             URI endpoint = URI.create(endpointStr);
-            CloudCredential credential = new CloudCredential();
-            credential.setAccessKey(s3Properties.getAccessKey());
-            credential.setSecretKey(s3Properties.getSecretKey());
-            if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
-                credential.setSessionToken(s3Properties.getSessionToken());
-            }
-            client = S3Util.buildS3Client(endpoint, s3Properties.getRegion(), 
credential, isUsePathStyle);
+            client = S3Util.buildS3Client(endpoint, s3Properties.getRegion(),
+                    isUsePathStyle, s3Properties.getAwsCredentialsProvider());
         }
         return client;
     }
 
-    public Status globList(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
-        long roundCnt = 0;
-        long elementCnt = 0;
-        long matchCnt = 0;
-        long startTime = System.nanoTime();
-        try {
-            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
-            String bucket = uri.getBucket();
-            String globPath = uri.getKey(); // eg: path/to/*.csv
-            if (LOG.isDebugEnabled()) {
-                LOG.info("globList globPath:{}, remotePath:{}", globPath, 
remotePath);
-            }
-
-            java.nio.file.Path pathPattern = Paths.get(globPath);
-            PathMatcher matcher = 
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
-            HashSet<String> directorySet = new HashSet<>();
-            String listPrefix = S3Util.getLongestPrefix(globPath); // similar 
to Azure
-            if (LOG.isDebugEnabled()) {
-                LOG.info("globList listPrefix: {}", listPrefix);
-            }
-            ListObjectsV2Request request = ListObjectsV2Request.builder()
-                    .bucket(bucket)
-                    .prefix(listPrefix)
-                    .build();
-            boolean isTruncated;
-            do {
-                roundCnt++;
-                ListObjectsV2Response response = 
getClient().listObjectsV2(request);
-                for (S3Object obj : response.contents()) {
-                    elementCnt++;
-                    java.nio.file.Path objPath = Paths.get(obj.key());
-
-                    boolean isPrefix = false;
-                    while (objPath != null && 
objPath.normalize().toString().startsWith(listPrefix)) {
-                        if (!matcher.matches(objPath)) {
-                            isPrefix = true;
-                            objPath = objPath.getParent();
-                            continue;
-                        }
-                        if 
(directorySet.contains(objPath.normalize().toString())) {
-                            break;
-                        }
-                        if (isPrefix) {
-                            directorySet.add(objPath.normalize().toString());
-                        }
-
-                        matchCnt++;
-                        RemoteFile remoteFile = new RemoteFile(
-                                fileNameOnly ? 
objPath.getFileName().toString() :
-                                        "s3://" + bucket + "/" + objPath,
-                                !isPrefix,
-                                isPrefix ? -1 : obj.size(),
-                                isPrefix ? -1 : obj.size(),
-                                isPrefix ? 0 : 
obj.lastModified().toEpochMilli()
-                        );
-                        result.add(remoteFile);
-                        objPath = objPath.getParent();
-                        isPrefix = true;
-                    }
-                }
-
-                isTruncated = response.isTruncated();
-                if (isTruncated) {
-                    request = request.toBuilder()
-                            
.continuationToken(response.nextContinuationToken())
-                            .build();
-                }
-            } while (isTruncated);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("remotePath:{}, result:{}", remotePath, result);
-            }
-            return Status.OK;
-        } catch (Exception e) {
-            LOG.warn("Errors while getting file status", e);
-            return new Status(Status.ErrCode.COMMON_ERROR, "Errors while 
getting file status " + e.getMessage());
-        } finally {
-            long endTime = System.nanoTime();
-            long duration = endTime - startTime;
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("process {} elements under prefix {} for {} round, 
match {} elements, take {} ms",
-                        elementCnt, remotePath, roundCnt, matchCnt,
-                        duration / 1000);
-            }
-        }
-    }
 
     @Override
     public Triple<String, String, String> getStsToken() throws DdlException {
@@ -207,11 +125,12 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
     @Override
     public Status headObject(String remotePath) {
         try {
-            remotePath = s3Properties.validateAndNormalizeUri(remotePath);
             S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
             HeadObjectResponse response = getClient()
                     
.headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
-            LOG.info("head file {} success: {}", remotePath, response);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("headObject success: {}, response: {}", remotePath, 
response);
+            }
             return Status.OK;
         } catch (S3Exception e) {
             if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
@@ -229,11 +148,12 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
     @Override
     public Status getObject(String remoteFilePath, File localFile) {
         try {
-            remoteFilePath = 
s3Properties.validateAndNormalizeUri(remoteFilePath);
             S3URI uri = S3URI.create(remoteFilePath, isUsePathStyle, 
forceParsingByStandardUri);
             GetObjectResponse response = getClient().getObject(
                     
GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), 
localFile.toPath());
-            LOG.info("get file {} success: {}", remoteFilePath, response);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("get file {} success: {}", remoteFilePath, response);
+            }
             return Status.OK;
         } catch (S3Exception s3Exception) {
             return new Status(
@@ -250,7 +170,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
     @Override
     public Status putObject(String remotePath, @Nullable InputStream content, 
long contentLength) {
         try {
-            remotePath = s3Properties.validateAndNormalizeUri(remotePath);
             S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
             RequestBody body = RequestBody.fromInputStream(content, 
contentLength);
             PutObjectResponse response =
@@ -258,10 +177,12 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
                             .putObject(
                                     
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
                                     body);
-            LOG.info("put object success: {}", response.toString());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("put object success: {}", response);
+            }
             return Status.OK;
         } catch (S3Exception e) {
-            LOG.warn("put object failed:", e);
+            LOG.warn("put object failed: ", e);
             return new Status(Status.ErrCode.COMMON_ERROR, "put object failed: 
" + e.getMessage());
         } catch (Exception ue) {
             LOG.warn("connect to s3 failed: ", ue);
@@ -272,13 +193,14 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
     @Override
     public Status deleteObject(String remotePath) {
         try {
-            remotePath = s3Properties.validateAndNormalizeUri(remotePath);
             S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
             DeleteObjectResponse response =
                     getClient()
                             .deleteObject(
                                     
DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
-            LOG.info("delete file " + remotePath + " success: " + 
response.toString());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("delete file {} success: {}", remotePath, response);
+            }
             return Status.OK;
         } catch (S3Exception e) {
             LOG.warn("delete file failed: ", e);
@@ -295,7 +217,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
     @Override
     public Status deleteObjects(String absolutePath) {
         try {
-            absolutePath = s3Properties.validateAndNormalizeUri(absolutePath);
             S3URI baseUri = S3URI.create(absolutePath, isUsePathStyle, 
forceParsingByStandardUri);
             String continuationToken = "";
             boolean isTruncated = false;
@@ -316,21 +237,26 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
                             .build();
 
                     DeleteObjectsResponse resp = 
getClient().deleteObjects(req);
-                    if (resp.errors().size() > 0) {
+                    if (!resp.errors().isEmpty()) {
                         LOG.warn("{} errors returned while deleting {} objects 
for dir {}",
                                 resp.errors().size(), objectList.size(), 
absolutePath);
                     }
-                    LOG.info("{} of {} objects deleted for dir {}",
-                            resp.deleted().size(), objectList.size(), 
absolutePath);
-                    totalObjects += objectList.size();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} of {} objects deleted for dir {}",
+                                resp.deleted().size(), objectList.size(), 
absolutePath);
+                        totalObjects += objectList.size();
+                    }
                 }
 
                 isTruncated = objects.isTruncated();
                 continuationToken = objects.getContinuationToken();
             } while (isTruncated);
-            LOG.info("total delete {} objects for dir {}", totalObjects, 
absolutePath);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("total delete {} objects for dir {}", totalObjects, 
absolutePath);
+            }
             return Status.OK;
         } catch (DdlException e) {
+            LOG.warn("deleteObjects:", e);
             return new Status(Status.ErrCode.COMMON_ERROR, "list objects for 
delete objects failed: " + e.getMessage());
         } catch (Exception e) {
             LOG.warn(String.format("delete objects %s failed", absolutePath), 
e);
@@ -341,8 +267,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
     @Override
     public Status copyObject(String origFilePath, String destFilePath) {
         try {
-            origFilePath = s3Properties.validateAndNormalizeUri(origFilePath);
-            destFilePath = s3Properties.validateAndNormalizeUri(destFilePath);
             S3URI origUri = S3URI.create(origFilePath, isUsePathStyle, 
forceParsingByStandardUri);
             S3URI descUri = S3URI.create(destFilePath, isUsePathStyle, 
forceParsingByStandardUri);
             CopyObjectResponse response = getClient()
@@ -352,13 +276,15 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
                                     .destinationBucket(descUri.getBucket())
                                     .destinationKey(descUri.getKey())
                                     .build());
-            LOG.info("copy file from " + origFilePath + " to " + destFilePath 
+ " success: " + response.toString());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("copy file from {} to {} success: {} ", 
origFilePath, destFilePath, response);
+            }
             return Status.OK;
         } catch (S3Exception e) {
-            LOG.error("copy file failed: ", e);
+            LOG.warn("copy file failed: ", e);
             return new Status(Status.ErrCode.COMMON_ERROR, "copy file failed: 
" + e.getMessage());
         } catch (UserException ue) {
-            LOG.error("copy to s3 failed: ", ue);
+            LOG.warn("copy to s3 failed: ", ue);
             return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 
failed: " + ue.getMessage());
         }
     }
@@ -366,7 +292,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
     @Override
     public RemoteObjects listObjects(String absolutePath, String 
continuationToken) throws DdlException {
         try {
-            absolutePath = s3Properties.validateAndNormalizeUri(absolutePath);
             S3URI uri = S3URI.create(absolutePath, isUsePathStyle, 
forceParsingByStandardUri);
             String bucket = uri.getBucket();
             String prefix = uri.getKey();
@@ -388,4 +313,181 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
             throw new DdlException("Failed to list objects for S3, Error 
message: " + e.getMessage(), e);
         }
     }
+
+    public Status multipartUpload(String remotePath, @Nullable InputStream 
inputStream, long totalBytes) {
+        Status st = Status.OK;
+        long uploadedBytes = 0;
+        int bytesRead = 0;
+        byte[] buffer = new byte[CHUNK_SIZE];
+        int partNumber = 1;
+
+        String uploadId = null;
+        S3URI uri = null;
+        Map<Integer, String> etags = new HashMap<>();
+
+        try {
+            uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            CreateMultipartUploadRequest createMultipartUploadRequest = 
CreateMultipartUploadRequest.builder()
+                    .bucket(uri.getBucket())
+                    .key(uri.getKey())
+                    .build();
+            CreateMultipartUploadResponse createMultipartUploadResponse = 
getClient()
+                    .createMultipartUpload(createMultipartUploadRequest);
+
+            uploadId = createMultipartUploadResponse.uploadId();
+
+            while (uploadedBytes < totalBytes && (bytesRead = 
inputStream.read(buffer)) != -1) {
+                uploadedBytes += bytesRead;
+                UploadPartRequest uploadPartRequest = 
UploadPartRequest.builder()
+                        .bucket(uri.getBucket())
+                        .key(uri.getKey())
+                        .uploadId(uploadId)
+                        .partNumber(partNumber).build();
+                RequestBody body = RequestBody
+                        .fromInputStream(new ByteArrayInputStream(buffer, 0, 
bytesRead), bytesRead);
+                UploadPartResponse uploadPartResponse = 
getClient().uploadPart(uploadPartRequest, body);
+
+                etags.put(partNumber, uploadPartResponse.eTag());
+                partNumber++;
+                uploadedBytes += bytesRead;
+            }
+
+            List<CompletedPart> completedParts = etags.entrySet().stream()
+                    .map(entry -> CompletedPart.builder()
+                            .partNumber(entry.getKey())
+                            .eTag(entry.getValue())
+                            .build())
+                    .collect(Collectors.toList());
+            CompletedMultipartUpload completedMultipartUpload = 
CompletedMultipartUpload.builder()
+                    .parts(completedParts)
+                    .build();
+
+            CompleteMultipartUploadRequest completeMultipartUploadRequest = 
CompleteMultipartUploadRequest.builder()
+                    .bucket(uri.getBucket())
+                    .key(uri.getKey())
+                    .uploadId(uploadId)
+                    .multipartUpload(completedMultipartUpload)
+                    .build();
+
+            
getClient().completeMultipartUpload(completeMultipartUploadRequest);
+        } catch (Exception e) {
+            LOG.warn("remotePath:{}, ", remotePath, e);
+            st = new Status(Status.ErrCode.COMMON_ERROR, "Failed to 
multipartUpload " + remotePath
+                    + " reason: " + e.getMessage());
+
+            if (uri != null && uploadId != null) {
+                try {
+                    AbortMultipartUploadRequest abortMultipartUploadRequest = 
AbortMultipartUploadRequest.builder()
+                            .bucket(uri.getBucket())
+                            .key(uri.getKey())
+                            .uploadId(uploadId)
+                            .build();
+                    
getClient().abortMultipartUpload(abortMultipartUploadRequest);
+                } catch (Exception e1) {
+                    LOG.warn("Failed to abort multipartUpload {}", remotePath, 
e1);
+                }
+            }
+        }
+        return st;
+    }
+
+    ListObjectsV2Response listObjectsV2(ListObjectsV2Request request) throws 
UserException {
+        return getClient().listObjectsV2(request);
+    }
+
+    /**
+     * List all files under the given path with glob pattern.
+     * For example, if the path is "s3://bucket/path/to/*.csv",
+     * it will list all files under "s3://bucket/path/to/" with ".csv" suffix.
+     * <p>
+     * Copy from `AzureObjStorage.GlobList`
+     */
+    public Status globList(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
+        long roundCnt = 0;
+        long elementCnt = 0;
+        long matchCnt = 0;
+        long startTime = System.nanoTime();
+        try {
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            String bucket = uri.getBucket();
+            String globPath = uri.getKey(); // eg: path/to/*.csv
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("globList globPath:{}, remotePath:{}", globPath, 
remotePath);
+            }
+            java.nio.file.Path pathPattern = Paths.get(globPath);
+            PathMatcher matcher = 
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
+            HashSet<String> directorySet = new HashSet<>();
+
+            String listPrefix = S3Util.getLongestPrefix(globPath); // similar 
to Azure
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("globList listPrefix: {}", listPrefix);
+            }
+            ListObjectsV2Request request = ListObjectsV2Request.builder()
+                    .bucket(bucket)
+                    .prefix(listPrefix)
+                    .build();
+
+            boolean isTruncated = false;
+            do {
+                roundCnt++;
+                ListObjectsV2Response response = listObjectsV2(request);
+                for (S3Object obj : response.contents()) {
+                    elementCnt++;
+                    java.nio.file.Path objPath = Paths.get(obj.key());
+
+                    boolean isPrefix = false;
+                    while (objPath != null && 
objPath.normalize().toString().startsWith(listPrefix)) {
+                        if (!matcher.matches(objPath)) {
+                            isPrefix = true;
+                            objPath = objPath.getParent();
+                            continue;
+                        }
+                        if 
(directorySet.contains(objPath.normalize().toString())) {
+                            break;
+                        }
+                        if (isPrefix) {
+                            directorySet.add(objPath.normalize().toString());
+                        }
+
+                        matchCnt++;
+                        RemoteFile remoteFile = new RemoteFile(
+                                fileNameOnly ? 
objPath.getFileName().toString() :
+                                        "s3://" + bucket + "/" + 
objPath.toString(),
+                                !isPrefix,
+                                isPrefix ? -1 : obj.size(),
+                                isPrefix ? -1 : obj.size(),
+                                isPrefix ? 0 : 
obj.lastModified().toEpochMilli()
+                        );
+                        result.add(remoteFile);
+                        objPath = objPath.getParent();
+                        isPrefix = true;
+                    }
+                }
+
+                isTruncated = response.isTruncated();
+                if (isTruncated) {
+                    request = request.toBuilder()
+                            
.continuationToken(response.nextContinuationToken())
+                            .build();
+                }
+            } while (isTruncated);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("remotePath:{}, result:{}", remotePath, result);
+            }
+            return Status.OK;
+        } catch (Exception e) {
+            LOG.warn("Errors while getting file status", e);
+            return new Status(Status.ErrCode.COMMON_ERROR, "Errors while 
getting file status " + e.getMessage());
+        } finally {
+            long endTime = System.nanoTime();
+            long duration = endTime - startTime;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("process {} elements under prefix {} for {} round, 
match {} elements, take {} ms",
+                        elementCnt, remotePath, roundCnt, matchCnt,
+                        duration / 1000 / 1000);
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/RemoteFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/RemoteFileSystem.java
index c7877d9ed8d..6e50d476d8a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/RemoteFileSystem.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/RemoteFileSystem.java
@@ -135,7 +135,7 @@ public abstract class RemoteFileSystem extends 
PersistentFileSystem implements C
         }
     }
 
-    public  boolean connectivityTest() throws UserException {
+    public  boolean connectivityTest(List<String> filePaths) throws 
UserException {
         return true;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/S3FileSystem.java
index de0d89b5cfc..b63f0effa58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/S3FileSystem.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.security.authentication.HadoopAuthenticator;
+import org.apache.doris.common.util.S3URI;
 import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
 import org.apache.doris.fsv2.obj.S3ObjStorage;
 
@@ -28,8 +29,11 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import software.amazon.awssdk.services.s3.S3Client;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class S3FileSystem extends ObjFileSystem {
 
@@ -72,16 +76,27 @@ public class S3FileSystem extends ObjFileSystem {
     }
 
     @Override
-    public boolean connectivityTest() throws UserException {
+    public boolean connectivityTest(List<String> filePaths) throws 
UserException {
+        if (filePaths == null || filePaths.isEmpty()) {
+            throw new UserException("File paths cannot be null or empty for 
connectivity test.");
+        }
         S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
         try {
-            objStorage.getClient().listBuckets();
+            S3Client s3Client = objStorage.getClient();
+            Set<String> bucketNames = new HashSet<>();
+            boolean usePathStyle = 
Boolean.parseBoolean(s3Properties.getUsePathStyle());
+            boolean forceParsingByStandardUri = 
Boolean.parseBoolean(s3Properties.getForceParsingByStandardUrl());
+            for (String filePath : filePaths) {
+                S3URI s3uri;
+                s3uri = S3URI.create(filePath, usePathStyle, 
forceParsingByStandardUri);
+                bucketNames.add(s3uri.getBucket());
+            }
+            bucketNames.forEach(bucketName -> s3Client.headBucket(b -> 
b.bucket(bucketName)));
             return true;
         } catch (Exception e) {
-            LOG.error("S3 connectivityTest error", e);
+            LOG.warn("S3 connectivityTest error: {}", e.getMessage(), e);
         }
         return false;
-
     }
 
     @VisibleForTesting
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
index 9c397c358aa..3afdfee5d0a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
@@ -20,9 +20,17 @@ package org.apache.doris.datasource.property.storage;
 import org.apache.doris.common.UserException;
 import 
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
 
+import mockit.Expectations;
+import mockit.Mocked;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import 
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -34,6 +42,9 @@ public class S3PropertiesTest {
     private static String accessKey = "";
     private static String hdfsPath = "";
 
+    @Mocked
+    StsClient mockStsClient;
+
     @BeforeEach
     public void setUp() {
         origProps = new HashMap<>();
@@ -178,4 +189,66 @@ public class S3PropertiesTest {
         //not support
         Assertions.assertThrowsExactly(StoragePropertiesException.class, () -> 
StorageProperties.createPrimary(s3EndpointProps), "Property cos.endpoint is 
required.");
     }
+
+    @Test
+    public void testS3IamRoleWithExternalId() throws UserException {
+        origProps.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
+        origProps.put("s3.role_arn", 
"arn:aws:iam::123456789012:role/MyTestRole");
+        origProps.put("s3.external_id", "external-123");
+
+        S3Properties s3Props = (S3Properties) 
StorageProperties.createPrimary(origProps);
+        Map<String, String> backendProperties = 
s3Props.getBackendConfigProperties();
+
+        Assertions.assertEquals("arn:aws:iam::123456789012:role/MyTestRole", 
backendProperties.get("AWS_ROLE_ARN"));
+        Assertions.assertEquals("external-123", 
backendProperties.get("AWS_EXTERNAL_ID"));
+    }
+
+    @Test
+    public void testGetAwsCredentialsProviderWithIamRoleAndExternalId(@Mocked 
StsClientBuilder mockBuilder, @Mocked StsClient mockStsClient, @Mocked 
InstanceProfileCredentialsProvider mockInstanceCreds) {
+
+        new Expectations() {
+            {
+                StsClient.builder();
+                result = mockBuilder;
+                mockBuilder.credentialsProvider((AwsCredentialsProvider) any);
+                result = mockBuilder;
+                mockBuilder.build();
+                result = mockStsClient;
+                InstanceProfileCredentialsProvider.create();
+                result = mockInstanceCreds;
+            }
+        };
+
+        origProps.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
+        origProps.put("s3.role_arn", 
"arn:aws:iam::123456789012:role/MyTestRole");
+        origProps.put("s3.external_id", "external-123");
+        origProps.put("s3.region", "us-west-2");
+        S3Properties s3Props = (S3Properties) 
StorageProperties.createPrimary(origProps);
+        AwsCredentialsProvider provider = s3Props.getAwsCredentialsProvider();
+        Assertions.assertNotNull(provider);
+        Assertions.assertTrue(provider instanceof 
StsAssumeRoleCredentialsProvider);
+    }
+
+    @Test
+    public void testGetAwsCredentialsProviderWithAccessKeyAndSecretKey() 
throws UserException {
+        origProps.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
+        origProps.put("s3.access_key", "myAccessKey");
+        origProps.put("s3.secret_key", "mySecretKey");
+        origProps.put("s3.region", "us-west-2");
+        S3Properties s3Props = (S3Properties) 
StorageProperties.createPrimary(origProps);
+        AwsCredentialsProvider provider = s3Props.getAwsCredentialsProvider();
+        Assertions.assertNotNull(provider);
+        Assertions.assertTrue(provider instanceof StaticCredentialsProvider);
+        origProps.put("s3.session_token", "mySessionToken");
+        s3Props = (S3Properties) StorageProperties.createPrimary(origProps);
+        provider = s3Props.getAwsCredentialsProvider();
+        Assertions.assertNotNull(provider);
+        Assertions.assertTrue(provider instanceof StaticCredentialsProvider);
+        origProps.put("s3.role_arn", 
"arn:aws:iam::123456789012:role/MyTestRole");
+        origProps.put("s3.external_id", "external-123");
+        s3Props = (S3Properties) StorageProperties.createPrimary(origProps);
+        provider = s3Props.getAwsCredentialsProvider();
+        Assertions.assertNotNull(provider);
+        Assertions.assertTrue(provider instanceof StaticCredentialsProvider);
+    }
 }
diff --git 
a/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
 
b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
index c88b6c8fcf3..76ffbf47221 100644
--- 
a/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
+++ 
b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
@@ -132,7 +132,7 @@ suite("test_domain_connection_and_ak_sk_correction",  
"load_p0") {
     }
 
     label = UUID.randomUUID().toString().replace("-", "")
-
+    try {
         result = sql """
             LOAD LABEL ${label}
             (
@@ -155,36 +155,10 @@ suite("test_domain_connection_and_ak_sk_correction",  
"load_p0") {
             );
         """
         logger.info("the fourth sql result is {}", result)
-    int totalWaitTime = 0
-    int pollInterval = 5
-    int timeout = 120
-    while (totalWaitTime < timeout) {
-        def loadResult = sql """
-        SHOW LOAD WHERE label="${label}"
-    """
-
-        if (loadResult == null || loadResult.isEmpty()) {
-            return false
-        } else if (loadResult.get(0).get(2) in ['CANCELLED', 'FAILED']) {
-            break 
-        } else if (loadResult.get(0).get(2) == 'FINISHED') {
-            throw new RuntimeException("load success, but the first bucket is 
wrong, so the sql should fail")
-        } else {
-            println("load status is ${loadResult.get(0).get(2)}")
-            Thread.sleep(pollInterval * 1000L)
-            totalWaitTime += pollInterval
-        }
-
-       
-    }
-
-    if (totalWaitTime >= timeout) {
-        def queryLoadResult = sql """
-        SHOW LOAD WHERE label="${label}"
-        """
-        if (queryLoadResult != null && queryLoadResult.get(0).get(2) == 
'FINISHED') {
-            throw new RuntimeException("load success, but the first bucket is 
wrong, so the sql should fail")
-        }
+        assertTrue(false. "in the second DATA INFILE, the first bucket is 
wrong, so the sql should fail")
+    } catch (Exception e) {
+        logger.info("the fourth sql exception result is {}", e.getMessage())
+        assertTrue(e.getMessage().contains("Failed to access object storage, 
message="), e.getMessage())
     }
     sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
     sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE"""


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to