This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b598c8b3959 [feature](Azure) Implement Azure's multi part upload in BE 
(#36620)
b598c8b3959 is described below

commit b598c8b3959faa2f0de65f895a9186aa441e6319
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Sat Jun 22 00:17:50 2024 +0800

    [feature](Azure) Implement Azure's multi part upload in BE (#36620)
    
    Previously the multipart upload of Azure is not supported, this pr makes
    it available.
---
 be/src/io/fs/azure_obj_storage_client.cpp         | 20 +++++++++++++-----
 be/src/io/fs/azure_obj_storage_client.h           |  6 +-----
 be/src/io/fs/obj_storage_client.h                 |  7 +++++--
 be/src/io/fs/s3_file_writer.cpp                   | 25 +++++++----------------
 be/src/io/fs/s3_file_writer.h                     |  6 ++----
 be/src/io/fs/s3_obj_storage_client.cpp            | 13 ++++++++----
 be/src/io/fs/s3_obj_storage_client.h              |  7 +------
 be/src/util/s3_util.cpp                           | 22 +++++++++++++++++++-
 be/src/util/s3_util.h                             |  9 --------
 be/src/vec/sink/writer/vhive_partition_writer.cpp |  2 +-
 10 files changed, 62 insertions(+), 55 deletions(-)

diff --git a/be/src/io/fs/azure_obj_storage_client.cpp 
b/be/src/io/fs/azure_obj_storage_client.cpp
index 941470f93c3..4bd0d1b7009 100644
--- a/be/src/io/fs/azure_obj_storage_client.cpp
+++ b/be/src/io/fs/azure_obj_storage_client.cpp
@@ -17,6 +17,9 @@
 
 #include "io/fs/azure_obj_storage_client.h"
 
+#include <aws/core/utils/Array.h>
+#include <aws/core/utils/HashingUtils.h>
+
 #include <algorithm>
 #include <azure/core/http/http.hpp>
 #include <azure/core/io/body_stream.hpp>
@@ -38,6 +41,11 @@ std::string wrap_object_storage_path_msg(const 
doris::io::ObjectStoragePathOptio
     return fmt::format("bucket {}, key {}, prefix {}, path {}", opts.bucket, 
opts.key, opts.prefix,
                        opts.path.native());
 }
+
+auto base64_encode_part_num(int part_num) {
+    return Aws::Utils::HashingUtils::Base64Encode(
+            {reinterpret_cast<unsigned char*>(&part_num), sizeof(part_num)});
+}
 } // namespace
 
 namespace doris::io {
@@ -90,7 +98,8 @@ ObjectStorageUploadResponse 
AzureObjStorageClient::upload_part(const ObjectStora
     try {
         Azure::Core::IO::MemoryBodyStream memory_body(
                 reinterpret_cast<const uint8_t*>(stream.data()), 
stream.size());
-        client.StageBlock(std::to_string(part_num), memory_body);
+        // The blockId must be base64 encoded
+        auto resp = client.StageBlock(base64_encode_part_num(part_num), 
memory_body);
     } catch (Azure::Core::RequestFailedException& e) {
         auto msg = fmt::format("Azure request failed because {}, error msg {}, 
path msg {}",
                                e.what(), e.Message, 
wrap_object_storage_path_msg(opts));
@@ -110,12 +119,13 @@ ObjectStorageUploadResponse 
AzureObjStorageClient::upload_part(const ObjectStora
 }
 
 ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload(
-        const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts& 
completed_parts) {
+        const ObjectStoragePathOptions& opts,
+        const std::vector<ObjectCompleteMultiPart>& completed_parts) {
     auto client = _client->GetBlockBlobClient(opts.key);
-    const auto& block_ids = static_cast<const 
AzureCompleteMultiParts&>(completed_parts).block_ids;
     std::vector<std::string> string_block_ids;
-    std::ranges::transform(block_ids, std::back_inserter(string_block_ids),
-                           [](int i) { return std::to_string(i); });
+    std::ranges::transform(
+            completed_parts, std::back_inserter(string_block_ids),
+            [](const ObjectCompleteMultiPart& i) { return 
base64_encode_part_num(i.part_num); });
     return do_azure_client_call([&]() { 
client.CommitBlockList(string_block_ids); }, opts);
 }
 
diff --git a/be/src/io/fs/azure_obj_storage_client.h 
b/be/src/io/fs/azure_obj_storage_client.h
index 0dbe56b0b69..7e8678628bd 100644
--- a/be/src/io/fs/azure_obj_storage_client.h
+++ b/be/src/io/fs/azure_obj_storage_client.h
@@ -25,10 +25,6 @@ class BlobContainerClient;
 
 namespace doris::io {
 
-struct AzureCompleteMultiParts : public ObjectCompleteMultiParts {
-    std::vector<int> block_ids;
-};
-
 class ObjClientHolder;
 
 class AzureObjStorageClient final : public ObjStorageClient {
@@ -44,7 +40,7 @@ public:
                                             int partNum) override;
     ObjectStorageResponse complete_multipart_upload(
             const ObjectStoragePathOptions& opts,
-            const ObjectCompleteMultiParts& completed_parts) override;
+            const std::vector<ObjectCompleteMultiPart>& completed_parts) 
override;
     ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& 
opts) override;
     ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, 
void* buffer,
                                      size_t offset, size_t bytes_read,
diff --git a/be/src/io/fs/obj_storage_client.h 
b/be/src/io/fs/obj_storage_client.h
index cd3db3c7371..8f31b55705b 100644
--- a/be/src/io/fs/obj_storage_client.h
+++ b/be/src/io/fs/obj_storage_client.h
@@ -45,7 +45,10 @@ struct ObjectStoragePathOptions {
     std::optional<std::string> upload_id = std::nullopt; // only used for S3 
upload
 };
 
-struct ObjectCompleteMultiParts {};
+struct ObjectCompleteMultiPart {
+    int part_num = 0;
+    std::string etag = std::string();
+};
 
 struct ObjectStorageStatus {
     int code = 0;
@@ -92,7 +95,7 @@ public:
     // After a successful execution, the large file can be accessed in the 
object storage
     virtual ObjectStorageResponse complete_multipart_upload(
             const ObjectStoragePathOptions& opts,
-            const ObjectCompleteMultiParts& completed_parts) = 0;
+            const std::vector<ObjectCompleteMultiPart>& completed_parts) = 0;
     // According to the passed bucket and key, it will access whether the 
corresponding file exists in the object storage.
     // If it exists, it will return the corresponding file size
     virtual ObjectStorageHeadResponse head_object(const 
ObjectStoragePathOptions& opts) = 0;
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 91b5aace12b..9af34ea8ca8 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -33,18 +33,12 @@
 #include "io/cache/block_file_cache_factory.h"
 #include "io/cache/file_block.h"
 #include "io/cache/file_cache_common.h"
-#include "io/fs/err_utils.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/path.h"
-#include "io/fs/s3_common.h"
 #include "io/fs/s3_file_bufferpool.h"
 #include "io/fs/s3_file_system.h"
 #include "io/fs/s3_obj_storage_client.h"
 #include "runtime/exec_env.h"
-#include "util/bvar_helper.h"
-#include "util/defer_op.h"
-#include "util/doris_metrics.h"
-#include "util/runtime_profile.h"
 #include "util/s3_util.h"
 
 namespace doris::io {
@@ -310,12 +304,8 @@ void S3FileWriter::_upload_one_part(int64_t part_num, 
UploadFileBuffer& buf) {
     }
     s3_bytes_written_total << buf.get_size();
 
-    std::unique_ptr<Aws::S3::Model::CompletedPart> completed_part =
-            std::make_unique<Aws::S3::Model::CompletedPart>();
-    completed_part->SetPartNumber(part_num);
-    const auto& etag = *resp.etag;
-    // DCHECK(etag.empty());
-    completed_part->SetETag(etag);
+    ObjectCompleteMultiPart completed_part {
+            static_cast<int>(part_num), resp.etag.has_value() ? 
std::move(resp.etag.value()) : ""};
 
     std::unique_lock<std::mutex> lck {_completed_lock};
     _completed_parts.emplace_back(std::move(completed_part));
@@ -330,8 +320,8 @@ Status S3FileWriter::_complete() {
         _wait_until_finish("early quit");
         return _st;
     }
-    // upload id is empty means there was no multipart upload
-    if (upload_id().empty()) {
+    // When the part num is only one, it means the data is less than 5MB so we 
can just put it.
+    if (_cur_part_num == 1) {
         _wait_until_finish("PutObject");
         return _st;
     }
@@ -349,10 +339,9 @@ Status S3FileWriter::_complete() {
         }
         // make sure _completed_parts are ascending order
         std::sort(_completed_parts.begin(), _completed_parts.end(),
-                  [](auto& p1, auto& p2) { return p1->GetPartNumber() < 
p2->GetPartNumber(); });
+                  [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; 
});
         TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", 
&_completed_parts);
-        auto resp = client->complete_multipart_upload(
-                _obj_storage_path_opts, S3CompleteMultiParts {.parts = 
_completed_parts});
+        auto resp = client->complete_multipart_upload(_obj_storage_path_opts, 
_completed_parts);
         if (resp.status.code != ErrorCode::OK) {
             return {resp.status.code, std::move(resp.status.msg)};
         }
@@ -399,7 +388,7 @@ std::string S3FileWriter::_dump_completed_part() const {
     std::stringstream ss;
     ss << "part_numbers:";
     for (const auto& part : _completed_parts) {
-        ss << " " << part->GetPartNumber();
+        ss << " " << part.part_num;
     }
     return ss.str();
 }
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 67fc9e26998..c67c79ce536 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -62,9 +62,7 @@ public:
         return _cache_builder == nullptr ? nullptr : _cache_builder.get();
     }
 
-    const std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& 
completed_parts() const {
-        return _completed_parts;
-    }
+    const std::vector<ObjectCompleteMultiPart>& completed_parts() const { 
return _completed_parts; }
 
     const std::string& key() const { return _obj_storage_path_opts.key; }
     const std::string& bucket() const { return _obj_storage_path_opts.bucket; }
@@ -92,7 +90,7 @@ private:
     // Current Part Num for CompletedPart
     int _cur_part_num = 1;
     std::mutex _completed_lock;
-    std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> 
_completed_parts;
+    std::vector<ObjectCompleteMultiPart> _completed_parts;
 
     // **Attention** call add_count() before submitting buf to async thread 
pool
     bthread::CountdownEvent _countdown_event {0};
diff --git a/be/src/io/fs/s3_obj_storage_client.cpp 
b/be/src/io/fs/s3_obj_storage_client.cpp
index ffe652583c8..c6cda174833 100644
--- a/be/src/io/fs/s3_obj_storage_client.cpp
+++ b/be/src/io/fs/s3_obj_storage_client.cpp
@@ -179,15 +179,20 @@ ObjectStorageUploadResponse 
S3ObjStorageClient::upload_part(const ObjectStorageP
 }
 
 ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload(
-        const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts& 
completed_parts) {
+        const ObjectStoragePathOptions& opts,
+        const std::vector<ObjectCompleteMultiPart>& completed_parts) {
     CompleteMultipartUploadRequest complete_request;
     
complete_request.WithBucket(opts.bucket).WithKey(opts.key).WithUploadId(*opts.upload_id);
 
     CompletedMultipartUpload completed_upload;
-    const auto aws_complete_parts = static_cast<const 
S3CompleteMultiParts&>(completed_parts);
     std::vector<CompletedPart> complete_parts;
-    std::ranges::transform(aws_complete_parts.parts, 
std::back_inserter(complete_parts),
-                           [](auto&& part_ptr) { return *part_ptr; });
+    std::ranges::transform(completed_parts, std::back_inserter(complete_parts),
+                           [](const ObjectCompleteMultiPart& part_ptr) {
+                               CompletedPart part;
+                               part.SetPartNumber(part_ptr.part_num);
+                               part.SetETag(part_ptr.etag);
+                               return part;
+                           });
     completed_upload.SetParts(std::move(complete_parts));
     complete_request.WithMultipartUpload(completed_upload);
 
diff --git a/be/src/io/fs/s3_obj_storage_client.h 
b/be/src/io/fs/s3_obj_storage_client.h
index 0bc2d5ef5af..ebc81b81b40 100644
--- a/be/src/io/fs/s3_obj_storage_client.h
+++ b/be/src/io/fs/s3_obj_storage_client.h
@@ -28,11 +28,6 @@ class CompletedPart;
 } // namespace Aws::S3
 
 namespace doris::io {
-
-struct S3CompleteMultiParts : public ObjectCompleteMultiParts {
-    std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& parts;
-};
-
 class ObjClientHolder;
 
 class S3ObjStorageClient final : public ObjStorageClient {
@@ -47,7 +42,7 @@ public:
                                             int partNum) override;
     ObjectStorageResponse complete_multipart_upload(
             const ObjectStoragePathOptions& opts,
-            const ObjectCompleteMultiParts& completed_parts) override;
+            const std::vector<ObjectCompleteMultiPart>& completed_parts) 
override;
     ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& 
opts) override;
     ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, 
void* buffer,
                                      size_t offset, size_t bytes_read,
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index e89366f3ab8..630a998bd47 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -73,7 +73,18 @@ bool to_int(std::string_view str, int& res) {
     return ec == std::errc {};
 }
 
-const std::string USE_PATH_STYLE = "use_path_style";
+constexpr char USE_PATH_STYLE[] = "use_path_style";
+
+constexpr char AZURE_PROVIDER_STRING[] = "AZURE";
+constexpr char S3_PROVIDER[] = "provider";
+constexpr char S3_AK[] = "AWS_ACCESS_KEY";
+constexpr char S3_SK[] = "AWS_SECRET_KEY";
+constexpr char S3_ENDPOINT[] = "AWS_ENDPOINT";
+constexpr char S3_REGION[] = "AWS_REGION";
+constexpr char S3_TOKEN[] = "AWS_TOKEN";
+constexpr char S3_MAX_CONN_SIZE[] = "AWS_MAX_CONN_SIZE";
+constexpr char S3_REQUEST_TIMEOUT_MS[] = "AWS_REQUEST_TIMEOUT_MS";
+constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS";
 
 } // namespace
 S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) {
@@ -196,6 +207,7 @@ std::shared_ptr<io::ObjStorageClient> 
S3ClientFactory::_create_azure_client(
                                         config::s3_client_http_scheme, 
s3_conf.ak, container_name);
 
     auto containerClient = 
std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred);
+    LOG_INFO("create one azure client with {}", s3_conf.to_string());
     return 
std::make_shared<io::AzureObjStorageClient>(std::move(containerClient));
 }
 
@@ -264,6 +276,7 @@ std::shared_ptr<io::ObjStorageClient> 
S3ClientFactory::_create_s3_client(
     }
 
     auto obj_client = 
std::make_shared<io::S3ObjStorageClient>(std::move(new_client));
+    LOG_INFO("create one s3 client with {}", s3_conf.to_string());
     return obj_client;
 }
 
@@ -302,12 +315,19 @@ Status S3ClientFactory::convert_properties_to_s3_conf(
                                            it->second);
         }
     }
+    if (auto it = properties.find(S3_PROVIDER); it != properties.end()) {
+        if (0 == strcmp(it->second.c_str(), AZURE_PROVIDER_STRING)) {
+            s3_conf->client_conf.provider = io::ObjStorageType::AZURE;
+        }
+    }
 
     if (s3_uri.get_bucket().empty()) {
         return Status::InvalidArgument("Invalid S3 URI {}, bucket is not 
specified",
                                        s3_uri.to_string());
     }
     s3_conf->bucket = s3_uri.get_bucket();
+    // For azure's compatibility
+    s3_conf->client_conf.bucket = s3_uri.get_bucket();
     s3_conf->prefix = "";
 
     // See 
https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 1764b1b8b86..5b0e9c09200 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -81,15 +81,6 @@ inline ::Aws::Client::AWSError<::Aws::S3::S3Errors> 
s3_error_factory() {
 
 #define DO_S3_GET_RATE_LIMIT(code) DO_S3_RATE_LIMIT(S3RateLimitType::GET, code)
 
-const static std::string S3_AK = "AWS_ACCESS_KEY";
-const static std::string S3_SK = "AWS_SECRET_KEY";
-const static std::string S3_ENDPOINT = "AWS_ENDPOINT";
-const static std::string S3_REGION = "AWS_REGION";
-const static std::string S3_TOKEN = "AWS_TOKEN";
-const static std::string S3_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE";
-const static std::string S3_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
-const static std::string S3_CONN_TIMEOUT_MS = "AWS_CONNECTION_TIMEOUT_MS";
-
 struct S3ClientConf {
     std::string endpoint;
     std::string region;
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp 
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index f65eb0d0972..abf1f9007a0 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -190,7 +190,7 @@ THivePartitionUpdate 
VHivePartitionWriter::_build_partition_update() {
 
         std::map<int, std::string> etags;
         for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
-            etags.insert({completed_part->GetPartNumber(), 
completed_part->GetETag()});
+            etags.insert({completed_part.part_num, completed_part.etag});
         }
         s3_mpu_pending_upload.__set_etags(etags);
         
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to