This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b598c8b3959 [feature](Azure) Implement Azure's multi part upload in BE (#36620) b598c8b3959 is described below commit b598c8b3959faa2f0de65f895a9186aa441e6319 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Sat Jun 22 00:17:50 2024 +0800 [feature](Azure) Implement Azure's multi part upload in BE (#36620) Previously the multipart upload of Azure is not supported, this pr makes it available. --- be/src/io/fs/azure_obj_storage_client.cpp | 20 +++++++++++++----- be/src/io/fs/azure_obj_storage_client.h | 6 +----- be/src/io/fs/obj_storage_client.h | 7 +++++-- be/src/io/fs/s3_file_writer.cpp | 25 +++++++---------------- be/src/io/fs/s3_file_writer.h | 6 ++---- be/src/io/fs/s3_obj_storage_client.cpp | 13 ++++++++---- be/src/io/fs/s3_obj_storage_client.h | 7 +------ be/src/util/s3_util.cpp | 22 +++++++++++++++++++- be/src/util/s3_util.h | 9 -------- be/src/vec/sink/writer/vhive_partition_writer.cpp | 2 +- 10 files changed, 62 insertions(+), 55 deletions(-) diff --git a/be/src/io/fs/azure_obj_storage_client.cpp b/be/src/io/fs/azure_obj_storage_client.cpp index 941470f93c3..4bd0d1b7009 100644 --- a/be/src/io/fs/azure_obj_storage_client.cpp +++ b/be/src/io/fs/azure_obj_storage_client.cpp @@ -17,6 +17,9 @@ #include "io/fs/azure_obj_storage_client.h" +#include <aws/core/utils/Array.h> +#include <aws/core/utils/HashingUtils.h> + #include <algorithm> #include <azure/core/http/http.hpp> #include <azure/core/io/body_stream.hpp> @@ -38,6 +41,11 @@ std::string wrap_object_storage_path_msg(const doris::io::ObjectStoragePathOptio return fmt::format("bucket {}, key {}, prefix {}, path {}", opts.bucket, opts.key, opts.prefix, opts.path.native()); } + +auto base64_encode_part_num(int part_num) { + return Aws::Utils::HashingUtils::Base64Encode( + {reinterpret_cast<unsigned char*>(&part_num), sizeof(part_num)}); +} } // namespace namespace doris::io { @@ -90,7 +98,8 @@ ObjectStorageUploadResponse AzureObjStorageClient::upload_part(const ObjectStora try { Azure::Core::IO::MemoryBodyStream memory_body( reinterpret_cast<const uint8_t*>(stream.data()), stream.size()); - client.StageBlock(std::to_string(part_num), memory_body); + // The blockId must be base64 encoded + auto resp = client.StageBlock(base64_encode_part_num(part_num), memory_body); } catch (Azure::Core::RequestFailedException& e) { auto msg = fmt::format("Azure request failed because {}, error msg {}, path msg {}", e.what(), e.Message, wrap_object_storage_path_msg(opts)); @@ -110,12 +119,13 @@ ObjectStorageUploadResponse AzureObjStorageClient::upload_part(const ObjectStora } ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload( - const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts& completed_parts) { + const ObjectStoragePathOptions& opts, + const std::vector<ObjectCompleteMultiPart>& completed_parts) { auto client = _client->GetBlockBlobClient(opts.key); - const auto& block_ids = static_cast<const AzureCompleteMultiParts&>(completed_parts).block_ids; std::vector<std::string> string_block_ids; - std::ranges::transform(block_ids, std::back_inserter(string_block_ids), - [](int i) { return std::to_string(i); }); + std::ranges::transform( + completed_parts, std::back_inserter(string_block_ids), + [](const ObjectCompleteMultiPart& i) { return base64_encode_part_num(i.part_num); }); return do_azure_client_call([&]() { client.CommitBlockList(string_block_ids); }, opts); } diff --git a/be/src/io/fs/azure_obj_storage_client.h b/be/src/io/fs/azure_obj_storage_client.h index 0dbe56b0b69..7e8678628bd 100644 --- a/be/src/io/fs/azure_obj_storage_client.h +++ b/be/src/io/fs/azure_obj_storage_client.h @@ -25,10 +25,6 @@ class BlobContainerClient; namespace doris::io { -struct AzureCompleteMultiParts : public ObjectCompleteMultiParts { - std::vector<int> block_ids; -}; - class ObjClientHolder; class AzureObjStorageClient final : public ObjStorageClient { @@ -44,7 +40,7 @@ public: int partNum) override; ObjectStorageResponse complete_multipart_upload( const ObjectStoragePathOptions& opts, - const ObjectCompleteMultiParts& completed_parts) override; + const std::vector<ObjectCompleteMultiPart>& completed_parts) override; ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& opts) override; ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, void* buffer, size_t offset, size_t bytes_read, diff --git a/be/src/io/fs/obj_storage_client.h b/be/src/io/fs/obj_storage_client.h index cd3db3c7371..8f31b55705b 100644 --- a/be/src/io/fs/obj_storage_client.h +++ b/be/src/io/fs/obj_storage_client.h @@ -45,7 +45,10 @@ struct ObjectStoragePathOptions { std::optional<std::string> upload_id = std::nullopt; // only used for S3 upload }; -struct ObjectCompleteMultiParts {}; +struct ObjectCompleteMultiPart { + int part_num = 0; + std::string etag = std::string(); +}; struct ObjectStorageStatus { int code = 0; @@ -92,7 +95,7 @@ public: // After a successful execution, the large file can be accessed in the object storage virtual ObjectStorageResponse complete_multipart_upload( const ObjectStoragePathOptions& opts, - const ObjectCompleteMultiParts& completed_parts) = 0; + const std::vector<ObjectCompleteMultiPart>& completed_parts) = 0; // According to the passed bucket and key, it will access whether the corresponding file exists in the object storage. // If it exists, it will return the corresponding file size virtual ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& opts) = 0; diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 91b5aace12b..9af34ea8ca8 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -33,18 +33,12 @@ #include "io/cache/block_file_cache_factory.h" #include "io/cache/file_block.h" #include "io/cache/file_cache_common.h" -#include "io/fs/err_utils.h" #include "io/fs/file_writer.h" #include "io/fs/path.h" -#include "io/fs/s3_common.h" #include "io/fs/s3_file_bufferpool.h" #include "io/fs/s3_file_system.h" #include "io/fs/s3_obj_storage_client.h" #include "runtime/exec_env.h" -#include "util/bvar_helper.h" -#include "util/defer_op.h" -#include "util/doris_metrics.h" -#include "util/runtime_profile.h" #include "util/s3_util.h" namespace doris::io { @@ -310,12 +304,8 @@ void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) { } s3_bytes_written_total << buf.get_size(); - std::unique_ptr<Aws::S3::Model::CompletedPart> completed_part = - std::make_unique<Aws::S3::Model::CompletedPart>(); - completed_part->SetPartNumber(part_num); - const auto& etag = *resp.etag; - // DCHECK(etag.empty()); - completed_part->SetETag(etag); + ObjectCompleteMultiPart completed_part { + static_cast<int>(part_num), resp.etag.has_value() ? std::move(resp.etag.value()) : ""}; std::unique_lock<std::mutex> lck {_completed_lock}; _completed_parts.emplace_back(std::move(completed_part)); @@ -330,8 +320,8 @@ Status S3FileWriter::_complete() { _wait_until_finish("early quit"); return _st; } - // upload id is empty means there was no multipart upload - if (upload_id().empty()) { + // When the part num is only one, it means the data is less than 5MB so we can just put it. + if (_cur_part_num == 1) { _wait_until_finish("PutObject"); return _st; } @@ -349,10 +339,9 @@ Status S3FileWriter::_complete() { } // make sure _completed_parts are ascending order std::sort(_completed_parts.begin(), _completed_parts.end(), - [](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); }); + [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; }); TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts); - auto resp = client->complete_multipart_upload( - _obj_storage_path_opts, S3CompleteMultiParts {.parts = _completed_parts}); + auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts); if (resp.status.code != ErrorCode::OK) { return {resp.status.code, std::move(resp.status.msg)}; } @@ -399,7 +388,7 @@ std::string S3FileWriter::_dump_completed_part() const { std::stringstream ss; ss << "part_numbers:"; for (const auto& part : _completed_parts) { - ss << " " << part->GetPartNumber(); + ss << " " << part.part_num; } return ss.str(); } diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index 67fc9e26998..c67c79ce536 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -62,9 +62,7 @@ public: return _cache_builder == nullptr ? nullptr : _cache_builder.get(); } - const std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& completed_parts() const { - return _completed_parts; - } + const std::vector<ObjectCompleteMultiPart>& completed_parts() const { return _completed_parts; } const std::string& key() const { return _obj_storage_path_opts.key; } const std::string& bucket() const { return _obj_storage_path_opts.bucket; } @@ -92,7 +90,7 @@ private: // Current Part Num for CompletedPart int _cur_part_num = 1; std::mutex _completed_lock; - std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> _completed_parts; + std::vector<ObjectCompleteMultiPart> _completed_parts; // **Attention** call add_count() before submitting buf to async thread pool bthread::CountdownEvent _countdown_event {0}; diff --git a/be/src/io/fs/s3_obj_storage_client.cpp b/be/src/io/fs/s3_obj_storage_client.cpp index ffe652583c8..c6cda174833 100644 --- a/be/src/io/fs/s3_obj_storage_client.cpp +++ b/be/src/io/fs/s3_obj_storage_client.cpp @@ -179,15 +179,20 @@ ObjectStorageUploadResponse S3ObjStorageClient::upload_part(const ObjectStorageP } ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload( - const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts& completed_parts) { + const ObjectStoragePathOptions& opts, + const std::vector<ObjectCompleteMultiPart>& completed_parts) { CompleteMultipartUploadRequest complete_request; complete_request.WithBucket(opts.bucket).WithKey(opts.key).WithUploadId(*opts.upload_id); CompletedMultipartUpload completed_upload; - const auto aws_complete_parts = static_cast<const S3CompleteMultiParts&>(completed_parts); std::vector<CompletedPart> complete_parts; - std::ranges::transform(aws_complete_parts.parts, std::back_inserter(complete_parts), - [](auto&& part_ptr) { return *part_ptr; }); + std::ranges::transform(completed_parts, std::back_inserter(complete_parts), + [](const ObjectCompleteMultiPart& part_ptr) { + CompletedPart part; + part.SetPartNumber(part_ptr.part_num); + part.SetETag(part_ptr.etag); + return part; + }); completed_upload.SetParts(std::move(complete_parts)); complete_request.WithMultipartUpload(completed_upload); diff --git a/be/src/io/fs/s3_obj_storage_client.h b/be/src/io/fs/s3_obj_storage_client.h index 0bc2d5ef5af..ebc81b81b40 100644 --- a/be/src/io/fs/s3_obj_storage_client.h +++ b/be/src/io/fs/s3_obj_storage_client.h @@ -28,11 +28,6 @@ class CompletedPart; } // namespace Aws::S3 namespace doris::io { - -struct S3CompleteMultiParts : public ObjectCompleteMultiParts { - std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& parts; -}; - class ObjClientHolder; class S3ObjStorageClient final : public ObjStorageClient { @@ -47,7 +42,7 @@ public: int partNum) override; ObjectStorageResponse complete_multipart_upload( const ObjectStoragePathOptions& opts, - const ObjectCompleteMultiParts& completed_parts) override; + const std::vector<ObjectCompleteMultiPart>& completed_parts) override; ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& opts) override; ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, void* buffer, size_t offset, size_t bytes_read, diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp index e89366f3ab8..630a998bd47 100644 --- a/be/src/util/s3_util.cpp +++ b/be/src/util/s3_util.cpp @@ -73,7 +73,18 @@ bool to_int(std::string_view str, int& res) { return ec == std::errc {}; } -const std::string USE_PATH_STYLE = "use_path_style"; +constexpr char USE_PATH_STYLE[] = "use_path_style"; + +constexpr char AZURE_PROVIDER_STRING[] = "AZURE"; +constexpr char S3_PROVIDER[] = "provider"; +constexpr char S3_AK[] = "AWS_ACCESS_KEY"; +constexpr char S3_SK[] = "AWS_SECRET_KEY"; +constexpr char S3_ENDPOINT[] = "AWS_ENDPOINT"; +constexpr char S3_REGION[] = "AWS_REGION"; +constexpr char S3_TOKEN[] = "AWS_TOKEN"; +constexpr char S3_MAX_CONN_SIZE[] = "AWS_MAX_CONN_SIZE"; +constexpr char S3_REQUEST_TIMEOUT_MS[] = "AWS_REQUEST_TIMEOUT_MS"; +constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS"; } // namespace S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) { @@ -196,6 +207,7 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_azure_client( config::s3_client_http_scheme, s3_conf.ak, container_name); auto containerClient = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred); + LOG_INFO("create one azure client with {}", s3_conf.to_string()); return std::make_shared<io::AzureObjStorageClient>(std::move(containerClient)); } @@ -264,6 +276,7 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client( } auto obj_client = std::make_shared<io::S3ObjStorageClient>(std::move(new_client)); + LOG_INFO("create one s3 client with {}", s3_conf.to_string()); return obj_client; } @@ -302,12 +315,19 @@ Status S3ClientFactory::convert_properties_to_s3_conf( it->second); } } + if (auto it = properties.find(S3_PROVIDER); it != properties.end()) { + if (0 == strcmp(it->second.c_str(), AZURE_PROVIDER_STRING)) { + s3_conf->client_conf.provider = io::ObjStorageType::AZURE; + } + } if (s3_uri.get_bucket().empty()) { return Status::InvalidArgument("Invalid S3 URI {}, bucket is not specified", s3_uri.to_string()); } s3_conf->bucket = s3_uri.get_bucket(); + // For azure's compatibility + s3_conf->client_conf.bucket = s3_uri.get_bucket(); s3_conf->prefix = ""; // See https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h index 1764b1b8b86..5b0e9c09200 100644 --- a/be/src/util/s3_util.h +++ b/be/src/util/s3_util.h @@ -81,15 +81,6 @@ inline ::Aws::Client::AWSError<::Aws::S3::S3Errors> s3_error_factory() { #define DO_S3_GET_RATE_LIMIT(code) DO_S3_RATE_LIMIT(S3RateLimitType::GET, code) -const static std::string S3_AK = "AWS_ACCESS_KEY"; -const static std::string S3_SK = "AWS_SECRET_KEY"; -const static std::string S3_ENDPOINT = "AWS_ENDPOINT"; -const static std::string S3_REGION = "AWS_REGION"; -const static std::string S3_TOKEN = "AWS_TOKEN"; -const static std::string S3_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE"; -const static std::string S3_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS"; -const static std::string S3_CONN_TIMEOUT_MS = "AWS_CONNECTION_TIMEOUT_MS"; - struct S3ClientConf { std::string endpoint; std::string region; diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index f65eb0d0972..abf1f9007a0 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -190,7 +190,7 @@ THivePartitionUpdate VHivePartitionWriter::_build_partition_update() { std::map<int, std::string> etags; for (auto& completed_part : s3_mpu_file_writer->completed_parts()) { - etags.insert({completed_part->GetPartNumber(), completed_part->GetETag()}); + etags.insert({completed_part.part_num, completed_part.etag}); } s3_mpu_pending_upload.__set_etags(etags); hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload}); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org