This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1ab8dc1aede [feat](s3client) Add role-based authorization for s3client
(#49541) (#50924)
1ab8dc1aede is described below
commit 1ab8dc1aede9be2ea21e3a79b46eb549435542e2
Author: Lei Zhang <[email protected]>
AuthorDate: Thu May 15 16:31:55 2025 +0800
[feat](s3client) Add role-based authorization for s3client (#49541) (#50924)
---
be/cmake/thirdparty.cmake | 2 +
be/src/io/fs/s3_file_system.cpp | 4 +
be/src/util/s3_util.cpp | 133 ++++++++---
be/src/util/s3_util.h | 19 +-
be/test/io/fs/s3_obj_storage_client_role_test.cpp | 185 +++++++++++++++
cloud/cmake/thirdparty.cmake | 3 +
cloud/src/meta-service/meta_service_resource.cpp | 160 ++++++++-----
cloud/src/recycler/s3_accessor.cpp | 63 +++--
cloud/src/recycler/s3_accessor.h | 9 +
cloud/test/meta_service_http_test.cpp | 89 +++++++
cloud/test/meta_service_test.cpp | 264 +++++++++++++++++++++
cloud/test/s3_accessor_test.cpp | 102 +++++++-
common/cpp/aws_common.cpp | 40 ++++
common/cpp/aws_common.h | 27 +++
common/cpp/aws_logger.h | 2 +
.../java/org/apache/doris/common/util/S3Util.java | 98 +++++++-
.../datasource/property/S3ClientBEProperties.java | 8 +
.../property/constants/S3Properties.java | 35 +++
.../java/org/apache/doris/fs/obj/S3ObjStorage.java | 12 +-
.../doris/tablefunction/S3TableValuedFunction.java | 8 +
.../org/apache/doris/catalog/S3ResourceTest.java | 31 +++
gensrc/proto/cloud.proto | 11 +
gensrc/thrift/AgentService.thrift | 11 +
.../org/apache/doris/regression/Config.groovy | 34 +++
.../apache/doris/regression/suite/Syncer.groovy | 23 ++
.../test_backup_restore_with_role.groovy | 97 ++++++++
.../aws_iam_role_p0/test_export_with_role.groovy | 117 +++++++++
.../test_external_catalog_with_role.groovy | 86 +++++++
.../aws_iam_role_p0/test_resource_with_role.groovy | 144 +++++++++++
.../aws_iam_role_p0/test_s3_load_with_role.groovy | 146 ++++++++++++
.../aws_iam_role_p0/test_s3_vault_with_role.groovy | 73 ++++++
.../test_select_into_outfile_with_role.groovy | 70 ++++++
.../aws_iam_role_p0/test_tvf_with_role.groovy | 49 ++++
33 files changed, 2025 insertions(+), 130 deletions(-)
diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake
index 764719ae00c..81e59539dbd 100644
--- a/be/cmake/thirdparty.cmake
+++ b/be/cmake/thirdparty.cmake
@@ -137,6 +137,8 @@ add_thirdparty(aws-c-mqtt LIB64)
add_thirdparty(aws-checksums LIB64)
add_thirdparty(aws-c-s3 LIB64)
add_thirdparty(aws-c-sdkutils LIB64)
+add_thirdparty(aws-cpp-sdk-identity-management LIB64)
+add_thirdparty(aws-cpp-sdk-sts LIB64)
if (NOT OS_MACOSX)
add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a")
endif()
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index bc6aefd92cc..ec1c63c9106 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -97,6 +97,10 @@ Status ObjClientHolder::reset(const S3ClientConf& conf) {
reset_conf.token = conf.token;
reset_conf.bucket = conf.bucket;
reset_conf.use_virtual_addressing = conf.use_virtual_addressing;
+
+ reset_conf.role_arn = conf.role_arn;
+ reset_conf.external_id = conf.external_id;
+ reset_conf.cred_provider_type = conf.cred_provider_type;
// Should check endpoint here?
}
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 0edce85e192..39887625942 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -24,7 +24,9 @@
#include <aws/core/utils/logging/LogLevel.h>
#include <aws/core/utils/logging/LogSystemInterface.h>
#include <aws/core/utils/memory/stl/AWSStringStream.h>
+#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
#include <aws/s3/S3Client.h>
+#include <aws/sts/STSClient.h>
#include <bvar/reducer.h>
#include <util/string_util.h>
@@ -77,11 +79,14 @@ doris::Status is_s3_conf_valid(const S3ClientConf& conf) {
if (conf.region.empty()) {
return Status::InvalidArgument<false>("Invalid s3 conf, empty region");
}
- if (conf.ak.empty()) {
- return Status::InvalidArgument<false>("Invalid s3 conf, empty ak");
- }
- if (conf.sk.empty()) {
- return Status::InvalidArgument<false>("Invalid s3 conf, empty sk");
+
+ if (conf.role_arn.empty()) {
+ if (conf.ak.empty()) {
+ return Status::InvalidArgument<false>("Invalid s3 conf, empty ak");
+ }
+ if (conf.sk.empty()) {
+ return Status::InvalidArgument<false>("Invalid s3 conf, empty sk");
+ }
}
return Status::OK();
}
@@ -106,6 +111,8 @@ constexpr char S3_REQUEST_TIMEOUT_MS[] =
"AWS_REQUEST_TIMEOUT_MS";
constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS";
constexpr char S3_NEED_OVERRIDE_ENDPOINT[] = "AWS_NEED_OVERRIDE_ENDPOINT";
+constexpr char S3_ROLE_ARN[] = "AWS_ROLE_ARN";
+constexpr char S3_EXTERNAL_ID[] = "AWS_EXTERNAL_ID";
} // namespace
bvar::Adder<int64_t> get_rate_limit_ns("get_rate_limit_ns");
@@ -217,6 +224,36 @@ std::shared_ptr<io::ObjStorageClient>
S3ClientFactory::_create_azure_client(
#endif
}
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
S3ClientFactory::get_aws_credentials_provider(
+ const S3ClientConf& s3_conf) {
+ if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) {
+ Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk);
+ DCHECK(!aws_cred.IsExpiredOrEmpty());
+ if (!s3_conf.token.empty()) {
+ aws_cred.SetSessionToken(s3_conf.token);
+ }
+ return
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(std::move(aws_cred));
+ }
+
+ if (s3_conf.cred_provider_type == CredProviderType::InstanceProfile) {
+ if (s3_conf.role_arn.empty()) {
+ return
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>();
+ }
+
+ Aws::Client::ClientConfiguration clientConfiguration =
+ S3ClientFactory::getClientConfiguration();
+
+ auto stsClient = std::make_shared<Aws::STS::STSClient>(
+
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(),
+ clientConfiguration);
+
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ s3_conf.role_arn, Aws::String(), s3_conf.external_id,
+ Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient);
+ }
+ return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+}
+
std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client(
const S3ClientConf& s3_conf) {
TEST_SYNC_POINT_RETURN_WITH_VALUE(
@@ -265,25 +302,11 @@ std::shared_ptr<io::ObjStorageClient>
S3ClientFactory::_create_s3_client(
aws_config.retryStrategy = std::make_shared<S3CustomRetryStrategy>(
config::max_s3_client_retry /*scaleFactor = 25*/);
- std::shared_ptr<Aws::S3::S3Client> new_client;
- if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) {
- Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk);
- DCHECK(!aws_cred.IsExpiredOrEmpty());
- if (!s3_conf.token.empty()) {
- aws_cred.SetSessionToken(s3_conf.token);
- }
- new_client = std::make_shared<Aws::S3::S3Client>(
- std::move(aws_cred), std::move(aws_config),
- Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
- s3_conf.use_virtual_addressing);
- } else {
- std::shared_ptr<Aws::Auth::AWSCredentialsProvider> aws_provider_chain =
-
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
- new_client = std::make_shared<Aws::S3::S3Client>(
- std::move(aws_provider_chain), std::move(aws_config),
- Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
- s3_conf.use_virtual_addressing);
- }
+
+ std::shared_ptr<Aws::S3::S3Client> new_client =
std::make_shared<Aws::S3::S3Client>(
+ get_aws_credentials_provider(s3_conf), std::move(aws_config),
+ Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
+ s3_conf.use_virtual_addressing);
auto obj_client =
std::make_shared<io::S3ObjStorageClient>(std::move(new_client));
LOG_INFO("create one s3 client with {}", s3_conf.to_string());
@@ -350,28 +373,62 @@ Status S3ClientFactory::convert_properties_to_s3_conf(
s3_conf->client_conf.use_virtual_addressing = it->second != "true";
}
+ if (auto it = properties.find(S3_ROLE_ARN); it != properties.end()) {
+ s3_conf->client_conf.cred_provider_type =
CredProviderType::InstanceProfile;
+ s3_conf->client_conf.role_arn = it->second;
+ }
+
+ if (auto it = properties.find(S3_EXTERNAL_ID); it != properties.end()) {
+ s3_conf->client_conf.external_id = it->second;
+ }
+
if (auto st = is_s3_conf_valid(s3_conf->client_conf); !st.ok()) {
return st;
}
return Status::OK();
}
+static CredProviderType cred_provider_type_from_thrift(TCredProviderType::type
cred_provider_type) {
+ switch (cred_provider_type) {
+ case TCredProviderType::DEFAULT:
+ return CredProviderType::Default;
+ case TCredProviderType::SIMPLE:
+ return CredProviderType::Simple;
+ case TCredProviderType::INSTANCE_PROFILE:
+ return CredProviderType::InstanceProfile;
+ default:
+ __builtin_unreachable();
+ LOG(WARNING) << "Invalid TCredProviderType value: " <<
cred_provider_type
+ << ", use default instead.";
+ return CredProviderType::Default;
+ }
+}
+
S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB& info) {
S3Conf ret {
.bucket = info.bucket(),
.prefix = info.prefix(),
- .client_conf {.endpoint = info.endpoint(),
- .region = info.region(),
- .ak = info.ak(),
- .sk = info.sk(),
- .token {},
- .bucket = info.bucket(),
- .provider = io::ObjStorageType::AWS,
- .use_virtual_addressing =
- info.has_use_path_style() ?
!info.use_path_style() : true},
+ .client_conf {
+ .endpoint = info.endpoint(),
+ .region = info.region(),
+ .ak = info.ak(),
+ .sk = info.sk(),
+ .token {},
+ .bucket = info.bucket(),
+ .provider = io::ObjStorageType::AWS,
+ .use_virtual_addressing =
+ info.has_use_path_style() ? !info.use_path_style()
: true,
+
+ .role_arn = info.role_arn(),
+ .external_id = info.external_id(),
+ },
.sse_enabled = info.sse_enabled(),
};
+ if (info.has_cred_provider_type()) {
+ ret.client_conf.cred_provider_type =
cred_provider_type_from_pb(info.cred_provider_type());
+ }
+
io::ObjStorageType type = io::ObjStorageType::AWS;
switch (info.provider()) {
case cloud::ObjectStoreInfoPB_Provider_OSS:
@@ -396,8 +453,8 @@ S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB&
info) {
type = io::ObjStorageType::AZURE;
break;
default:
- LOG_FATAL("unknown provider type {}, info {}", info.provider(),
ret.to_string());
__builtin_unreachable();
+ LOG_FATAL("unknown provider type {}, info {}", info.provider(),
ret.to_string());
}
ret.client_conf.provider = type;
return ret;
@@ -421,7 +478,15 @@ S3Conf S3Conf::get_s3_conf(const TS3StorageParam& param) {
// When using cold heat separation in minio, user might
use ip address directly,
// which needs enable use_virtual_addressing to true
.use_virtual_addressing = !param.use_path_style,
+ .role_arn = param.role_arn,
+ .external_id = param.external_id,
}};
+
+ if (param.__isset.cred_provider_type) {
+ ret.client_conf.cred_provider_type =
+ cred_provider_type_from_thrift(param.cred_provider_type);
+ }
+
io::ObjStorageType type = io::ObjStorageType::AWS;
switch (param.provider) {
case TObjStorageType::UNKNOWN:
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 793d0b8a956..c45c6afa6ef 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -32,6 +32,7 @@
#include <unordered_map>
#include "common/status.h"
+#include "cpp/aws_common.h"
#include "cpp/s3_rate_limiter.h"
#include "io/fs/obj_storage_client.h"
#include "vec/common/string_ref.h"
@@ -61,7 +62,6 @@ extern bvar::LatencyRecorder s3_copy_object_latency;
}; // namespace s3_bvar
class S3URI;
-
struct S3ClientConf {
std::string endpoint;
std::string region;
@@ -78,6 +78,10 @@ struct S3ClientConf {
// For aws s3, no need to override endpoint
bool need_override_endpoint = true;
+ CredProviderType cred_provider_type = CredProviderType::Default;
+ std::string role_arn;
+ std::string external_id;
+
uint64_t get_hash() const {
uint64_t hash_code = 0;
hash_code ^= crc32_hash(ak);
@@ -91,15 +95,21 @@ struct S3ClientConf {
hash_code ^= connect_timeout_ms;
hash_code ^= use_virtual_addressing;
hash_code ^= static_cast<int>(provider);
+
+ hash_code ^= static_cast<int>(cred_provider_type);
+ hash_code ^= crc32_hash(role_arn);
+ hash_code ^= crc32_hash(external_id);
return hash_code;
}
std::string to_string() const {
return fmt::format(
"(ak={}, token={}, endpoint={}, region={}, bucket={},
max_connections={}, "
- "request_timeout_ms={}, connect_timeout_ms={},
use_virtual_addressing={}",
+ "request_timeout_ms={}, connect_timeout_ms={},
use_virtual_addressing={}, "
+ "cred_provider_type={},role_arn={}, external_id={}",
ak, token, endpoint, region, bucket, max_connections,
request_timeout_ms,
- connect_timeout_ms, use_virtual_addressing);
+ connect_timeout_ms, use_virtual_addressing,
cred_provider_type, role_arn,
+ external_id);
}
};
@@ -144,6 +154,9 @@ public:
private:
std::shared_ptr<io::ObjStorageClient> _create_s3_client(const
S3ClientConf& s3_conf);
std::shared_ptr<io::ObjStorageClient> _create_azure_client(const
S3ClientConf& s3_conf);
+ std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
get_aws_credentials_provider(
+ const S3ClientConf& s3_conf);
+
S3ClientFactory();
static std::string get_valid_ca_cert_path();
diff --git a/be/test/io/fs/s3_obj_storage_client_role_test.cpp
b/be/test/io/fs/s3_obj_storage_client_role_test.cpp
new file mode 100644
index 00000000000..5760e6e4df3
--- /dev/null
+++ b/be/test/io/fs/s3_obj_storage_client_role_test.cpp
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "io/fs/obj_storage_client.h"
+#include "util/s3_util.h"
+
+namespace doris {
+
+class S3ObjStorageClientRoleTest : public testing::Test {
+protected:
+ static std::shared_ptr<io::ObjStorageClient> obj_storage_client;
+ static std::string bucket;
+ static std::string prefix;
+
+ static void SetUpTestSuite() {
+ if (!std::getenv("AWS_ROLE_ARN") || !std::getenv("AWS_EXTERNAL_ID") ||
+ !std::getenv("AWS_ENDPOINT") || !std::getenv("AWS_REGION")) {
+ return;
+ }
+
+ std::string role_arn = std::getenv("AWS_ROLE_ARN");
+ std::string external_id = std::getenv("AWS_EXTERNAL_ID");
+ std::string endpoint = std::getenv("AWS_ENDPOINT");
+ std::string region = std::getenv("AWS_REGION");
+
+ S3ObjStorageClientRoleTest::bucket = std::getenv("AWS_BUCKET");
+
+ S3ObjStorageClientRoleTest::bucket = std::getenv("AWS_BUCKET");
+ if (!std::getenv("AWS_PREFIX")) {
+ S3ObjStorageClientRoleTest::prefix = "";
+ } else {
+ S3ObjStorageClientRoleTest::prefix = std::getenv("AWS_PREFIX");
+ }
+
+ S3ObjStorageClientRoleTest::obj_storage_client =
S3ClientFactory::instance().create(
+ {.endpoint = endpoint,
+ .region = region,
+ .bucket = bucket,
+ .provider = io::ObjStorageType::AWS,
+ .use_virtual_addressing = false,
+ .cred_provider_type = CredProviderType::InstanceProfile,
+ .role_arn = role_arn,
+ .external_id = external_id});
+
+ ASSERT_TRUE(S3ObjStorageClientRoleTest::obj_storage_client != nullptr);
+ }
+
+ void SetUp() override {
+ if (S3ObjStorageClientRoleTest::obj_storage_client == nullptr) {
+ GTEST_SKIP() << "Skipping S3 test, because AWS environment not
set";
+ }
+ }
+};
+
+std::shared_ptr<io::ObjStorageClient>
S3ObjStorageClientRoleTest::obj_storage_client = nullptr;
+std::string S3ObjStorageClientRoleTest::bucket;
+std::string S3ObjStorageClientRoleTest::prefix;
+
+TEST_F(S3ObjStorageClientRoleTest, put_list_delete_object) {
+ LOG(INFO) << "S3ObjStorageClientRoleTest::put_list_delete_object, prefix:"
<< prefix;
+
+ auto response = S3ObjStorageClientRoleTest::obj_storage_client->put_object(
+ {.bucket = bucket, .key = prefix +
"S3ObjStorageClientRoleTest/put_list_delete_object"},
+ std::string("aaaa"));
+ EXPECT_EQ(response.status.code, ErrorCode::OK);
+
+ std::vector<io::FileInfo> files;
+ // clang-format off
+ response =
S3ObjStorageClientRoleTest::obj_storage_client->list_objects({.bucket = bucket,
+ .prefix = prefix +
"S3ObjStorageClientRoleTest/put_list_delete_object",}, &files);
+ // clang-format on
+ EXPECT_EQ(response.status.code, ErrorCode::OK);
+ EXPECT_EQ(files.size(), 1);
+ files.clear();
+
+ response = S3ObjStorageClientRoleTest::obj_storage_client->delete_object(
+ {.bucket = bucket,
+ .key = prefix +
"S3ObjStorageClientRoleTest/put_list_delete_object"});
+ EXPECT_EQ(response.status.code, ErrorCode::OK);
+
+ // clang-format off
+ response =
S3ObjStorageClientRoleTest::obj_storage_client->list_objects({.bucket = bucket,
+ .prefix = prefix +
"S3ObjStorageClientRoleTest/put_list_delete_object",}, &files);
+ // clang-format on
+ EXPECT_EQ(response.status.code, ErrorCode::OK);
+ EXPECT_EQ(files.size(), 0);
+}
+
+TEST_F(S3ObjStorageClientRoleTest, delete_objects_recursively) {
+ LOG(INFO) << "S3ObjStorageClientRoleTest::delete_objects_recursively";
+
+ for (int i = 0; i < 22; i++) {
+ std::string key = prefix +
"S3ObjStorageClientRoleTest/delete_objects_recursively" +
+ std::to_string(i);
+
+ auto response =
S3ObjStorageClientRoleTest::obj_storage_client->put_object(
+ {.bucket = bucket, .key = key}, std::string("aaaa"));
+ EXPECT_EQ(response.status.code, ErrorCode::OK);
+ LOG(INFO) << "put " << key << " OK";
+ }
+
+ std::vector<io::FileInfo> files;
+ // clang-format off
+ auto response =
S3ObjStorageClientRoleTest::obj_storage_client->list_objects({.bucket = bucket,
+ .prefix = prefix +
"S3ObjStorageClientRoleTest/delete_objects_recursively",}, &files);
+ // clang-format on
+ EXPECT_EQ(response.status.code, ErrorCode::OK);
+ EXPECT_EQ(files.size(), 22);
+ files.clear();
+
+ response =
S3ObjStorageClientRoleTest::obj_storage_client->delete_objects_recursively(
+ {.bucket = bucket,
+ .prefix = prefix +
"S3ObjStorageClientRoleTest/delete_objects_recursively"});
+ EXPECT_EQ(response.status.code, ErrorCode::OK);
+
+ // clang-format off
+ response =
S3ObjStorageClientRoleTest::obj_storage_client->list_objects({.bucket = bucket,
+ .prefix = prefix +
"S3ObjStorageClientRoleTest/delete_objects_recursively",}, &files);
+ // clang-format on
+ EXPECT_EQ(response.status.code, ErrorCode::OK);
+ EXPECT_EQ(files.size(), 0);
+}
+
+TEST_F(S3ObjStorageClientRoleTest, multipart_upload) {
+ LOG(INFO) << "S3ObjStorageClientRoleTest::multipart_upload";
+
+ auto response =
S3ObjStorageClientRoleTest::obj_storage_client->create_multipart_upload(
+ {.bucket = bucket, .key = prefix +
"S3ObjStorageClientRoleTest/multipart_upload"});
+ EXPECT_EQ(response.resp.status.code, ErrorCode::OK);
+ auto upload_id = response.upload_id;
+
+ std::string body = "S3ObjStorageClientRoleTest::multipart_upload";
+ body.resize(5 * 1024 * 1024);
+
+ std::vector<doris::io::ObjectCompleteMultiPart> completed_parts;
+
+ response = S3ObjStorageClientRoleTest::obj_storage_client->upload_part(
+ {.bucket = bucket,
+ .key = prefix + "S3ObjStorageClientRoleTest/multipart_upload",
+ .upload_id = upload_id},
+ body, 1);
+
+ EXPECT_EQ(response.resp.status.code, ErrorCode::OK);
+ doris::io::ObjectCompleteMultiPart completed_part {
+ 1, response.etag.has_value() ? std::move(response.etag.value()) :
""};
+
+ completed_parts.emplace_back(std::move(completed_part));
+
+ response = S3ObjStorageClientRoleTest::obj_storage_client->upload_part(
+ {.bucket = bucket,
+ .key = prefix + "S3ObjStorageClientRoleTest/multipart_upload",
+ .upload_id = upload_id},
+ body, 2);
+
+ EXPECT_EQ(response.resp.status.code, ErrorCode::OK);
+ doris::io::ObjectCompleteMultiPart completed_part2 {
+ 2, response.etag.has_value() ? std::move(response.etag.value()) :
""};
+ completed_parts.emplace_back(std::move(completed_part2));
+
+ auto response2 =
S3ObjStorageClientRoleTest::obj_storage_client->complete_multipart_upload(
+ {.bucket = bucket,
+ .key = prefix + "S3ObjStorageClientRoleTest/multipart_upload",
+ .upload_id = upload_id},
+ completed_parts);
+
+ EXPECT_EQ(response2.status.code, ErrorCode::OK);
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/cloud/cmake/thirdparty.cmake b/cloud/cmake/thirdparty.cmake
index 80e70c87eca..6b1a614b395 100644
--- a/cloud/cmake/thirdparty.cmake
+++ b/cloud/cmake/thirdparty.cmake
@@ -89,6 +89,9 @@ add_thirdparty(aws-checksums LIB64)
add_thirdparty(aws-c-s3 LIB64)
add_thirdparty(aws-c-sdkutils LIB64)
add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a")
+add_thirdparty(aws-cpp-sdk-identity-management LIB64)
+add_thirdparty(aws-cpp-sdk-sts LIB64)
+
# end aws libs
add_thirdparty(jsoncpp LIB64)
add_thirdparty(uuid LIB64)
diff --git a/cloud/src/meta-service/meta_service_resource.cpp
b/cloud/src/meta-service/meta_service_resource.cpp
index 6b1c1c63c55..96a00a48e97 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -421,25 +421,37 @@ static void create_object_info_with_encrypt(const
InstanceInfoPB& instance, Obje
std::string external_endpoint = obj->has_external_endpoint() ?
obj->external_endpoint() : "";
std::string region = obj->has_region() ? obj->region() : "";
- // ATTN: prefix may be empty
- if (plain_ak.empty() || plain_sk.empty() || bucket.empty() ||
endpoint.empty() ||
- region.empty() || !obj->has_provider() || external_endpoint.empty()) {
- code = MetaServiceCode::INVALID_ARGUMENT;
- msg = "s3 conf info err, please check it";
- return;
- }
- EncryptionInfoPB encryption_info;
- AkSkPair cipher_ak_sk_pair;
- auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info,
&cipher_ak_sk_pair, code,
- msg);
- TEST_SYNC_POINT_CALLBACK("create_object_info_with_encrypt", &ret, &code,
&msg);
- if (ret != 0) {
- return;
+ if (obj->has_role_arn()) {
+ if (obj->role_arn().empty() || !obj->has_cred_provider_type() ||
+ obj->cred_provider_type() != CredProviderTypePB::INSTANCE_PROFILE
||
+ !obj->has_provider() || obj->provider() != ObjectStoreInfoPB::S3
|| bucket.empty() ||
+ endpoint.empty() || region.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "s3 conf info err with role_arn, please check it";
+ return;
+ }
+ } else {
+ // ATTN: prefix may be empty
+ if (plain_ak.empty() || plain_sk.empty() || bucket.empty() ||
endpoint.empty() ||
+ region.empty() || !obj->has_provider() ||
external_endpoint.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "s3 conf info err, please check it";
+ return;
+ }
+
+ EncryptionInfoPB encryption_info;
+ AkSkPair cipher_ak_sk_pair;
+ auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info,
&cipher_ak_sk_pair,
+ code, msg);
+ TEST_SYNC_POINT_CALLBACK("create_object_info_with_encrypt", &ret,
&code, &msg);
+ if (ret != 0) {
+ return;
+ }
+ obj->set_ak(std::move(cipher_ak_sk_pair.first));
+ obj->set_sk(std::move(cipher_ak_sk_pair.second));
+ obj->mutable_encryption_info()->CopyFrom(encryption_info);
}
- obj->set_ak(std::move(cipher_ak_sk_pair.first));
- obj->set_sk(std::move(cipher_ak_sk_pair.second));
- obj->mutable_encryption_info()->CopyFrom(encryption_info);
obj->set_bucket(bucket);
obj->set_prefix(prefix);
obj->set_endpoint(endpoint);
@@ -781,6 +793,9 @@ struct ObjectStorageDesc {
std::string& external_endpoint;
std::string& region;
bool& use_path_style;
+
+ std::string& role_arn;
+ std::string& external_id;
};
static int extract_object_storage_info(const AlterObjStoreInfoRequest* request,
@@ -793,39 +808,54 @@ static int extract_object_storage_info(const
AlterObjStoreInfoRequest* request,
msg = "s3 obj info err " + proto_to_json(*request);
return -1;
}
- auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region,
use_path_style] = obj_desc;
+
const auto& obj = request->has_obj() ? request->obj() :
request->vault().obj_info();
- // Prepare data
- if (!obj.has_ak() || !obj.has_sk()) {
+
+ // obj size > 1k, refuse
+ if (obj.ByteSizeLong() > 1024) {
code = MetaServiceCode::INVALID_ARGUMENT;
- msg = "s3 obj info err " + proto_to_json(*request);
+ msg = "s3 obj info greater than 1k " + proto_to_json(*request);
return -1;
- }
+ };
- std::string plain_ak = obj.has_ak() ? obj.ak() : "";
- std::string plain_sk = obj.has_sk() ? obj.sk() : "";
+ auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region,
use_path_style, role_arn,
+ external_id] = obj_desc;
- auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info,
&cipher_ak_sk_pair, code,
- msg);
- if (ret != 0) {
- return -1;
+ if (!obj.has_role_arn()) {
+ if (!obj.has_ak() || !obj.has_sk()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "s3 obj info err " + proto_to_json(*request);
+ return -1;
+ }
+
+ std::string plain_ak = obj.has_ak() ? obj.ak() : "";
+ std::string plain_sk = obj.has_sk() ? obj.sk() : "";
+ auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info,
&cipher_ak_sk_pair,
+ code, msg);
+ if (ret != 0) {
+ return -1;
+ }
+
+ ak = cipher_ak_sk_pair.first;
+ sk = cipher_ak_sk_pair.second;
+ } else {
+ if (!obj.has_cred_provider_type() ||
+ obj.cred_provider_type() != CredProviderTypePB::INSTANCE_PROFILE ||
+ !obj.has_provider() || obj.provider() != ObjectStoreInfoPB::S3) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "s3 conf info err with role_arn, please check it";
+ return -1;
+ }
+ role_arn = obj.has_role_arn() ? obj.role_arn() : "";
+ external_id = obj.has_external_id() ? obj.external_id() : "";
}
TEST_SYNC_POINT_CALLBACK("extract_object_storage_info:get_aksk_pair",
&cipher_ak_sk_pair);
-
- ak = cipher_ak_sk_pair.first;
- sk = cipher_ak_sk_pair.second;
bucket = obj.has_bucket() ? obj.bucket() : "";
prefix = obj.has_prefix() ? obj.prefix() : "";
endpoint = obj.has_endpoint() ? obj.endpoint() : "";
external_endpoint = obj.has_external_endpoint() ? obj.external_endpoint()
: "";
region = obj.has_region() ? obj.region() : "";
use_path_style = obj.use_path_style();
- // obj size > 1k, refuse
- if (obj.ByteSizeLong() > 1024) {
- code = MetaServiceCode::INVALID_ARGUMENT;
- msg = "s3 obj info greater than 1k " + proto_to_json(*request);
- return -1;
- };
return 0;
}
@@ -835,7 +865,8 @@ static ObjectStoreInfoPB
object_info_pb_factory(ObjectStorageDesc& obj_desc,
EncryptionInfoPB&
encryption_info,
AkSkPair& cipher_ak_sk_pair) {
ObjectStoreInfoPB last_item;
- auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region,
use_path_style] = obj_desc;
+ auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region,
use_path_style, role_arn,
+ external_id] = obj_desc;
auto now_time = std::chrono::system_clock::now();
uint64_t time =
std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count();
@@ -845,9 +876,16 @@ static ObjectStoreInfoPB
object_info_pb_factory(ObjectStorageDesc& obj_desc,
if (obj.has_user_id()) {
last_item.set_user_id(obj.user_id());
}
- last_item.set_ak(std::move(cipher_ak_sk_pair.first));
- last_item.set_sk(std::move(cipher_ak_sk_pair.second));
- last_item.mutable_encryption_info()->CopyFrom(encryption_info);
+
+ if (!obj.has_role_arn()) {
+ last_item.set_ak(std::move(cipher_ak_sk_pair.first));
+ last_item.set_sk(std::move(cipher_ak_sk_pair.second));
+ last_item.mutable_encryption_info()->CopyFrom(encryption_info);
+ } else {
+ last_item.set_role_arn(role_arn);
+ last_item.set_external_id(external_id);
+ last_item.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE);
+ }
last_item.set_bucket(bucket);
// format prefix, such as `/aa/bb/`, `aa/bb//`, `//aa/bb`, ` /aa/bb` ->
`aa/bb`
trim(prefix);
@@ -858,6 +896,7 @@ static ObjectStoreInfoPB
object_info_pb_factory(ObjectStorageDesc& obj_desc,
last_item.set_provider(obj.provider());
last_item.set_sse_enabled(instance.sse_enabled());
last_item.set_use_path_style(use_path_style);
+
return last_item;
}
@@ -865,7 +904,7 @@ void
MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
const AlterObjStoreInfoRequest*
request,
AlterObjStoreInfoResponse* response,
::google::protobuf::Closure* done) {
- std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region;
+ std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region,
role_arn, external_id;
bool use_path_style;
EncryptionInfoPB encryption_info;
AkSkPair cipher_ak_sk_pair;
@@ -873,8 +912,11 @@ void
MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
switch (request->op()) {
case AlterObjStoreInfoRequest::ADD_S3_VAULT:
case AlterObjStoreInfoRequest::DROP_S3_VAULT: {
- auto tmp_desc = ObjectStorageDesc {
- ak, sk, bucket, prefix, endpoint, external_endpoint, region,
use_path_style};
+ auto tmp_desc = ObjectStorageDesc {ak, sk,
+ bucket, prefix,
+ endpoint, external_endpoint,
+ region, use_path_style,
+ role_arn, external_id};
if (0 != extract_object_storage_info(request, code, msg, tmp_desc,
encryption_info,
cipher_ak_sk_pair)) {
return;
@@ -995,7 +1037,8 @@ void
MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
return;
}
// ATTN: prefix may be empty
- if (ak.empty() || sk.empty() || bucket.empty() || endpoint.empty() ||
region.empty()) {
+ if (((ak.empty() || sk.empty()) && role_arn.empty()) || bucket.empty()
||
+ endpoint.empty() || region.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "s3 conf info err, please check it";
return;
@@ -1013,8 +1056,11 @@ void
MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
}
}
// calc id
- auto tmp_tuple = ObjectStorageDesc {
- ak, sk, bucket, prefix, endpoint, external_endpoint, region,
use_path_style};
+ auto tmp_tuple = ObjectStorageDesc {ak, sk,
+ bucket, prefix,
+ endpoint, external_endpoint,
+ region, use_path_style,
+ role_arn, external_id};
ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj,
instance,
encryption_info,
cipher_ak_sk_pair);
if (instance.storage_vault_names().end() !=
@@ -1156,7 +1202,7 @@ void
MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont
const AlterObjStoreInfoRequest*
request,
AlterObjStoreInfoResponse* response,
::google::protobuf::Closure* done) {
- std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region;
+ std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region,
role_arn, external_id;
bool use_path_style;
EncryptionInfoPB encryption_info;
AkSkPair cipher_ak_sk_pair;
@@ -1165,8 +1211,11 @@ void
MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont
case AlterObjStoreInfoRequest::ADD_OBJ_INFO:
case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK:
case AlterObjStoreInfoRequest::UPDATE_AK_SK: {
- auto tmp_desc = ObjectStorageDesc {
- ak, sk, bucket, prefix, endpoint, external_endpoint, region,
use_path_style};
+ auto tmp_desc = ObjectStorageDesc {ak, sk,
+ bucket, prefix,
+ endpoint, external_endpoint,
+ region, use_path_style,
+ role_arn, external_id};
if (0 != extract_object_storage_info(request, code, msg, tmp_desc,
encryption_info,
cipher_ak_sk_pair)) {
return;
@@ -1287,8 +1336,8 @@ void
MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont
return;
}
// ATTN: prefix may be empty
- if (ak.empty() || sk.empty() || bucket.empty() || endpoint.empty() ||
region.empty() ||
- prefix.empty()) {
+ if (((ak.empty() || sk.empty()) && role_arn.empty()) || bucket.empty()
||
+ endpoint.empty() || region.empty() || prefix.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "s3 conf info err, please check it";
return;
@@ -1306,8 +1355,11 @@ void
MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont
}
}
// calc id
- auto tmp_tuple = ObjectStorageDesc {
- ak, sk, bucket, prefix, endpoint, external_endpoint, region,
use_path_style};
+ auto tmp_tuple = ObjectStorageDesc {ak, sk,
+ bucket, prefix,
+ endpoint, external_endpoint,
+ region, use_path_style,
+ role_arn, external_id};
ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj,
instance,
encryption_info,
cipher_ak_sk_pair);
instance.add_obj_info()->CopyFrom(last_item);
diff --git a/cloud/src/recycler/s3_accessor.cpp
b/cloud/src/recycler/s3_accessor.cpp
index 9d365cffa04..63844665e3a 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -17,10 +17,13 @@
#include "recycler/s3_accessor.h"
-#include <aws/core/Aws.h>
+#include <aws/core/auth/AWSAuthSigner.h>
#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/client/DefaultRetryStrategy.h>
+#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
#include <aws/s3/S3Client.h>
+#include <aws/sts/STSClient.h>
#include <bvar/reducer.h>
#include <gen_cpp/cloud.pb.h>
@@ -181,20 +184,28 @@ std::optional<S3Conf> S3Conf::from_obj_store_info(const
ObjectStoreInfoPB& obj_i
}
if (!skip_aksk) {
- if (obj_info.has_encryption_info()) {
- AkSkPair plain_ak_sk_pair;
- int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(),
obj_info.encryption_info(),
- &plain_ak_sk_pair);
- if (ret != 0) {
- LOG_WARNING("fail to decrypt ak sk").tag("obj_info",
proto_to_json(obj_info));
- return std::nullopt;
+ if (!obj_info.ak().empty() && !obj_info.sk().empty()) {
+ if (obj_info.has_encryption_info()) {
+ AkSkPair plain_ak_sk_pair;
+ int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(),
+ obj_info.encryption_info(),
&plain_ak_sk_pair);
+ if (ret != 0) {
+ LOG_WARNING("fail to decrypt ak sk").tag("obj_info",
proto_to_json(obj_info));
+ return std::nullopt;
+ } else {
+ s3_conf.ak = std::move(plain_ak_sk_pair.first);
+ s3_conf.sk = std::move(plain_ak_sk_pair.second);
+ }
} else {
- s3_conf.ak = std::move(plain_ak_sk_pair.first);
- s3_conf.sk = std::move(plain_ak_sk_pair.second);
+ s3_conf.ak = obj_info.ak();
+ s3_conf.sk = obj_info.sk();
}
- } else {
- s3_conf.ak = obj_info.ak();
- s3_conf.sk = obj_info.sk();
+ }
+
+ if (obj_info.has_role_arn() && !obj_info.role_arn().empty()) {
+ s3_conf.role_arn = obj_info.role_arn();
+ s3_conf.external_id = obj_info.external_id();
+ s3_conf.cred_provider_type = CredProviderType::InstanceProfile;
}
}
@@ -236,6 +247,29 @@ int S3Accessor::create(S3Conf conf,
std::shared_ptr<S3Accessor>* accessor) {
static std::shared_ptr<SimpleThreadPool> worker_pool;
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
S3Accessor::get_aws_credentials_provider(
+ const S3Conf& s3_conf) {
+ if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) {
+ Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk);
+ DCHECK(!aws_cred.IsExpiredOrEmpty());
+ return
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(std::move(aws_cred));
+ }
+
+ if (s3_conf.cred_provider_type == CredProviderType::InstanceProfile) {
+ if (s3_conf.role_arn.empty()) {
+ return
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>();
+ }
+
+ auto stsClient = std::make_shared<Aws::STS::STSClient>(
+
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>());
+
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ s3_conf.role_arn, Aws::String(), s3_conf.external_id,
+ Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient);
+ }
+ return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+}
+
int S3Accessor::init() {
static std::once_flag log_annotated_tags_key_once;
std::call_once(log_annotated_tags_key_once, [&]() {
@@ -287,7 +321,6 @@ int S3Accessor::init() {
static S3Environment s3_env;
// S3Conf::S3
- Aws::Auth::AWSCredentials aws_cred(conf_.ak, conf_.sk);
Aws::Client::ClientConfiguration aws_config;
aws_config.endpointOverride = conf_.endpoint;
aws_config.region = conf_.region;
@@ -302,7 +335,7 @@ int S3Accessor::init() {
aws_config.retryStrategy = std::make_shared<S3CustomRetryStrategy>(
config::max_s3_client_retry /*scaleFactor = 25*/);
auto s3_client = std::make_shared<Aws::S3::S3Client>(
- std::move(aws_cred), std::move(aws_config),
+ get_aws_credentials_provider(conf_), std::move(aws_config),
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
conf_.use_virtual_addressing /* useVirtualAddressing */);
obj_client_ = std::make_shared<S3ObjClient>(std::move(s3_client),
conf_.endpoint);
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index e9640b5693a..faa8392373c 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -17,12 +17,14 @@
#pragma once
+#include <aws/core/Aws.h>
#include <bvar/latency_recorder.h>
#include <array>
#include <cstdint>
#include <memory>
+#include "cpp/aws_common.h"
#include "recycler/obj_storage_client.h"
#include "recycler/storage_vault_accessor.h"
@@ -71,6 +73,10 @@ struct S3Conf {
std::string prefix;
bool use_virtual_addressing {true};
+ CredProviderType cred_provider_type = CredProviderType::Default;
+ std::string role_arn;
+ std::string external_id;
+
enum Provider : uint8_t {
S3,
GCS,
@@ -125,6 +131,9 @@ protected:
virtual int delete_prefix_impl(const std::string& path_prefix, int64_t
expiration_time = 0);
+ std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
get_aws_credentials_provider(
+ const S3Conf& s3_conf);
+
std::string get_key(const std::string& relative_path) const;
std::string to_uri(const std::string& relative_path) const;
diff --git a/cloud/test/meta_service_http_test.cpp
b/cloud/test/meta_service_http_test.cpp
index e51e6fa819b..e9ff2956307 100644
--- a/cloud/test/meta_service_http_test.cpp
+++ b/cloud/test/meta_service_http_test.cpp
@@ -1834,4 +1834,93 @@ TEST(HttpEncodeKeyTest, ProcessHttpSetValue) {
EXPECT_EQ(updated_rowset_meta.data_disk_size(), 2048);
}
+TEST(MetaServiceHttpTest, CreateInstanceWithIamRoleTest) {
+ HttpContext ctx;
+
+ brpc::Controller cntl;
+ std::string instance_id = "iam_role_test_instance_id";
+
+ {
+ ObjectStoreInfoPB obj;
+ obj.set_endpoint("s3.us-east-1.amazonaws.com");
+ obj.set_region("us-east-1");
+ obj.set_prefix("/test-prefix");
+ obj.set_provider(ObjectStoreInfoPB::S3);
+
+ // create instance without ram user
+ CreateInstanceRequest create_instance_req;
+ create_instance_req.set_instance_id(instance_id);
+ create_instance_req.set_user_id("test_user");
+ create_instance_req.set_name("test_name");
+ create_instance_req.mutable_obj_info()->CopyFrom(obj);
+ CreateInstanceResponse create_instance_res;
+ ctx.meta_service_->create_instance(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&create_instance_req,
+ &create_instance_res, nullptr);
+ LOG(INFO) << create_instance_res.DebugString();
+ ASSERT_EQ(create_instance_res.status().code(),
MetaServiceCode::INVALID_ARGUMENT);
+ }
+
+ {
+ ObjectStoreInfoPB obj;
+ obj.set_endpoint("s3.us-east-1.amazonaws.com");
+ obj.set_region("us-east-1");
+ obj.set_prefix("/test-prefix");
+ obj.set_provider(ObjectStoreInfoPB::S3);
+
+ // create instance without ram user
+ CreateInstanceRequest create_instance_req;
+ create_instance_req.set_instance_id(instance_id);
+ create_instance_req.set_user_id("test_user");
+ create_instance_req.set_name("test_name");
+ create_instance_req.mutable_obj_info()->CopyFrom(obj);
+ CreateInstanceResponse create_instance_res;
+ ctx.meta_service_->create_instance(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&create_instance_req,
+ &create_instance_res, nullptr);
+ LOG(INFO) << create_instance_res.DebugString();
+ ASSERT_EQ(create_instance_res.status().code(),
MetaServiceCode::INVALID_ARGUMENT);
+ }
+
+ {
+ ObjectStoreInfoPB obj;
+ obj.set_endpoint("s3.us-east-1.amazonaws.com");
+ obj.set_region("us-east-1");
+ obj.set_bucket("test-bucket");
+ obj.set_prefix("test-prefix");
+ obj.set_provider(ObjectStoreInfoPB::S3);
+ obj.set_role_arn("arn:aws:iam::123456789012:role/test-role");
+ obj.set_external_id("test-external-id");
+ obj.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE);
+
+ CreateInstanceRequest create_instance_req;
+ create_instance_req.set_instance_id(instance_id);
+ create_instance_req.set_user_id("test_user");
+ create_instance_req.set_name("test_name");
+ create_instance_req.mutable_obj_info()->CopyFrom(obj);
+ CreateInstanceResponse create_instance_res;
+ ctx.meta_service_->create_instance(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&create_instance_req,
+ &create_instance_res, nullptr);
+ LOG(INFO) << create_instance_res.DebugString();
+ ASSERT_EQ(create_instance_res.status().code(), MetaServiceCode::OK);
+
+ InstanceInfoPB instance = ctx.get_instance_info(instance_id);
+ LOG(INFO) << instance.DebugString();
+
+ ASSERT_EQ(instance.obj_info().Get(0).endpoint(),
"s3.us-east-1.amazonaws.com");
+ ASSERT_EQ(instance.obj_info().Get(0).region(), "us-east-1");
+ ASSERT_EQ(instance.obj_info().Get(0).bucket(), "test-bucket");
+ ASSERT_EQ(instance.obj_info().Get(0).prefix(), "test-prefix");
+ ASSERT_EQ(instance.obj_info().Get(0).provider(),
ObjectStoreInfoPB::S3);
+ ASSERT_EQ(instance.obj_info().Get(0).role_arn(),
+ "arn:aws:iam::123456789012:role/test-role");
+ ASSERT_EQ(instance.obj_info().Get(0).external_id(),
"test-external-id");
+ ASSERT_EQ(instance.obj_info().Get(0).cred_provider_type(),
+ CredProviderTypePB::INSTANCE_PROFILE);
+ ASSERT_EQ(instance.obj_info().Get(0).has_ak(), false);
+ ASSERT_EQ(instance.obj_info().Get(0).has_sk(), false);
+ }
+}
+
} // namespace doris::cloud
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 0a1be69e1eb..0e8e543c494 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -8666,4 +8666,268 @@ TEST(MetaServiceTest, UpdateTmpRowsetTest) {
}
}
+TEST(MetaServiceTest, CreateS3VaultWithIamRole) {
+ auto sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
+ auto* ret = try_any_cast<int*>(args[0]);
+ *ret = 0;
+ auto* key = try_any_cast<std::string*>(args[1]);
+ *key = "selectdbselectdbselectdbselectdb";
+ auto* key_id = try_any_cast<int64_t*>(args[2]);
+ *key_id = 1;
+ });
+ std::pair<std::string, std::string> pair;
+ sp->set_call_back("extract_object_storage_info:get_aksk_pair", [&](auto&&
args) {
+ auto* ret = try_any_cast<std::pair<std::string,
std::string>*>(args[0]);
+ pair = *ret;
+ });
+
+ auto meta_service = get_meta_service();
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string key;
+ std::string val;
+ InstanceKeyInfo key_info {"test_instance"};
+ instance_key(key_info, &key);
+
+ ObjectStoreInfoPB obj_info;
+ obj_info.set_id("1");
+ obj_info.set_ak("ak");
+ obj_info.set_sk("sk");
+ StorageVaultPB vault;
+ constexpr char vault_name[] = "test_alter_s3_vault";
+ vault.mutable_obj_info()->MergeFrom(obj_info);
+ vault.set_name(vault_name);
+ vault.set_id("2");
+ InstanceInfoPB instance;
+ instance.add_storage_vault_names(vault.name());
+ instance.add_resource_ids(vault.id());
+ instance.set_instance_id("GetObjStoreInfoTestInstance");
+ instance.set_enable_storage_vault(true);
+ val = instance.SerializeAsString();
+ txn->put(key, val);
+ txn->put(storage_vault_key({instance.instance_id(), "2"}),
vault.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ txn = nullptr;
+
+ auto get_test_instance = [&](InstanceInfoPB& i) {
+ std::string key;
+ std::string val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ InstanceKeyInfo key_info {"test_instance"};
+ instance_key(key_info, &key);
+ ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+ i.ParseFromString(val);
+ };
+
+ {
+ AlterObjStoreInfoRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_op(AlterObjStoreInfoRequest::ADD_S3_VAULT);
+ StorageVaultPB vault;
+ vault.mutable_obj_info()->set_endpoint("s3.us-east-1.amazonaws.com");
+ vault.mutable_obj_info()->set_region("us-east-1");
+ vault.mutable_obj_info()->set_bucket("test_bucket");
+ vault.mutable_obj_info()->set_prefix("test_prefix");
+ vault.mutable_obj_info()->set_ak("new_ak");
+ vault.mutable_obj_info()->set_sk("new_sk");
+ vault.mutable_obj_info()->set_provider(
+ ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_S3);
+
+ vault.set_name("ak_sk_s3_vault");
+ req.mutable_vault()->CopyFrom(vault);
+
+ brpc::Controller cntl;
+ AlterObjStoreInfoResponse res;
+ meta_service->alter_storage_vault(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ {
+ InstanceInfoPB instance;
+ get_test_instance(instance);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(txn->get(storage_vault_key({instance.instance_id(),
"2"}), &val),
+ TxnErrorCode::TXN_OK);
+ StorageVaultPB get_obj;
+ get_obj.ParseFromString(val);
+ ASSERT_EQ(get_obj.obj_info().ak(), "ak") <<
get_obj.obj_info().ak();
+ }
+
+ {
+ InstanceInfoPB instance;
+ get_test_instance(instance);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(txn->get(storage_vault_key({instance.instance_id(),
"3"}), &val),
+ TxnErrorCode::TXN_OK);
+ StorageVaultPB get_obj;
+ get_obj.ParseFromString(val);
+ ASSERT_EQ(get_obj.obj_info().ak(), "new_ak") <<
get_obj.obj_info().ak();
+ ASSERT_NE(get_obj.obj_info().sk(), "new_sk") <<
get_obj.obj_info().sk();
+ }
+ }
+
+ {
+ AlterObjStoreInfoRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_op(AlterObjStoreInfoRequest::ADD_S3_VAULT);
+ StorageVaultPB vault;
+ vault.mutable_obj_info()->set_endpoint("s3.us-east-1.amazonaws.com");
+ vault.mutable_obj_info()->set_region("us-east-1");
+ vault.mutable_obj_info()->set_bucket("test_bucket");
+ vault.mutable_obj_info()->set_prefix("test_prefix");
+
vault.mutable_obj_info()->set_role_arn("arn:aws:iam::123456789012:role/test-role");
+ vault.mutable_obj_info()->set_external_id("external_id");
+ vault.mutable_obj_info()->set_provider(
+ ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_S3);
+
vault.mutable_obj_info()->set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE);
+
+ vault.set_name("ak_sk_s3_vault_with_role");
+ req.mutable_vault()->CopyFrom(vault);
+
+ brpc::Controller cntl;
+ AlterObjStoreInfoResponse res;
+ meta_service->alter_storage_vault(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ {
+ InstanceInfoPB instance;
+ get_test_instance(instance);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(txn->get(storage_vault_key({instance.instance_id(),
"4"}), &val),
+ TxnErrorCode::TXN_OK);
+ StorageVaultPB get_obj;
+ get_obj.ParseFromString(val);
+ ASSERT_EQ(get_obj.obj_info().ak().empty(), true) <<
get_obj.obj_info().ak();
+ ASSERT_EQ(get_obj.obj_info().sk().empty(), true) <<
get_obj.obj_info().sk();
+
+ ASSERT_EQ(get_obj.obj_info().role_arn(),
"arn:aws:iam::123456789012:role/test-role")
+ << get_obj.obj_info().role_arn();
+ ASSERT_EQ(get_obj.obj_info().external_id(), "external_id")
+ << get_obj.obj_info().external_id();
+
+ ASSERT_EQ(get_obj.obj_info().endpoint(),
"s3.us-east-1.amazonaws.com")
+ << get_obj.obj_info().endpoint();
+ ASSERT_EQ(get_obj.obj_info().region(), "us-east-1") <<
get_obj.obj_info().region();
+ ASSERT_EQ(get_obj.obj_info().bucket(), "test_bucket") <<
get_obj.obj_info().bucket();
+ ASSERT_EQ(get_obj.obj_info().prefix(), "test_prefix") <<
get_obj.obj_info().prefix();
+ ASSERT_EQ(get_obj.obj_info().provider(),
+
ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_S3)
+ << get_obj.obj_info().provider();
+ ASSERT_EQ(get_obj.name(), "ak_sk_s3_vault_with_role") <<
get_obj.name();
+ ASSERT_EQ(get_obj.id(), "4") << get_obj.id();
+ ASSERT_EQ(get_obj.obj_info().cred_provider_type(),
CredProviderTypePB::INSTANCE_PROFILE)
+ << get_obj.obj_info().cred_provider_type();
+ }
+ }
+
+ LOG(INFO) << "instance:" << instance.ShortDebugString();
+ SyncPoint::get_instance()->disable_processing();
+ SyncPoint::get_instance()->clear_all_call_backs();
+}
+
+TEST(MetaServiceTest, AddObjInfoWithRole) {
+ auto meta_service = get_meta_service();
+
+ auto sp = SyncPoint::get_instance();
+ sp->enable_processing();
+
+ sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
+ auto* ret = try_any_cast<int*>(args[0]);
+ *ret = 0;
+ auto* key = try_any_cast<std::string*>(args[1]);
+ *key = "selectdbselectdbselectdbselectdb";
+ auto* key_id = try_any_cast<int64_t*>(args[2]);
+ *key_id = 1;
+ });
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string key;
+ std::string val;
+ InstanceKeyInfo key_info {"test_instance"};
+ instance_key(key_info, &key);
+
+ InstanceInfoPB instance;
+ val = instance.SerializeAsString();
+ txn->put(key, val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ auto get_test_instance = [&](InstanceInfoPB& i) {
+ std::string key;
+ std::string val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ InstanceKeyInfo key_info {"test_instance"};
+ instance_key(key_info, &key);
+ ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+ i.ParseFromString(val);
+ };
+
+ {
+ AlterObjStoreInfoRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_op(AlterObjStoreInfoRequest::ADD_OBJ_INFO);
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("create_object_info_with_encrypt", [](auto&& args) {
+ auto* ret = try_any_cast<int*>(args[0]);
+ *ret = 0;
+ });
+ sp->enable_processing();
+
+ ObjectStoreInfoPB obj_info;
+ obj_info.set_endpoint("s3.us-east-1.amazonaws.com");
+ obj_info.set_region("us-east-1");
+ obj_info.set_bucket("test_bucket");
+ obj_info.set_prefix("test_prefix");
+ obj_info.set_role_arn("arn:aws:iam::123456789012:role/test-role");
+ obj_info.set_external_id("external_id");
+
obj_info.set_provider(ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_S3);
+ obj_info.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE);
+
+ req.mutable_obj()->MergeFrom(obj_info);
+
+ brpc::Controller cntl;
+ AlterObjStoreInfoResponse res;
+ meta_service->alter_obj_store_info(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
+
+ InstanceInfoPB instance;
+ get_test_instance(instance);
+ const auto& obj = instance.obj_info().at(0);
+ ASSERT_EQ(obj.id(), "1");
+ ASSERT_EQ(obj.ak().empty(), true) << obj.ak();
+ ASSERT_EQ(obj.sk().empty(), true) << obj.sk();
+
+ ASSERT_EQ(obj.role_arn(), "arn:aws:iam::123456789012:role/test-role")
<< obj.role_arn();
+ ASSERT_EQ(obj.external_id(), "external_id") << obj.external_id();
+
+ ASSERT_EQ(obj.endpoint(), "s3.us-east-1.amazonaws.com") <<
obj.endpoint();
+ ASSERT_EQ(obj.region(), "us-east-1") << obj.region();
+ ASSERT_EQ(obj.bucket(), "test_bucket") << obj.bucket();
+ ASSERT_EQ(obj.prefix(), "test_prefix") << obj.prefix();
+ ASSERT_EQ(obj.provider(),
ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_S3)
+ << obj.provider();
+ ASSERT_EQ(obj.cred_provider_type(),
CredProviderTypePB::INSTANCE_PROFILE)
+ << obj.cred_provider_type();
+
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ }
+
+ SyncPoint::get_instance()->disable_processing();
+ SyncPoint::get_instance()->clear_all_call_backs();
+}
} // namespace doris::cloud
diff --git a/cloud/test/s3_accessor_test.cpp b/cloud/test/s3_accessor_test.cpp
index c19f5f6a1df..f95fb2dba18 100644
--- a/cloud/test/s3_accessor_test.cpp
+++ b/cloud/test/s3_accessor_test.cpp
@@ -30,6 +30,7 @@
#include "common/config.h"
#include "common/configbase.h"
#include "common/logging.h"
+#include "cpp/aws_common.h"
#include "cpp/sync_point.h"
using namespace doris;
@@ -41,15 +42,11 @@ int main(int argc, char** argv) {
return -1;
}
- if (cloud::config::test_s3_ak.empty()) {
- std::cout << "empty test_s3_ak, skip S3AccessorTest" << std::endl;
- return 0;
- }
-
if (!cloud::init_glog("s3_accessor_test")) {
std::cerr << "failed to init glog" << std::endl;
return -1;
}
+ doris::cloud::config::aws_log_level = 5;
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
@@ -57,6 +54,14 @@ int main(int argc, char** argv) {
namespace doris::cloud {
namespace {
+class S3AccessorTest : public testing::Test {
+ void SetUp() override {
+ if (cloud::config::test_s3_ak.empty()) {
+ GTEST_SKIP() << "empty test_s3_ak, skip S3AccessorTest";
+ }
+ }
+};
+
void test_s3_accessor(S3Accessor& accessor) {
std::string file1 = "data/10000/1_0.dat";
@@ -196,7 +201,7 @@ void test_s3_accessor(S3Accessor& accessor) {
} // namespace
-TEST(S3AccessorTest, s3) {
+TEST_F(S3AccessorTest, s3) {
std::shared_ptr<S3Accessor> accessor;
int ret = S3Accessor::create(
S3Conf {
@@ -238,7 +243,7 @@ TEST(S3AccessorTest, s3) {
test_s3_accessor(*accessor);
}
-TEST(S3AccessorTest, azure) {
+TEST_F(S3AccessorTest, azure) {
std::shared_ptr<S3Accessor> accessor;
int ret = S3Accessor::create(
S3Conf {
@@ -280,7 +285,7 @@ TEST(S3AccessorTest, azure) {
test_s3_accessor(*accessor);
}
-TEST(S3AccessorTest, gcs) {
+TEST_F(S3AccessorTest, gcs) {
std::shared_ptr<S3Accessor> accessor;
int ret = S3Accessor::create(
S3Conf {
@@ -322,7 +327,7 @@ TEST(S3AccessorTest, gcs) {
test_s3_accessor(*accessor);
}
-TEST(S3AccessorTest, path_style_test) {
+TEST_F(S3AccessorTest, path_style_test) {
ObjectStoreInfoPB obj_info;
obj_info.set_prefix("doris-debug-instance-prefix");
obj_info.set_provider(ObjectStoreInfoPB_Provider_S3);
@@ -388,4 +393,83 @@ TEST(S3AccessorTest, path_style_test) {
}
}
+class S3AccessorRoleTest : public testing::Test {
+ static void SetUpTestSuite() {
+ if (!std::getenv("AWS_ROLE_ARN") || !std::getenv("AWS_EXTERNAL_ID") ||
+ !std::getenv("AWS_ENDPOINT") || !std::getenv("AWS_REGION") ||
+ !std::getenv("AWS_BUCKET") || !std::getenv("AWS_PREFIX")) {
+ return;
+ }
+
+ role_arn = std::getenv("AWS_ROLE_ARN");
+ external_id = std::getenv("AWS_EXTERNAL_ID");
+ endpoint = std::getenv("AWS_ENDPOINT");
+ region = std::getenv("AWS_REGION");
+ bucket = std::getenv("AWS_BUCKET");
+ prefix = std::getenv("AWS_PREFIX");
+ }
+
+ void SetUp() override {
+ if (role_arn.empty() || external_id.empty() || endpoint.empty() ||
region.empty() ||
+ bucket.empty() || prefix.empty()) {
+ GTEST_SKIP() << "Skipping S3 test, because AWS environment not
set";
+ }
+ }
+
+public:
+ static std::string endpoint;
+ static std::string region;
+ static std::string bucket;
+ static std::string prefix;
+ static std::string role_arn;
+ static std::string external_id;
+};
+
+std::string S3AccessorRoleTest::endpoint;
+std::string S3AccessorRoleTest::region;
+std::string S3AccessorRoleTest::bucket;
+std::string S3AccessorRoleTest::prefix;
+std::string S3AccessorRoleTest::role_arn;
+std::string S3AccessorRoleTest::external_id;
+
+TEST_F(S3AccessorRoleTest, s3) {
+ std::shared_ptr<S3Accessor> accessor;
+ int ret = S3Accessor::create(
+ S3Conf {.endpoint = endpoint,
+ .region = region,
+ .bucket = bucket,
+ .prefix = prefix + "/S3AccessorRoleTest/" +
butil::GenerateGUID(),
+ .cred_provider_type = CredProviderType::InstanceProfile,
+ .role_arn = role_arn,
+ .external_id = external_id,
+ .provider = S3Conf::S3},
+ &accessor);
+ ASSERT_EQ(ret, 0);
+
+ auto* sp = SyncPoint::get_instance();
+ std::vector<SyncPoint::CallbackGuard> guards;
+ sp->set_call_back(
+ "S3ObjListIterator",
+ [](auto&& args) {
+ auto* req =
try_any_cast<Aws::S3::Model::ListObjectsV2Request*>(args[0]);
+ req->SetMaxKeys(7);
+ },
+ &guards.emplace_back());
+ sp->set_call_back(
+ "S3ObjClient::delete_objects",
+ [](auto&& args) {
+ auto* delete_batch_size = try_any_cast<size_t*>(args[0]);
+ *delete_batch_size = 7;
+ },
+ &guards.emplace_back());
+ sp->set_call_back(
+ "ObjStorageClient::delete_objects_recursively_",
+ [](auto&& args) {
+ auto* delete_batch_size = try_any_cast<size_t*>(args);
+ *delete_batch_size = 7;
+ },
+ &guards.emplace_back());
+
+ test_s3_accessor(*accessor);
+}
} // namespace doris::cloud
diff --git a/common/cpp/aws_common.cpp b/common/cpp/aws_common.cpp
new file mode 100644
index 00000000000..15a34f7c11a
--- /dev/null
+++ b/common/cpp/aws_common.cpp
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "aws_common.h"
+
+#include <glog/logging.h>
+
+namespace doris {
+
+CredProviderType cred_provider_type_from_pb(cloud::CredProviderTypePB
cred_provider_type) {
+ switch (cred_provider_type) {
+ case cloud::CredProviderTypePB::DEFAULT:
+ return CredProviderType::Default;
+ case cloud::CredProviderTypePB::SIMPLE:
+ return CredProviderType::Simple;
+ case cloud::CredProviderTypePB::INSTANCE_PROFILE:
+ return CredProviderType::InstanceProfile;
+ default:
+ __builtin_unreachable();
+ LOG(WARNING) << "Invalid CredProviderTypePB value: " <<
cred_provider_type
+ << ", use default instead.";
+ return CredProviderType::Default;
+ }
+}
+
+}
\ No newline at end of file
diff --git a/common/cpp/aws_common.h b/common/cpp/aws_common.h
new file mode 100644
index 00000000000..895ba7a6736
--- /dev/null
+++ b/common/cpp/aws_common.h
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/cloud.pb.h>
+
+namespace doris {
+ //AWS Credentials Provider Type
+ enum class CredProviderType { Default = 0, Simple = 1, InstanceProfile = 2
};
+
+ CredProviderType cred_provider_type_from_pb(cloud::CredProviderTypePB
cred_provider_type);
+}
\ No newline at end of file
diff --git a/common/cpp/aws_logger.h b/common/cpp/aws_logger.h
index 8fb74c437db..ca607cab056 100644
--- a/common/cpp/aws_logger.h
+++ b/common/cpp/aws_logger.h
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#pragma once
+
#include <aws/core/utils/logging/LogLevel.h>
#include <aws/core/utils/logging/LogSystemInterface.h>
#include <glog/logging.h> // IWYU pragma: export
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index 9e5e258a3ea..d0764385201 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -19,6 +19,7 @@ package org.apache.doris.common.util;
import org.apache.doris.common.credentials.CloudCredential;
+import com.google.common.base.Strings;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -39,32 +40,114 @@ import
software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.sts.StsClient;
+import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import java.net.URI;
import java.time.Duration;
public class S3Util {
-
- public static S3Client buildS3Client(URI endpoint, String region,
CloudCredential credential,
- boolean isUsePathStyle) {
- AwsCredentialsProvider scp;
+ private static AwsCredentialsProvider
getAwsCredencialsProvider(CloudCredential credential) {
AwsCredentials awsCredential;
+ AwsCredentialsProvider awsCredentialsProvider;
if (!credential.isTemporary()) {
awsCredential =
AwsBasicCredentials.create(credential.getAccessKey(),
credential.getSecretKey());
} else {
awsCredential =
AwsSessionCredentials.create(credential.getAccessKey(),
credential.getSecretKey(),
credential.getSessionToken());
}
+
if (!credential.isWhole()) {
- scp = AwsCredentialsProviderChain.of(
+ awsCredentialsProvider = AwsCredentialsProviderChain.of(
SystemPropertyCredentialsProvider.create(),
EnvironmentVariableCredentialsProvider.create(),
WebIdentityTokenFileCredentialsProvider.create(),
ProfileCredentialsProvider.create(),
InstanceProfileCredentialsProvider.create());
} else {
- scp = StaticCredentialsProvider.create(awsCredential);
+ awsCredentialsProvider =
StaticCredentialsProvider.create(awsCredential);
+ }
+
+ return awsCredentialsProvider;
+ }
+
+ @Deprecated
+ public static S3Client buildS3Client(URI endpoint, String region,
CloudCredential credential,
+ boolean isUsePathStyle) {
+ EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
+ .builder()
+ .baseDelay(Duration.ofSeconds(1))
+ .maxBackoffTime(Duration.ofMinutes(1))
+ .build();
+ // retry 3 time with Equal backoff
+ RetryPolicy retryPolicy = RetryPolicy
+ .builder()
+ .numRetries(3)
+ .backoffStrategy(backoffStrategy)
+ .build();
+ ClientOverrideConfiguration clientConf = ClientOverrideConfiguration
+ .builder()
+ // set retry policy
+ .retryPolicy(retryPolicy)
+ // using AwsS3V4Signer
+ .putAdvancedOption(SdkAdvancedClientOption.SIGNER,
AwsS3V4Signer.create())
+ .build();
+ return S3Client.builder()
+ .httpClient(UrlConnectionHttpClient.create())
+ .endpointOverride(endpoint)
+ .credentialsProvider(getAwsCredencialsProvider(credential))
+ .region(Region.of(region))
+ .overrideConfiguration(clientConf)
+ // disable chunkedEncoding because of bos not supported
+ .serviceConfiguration(S3Configuration.builder()
+ .chunkedEncodingEnabled(false)
+ .pathStyleAccessEnabled(isUsePathStyle)
+ .build())
+ .build();
+ }
+
+ /**
+ * Using (accessKey, secretKey) or roleArn for creating different
credentials provider when creating s3client
+ * @param endpoint AWS endpoint (eg: "https://s3.us-east-1.amazonaws.com")
+ * @param region AWS region identifier (eg: "us-east-1")
+ * @param accessKey AWS access key ID
+ * @param secretKey AWS secret access key, paired with accessKey
+ * @param sessionToken AWS temporary session token for short-term
credentials
+ * @param roleArn AWS iam role arn to assume (format:
"arn:aws:iam::123456789012:role/role-name")
+ * @param externalId AWS External ID for cross-account role assumption
security
+ * @return
+ */
+ private static AwsCredentialsProvider getAwsCredencialsProvider(URI
endpoint, String region, String accessKey,
+ String secretKey, String sessionToken, String roleArn, String
externalId) {
+
+ if (!Strings.isNullOrEmpty(accessKey) &&
!Strings.isNullOrEmpty(secretKey)) {
+ if (Strings.isNullOrEmpty(sessionToken)) {
+ return
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey,
secretKey));
+ } else {
+ return
StaticCredentialsProvider.create(AwsSessionCredentials.create(accessKey,
+ secretKey, sessionToken));
+ }
+ }
+
+ if (!Strings.isNullOrEmpty(roleArn)) {
+ StsClient stsClient = StsClient.builder()
+
.credentialsProvider(InstanceProfileCredentialsProvider.create())
+ .build();
+ return StsAssumeRoleCredentialsProvider.builder()
+ .stsClient(stsClient)
+ .refreshRequest(r ->
r.roleArn(roleArn).externalId(externalId)
+ .roleSessionName("aws-sdk-java-v2-fe"))
+ .build();
}
+ return
AwsCredentialsProviderChain.of(SystemPropertyCredentialsProvider.create(),
+ EnvironmentVariableCredentialsProvider.create(),
+ WebIdentityTokenFileCredentialsProvider.create(),
+ ProfileCredentialsProvider.create(),
+ InstanceProfileCredentialsProvider.create());
+ }
+
+ public static S3Client buildS3Client(URI endpoint, String region, boolean
isUsePathStyle, String accessKey,
+ String secretKey, String sessionToken, String roleArn, String
externalId) {
EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
.builder()
.baseDelay(Duration.ofSeconds(1))
@@ -86,7 +169,8 @@ public class S3Util {
return S3Client.builder()
.httpClient(UrlConnectionHttpClient.create())
.endpointOverride(endpoint)
- .credentialsProvider(scp)
+ .credentialsProvider(getAwsCredencialsProvider(endpoint,
region, accessKey, secretKey,
+ sessionToken, roleArn, externalId))
.region(Region.of(region))
.overrideConfiguration(clientConf)
// disable chunkedEncoding because of bos not supported
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java
index 5cc19339ce0..c4f4c01b0b0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java
@@ -96,6 +96,14 @@ public class S3ClientBEProperties {
if (properties.containsKey(S3Properties.PROVIDER)) {
beProperties.put(S3Properties.PROVIDER,
properties.get(S3Properties.PROVIDER));
}
+
+ if (properties.containsKey(S3Properties.ROLE_ARN)) {
+ beProperties.put(S3Properties.Env.ROLE_ARN,
properties.get(S3Properties.ROLE_ARN));
+ }
+
+ if (properties.containsKey(S3Properties.EXTERNAL_ID)) {
+ beProperties.put(S3Properties.Env.EXTERNAL_ID,
properties.get(S3Properties.EXTERNAL_ID));
+ }
return beProperties;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
index 3f8e875ae68..41f4de716e9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
@@ -18,6 +18,7 @@
package org.apache.doris.datasource.property.constants;
import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.proto.Cloud.CredProviderTypePB;
import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB.Provider;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
@@ -25,6 +26,7 @@ import org.apache.doris.common.credentials.CloudCredential;
import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.credentials.DataLakeAWSCredentialsProvider;
import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.thrift.TCredProviderType;
import org.apache.doris.thrift.TS3StorageParam;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
@@ -57,6 +59,10 @@ public class S3Properties extends BaseProperties {
public static final String ACCESS_KEY = "s3.access_key";
public static final String SECRET_KEY = "s3.secret_key";
public static final String SESSION_TOKEN = "s3.session_token";
+
+ public static final String ROLE_ARN = "s3.role_arn";
+ public static final String EXTERNAL_ID = "s3.external_id";
+
public static final String MAX_CONNECTIONS = "s3.connection.maximum";
public static final String REQUEST_TIMEOUT_MS =
"s3.connection.request.timeout";
public static final String CONNECTION_TIMEOUT_MS = "s3.connection.timeout";
@@ -121,6 +127,9 @@ public class S3Properties extends BaseProperties {
public static final String DEFAULT_CONNECTION_TIMEOUT_MS = "1000";
public static final String NEED_OVERRIDE_ENDPOINT =
"AWS_NEED_OVERRIDE_ENDPOINT";
+ public static final String ROLE_ARN = "AWS_ROLE_ARN";
+ public static final String EXTERNAL_ID = "AWS_EXTERNAL_ID";
+
public static final List<String> REQUIRED_FIELDS =
Arrays.asList(ENDPOINT);
public static final List<String> FS_KEYS = Arrays.asList(ENDPOINT,
REGION, ACCESS_KEY, SECRET_KEY, TOKEN,
ROOT_PATH, BUCKET, MAX_CONNECTIONS, REQUEST_TIMEOUT_MS,
CONNECTION_TIMEOUT_MS);
@@ -284,10 +293,27 @@ public class S3Properties extends BaseProperties {
if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
properties.putIfAbsent(PropertyConverter.USE_PATH_STYLE,
properties.get(PropertyConverter.USE_PATH_STYLE));
}
+
+ if (properties.containsKey(S3Properties.Env.ROLE_ARN)) {
+ properties.putIfAbsent(S3Properties.ROLE_ARN,
properties.get(S3Properties.Env.ROLE_ARN));
+ }
+
+ if (properties.containsKey(S3Properties.Env.EXTERNAL_ID)) {
+ properties.putIfAbsent(S3Properties.EXTERNAL_ID,
properties.get(S3Properties.Env.EXTERNAL_ID));
+ }
}
public static TS3StorageParam getS3TStorageParam(Map<String, String>
properties) {
TS3StorageParam s3Info = new TS3StorageParam();
+
+ if (properties.containsKey(S3Properties.ROLE_ARN)) {
+ s3Info.setRoleArn(properties.get(S3Properties.ROLE_ARN));
+ if (properties.containsKey(S3Properties.EXTERNAL_ID)) {
+ s3Info.setExternalId(properties.get(S3Properties.EXTERNAL_ID));
+ }
+ s3Info.setCredProviderType(TCredProviderType.INSTANCE_PROFILE);
+ }
+
s3Info.setEndpoint(properties.get(S3Properties.ENDPOINT));
s3Info.setRegion(properties.get(S3Properties.REGION));
s3Info.setAk(properties.get(S3Properties.ACCESS_KEY));
@@ -348,6 +374,15 @@ public class S3Properties extends BaseProperties {
"Invalid use_path_style value: %s only 'true' or 'false'
is acceptable", value);
builder.setUsePathStyle(value.equalsIgnoreCase("true"));
}
+
+ if (properties.containsKey(S3Properties.ROLE_ARN)) {
+ builder.setRoleArn(properties.get(S3Properties.ROLE_ARN));
+ if (properties.containsKey(S3Properties.EXTERNAL_ID)) {
+
builder.setExternalId(properties.get(S3Properties.EXTERNAL_ID));
+ }
+ builder.setCredProviderType(CredProviderTypePB.INSTANCE_PROFILE);
+ }
+
return builder;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 5249c9f49d8..6adb98f1d7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -20,7 +20,6 @@ package org.apache.doris.fs.obj;
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.credentials.CloudCredential;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.datasource.property.PropertyConverter;
@@ -127,13 +126,10 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
endpointStr = "http://" + endpointStr;
}
URI endpoint = URI.create(endpointStr);
- CloudCredential credential = new CloudCredential();
- credential.setAccessKey(properties.get(S3Properties.ACCESS_KEY));
- credential.setSecretKey(properties.get(S3Properties.SECRET_KEY));
- if (properties.containsKey(S3Properties.SESSION_TOKEN)) {
-
credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN));
- }
- client = S3Util.buildS3Client(endpoint,
properties.get(S3Properties.REGION), credential, isUsePathStyle);
+ client = S3Util.buildS3Client(endpoint,
properties.get(S3Properties.REGION), isUsePathStyle,
+ properties.get(S3Properties.ACCESS_KEY),
properties.get(S3Properties.SECRET_KEY),
+ properties.get(S3Properties.SESSION_TOKEN),
properties.get(S3Properties.ROLE_ARN),
+ properties.get(S3Properties.EXTERNAL_ID));
}
return client;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 56c438c303e..93dc28d84a3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -104,6 +104,14 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
// For Azure's compatibility, we need bucket to connect to the
blob storage's container
locationProperties.put(S3Properties.BUCKET, s3uri.getBucket());
}
+
+ if (properties.containsKey(S3Properties.ROLE_ARN)) {
+ locationProperties.put(S3Properties.ROLE_ARN,
properties.get(S3Properties.ROLE_ARN));
+ if (properties.containsKey(S3Properties.EXTERNAL_ID)) {
+ locationProperties.put(S3Properties.EXTERNAL_ID,
properties.get(S3Properties.EXTERNAL_ID));
+ }
+ }
+
locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(locationProperties));
locationProperties.putAll(otherProps);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java
index b7d14ab7017..b39e2fbfef8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java
@@ -277,4 +277,35 @@ public class S3ResourceTest {
Assert.assertTrue(e.getMessage(), false);
}
}
+
+ @Test
+ public void testPingS3WithRoleArn() {
+ try {
+ String endpoint = System.getenv("ENDPOINT");
+ String region = System.getenv("REGION");
+ String provider = System.getenv("PROVIDER");
+
+ String roleArn = System.getenv("ROLE_ARN");
+ String externalId = System.getenv("EXTERNAL_ID");
+ String bucket = System.getenv("BUCKET");
+
+ Assume.assumeTrue("ENDPOINT isNullOrEmpty.",
!Strings.isNullOrEmpty(endpoint));
+ Assume.assumeTrue("REGION isNullOrEmpty.",
!Strings.isNullOrEmpty(region));
+ Assume.assumeTrue("PROVIDER isNullOrEmpty.",
!Strings.isNullOrEmpty(provider));
+ Assume.assumeTrue("ROLE_ARN isNullOrEmpty.",
!Strings.isNullOrEmpty(roleArn));
+ Assume.assumeTrue("EXTERNAL_ID isNullOrEmpty.",
!Strings.isNullOrEmpty(externalId));
+ Assume.assumeTrue("BUCKET isNullOrEmpty.",
!Strings.isNullOrEmpty(bucket));
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("s3.endpoint", endpoint);
+ properties.put("s3.region", region);
+ properties.put("s3.role_arn", roleArn);
+ properties.put("s3.external_id", externalId);
+ properties.put("provider", provider);
+ S3Resource.pingS3(bucket, "fe_ut_role_prefix", properties);
+ } catch (DdlException e) {
+ LOG.info("testPingS3WithRoleArn exception:", e);
+ Assert.assertTrue(e.getMessage(), false);
+ }
+ }
}
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index c18b35ce15f..1b6cdd9bd5a 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -170,6 +170,13 @@ enum NodeStatusPB {
NODE_STATUS_DECOMMISSIONED = 4;
}
+enum CredProviderTypePB {
+ // used for creating different credentials provider when creating s3client
+ DEFAULT = 1; // DefaultAWSCredentialsProviderChain
+ SIMPLE = 2; // SimpleAWSCredentialsProvider, corresponding to (ak, sk)
+ INSTANCE_PROFILE = 3; // InstanceProfileCredentialsProvider
+}
+
message ObjectStoreInfoPB {
// presigned url use
// oss,aws,cos,obs,bos
@@ -198,6 +205,10 @@ message ObjectStoreInfoPB {
optional EncryptionInfoPB encryption_info = 14;
optional bool sse_enabled = 15;
optional bool use_path_style = 16;
+
+ optional CredProviderTypePB cred_provider_type = 17;
+ optional string role_arn = 18; // aws assumed role's arn
+ optional string external_id = 19; // aws assumed role's external_id if
configure
}
// The legacy ObjectStoreInfoPB is stored in InstanceInfoPB
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index aa1ee2f5b9f..cf206f5c564 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -77,6 +77,13 @@ enum TObjStorageType {
GCP = 7
}
+enum TCredProviderType {
+ // used for creating different credentials provider when creating s3client
+ DEFAULT = 0, // DefaultAWSCredentialsProviderChain
+ SIMPLE = 1, // SimpleAWSCredentialsProvider, corresponding to (ak, sk)
+ INSTANCE_PROFILE = 2 // InstanceProfileCredentialsProvider
+}
+
struct TS3StorageParam {
1: optional string endpoint
2: optional string region
@@ -90,6 +97,10 @@ struct TS3StorageParam {
10: optional bool use_path_style = false
11: optional string token
12: optional TObjStorageType provider
+
+ 13: optional TCredProviderType cred_provider_type
+ 14: optional string role_arn // aws assumed role's arn
+ 15: optional string external_id // aws assumed role's external_id if
configure
}
struct TStoragePolicy {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
index b57976c0578..da76c682c57 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
@@ -150,6 +150,23 @@ class Config {
public String s3Source
+ // for aws role arn regression test
+ public String awsRoleArn
+ public String awsExternalId
+ public String awsEndpoint
+ public String awsRegion
+ public String awsBucket
+ public String awsPrefix
+ public String awsAccessKey
+ public String awsSecretKey
+
+ public String regressionAwsRoleArn
+ public String regressionAwsExternalId
+ public String regressionAwsEndpoint
+ public String regressionAwsRegion
+ public String regressionAwsBucket
+ public String regressionAwsPrefix
+
Config() {}
Config(
@@ -599,6 +616,23 @@ class Config {
config.dockerEndNoKill = configToBoolean(obj.dockerEndNoKill)
config.excludeDockerTest = configToBoolean(obj.excludeDockerTest)
+ config.awsRoleArn = configToString(obj.awsRoleArn)
+ config.awsExternalId = configToString(obj.awsExternalId)
+ config.awsPrefix = configToString(obj.awsPrefix)
+ config.awsEndpoint = configToString(obj.awsEndpoint)
+ config.awsRegion = configToString(obj.awsRegion)
+ config.awsBucket = configToString(obj.awsBucket)
+ config.awsAccessKey = configToString(obj.awsAccessKey)
+ config.awsSecretKey = configToString(obj.awsSecretKey)
+ config.awsPrefix = configToString(obj.awsPrefix)
+
+ config.regressionAwsRoleArn = configToString(obj.regressionAwsRoleArn)
+ config.regressionAwsExternalId =
configToString(obj.regressionAwsExternalId)
+ config.regressionAwsEndpoint =
configToString(obj.regressionAwsEndpoint)
+ config.regressionAwsRegion = configToString(obj.regressionAwsRegion)
+ config.regressionAwsBucket = configToString(obj.regressionAwsBucket)
+ config.regressionAwsPrefix = configToString(obj.regressionAwsPrefix)
+
def declareFileNames = config.getClass()
.getDeclaredFields()
.collect({f -> f.name})
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 8f6a66d50eb..49046a0807f 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -977,4 +977,27 @@ class Syncer {
)
"""
}
+
+ void createS3RepositoryWithRole(String name, boolean readOnly = false) {
+ String roleArn = suite.context.config.awsRoleArn
+ String externalId = suite.context.config.awsExternalId
+ String endpoint = suite.context.config.awsEndpoint
+ String region = suite.context.config.awsRegion
+ String bucket = suite.context.config.awsBucket
+ String prefix = suite.context.config.awsPrefix
+
+ suite.try_sql "DROP REPOSITORY `${name}`"
+ suite.sql """
+ CREATE ${readOnly ? "READ ONLY" : ""} REPOSITORY `${name}`
+ WITH S3
+ ON LOCATION "s3://${bucket}/${prefix}/aws_iam_role_p0/${name}"
+ PROPERTIES
+ (
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "${region}",
+ "s3.role_arn" = "${roleArn}",
+ "s3.external_id" = "${externalId}"
+ )
+ """
+ }
}
diff --git
a/regression-test/suites/aws_iam_role_p0/test_backup_restore_with_role.groovy
b/regression-test/suites/aws_iam_role_p0/test_backup_restore_with_role.groovy
new file mode 100644
index 00000000000..33349bdfbbf
--- /dev/null
+++
b/regression-test/suites/aws_iam_role_p0/test_backup_restore_with_role.groovy
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.google.common.base.Strings;
+
+suite("test_backup_restore_with_role") {
+ if (Strings.isNullOrEmpty(context.config.awsRoleArn)) {
+ logger.info("skip ${name} case, because awsRoleArn is null or empty")
+ return
+ }
+
+ if (isCloudMode()) {
+ logger.info("skip ${name} case, because cloud mode not support")
+ return
+ }
+
+ logger.info("role info:
${context.config.awsRoleArn}|${context.config.awsExternalId}|${context.config.awsPrefix}")
+
+ String suiteName = "test_backup_restore_with_role"
+
+ String dbName = "${suiteName}_db"
+ String tableName = "${suiteName}_table"
+ String repoName = "${suiteName}_repo_" +
UUID.randomUUID().toString().replace("-", "")
+ String snapshotName = "${suiteName}_snapshot"
+
+ def syncer = getSyncer()
+ syncer.createS3RepositoryWithRole(repoName)
+
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE;"
+ sql """
+ CREATE TABLE ${dbName}.${tableName} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0")
+ AGGREGATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+
+ List<String> values = []
+ for (int i = 1; i <= 10; ++i) {
+ values.add("(${i}, ${i})")
+ }
+ sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}"
+ def result = sql "SELECT * FROM ${dbName}.${tableName}"
+ assertEquals(result.size(), values.size());
+
+ sql """
+ BACKUP SNAPSHOT ${dbName}.${snapshotName}
+ TO `${repoName}`
+ ON (${tableName})
+ """
+
+ syncer.waitSnapshotFinish(dbName)
+
+ def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+ assertTrue(snapshot != null)
+
+ sql "TRUNCATE TABLE ${dbName}.${tableName}"
+
+ sql """
+ RESTORE SNAPSHOT ${dbName}.${snapshotName}
+ FROM `${repoName}`
+ ON ( `${tableName}`)
+ PROPERTIES
+ (
+ "backup_timestamp" = "${snapshot}",
+ "reserve_replica" = "true"
+ )
+ """
+
+ syncer.waitAllRestoreFinish(dbName)
+
+ result = sql "SELECT * FROM ${dbName}.${tableName}"
+ assertEquals(result.size(), values.size());
+
+ sql "DROP TABLE ${dbName}.${tableName} FORCE"
+ sql "DROP DATABASE ${dbName} FORCE"
+ sql "DROP REPOSITORY `${repoName}`"
+}
\ No newline at end of file
diff --git
a/regression-test/suites/aws_iam_role_p0/test_export_with_role.groovy
b/regression-test/suites/aws_iam_role_p0/test_export_with_role.groovy
new file mode 100644
index 00000000000..1fbf4f3a8f0
--- /dev/null
+++ b/regression-test/suites/aws_iam_role_p0/test_export_with_role.groovy
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.google.common.base.Strings;
+
+suite("test_export_with_role") {
+ if (Strings.isNullOrEmpty(context.config.awsRoleArn)) {
+ logger.info("skip ${name} case, because awsRoleArn is null or empty")
+ return
+ }
+
+
+ def randomStr = UUID.randomUUID().toString().replace("-", "")
+ def label = "label_" + randomStr
+ def tableName = "test_export_with_role"
+
+ def endpoint = context.config.awsEndpoint
+ def region = context.config.awsRegion
+ def bucket = context.config.awsBucket
+ def roleArn = context.config.awsRoleArn
+ def externalId = context.config.awsExternalId
+
+ def prefix = context.config.awsPrefix
+
+ sql """
+ DROP TABLE IF EXISTS ${tableName} FORCE;
+ """
+
+ sql """
+ CREATE TABLE ${tableName}
+ (
+ siteid INT DEFAULT '10',
+ citycode SMALLINT NOT NULL,
+ username VARCHAR(32) DEFAULT '',
+ pv BIGINT SUM DEFAULT '0'
+ )
+ AGGREGATE KEY(siteid, citycode, username)
+ DISTRIBUTED BY HASH(siteid) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """insert into ${tableName}(siteid, citycode, username, pv) values (1,
1, "xxx", 1),
+ (2, 2, "yyy", 2),
+ (3, 3, "zzz", 3)
+ """
+
+ sql """
+ EXPORT TABLE ${tableName} TO
"s3://${bucket}/${prefix}/aws_iam_role_p0/test_export_with_role"
+ PROPERTIES(
+ "label" = "${label}",
+ "format" = "csv",
+ "column_separator"=",",
+ "delete_existing_files"= "true"
+ )
+ WITH s3 (
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "${region}",
+ "s3.role_arn" = "${roleArn}",
+ "s3.external_id"="${externalId}",
+ "provider" = "AWS"
+ );
+ """
+
+ def maxTryMs = 600000
+ def outfileUrl = ""
+ while (maxTryMs > 0) {
+ String[][] result = sql """ show export where label = "${label}" """
+ logger.info("result: ${result}")
+ if (result[0][2].equals("FINISHED")) {
+ def json = parseJson(result[0][11])
+ logger.info("json: ${json}")
+ assert json instanceof List
+ assertEquals("1", json.fileNumber[0][0])
+ log.info("outfileUrl: ${json.url[0][0]}")
+ outfileUrl = json.url[0][0]
+ break;
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ assertTrue(false, "Export ${label} cancelled: ${result}")
+ break
+ }
+ Thread.sleep(5000)
+ maxTryMs -= 5000
+ if (maxTryMs <= 0) {
+ assertTrue(false, "Export ${label} timeout")
+ }
+ }
+
+ def result = sql """
+ select count(*) from s3("uri" =
"s3://${bucket}/${outfileUrl.substring(5 + bucket.length())}0.csv",
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "${region}",
+ "s3.role_arn"= "${roleArn}",
+ "s3.external_id" = "${externalId}",
+ "format" = "csv",
+ "column_separator" = ",",
+ "use_path_style" = "false");
+ """
+ log.info("result: ${result}")
+ assertEquals(3, result[0][0])
+}
\ No newline at end of file
diff --git
a/regression-test/suites/aws_iam_role_p0/test_external_catalog_with_role.groovy
b/regression-test/suites/aws_iam_role_p0/test_external_catalog_with_role.groovy
new file mode 100644
index 00000000000..0734c34b4a1
--- /dev/null
+++
b/regression-test/suites/aws_iam_role_p0/test_external_catalog_with_role.groovy
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.google.common.base.Strings;
+
+suite("test_external_catalog_with_role") {
+ if (Strings.isNullOrEmpty(context.config.awsRoleArn)) {
+ logger.info("skip ${name} case, because awsRoleArn is null or empty")
+ return
+ }
+
+ def endpoint = context.config.awsEndpoint
+ def region = context.config.awsRegion
+ def bucket = context.config.awsBucket
+ def roleArn = context.config.awsRoleArn
+ def externalId = context.config.awsExternalId
+ def prefix = context.config.awsPrefix
+
+ def randomStr = UUID.randomUUID().toString().replace("-", "")
+
+ def tableName = "test_external_catalog_with_role"
+
+ sql """ drop table if exists ${tableName} force;"""
+
+ sql """
+ CREATE TABLE ${tableName}
+ (
+ siteid INT DEFAULT '10',
+ citycode INT NOT NULL,
+ username VARCHAR(32) DEFAULT '',
+ pv BIGINT SUM DEFAULT '0'
+ )
+ AGGREGATE KEY(siteid, citycode, username)
+ DISTRIBUTED BY HASH(siteid) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """insert into ${tableName}(siteid, citycode, username, pv) values (1,
1, "xxx", 1),
+ (2, 2, "yyy", 2),
+ (3, 3, "zzz", 3)
+ """
+ sql """sync;"""
+
+
+ sql """ drop catalog if exists aws_iam_role_p0_iceberg;"""
+
+ sql """
+ CREATE CATALOG aws_iam_role_p0_iceberg PROPERTIES (
+ "type" = "iceberg",
+ "iceberg.catalog.type" = "hadoop",
+ "warehouse" =
"s3://${bucket}/${prefix}/aws_iam_role_p0/test_external_catalog_with_role/${randomStr}",
+ "s3.endpoint" = "${endpoint}",
+ "s3.role_arn" = "${roleArn}",
+ "s3.external_id" = "${externalId}"
+ );
+ """
+
+ sql """ create database if not exists
aws_iam_role_p0_iceberg.test_role_db; """
+
+ sql """ drop table if exists
aws_iam_role_p0_iceberg.test_role_db.${tableName}_2;"""
+
+ sql """ CREATE TABLE aws_iam_role_p0_iceberg.test_role_db.${tableName}_2
+ PROPERTIES("file_format" = "parquet") AS SELECT * FROM
${tableName};"""
+
+ sql """ sync; """
+
+ def result = sql "SELECT count(*) FROM
aws_iam_role_p0_iceberg.test_role_db.${tableName}_2"
+ logger.info("result: ${result}")
+ assertEquals(result[0][0], 3);
+}
\ No newline at end of file
diff --git
a/regression-test/suites/aws_iam_role_p0/test_resource_with_role.groovy
b/regression-test/suites/aws_iam_role_p0/test_resource_with_role.groovy
new file mode 100644
index 00000000000..66410ff48d4
--- /dev/null
+++ b/regression-test/suites/aws_iam_role_p0/test_resource_with_role.groovy
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.google.common.base.Strings;
+import groovy.json.JsonSlurper
+
+suite("test_resource_with_role") {
+ if (Strings.isNullOrEmpty(context.config.awsRoleArn)) {
+ logger.info("skip ${name} case, because awsRoleArn is null or empty")
+ return
+ }
+
+
+ if (isCloudMode()) {
+ logger.info("skip ${name} case, because cloud mode not support")
+ return
+ }
+
+ def tableName = "test_resource_with_role"
+ def randomStr = UUID.randomUUID().toString().replace("-", "")
+ def resourceName = "resource_${randomStr}"
+ def policyName = "policy_${randomStr}"
+
+ def awsEndpoint = context.config.awsEndpoint
+ def region = context.config.awsRegion
+ def bucket = context.config.awsBucket
+ def roleArn = context.config.awsRoleArn
+ def externalId = context.config.awsExternalId
+ def prefix = context.config.awsPrefix
+
+ sql """
+ CREATE RESOURCE IF NOT EXISTS "${resourceName}"
+ PROPERTIES(
+ "type"="s3",
+ "AWS_ENDPOINT" = "${awsEndpoint}",
+ "AWS_REGION" = "${region}",
+ "AWS_BUCKET" = "${bucket}",
+ "AWS_ROOT_PATH" =
"${prefix}/aws_iam_role_p0/test_resource_with_role/${randomStr}",
+ "AWS_ROLE_ARN" = "${roleArn}",
+ "AWS_EXTERNAL_ID" = "${externalId}",
+ "AWS_MAX_CONNECTIONS" = "50",
+ "AWS_REQUEST_TIMEOUT_MS" = "3000",
+ "AWS_CONNECTION_TIMEOUT_MS" = "1000",
+ "s3_validity_check" = "true"
+ );
+ """
+
+ sql """
+ CREATE STORAGE POLICY IF NOT EXISTS ${policyName}
+ PROPERTIES(
+ "storage_resource" = "${resourceName}",
+ "cooldown_ttl" = "1"
+ )
+ """
+
+ sql """
+ DROP TABLE IF EXISTS ${tableName} FORCE;
+ """
+
+ sql """
+ CREATE TABLE ${tableName}
+ (
+ siteid INT DEFAULT '10',
+ citycode SMALLINT NOT NULL,
+ username VARCHAR(32) DEFAULT '',
+ pv BIGINT SUM DEFAULT '0'
+ )
+ AGGREGATE KEY(siteid, citycode, username)
+ DISTRIBUTED BY HASH(siteid) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "storage_policy" = "${policyName}"
+ )
+ """
+
+ sql """insert into ${tableName}(siteid, citycode, username, pv) values (1,
1, "xxx", 1),
+ (2, 2, "yyy", 2),
+ (3, 3, "zzz", 3)
+ """
+
+ // data_sizes is one arrayList<Long>, t is tablet
+ def fetchDataSize = {List<Long> data_sizes, Map<String, Object> t ->
+ def tabletId = t.TabletId
+ def meta_url = t.MetaUrl
+ def clos = { respCode, body ->
+ logger.info("test ttl expired resp Code {}",
"${respCode}".toString())
+ assertEquals("${respCode}".toString(), "200")
+ String out = "${body}".toString()
+ def obj = new JsonSlurper().parseText(out)
+ data_sizes[0] = obj.local_data_size
+ data_sizes[1] = obj.remote_data_size
+ }
+ meta_url = meta_url.replace("header", "data_size")
+
+ def i = meta_url.indexOf("/api")
+ def endPoint = meta_url.substring(0, i)
+ def metaUri = meta_url.substring(i)
+ logger.info("test fetchBeHttp, endpoint:${endPoint},
metaUri:${metaUri}")
+ i = endPoint.lastIndexOf('/')
+ endPoint = endPoint.substring(i + 1)
+
+ httpTest {
+ endpoint {endPoint}
+ uri metaUri
+ op "get"
+ check clos
+ }
+ }
+
+ sleep(60000)
+
+ List<Long> sizes = [-1, -1]
+ def tablets = sql_return_maparray """
+ SHOW TABLETS FROM ${tableName}
+ """
+ log.info( "test tablets not empty:${tablets}")
+ fetchDataSize(sizes, tablets[0])
+ def retry = 100
+ while (sizes[1] == 0 && retry-- > 0) {
+ log.info( "test remote size is zero, sleep 10s")
+ sleep(10000)
+ tablets = sql_return_maparray """
+ SHOW TABLETS FROM ${tableName}
+ """
+ fetchDataSize(sizes, tablets[0])
+ }
+ assertTrue(sizes[1] != 0, "remote size is still zero, maybe some error
occurred")
+ assertTrue(tablets.size() > 0)
+ log.info( "test remote size not zero")
+}
\ No newline at end of file
diff --git
a/regression-test/suites/aws_iam_role_p0/test_s3_load_with_role.groovy
b/regression-test/suites/aws_iam_role_p0/test_s3_load_with_role.groovy
new file mode 100644
index 00000000000..25c759802cf
--- /dev/null
+++ b/regression-test/suites/aws_iam_role_p0/test_s3_load_with_role.groovy
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.google.common.base.Strings;
+
+suite("test_s3_load_with_role") {
+ if (Strings.isNullOrEmpty(context.config.regressionAwsRoleArn)) {
+ logger.info("skip ${name} case, because awsRoleArn is null or empty")
+ return
+ }
+
+ def randomStr = UUID.randomUUID().toString().replace("-", "")
+ def loadLabel = "label_" + randomStr
+ def tableName = "test_s3_load_with_role"
+
+ sql """
+ DROP TABLE IF EXISTS ${tableName} FORCE;
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ C_CUSTKEY INTEGER NOT NULL,
+ C_NAME VARCHAR(25) NOT NULL,
+ C_ADDRESS VARCHAR(40) NOT NULL,
+ C_NATIONKEY INTEGER NOT NULL,
+ C_PHONE CHAR(15) NOT NULL,
+ C_ACCTBAL DECIMAL(15,2) NOT NULL,
+ C_MKTSEGMENT CHAR(10) NOT NULL,
+ C_COMMENT VARCHAR(117) NOT NULL
+ )
+ DUPLICATE KEY(C_CUSTKEY, C_NAME)
+ DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ def endpoint = context.config.regressionAwsEndpoint
+ def region = context.config.regressionAwsRegion
+ def bucket = context.config.regressionAwsBucket
+ def roleArn = context.config.regressionAwsRoleArn
+ def externalId = context.config.regressionAwsExternalId
+
+ sql """
+ LOAD LABEL ${loadLabel} (
+ DATA
INFILE("s3://${bucket}/regression/tpch/sf0.01/customer.csv.gz")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY "|"
+ (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal,
c_mktsegment, c_comment)
+ )
+ WITH S3 (
+ "AWS_ENDPOINT" = "${endpoint}",
+ "AWS_REGION" = "${region}",
+ "AWS_ROLE_ARN" = "${roleArn}",
+ "AWS_EXTERNAL_ID" = "${externalId}",
+ "compress_type" = "GZ"
+ )
+ properties(
+ "timeout" = "28800",
+ "exec_mem_limit" = "8589934592"
+ )
+ """
+
+ def maxTryMs = 600000
+ while (maxTryMs > 0) {
+ String[][] result = sql """ show load where label="${loadLabel}" order
by createtime desc limit 1; """
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("Load ${loadLabel} finished: $result")
+ break
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ assertTrue(false, "Load ${loadLabel} cancelled: ${result}")
+ break
+ }
+ Thread.sleep(5000)
+ maxTryMs -= 5000
+ if (maxTryMs <= 0) {
+ assertTrue(false, "Load ${loadLabel} timeout")
+ }
+ }
+
+ def result = sql """ select count(*) from ${tableName}; """
+ logger.info("result:${result}");
+ assertTrue(result[0][0] == 1500)
+
+
+ def randomStr2 = UUID.randomUUID().toString().replace("-", "")
+ def loadLabel2 = "label_" + randomStr2
+
+ sql """
+ LOAD LABEL ${loadLabel2} (
+ DATA
INFILE("s3://${bucket}/regression/tpch/sf0.01/customer.csv.gz")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY "|"
+ (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal,
c_mktsegment, c_comment)
+ )
+ WITH S3 (
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "${region}",
+ "s3.role_arn" = "${roleArn}",
+ "s3.external_id" = "${externalId}",
+ "compress_type" = "GZ"
+ )
+ properties(
+ "timeout" = "28800",
+ "exec_mem_limit" = "8589934592"
+ )
+ """
+
+ maxTryMs = 600000
+ while (maxTryMs > 0) {
+ String[][] result2 = sql """ show load where label="${loadLabel2}"
order by createtime desc limit 1; """
+ if (result2[0][2].equals("FINISHED")) {
+ logger.info("Load ${loadLabel2} finished: $result2")
+ break
+ }
+ if (result2[0][2].equals("CANCELLED")) {
+ assertTrue(false, "Load ${loadLabel2} cancelled: ${result2}")
+ break
+ }
+ Thread.sleep(5000)
+ maxTryMs -= 5000
+ if (maxTryMs <= 0) {
+ assertTrue(false, "Load ${loadLabel2} timeout")
+ }
+ }
+
+ result = sql """ select count(*) from ${tableName}; """
+ logger.info("result:${result}");
+ assertTrue(result[0][0] == 3000)
+
+}
\ No newline at end of file
diff --git
a/regression-test/suites/aws_iam_role_p0/test_s3_vault_with_role.groovy
b/regression-test/suites/aws_iam_role_p0/test_s3_vault_with_role.groovy
new file mode 100644
index 00000000000..f73a3319c85
--- /dev/null
+++ b/regression-test/suites/aws_iam_role_p0/test_s3_vault_with_role.groovy
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.google.common.base.Strings;
+
+suite("test_s3_vault_with_role") {
+ if (!isCloudMode()) {
+ logger.info("skip ${name} case, because not cloud mode")
+ return
+ }
+
+ if (!enableStoragevault()) {
+ logger.info("skip ${name} case, because storage vault not enabled")
+ return
+ }
+
+ def randomStr = UUID.randomUUID().toString().replace("-", "")
+ def s3VaultName = "s3_" + randomStr
+
+ def endpoint = context.config.awsEndpoint
+ def region = context.config.awsRegion
+ def bucket = context.config.awsBucket
+ def roleArn = context.config.awsRoleArn
+ def externalId = context.config.awsExternalId
+ def prefix = context.config.awsPrefix
+
+ sql """
+ CREATE STORAGE VAULT IF NOT EXISTS ${s3VaultName}
+ PROPERTIES (
+ "type"="S3",
+ "s3.endpoint"="${endpoint}",
+ "s3.region" = "${region}",
+ "s3.role_arn" = "${roleArn}",
+ "s3.external_id" = "${externalId}",
+ "s3.root.path" = "${prefix}/aws_iam_role_p0/${s3VaultName}",
+ "s3.bucket" = "${bucket}",
+ "s3.external_endpoint" = "",
+ "provider" = "S3",
+ "use_path_style" = "false"
+ );
+ """
+
+ sql """
+ CREATE TABLE ${s3VaultName} (
+ C_CUSTKEY INTEGER NOT NULL,
+ C_NAME INTEGER NOT NULL
+ )
+ DUPLICATE KEY(C_CUSTKEY, C_NAME)
+ DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "storage_vault_name" = ${s3VaultName}
+ )
+ """
+ sql """ insert into ${s3VaultName} values(1, 1); """
+ sql """ sync;"""
+ def result = sql """ select * from ${s3VaultName}; """
+ assertEquals(result.size(), 1);
+}
\ No newline at end of file
diff --git
a/regression-test/suites/aws_iam_role_p0/test_select_into_outfile_with_role.groovy
b/regression-test/suites/aws_iam_role_p0/test_select_into_outfile_with_role.groovy
new file mode 100644
index 00000000000..613d4ee8be9
--- /dev/null
+++
b/regression-test/suites/aws_iam_role_p0/test_select_into_outfile_with_role.groovy
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.google.common.base.Strings;
+
+suite("test_select_into_outfile_with_role") {
+ if (Strings.isNullOrEmpty(context.config.awsRoleArn)) {
+ logger.info("skip ${name} case, because awsRoleArn is null or empty")
+ return
+ }
+
+ def endpoint = context.config.awsEndpoint
+ def region = context.config.awsRegion
+ def bucket = context.config.awsBucket
+ def roleArn = context.config.awsRoleArn
+ def externalId = context.config.awsExternalId
+ def prefix = context.config.awsPrefix
+
+ def randomStr = UUID.randomUUID().toString().replace("-", "")
+ def tableName = "test_select_into_outfile_with_role"
+
+ sql """ drop table if exists ${tableName} force;"""
+ sql """
+ CREATE TABLE ${tableName}
+ (
+ siteid INT DEFAULT '10',
+ citycode SMALLINT NOT NULL,
+ username VARCHAR(32) DEFAULT '',
+ pv BIGINT SUM DEFAULT '0'
+ )
+ AGGREGATE KEY(siteid, citycode, username)
+ DISTRIBUTED BY HASH(siteid) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """insert into ${tableName}(siteid, citycode, username, pv) values (1,
1, "xxx", 1),
+ (2, 2, "yyy", 2),
+ (3, 3, "zzz", 3)
+ """
+ sql """sync;"""
+
+
+ sql """
+ SELECT * FROM ${tableName}
+ INTO OUTFILE
"s3://${bucket}/${prefix}/aws_iam_role_p0/test_select_into_outfile_with_role"
+ FORMAT AS CSV
+ PROPERTIES(
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "${region}",
+ "s3.role_arn" = "${roleArn}",
+ "s3.external_id"="${externalId}"
+ );
+ """
+}
\ No newline at end of file
diff --git a/regression-test/suites/aws_iam_role_p0/test_tvf_with_role.groovy
b/regression-test/suites/aws_iam_role_p0/test_tvf_with_role.groovy
new file mode 100644
index 00000000000..ebecc6348ed
--- /dev/null
+++ b/regression-test/suites/aws_iam_role_p0/test_tvf_with_role.groovy
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.google.common.base.Strings;
+
+suite("test_tvf_with_role") {
+ if (Strings.isNullOrEmpty(context.config.regressionAwsRoleArn)) {
+ logger.info("skip ${name} case, because awsRoleArn is null or empty")
+ return
+ }
+
+
+ if (Strings.isNullOrEmpty(context.config.awsRoleArn)) {
+ logger.info("skip ${name} case, because awsRoleArn is null or empty")
+ return
+ }
+
+ def endpoint = context.config.regressionAwsEndpoint
+ def region = context.config.regressionAwsRegion
+ def bucket = context.config.regressionAwsBucket
+ def roleArn = context.config.regressionAwsRoleArn
+ def externalId = context.config.regressionAwsExternalId
+
+ sql """
+ select count(*) from s3("uri" =
"s3://${bucket}/regression/tpch/sf0.01/customer.csv.gz",
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "${region}",
+ "s3.role_arn"= "${roleArn}",
+ "s3.external_id" = "${externalId}",
+ "format" = "csv",
+ "compress_type" = "gz",
+ "column_separator" = "|",
+ "use_path_style" = "false");
+ """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]