This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bc396e1b7b4 [feat](fs-v2): add S3 IAM Role support (#51229)
bc396e1b7b4 is described below
commit bc396e1b7b40b5e1741ecb53619a7a4575e9b618
Author: Calvin Kirs <[email protected]>
AuthorDate: Fri May 30 10:10:21 2025 +0800
[feat](fs-v2): add S3 IAM Role support (#51229)
### What problem does this PR solve?
https://github.com/apache/doris/issues/50238
Prefer AK/SK when both AK/SK and IAM Role are provided; if only IAM Role
is set, use role-based credentials. AK/SK no longer required when IAM
Role is configured
Replaced listBuckets with headBucket for connectivity check as it's more
standard, widely supported, and compatible across S3-like systems.
---
.../java/org/apache/doris/analysis/LoadStmt.java | 15 +-
.../java/org/apache/doris/common/util/S3Util.java | 35 +++
.../storage/AbstractS3CompatibleProperties.java | 17 ++
.../datasource/property/storage/S3Properties.java | 68 ++++-
.../java/org/apache/doris/fsv2/obj/ObjStorage.java | 4 +
.../org/apache/doris/fsv2/obj/S3ObjStorage.java | 332 ++++++++++++++-------
.../apache/doris/fsv2/remote/RemoteFileSystem.java | 2 +-
.../org/apache/doris/fsv2/remote/S3FileSystem.java | 23 +-
.../property/storage/S3PropertiesTest.java | 73 +++++
...t_domain_connection_and_ak_sk_correction.groovy | 36 +--
10 files changed, 448 insertions(+), 157 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index a119af5345d..02b014fd26d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -607,8 +607,21 @@ public class LoadStmt extends DdlStmt implements
NotFallbackInParser {
String endpoint = storageProperties.getEndpoint();
checkEndpoint(endpoint);
checkWhiteList(endpoint);
+ List<String> filePaths = new ArrayList<>();
+ if (dataDescriptions != null && !dataDescriptions.isEmpty()) {
+ for (DataDescription dataDescription : dataDescriptions) {
+ if (dataDescription.getFilePaths() != null) {
+ for (String filePath : dataDescription.getFilePaths())
{
+ if (filePath != null && !filePath.isEmpty()) {
+ filePaths.add(filePath);
+ }
+ }
+ }
+ }
+ }
//should add connectivity test
- boolean connectivityTest =
FileSystemFactory.get(brokerDesc.getStorageProperties()).connectivityTest();
+ boolean connectivityTest =
FileSystemFactory.get(brokerDesc.getStorageProperties())
+ .connectivityTest(filePaths);
if (!connectivityTest) {
throw new UserException("Failed to access object storage,
message=connectivity test failed");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index e204fab2e22..c147fce6801 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -151,6 +151,41 @@ public class S3Util {
InstanceProfileCredentialsProvider.create());
}
+ public static S3Client buildS3Client(URI endpoint, String region, boolean
isUsePathStyle,
+ AwsCredentialsProvider credential) {
+ EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
+ .builder()
+ .baseDelay(Duration.ofSeconds(1))
+ .maxBackoffTime(Duration.ofMinutes(1))
+ .build();
+ // retry 3 time with Equal backoff
+ RetryPolicy retryPolicy = RetryPolicy
+ .builder()
+ .numRetries(3)
+ .backoffStrategy(backoffStrategy)
+ .build();
+ ClientOverrideConfiguration clientConf = ClientOverrideConfiguration
+ .builder()
+ // set retry policy
+ .retryPolicy(retryPolicy)
+ // using AwsS3V4Signer
+ .putAdvancedOption(SdkAdvancedClientOption.SIGNER,
AwsS3V4Signer.create())
+ .build();
+ return S3Client.builder()
+
.httpClient(UrlConnectionHttpClient.builder().socketTimeout(Duration.ofSeconds(30))
+ .connectionTimeout(Duration.ofSeconds(30)).build())
+ .endpointOverride(endpoint)
+ .credentialsProvider(credential)
+ .region(Region.of(region))
+ .overrideConfiguration(clientConf)
+ // disable chunkedEncoding because of bos not supported
+ .serviceConfiguration(S3Configuration.builder()
+ .chunkedEncodingEnabled(false)
+ .pathStyleAccessEnabled(isUsePathStyle)
+ .build())
+ .build();
+ }
+
public static S3Client buildS3Client(URI endpoint, String region, boolean
isUsePathStyle, String accessKey,
String secretKey, String sessionToken, String roleArn, String
externalId) {
EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
index 3c0422954e5..0d252b2043d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
@@ -20,9 +20,14 @@ package org.apache.doris.datasource.property.storage;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.ConnectorProperty;
+import com.google.common.base.Strings;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import java.util.HashMap;
import java.util.Map;
@@ -152,6 +157,18 @@ public abstract class AbstractS3CompatibleProperties
extends StorageProperties i
return generateBackendS3Configuration();
}
+ public AwsCredentialsProvider getAwsCredentialsProvider() {
+ if (StringUtils.isNotBlank(getAccessKey()) &&
StringUtils.isNotBlank(getSecretKey())) {
+ if (Strings.isNullOrEmpty(sessionToken)) {
+ return
StaticCredentialsProvider.create(AwsBasicCredentials.create(getAccessKey(),
getSecretKey()));
+ } else {
+ return
StaticCredentialsProvider.create(AwsSessionCredentials.create(getAccessKey(),
getSecretKey(),
+ sessionToken));
+ }
+ }
+ return null;
+ }
+
@Override
protected void initNormalizeAndCheckProps() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index 343b4b29d39..4305a04ebc8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -18,11 +18,22 @@
package org.apache.doris.datasource.property.storage;
import org.apache.doris.datasource.property.ConnectorProperty;
+import
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
+import
software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import
software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.services.sts.StsClient;
+import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import java.lang.reflect.Field;
import java.util.List;
@@ -51,11 +62,13 @@ public class S3Properties extends
AbstractS3CompatibleProperties {
@Getter
@ConnectorProperty(names = {"s3.access_key", "AWS_ACCESS_KEY",
"access_key", "ACCESS_KEY"},
+ required = false,
description = "The access key of S3.")
protected String accessKey = "";
@Getter
@ConnectorProperty(names = {"s3.secret_key", "AWS_SECRET_KEY",
"secret_key", "SECRET_KEY"},
+ required = false,
description = "The secret key of S3.")
protected String secretKey = "";
@@ -90,14 +103,12 @@ public class S3Properties extends
AbstractS3CompatibleProperties {
description = "The sts region of S3.")
protected String s3StsRegion = "";
- @ConnectorProperty(names = {"s3.iam_role"},
- supported = false,
+ @ConnectorProperty(names = {"s3.role_arn", "AWS_ROLE_ARN"},
required = false,
description = "The iam role of S3.")
protected String s3IAMRole = "";
- @ConnectorProperty(names = {"s3.external_id"},
- supported = false,
+ @ConnectorProperty(names = {"s3.external_id", "AWS_EXTERNAL_ID"},
required = false,
description = "The external id of S3.")
protected String s3ExternalId = "";
@@ -122,6 +133,18 @@ public class S3Properties extends
AbstractS3CompatibleProperties {
super(Type.S3, origProps);
}
+ @Override
+ protected void initNormalizeAndCheckProps() {
+ super.initNormalizeAndCheckProps();
+ if (StringUtils.isNotBlank(accessKey) &&
StringUtils.isNotBlank(secretKey)) {
+ return;
+ }
+ if (StringUtils.isNotBlank(s3ExternalId) &&
StringUtils.isNotBlank(s3IAMRole)) {
+ return;
+ }
+ throw new StoragePropertiesException("Please set s3.access_key and
s3.secret_key or s3.iam_role and "
+ + "s3.external_id");
+ }
/**
* Guess if the storage properties is for this storage type.
@@ -195,7 +218,42 @@ public class S3Properties extends
AbstractS3CompatibleProperties {
@Override
public Map<String, String> getBackendConfigProperties() {
- return generateBackendS3Configuration(s3ConnectionMaximum,
+ Map<String, String> backendProperties =
generateBackendS3Configuration(s3ConnectionMaximum,
s3ConnectionRequestTimeoutS, s3ConnectionTimeoutS,
String.valueOf(usePathStyle));
+
+ if (StringUtils.isNotBlank(s3ExternalId)
+ && StringUtils.isNotBlank(s3IAMRole)) {
+ backendProperties.put("AWS_ROLE_ARN", s3IAMRole);
+ backendProperties.put("AWS_EXTERNAL_ID", s3ExternalId);
+ }
+ return backendProperties;
+ }
+
+ @Override
+ public AwsCredentialsProvider getAwsCredentialsProvider() {
+ AwsCredentialsProvider credentialsProvider =
super.getAwsCredentialsProvider();
+ if (credentialsProvider != null) {
+ return credentialsProvider;
+ }
+ if (StringUtils.isNotBlank(s3IAMRole)) {
+ StsClient stsClient = StsClient.builder()
+
.credentialsProvider(InstanceProfileCredentialsProvider.create())
+ .build();
+
+ return StsAssumeRoleCredentialsProvider.builder()
+ .stsClient(stsClient)
+ .refreshRequest(builder -> {
+
builder.roleArn(s3IAMRole).roleSessionName("aws-sdk-java-v2-fe");
+ if (!Strings.isNullOrEmpty(s3ExternalId)) {
+ builder.externalId(s3ExternalId);
+ }
+ }).build();
+ }
+ return
AwsCredentialsProviderChain.of(SystemPropertyCredentialsProvider.create(),
+ EnvironmentVariableCredentialsProvider.create(),
+ WebIdentityTokenFileCredentialsProvider.create(),
+ ProfileCredentialsProvider.create(),
+ InstanceProfileCredentialsProvider.create());
}
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/ObjStorage.java
index d3033bdec23..f45e4b1eebb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/ObjStorage.java
@@ -32,6 +32,10 @@ import java.io.InputStream;
* @param <C> cloud SDK Client
*/
public interface ObjStorage<C> {
+
+ // CHUNK_SIZE for multi part upload
+ int CHUNK_SIZE = 5 * 1024 * 1024;
+
C getClient() throws UserException;
Triple<String, String, String> getStsToken() throws DdlException;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java
index 76c12a85834..761fad75734 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/obj/S3ObjStorage.java
@@ -20,7 +20,6 @@ package org.apache.doris.fsv2.obj;
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.credentials.CloudCredential;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.common.util.S3Util;
import
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
@@ -34,8 +33,14 @@ import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
@@ -52,7 +57,10 @@ import
software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.net.URI;
@@ -60,6 +68,7 @@ import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -101,103 +110,12 @@ public class S3ObjStorage implements
ObjStorage<S3Client> {
endpointStr = "http://" + endpointStr;
}
URI endpoint = URI.create(endpointStr);
- CloudCredential credential = new CloudCredential();
- credential.setAccessKey(s3Properties.getAccessKey());
- credential.setSecretKey(s3Properties.getSecretKey());
- if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
- credential.setSessionToken(s3Properties.getSessionToken());
- }
- client = S3Util.buildS3Client(endpoint, s3Properties.getRegion(),
credential, isUsePathStyle);
+ client = S3Util.buildS3Client(endpoint, s3Properties.getRegion(),
+ isUsePathStyle, s3Properties.getAwsCredentialsProvider());
}
return client;
}
- public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
- long roundCnt = 0;
- long elementCnt = 0;
- long matchCnt = 0;
- long startTime = System.nanoTime();
- try {
- S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
- String bucket = uri.getBucket();
- String globPath = uri.getKey(); // eg: path/to/*.csv
- if (LOG.isDebugEnabled()) {
- LOG.info("globList globPath:{}, remotePath:{}", globPath,
remotePath);
- }
-
- java.nio.file.Path pathPattern = Paths.get(globPath);
- PathMatcher matcher =
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
- HashSet<String> directorySet = new HashSet<>();
- String listPrefix = S3Util.getLongestPrefix(globPath); // similar
to Azure
- if (LOG.isDebugEnabled()) {
- LOG.info("globList listPrefix: {}", listPrefix);
- }
- ListObjectsV2Request request = ListObjectsV2Request.builder()
- .bucket(bucket)
- .prefix(listPrefix)
- .build();
- boolean isTruncated;
- do {
- roundCnt++;
- ListObjectsV2Response response =
getClient().listObjectsV2(request);
- for (S3Object obj : response.contents()) {
- elementCnt++;
- java.nio.file.Path objPath = Paths.get(obj.key());
-
- boolean isPrefix = false;
- while (objPath != null &&
objPath.normalize().toString().startsWith(listPrefix)) {
- if (!matcher.matches(objPath)) {
- isPrefix = true;
- objPath = objPath.getParent();
- continue;
- }
- if
(directorySet.contains(objPath.normalize().toString())) {
- break;
- }
- if (isPrefix) {
- directorySet.add(objPath.normalize().toString());
- }
-
- matchCnt++;
- RemoteFile remoteFile = new RemoteFile(
- fileNameOnly ?
objPath.getFileName().toString() :
- "s3://" + bucket + "/" + objPath,
- !isPrefix,
- isPrefix ? -1 : obj.size(),
- isPrefix ? -1 : obj.size(),
- isPrefix ? 0 :
obj.lastModified().toEpochMilli()
- );
- result.add(remoteFile);
- objPath = objPath.getParent();
- isPrefix = true;
- }
- }
-
- isTruncated = response.isTruncated();
- if (isTruncated) {
- request = request.toBuilder()
-
.continuationToken(response.nextContinuationToken())
- .build();
- }
- } while (isTruncated);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("remotePath:{}, result:{}", remotePath, result);
- }
- return Status.OK;
- } catch (Exception e) {
- LOG.warn("Errors while getting file status", e);
- return new Status(Status.ErrCode.COMMON_ERROR, "Errors while
getting file status " + e.getMessage());
- } finally {
- long endTime = System.nanoTime();
- long duration = endTime - startTime;
- if (LOG.isDebugEnabled()) {
- LOG.debug("process {} elements under prefix {} for {} round,
match {} elements, take {} ms",
- elementCnt, remotePath, roundCnt, matchCnt,
- duration / 1000);
- }
- }
- }
@Override
public Triple<String, String, String> getStsToken() throws DdlException {
@@ -207,11 +125,12 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
@Override
public Status headObject(String remotePath) {
try {
- remotePath = s3Properties.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
HeadObjectResponse response = getClient()
.headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
- LOG.info("head file {} success: {}", remotePath, response);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("headObject success: {}, response: {}", remotePath,
response);
+ }
return Status.OK;
} catch (S3Exception e) {
if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
@@ -229,11 +148,12 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
@Override
public Status getObject(String remoteFilePath, File localFile) {
try {
- remoteFilePath =
s3Properties.validateAndNormalizeUri(remoteFilePath);
S3URI uri = S3URI.create(remoteFilePath, isUsePathStyle,
forceParsingByStandardUri);
GetObjectResponse response = getClient().getObject(
GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
localFile.toPath());
- LOG.info("get file {} success: {}", remoteFilePath, response);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get file {} success: {}", remoteFilePath, response);
+ }
return Status.OK;
} catch (S3Exception s3Exception) {
return new Status(
@@ -250,7 +170,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
@Override
public Status putObject(String remotePath, @Nullable InputStream content,
long contentLength) {
try {
- remotePath = s3Properties.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
RequestBody body = RequestBody.fromInputStream(content,
contentLength);
PutObjectResponse response =
@@ -258,10 +177,12 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
.putObject(
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
body);
- LOG.info("put object success: {}", response.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("put object success: {}", response);
+ }
return Status.OK;
} catch (S3Exception e) {
- LOG.warn("put object failed:", e);
+ LOG.warn("put object failed: ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "put object failed:
" + e.getMessage());
} catch (Exception ue) {
LOG.warn("connect to s3 failed: ", ue);
@@ -272,13 +193,14 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
@Override
public Status deleteObject(String remotePath) {
try {
- remotePath = s3Properties.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
DeleteObjectResponse response =
getClient()
.deleteObject(
DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
- LOG.info("delete file " + remotePath + " success: " +
response.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("delete file {} success: {}", remotePath, response);
+ }
return Status.OK;
} catch (S3Exception e) {
LOG.warn("delete file failed: ", e);
@@ -295,7 +217,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
@Override
public Status deleteObjects(String absolutePath) {
try {
- absolutePath = s3Properties.validateAndNormalizeUri(absolutePath);
S3URI baseUri = S3URI.create(absolutePath, isUsePathStyle,
forceParsingByStandardUri);
String continuationToken = "";
boolean isTruncated = false;
@@ -316,21 +237,26 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
.build();
DeleteObjectsResponse resp =
getClient().deleteObjects(req);
- if (resp.errors().size() > 0) {
+ if (!resp.errors().isEmpty()) {
LOG.warn("{} errors returned while deleting {} objects
for dir {}",
resp.errors().size(), objectList.size(),
absolutePath);
}
- LOG.info("{} of {} objects deleted for dir {}",
- resp.deleted().size(), objectList.size(),
absolutePath);
- totalObjects += objectList.size();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} of {} objects deleted for dir {}",
+ resp.deleted().size(), objectList.size(),
absolutePath);
+ totalObjects += objectList.size();
+ }
}
isTruncated = objects.isTruncated();
continuationToken = objects.getContinuationToken();
} while (isTruncated);
- LOG.info("total delete {} objects for dir {}", totalObjects,
absolutePath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("total delete {} objects for dir {}", totalObjects,
absolutePath);
+ }
return Status.OK;
} catch (DdlException e) {
+ LOG.warn("deleteObjects:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "list objects for
delete objects failed: " + e.getMessage());
} catch (Exception e) {
LOG.warn(String.format("delete objects %s failed", absolutePath),
e);
@@ -341,8 +267,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
@Override
public Status copyObject(String origFilePath, String destFilePath) {
try {
- origFilePath = s3Properties.validateAndNormalizeUri(origFilePath);
- destFilePath = s3Properties.validateAndNormalizeUri(destFilePath);
S3URI origUri = S3URI.create(origFilePath, isUsePathStyle,
forceParsingByStandardUri);
S3URI descUri = S3URI.create(destFilePath, isUsePathStyle,
forceParsingByStandardUri);
CopyObjectResponse response = getClient()
@@ -352,13 +276,15 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
.destinationBucket(descUri.getBucket())
.destinationKey(descUri.getKey())
.build());
- LOG.info("copy file from " + origFilePath + " to " + destFilePath
+ " success: " + response.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("copy file from {} to {} success: {} ",
origFilePath, destFilePath, response);
+ }
return Status.OK;
} catch (S3Exception e) {
- LOG.error("copy file failed: ", e);
+ LOG.warn("copy file failed: ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "copy file failed:
" + e.getMessage());
} catch (UserException ue) {
- LOG.error("copy to s3 failed: ", ue);
+ LOG.warn("copy to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3
failed: " + ue.getMessage());
}
}
@@ -366,7 +292,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
@Override
public RemoteObjects listObjects(String absolutePath, String
continuationToken) throws DdlException {
try {
- absolutePath = s3Properties.validateAndNormalizeUri(absolutePath);
S3URI uri = S3URI.create(absolutePath, isUsePathStyle,
forceParsingByStandardUri);
String bucket = uri.getBucket();
String prefix = uri.getKey();
@@ -388,4 +313,181 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
throw new DdlException("Failed to list objects for S3, Error
message: " + e.getMessage(), e);
}
}
+
+ public Status multipartUpload(String remotePath, @Nullable InputStream
inputStream, long totalBytes) {
+ Status st = Status.OK;
+ long uploadedBytes = 0;
+ int bytesRead = 0;
+ byte[] buffer = new byte[CHUNK_SIZE];
+ int partNumber = 1;
+
+ String uploadId = null;
+ S3URI uri = null;
+ Map<Integer, String> etags = new HashMap<>();
+
+ try {
+ uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ CreateMultipartUploadRequest createMultipartUploadRequest =
CreateMultipartUploadRequest.builder()
+ .bucket(uri.getBucket())
+ .key(uri.getKey())
+ .build();
+ CreateMultipartUploadResponse createMultipartUploadResponse =
getClient()
+ .createMultipartUpload(createMultipartUploadRequest);
+
+ uploadId = createMultipartUploadResponse.uploadId();
+
+ while (uploadedBytes < totalBytes && (bytesRead =
inputStream.read(buffer)) != -1) {
+ uploadedBytes += bytesRead;
+ UploadPartRequest uploadPartRequest =
UploadPartRequest.builder()
+ .bucket(uri.getBucket())
+ .key(uri.getKey())
+ .uploadId(uploadId)
+ .partNumber(partNumber).build();
+ RequestBody body = RequestBody
+ .fromInputStream(new ByteArrayInputStream(buffer, 0,
bytesRead), bytesRead);
+ UploadPartResponse uploadPartResponse =
getClient().uploadPart(uploadPartRequest, body);
+
+ etags.put(partNumber, uploadPartResponse.eTag());
+ partNumber++;
+ uploadedBytes += bytesRead;
+ }
+
+ List<CompletedPart> completedParts = etags.entrySet().stream()
+ .map(entry -> CompletedPart.builder()
+ .partNumber(entry.getKey())
+ .eTag(entry.getValue())
+ .build())
+ .collect(Collectors.toList());
+ CompletedMultipartUpload completedMultipartUpload =
CompletedMultipartUpload.builder()
+ .parts(completedParts)
+ .build();
+
+ CompleteMultipartUploadRequest completeMultipartUploadRequest =
CompleteMultipartUploadRequest.builder()
+ .bucket(uri.getBucket())
+ .key(uri.getKey())
+ .uploadId(uploadId)
+ .multipartUpload(completedMultipartUpload)
+ .build();
+
+
getClient().completeMultipartUpload(completeMultipartUploadRequest);
+ } catch (Exception e) {
+ LOG.warn("remotePath:{}, ", remotePath, e);
+ st = new Status(Status.ErrCode.COMMON_ERROR, "Failed to
multipartUpload " + remotePath
+ + " reason: " + e.getMessage());
+
+ if (uri != null && uploadId != null) {
+ try {
+ AbortMultipartUploadRequest abortMultipartUploadRequest =
AbortMultipartUploadRequest.builder()
+ .bucket(uri.getBucket())
+ .key(uri.getKey())
+ .uploadId(uploadId)
+ .build();
+
getClient().abortMultipartUpload(abortMultipartUploadRequest);
+ } catch (Exception e1) {
+ LOG.warn("Failed to abort multipartUpload {}", remotePath,
e1);
+ }
+ }
+ }
+ return st;
+ }
+
+ ListObjectsV2Response listObjectsV2(ListObjectsV2Request request) throws
UserException {
+ return getClient().listObjectsV2(request);
+ }
+
+ /**
+ * List all files under the given path with glob pattern.
+ * For example, if the path is "s3://bucket/path/to/*.csv",
+ * it will list all files under "s3://bucket/path/to/" with ".csv" suffix.
+ * <p>
+ * Copy from `AzureObjStorage.GlobList`
+ */
+ public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
+ long roundCnt = 0;
+ long elementCnt = 0;
+ long matchCnt = 0;
+ long startTime = System.nanoTime();
+ try {
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ String bucket = uri.getBucket();
+ String globPath = uri.getKey(); // eg: path/to/*.csv
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("globList globPath:{}, remotePath:{}", globPath,
remotePath);
+ }
+ java.nio.file.Path pathPattern = Paths.get(globPath);
+ PathMatcher matcher =
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
+ HashSet<String> directorySet = new HashSet<>();
+
+ String listPrefix = S3Util.getLongestPrefix(globPath); // similar
to Azure
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("globList listPrefix: {}", listPrefix);
+ }
+ ListObjectsV2Request request = ListObjectsV2Request.builder()
+ .bucket(bucket)
+ .prefix(listPrefix)
+ .build();
+
+ boolean isTruncated = false;
+ do {
+ roundCnt++;
+ ListObjectsV2Response response = listObjectsV2(request);
+ for (S3Object obj : response.contents()) {
+ elementCnt++;
+ java.nio.file.Path objPath = Paths.get(obj.key());
+
+ boolean isPrefix = false;
+ while (objPath != null &&
objPath.normalize().toString().startsWith(listPrefix)) {
+ if (!matcher.matches(objPath)) {
+ isPrefix = true;
+ objPath = objPath.getParent();
+ continue;
+ }
+ if
(directorySet.contains(objPath.normalize().toString())) {
+ break;
+ }
+ if (isPrefix) {
+ directorySet.add(objPath.normalize().toString());
+ }
+
+ matchCnt++;
+ RemoteFile remoteFile = new RemoteFile(
+ fileNameOnly ?
objPath.getFileName().toString() :
+ "s3://" + bucket + "/" +
objPath.toString(),
+ !isPrefix,
+ isPrefix ? -1 : obj.size(),
+ isPrefix ? -1 : obj.size(),
+ isPrefix ? 0 :
obj.lastModified().toEpochMilli()
+ );
+ result.add(remoteFile);
+ objPath = objPath.getParent();
+ isPrefix = true;
+ }
+ }
+
+ isTruncated = response.isTruncated();
+ if (isTruncated) {
+ request = request.toBuilder()
+
.continuationToken(response.nextContinuationToken())
+ .build();
+ }
+ } while (isTruncated);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remotePath:{}, result:{}", remotePath, result);
+ }
+ return Status.OK;
+ } catch (Exception e) {
+ LOG.warn("Errors while getting file status", e);
+ return new Status(Status.ErrCode.COMMON_ERROR, "Errors while
getting file status " + e.getMessage());
+ } finally {
+ long endTime = System.nanoTime();
+ long duration = endTime - startTime;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("process {} elements under prefix {} for {} round,
match {} elements, take {} ms",
+ elementCnt, remotePath, roundCnt, matchCnt,
+ duration / 1000 / 1000);
+ }
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/RemoteFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/RemoteFileSystem.java
index c7877d9ed8d..6e50d476d8a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/RemoteFileSystem.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/RemoteFileSystem.java
@@ -135,7 +135,7 @@ public abstract class RemoteFileSystem extends
PersistentFileSystem implements C
}
}
- public boolean connectivityTest() throws UserException {
+ public boolean connectivityTest(List<String> filePaths) throws
UserException {
return true;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/S3FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/S3FileSystem.java
index de0d89b5cfc..b63f0effa58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/S3FileSystem.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
+import org.apache.doris.common.util.S3URI;
import
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
import org.apache.doris.fsv2.obj.S3ObjStorage;
@@ -28,8 +29,11 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import software.amazon.awssdk.services.s3.S3Client;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class S3FileSystem extends ObjFileSystem {
@@ -72,16 +76,27 @@ public class S3FileSystem extends ObjFileSystem {
}
@Override
- public boolean connectivityTest() throws UserException {
+ public boolean connectivityTest(List<String> filePaths) throws
UserException {
+ if (filePaths == null || filePaths.isEmpty()) {
+ throw new UserException("File paths cannot be null or empty for
connectivity test.");
+ }
S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
try {
- objStorage.getClient().listBuckets();
+ S3Client s3Client = objStorage.getClient();
+ Set<String> bucketNames = new HashSet<>();
+ boolean usePathStyle =
Boolean.parseBoolean(s3Properties.getUsePathStyle());
+ boolean forceParsingByStandardUri =
Boolean.parseBoolean(s3Properties.getForceParsingByStandardUrl());
+ for (String filePath : filePaths) {
+ S3URI s3uri;
+ s3uri = S3URI.create(filePath, usePathStyle,
forceParsingByStandardUri);
+ bucketNames.add(s3uri.getBucket());
+ }
+ bucketNames.forEach(bucketName -> s3Client.headBucket(b ->
b.bucket(bucketName)));
return true;
} catch (Exception e) {
- LOG.error("S3 connectivityTest error", e);
+ LOG.warn("S3 connectivityTest error: {}", e.getMessage(), e);
}
return false;
-
}
@VisibleForTesting
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
index 9c397c358aa..3afdfee5d0a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
@@ -20,9 +20,17 @@ package org.apache.doris.datasource.property.storage;
import org.apache.doris.common.UserException;
import
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
+import mockit.Expectations;
+import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import java.util.HashMap;
import java.util.Map;
@@ -34,6 +42,9 @@ public class S3PropertiesTest {
private static String accessKey = "";
private static String hdfsPath = "";
+ @Mocked
+ StsClient mockStsClient;
+
@BeforeEach
public void setUp() {
origProps = new HashMap<>();
@@ -178,4 +189,66 @@ public class S3PropertiesTest {
//not support
Assertions.assertThrowsExactly(StoragePropertiesException.class, () ->
StorageProperties.createPrimary(s3EndpointProps), "Property cos.endpoint is
required.");
}
+
+ @Test
+ public void testS3IamRoleWithExternalId() throws UserException {
+ origProps.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
+ origProps.put("s3.role_arn",
"arn:aws:iam::123456789012:role/MyTestRole");
+ origProps.put("s3.external_id", "external-123");
+
+ S3Properties s3Props = (S3Properties)
StorageProperties.createPrimary(origProps);
+ Map<String, String> backendProperties =
s3Props.getBackendConfigProperties();
+
+ Assertions.assertEquals("arn:aws:iam::123456789012:role/MyTestRole",
backendProperties.get("AWS_ROLE_ARN"));
+ Assertions.assertEquals("external-123",
backendProperties.get("AWS_EXTERNAL_ID"));
+ }
+
+ @Test
+ public void testGetAwsCredentialsProviderWithIamRoleAndExternalId(@Mocked
StsClientBuilder mockBuilder, @Mocked StsClient mockStsClient, @Mocked
InstanceProfileCredentialsProvider mockInstanceCreds) {
+
+ new Expectations() {
+ {
+ StsClient.builder();
+ result = mockBuilder;
+ mockBuilder.credentialsProvider((AwsCredentialsProvider) any);
+ result = mockBuilder;
+ mockBuilder.build();
+ result = mockStsClient;
+ InstanceProfileCredentialsProvider.create();
+ result = mockInstanceCreds;
+ }
+ };
+
+ origProps.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
+ origProps.put("s3.role_arn",
"arn:aws:iam::123456789012:role/MyTestRole");
+ origProps.put("s3.external_id", "external-123");
+ origProps.put("s3.region", "us-west-2");
+ S3Properties s3Props = (S3Properties)
StorageProperties.createPrimary(origProps);
+ AwsCredentialsProvider provider = s3Props.getAwsCredentialsProvider();
+ Assertions.assertNotNull(provider);
+ Assertions.assertTrue(provider instanceof
StsAssumeRoleCredentialsProvider);
+ }
+
+ @Test
+ public void testGetAwsCredentialsProviderWithAccessKeyAndSecretKey()
throws UserException {
+ origProps.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
+ origProps.put("s3.access_key", "myAccessKey");
+ origProps.put("s3.secret_key", "mySecretKey");
+ origProps.put("s3.region", "us-west-2");
+ S3Properties s3Props = (S3Properties)
StorageProperties.createPrimary(origProps);
+ AwsCredentialsProvider provider = s3Props.getAwsCredentialsProvider();
+ Assertions.assertNotNull(provider);
+ Assertions.assertTrue(provider instanceof StaticCredentialsProvider);
+ origProps.put("s3.session_token", "mySessionToken");
+ s3Props = (S3Properties) StorageProperties.createPrimary(origProps);
+ provider = s3Props.getAwsCredentialsProvider();
+ Assertions.assertNotNull(provider);
+ Assertions.assertTrue(provider instanceof StaticCredentialsProvider);
+ origProps.put("s3.role_arn",
"arn:aws:iam::123456789012:role/MyTestRole");
+ origProps.put("s3.external_id", "external-123");
+ s3Props = (S3Properties) StorageProperties.createPrimary(origProps);
+ provider = s3Props.getAwsCredentialsProvider();
+ Assertions.assertNotNull(provider);
+ Assertions.assertTrue(provider instanceof StaticCredentialsProvider);
+ }
}
diff --git
a/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
index c88b6c8fcf3..76ffbf47221 100644
---
a/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
+++
b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
@@ -132,7 +132,7 @@ suite("test_domain_connection_and_ak_sk_correction",
"load_p0") {
}
label = UUID.randomUUID().toString().replace("-", "")
-
+ try {
result = sql """
LOAD LABEL ${label}
(
@@ -155,36 +155,10 @@ suite("test_domain_connection_and_ak_sk_correction",
"load_p0") {
);
"""
logger.info("the fourth sql result is {}", result)
- int totalWaitTime = 0
- int pollInterval = 5
- int timeout = 120
- while (totalWaitTime < timeout) {
- def loadResult = sql """
- SHOW LOAD WHERE label="${label}"
- """
-
- if (loadResult == null || loadResult.isEmpty()) {
- return false
- } else if (loadResult.get(0).get(2) in ['CANCELLED', 'FAILED']) {
- break
- } else if (loadResult.get(0).get(2) == 'FINISHED') {
- throw new RuntimeException("load success, but the first bucket is
wrong, so the sql should fail")
- } else {
- println("load status is ${loadResult.get(0).get(2)}")
- Thread.sleep(pollInterval * 1000L)
- totalWaitTime += pollInterval
- }
-
-
- }
-
- if (totalWaitTime >= timeout) {
- def queryLoadResult = sql """
- SHOW LOAD WHERE label="${label}"
- """
- if (queryLoadResult != null && queryLoadResult.get(0).get(2) ==
'FINISHED') {
- throw new RuntimeException("load success, but the first bucket is
wrong, so the sql should fail")
- }
+ assertTrue(false. "in the second DATA INFILE, the first bucket is
wrong, so the sql should fail")
+ } catch (Exception e) {
+ logger.info("the fourth sql exception result is {}", e.getMessage())
+ assertTrue(e.getMessage().contains("Failed to access object storage,
message="), e.getMessage())
}
sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]