This is an automated email from the ASF dual-hosted git repository. plat1ko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 45661f941e8 [feature](Cloud) Introduce obj storage client interface to recycler (#35447) 45661f941e8 is described below commit 45661f941e80265039429b26bf8bb6ef8ec410d5 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Mon Jun 3 20:12:14 2024 +0800 [feature](Cloud) Introduce obj storage client interface to recycler (#35447) Extract basic interface to suite different kinds of ObjectStorage. --- be/src/common/status.h | 18 +- be/src/io/fs/err_utils.cpp | 7 + be/src/io/fs/obj_storage_client.h | 24 +- be/src/io/fs/s3_file_reader.cpp | 5 +- be/src/io/fs/s3_file_system.cpp | 73 ++-- be/src/io/fs/s3_file_writer.cpp | 16 +- be/src/io/fs/s3_obj_storage_client.cpp | 112 ++++-- be/src/io/fs/s3_obj_storage_client.h | 7 +- be/test/io/fs/s3_file_writer_test.cpp | 17 +- cloud/src/recycler/obj_store_accessor.h | 57 +++ cloud/src/recycler/s3_accessor.cpp | 389 ++++----------------- cloud/src/recycler/s3_accessor.h | 6 +- cloud/src/recycler/s3_obj_client.cpp | 371 ++++++++++++++++++++ .../src/recycler/s3_obj_client.h | 48 ++- cloud/test/s3_accessor_test.cpp | 18 + 15 files changed, 715 insertions(+), 453 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 137a268b5aa..34e13749165 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -25,6 +25,14 @@ namespace doris { +namespace io { +struct ObjectStorageStatus; +} + +class Status; + +extern io::ObjectStorageStatus convert_to_obj_response(Status st); + class PStatus; namespace ErrorCode { @@ -352,11 +360,11 @@ public: Status() : _code(ErrorCode::OK), _err_msg(nullptr) {} // used to convert Exception to Status - Status(int code, std::string msg, std::string stack) : _code(code) { + Status(int code, std::string msg, std::string stack = "") : _code(code) { _err_msg = std::make_unique<ErrMsg>(); - _err_msg->_msg = msg; + _err_msg->_msg = std::move(msg); #ifdef ENABLE_STACKTRACE - _err_msg->_stack = stack; + _err_msg->_stack = std::move(stack); #endif } @@ -529,6 +537,10 @@ public: std::string_view msg() const { return _err_msg ? _err_msg->_msg : std::string_view(""); } + std::pair<int, std::string> retrieve_error_msg() { return {_code, std::move(_err_msg->_msg)}; } + + friend io::ObjectStorageStatus convert_to_obj_response(Status st); + private: int _code; struct ErrMsg { diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp index 8552c647cdd..6552d454824 100644 --- a/be/src/io/fs/err_utils.cpp +++ b/be/src/io/fs/err_utils.cpp @@ -27,10 +27,17 @@ #include "common/status.h" #include "io/fs/hdfs.h" +#include "io/fs/obj_storage_client.h" namespace doris { using namespace ErrorCode; +io::ObjectStorageStatus convert_to_obj_response(Status st) { + int code = st._code; + std::string msg = st._err_msg == nullptr ? "" : std::move(st._err_msg->_msg); + return io::ObjectStorageStatus {.code = code, .msg = std::move(msg)}; +} + namespace io { std::string errno_to_str() { diff --git a/be/src/io/fs/obj_storage_client.h b/be/src/io/fs/obj_storage_client.h index 40e0ff9a8fe..3ab0a8e2dea 100644 --- a/be/src/io/fs/obj_storage_client.h +++ b/be/src/io/fs/obj_storage_client.h @@ -46,14 +46,26 @@ struct ObjectStoragePathOptions { struct ObjectCompleteMultiParts {}; +struct ObjectStorageStatus { + int code = 0; + std::string msg = std::string(); +}; + +// We only store error code along with err_msg instead of Status to unify BE and recycler's error handle logic struct ObjectStorageResponse { - Status status = Status::OK(); + ObjectStorageStatus status {}; + int http_code {200}; + std::string request_id = std::string(); +}; + +struct ObjectStorageUploadResponse { + ObjectStorageResponse resp {}; std::optional<std::string> upload_id = std::nullopt; std::optional<std::string> etag = std::nullopt; }; struct ObjectStorageHeadResponse { - Status status = Status::OK(); + ObjectStorageResponse resp {}; long long file_size {0}; }; @@ -62,7 +74,8 @@ public: virtual ~ObjStorageClient() = default; // Create a multi-part upload request. On AWS-compatible systems, it will return an upload ID, but not on Azure. // The input parameters should include the bucket and key for the object storage. - virtual ObjectStorageResponse create_multipart_upload(const ObjectStoragePathOptions& opts) = 0; + virtual ObjectStorageUploadResponse create_multipart_upload( + const ObjectStoragePathOptions& opts) = 0; // To directly upload a piece of data to object storage and generate a user-visible file. // You need to clearly specify the bucket and key virtual ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts, @@ -71,8 +84,8 @@ public: // The temporary file's ID is the value of the part_num passed in // You need to specify the bucket and key along with the upload_id if it's AWS-compatible system // For the same bucket and key, as well as the same part_num, it will directly replace the original temporary file. - virtual ObjectStorageResponse upload_part(const ObjectStoragePathOptions& opts, - std::string_view stream, int part_num) = 0; + virtual ObjectStorageUploadResponse upload_part(const ObjectStoragePathOptions& opts, + std::string_view stream, int part_num) = 0; // To combine the previously uploaded multiple file parts into a complete file, the file name is the name of the key passed in. // If it is an AWS-compatible system, the upload_id needs to be included. // After a successful execution, the large file can be accessed in the object storage @@ -88,6 +101,7 @@ public: size_t offset, size_t bytes_read, size_t* size_return) = 0; // According to the passed bucket and prefix, it traverses and retrieves all files under the prefix, and returns the name and file size of all files. + // **Notice**: The files returned by this function contains the full key in object storage. virtual ObjectStorageResponse list_objects(const ObjectStoragePathOptions& opts, std::vector<FileInfo>* files) = 0; // According to the bucket and prefix specified by the user, it performs batch deletion based on the object names in the object array. diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 1ea6d5c3c03..e7775803198 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -117,8 +117,9 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea auto resp = client->get_object( { .bucket = _bucket, .key = _key, }, to, offset, bytes_req, bytes_read); // clang-format on - if (!resp.status.ok()) { - return resp.status.append(fmt::format("failed to read from {}", _path.native())); + if (resp.status.code != ErrorCode::OK) { + return std::move(Status(resp.status.code, std::move(resp.status.msg)) + .append(fmt::format("failed to read from {}", _path.native()))); } if (*bytes_read != bytes_req) { return Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})", diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 2cd40ea87a6..27aff992f4c 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -131,9 +131,10 @@ Result<int64_t> ObjClientHolder::object_file_size(const std::string& bucket, .key = key, }); - if (!resp.status.ok()) { - return ResultError(resp.status.append( - fmt::format("failed to head s3 file {}", full_s3_path(bucket, key)))); + if (resp.resp.status.code != ErrorCode::OK) { + return ResultError(std::move(Status(resp.resp.status.code, std::move(resp.resp.status.msg)) + .append(fmt::format("failed to head s3 file {}", + full_s3_path(bucket, key))))); } return resp.file_size; @@ -207,10 +208,11 @@ Status S3FileSystem::delete_file_impl(const Path& file) { auto resp = client->delete_object({.bucket = _bucket, .key = key}); - if (resp.status.ok() || resp.status.is<ErrorCode::NOT_FOUND>()) { + if (resp.status.code == ErrorCode::OK || resp.status.code == ErrorCode::NOT_FOUND) { return Status::OK(); } - return resp.status.append(fmt::format("failed to delete file {}", full_s3_path(key))); + return std::move(Status(resp.status.code, std::move(resp.status.msg)) + .append(fmt::format("failed to delete file {}", full_s3_path(key)))); } Status S3FileSystem::delete_directory_impl(const Path& dir) { @@ -222,13 +224,12 @@ Status S3FileSystem::delete_directory_impl(const Path& dir) { prefix.push_back('/'); } - return client - ->delete_objects_recursively({ - .path = full_s3_path(prefix), - .bucket = _bucket, - .prefix = prefix, - }) - .status; + auto resp = client->delete_objects_recursively({ + .path = full_s3_path(prefix), + .bucket = _bucket, + .prefix = prefix, + }); + return {resp.status.code, std::move(resp.status.msg)}; } Status S3FileSystem::batch_delete_impl(const std::vector<Path>& remote_files) { @@ -251,8 +252,9 @@ Status S3FileSystem::batch_delete_impl(const std::vector<Path>& remote_files) { return Status::OK(); } // clang-format off - RETURN_IF_ERROR(client->delete_objects( {.bucket = _bucket,}, std::move(objects)) - .status); + if (auto resp = client->delete_objects( {.bucket = _bucket,}, std::move(objects)); resp.status.code != ErrorCode::OK) { + return {resp.status.code, std::move(resp.status.msg)}; + } // clang-format on } while (path_iter != remote_files.end()); @@ -266,12 +268,14 @@ Status S3FileSystem::exists_impl(const Path& path, bool* res) const { auto resp = client->head_object({.bucket = _bucket, .key = key}); - if (resp.status.ok()) { + if (resp.resp.status.code == ErrorCode::OK) { *res = true; - } else if (resp.status.is<ErrorCode::NOT_FOUND>()) { + } else if (resp.resp.status.code == ErrorCode::NOT_FOUND) { *res = false; } else { - return resp.status.append(fmt::format("failed to check exists {}", full_s3_path(key))); + return std::move( + Status(resp.resp.status.code, std::move(resp.resp.status.msg)) + .append(fmt::format(" failed to check exists {}", full_s3_path(key)))); } return Status::OK(); } @@ -297,8 +301,13 @@ Status S3FileSystem::list_impl(const Path& dir, bool only_file, std::vector<File // clang-format off auto resp = client->list_objects( {.bucket = _bucket, .prefix = prefix,}, files); // clang-format on + if (resp.status.code == ErrorCode::OK) { + for (auto&& file : *files) { + file.file_name.erase(0, prefix.size()); + } + } - return resp.status; + return {resp.status.code, std::move(resp.status.msg)}; } Status S3FileSystem::rename_impl(const Path& orig_name, const Path& new_name) { @@ -347,10 +356,13 @@ Status S3FileSystem::batch_upload_impl(const std::vector<Path>& local_files, std::vector<FileWriterPtr> obj_writers(local_files.size()); - auto upload_task = [this](Path local_file, Path remote_file, FileWriterPtr* obj_writer) { + auto upload_task = [&, this](size_t idx) { + const auto& local_file = local_files[idx]; + const auto& remote_file = remote_files[idx]; + auto& obj_writer = obj_writers[idx]; auto key = DORIS_TRY(get_key(remote_file)); LOG(INFO) << "Start to upload " << local_file.native() << " to " << full_s3_path(key); - RETURN_IF_ERROR(create_file_impl(key, obj_writer, nullptr)); + RETURN_IF_ERROR(create_file_impl(key, &obj_writer, nullptr)); FileReaderSPtr local_reader; RETURN_IF_ERROR(io::global_local_filesystem()->open_file(local_file, &local_reader)); size_t local_buffer_size = config::s3_file_system_local_upload_buffer_size; @@ -360,24 +372,18 @@ Status S3FileSystem::batch_upload_impl(const std::vector<Path>& local_files, size_t bytes_read = 0; RETURN_IF_ERROR(local_reader->read_at( cur_read, Slice {write_buffer.get(), local_buffer_size}, &bytes_read)); - RETURN_IF_ERROR((*obj_writer)->append({write_buffer.get(), bytes_read})); + RETURN_IF_ERROR((*obj_writer).append({write_buffer.get(), bytes_read})); cur_read += bytes_read; } - RETURN_IF_ERROR((*obj_writer)->close()); + RETURN_IF_ERROR((*obj_writer).close()); return Status::OK(); }; std::vector<std::future<Status>> futures; for (int i = 0; i < local_files.size(); ++i) { - std::shared_ptr<std::packaged_task<Status(Path local_file, Path remote_file, - FileWriterPtr * obj_writer)>> - task = std::make_shared<std::packaged_task<Status(Path local_file, Path remote_file, - FileWriterPtr * obj_writer)>>( - upload_task); + auto task = std::make_shared<std::packaged_task<Status(size_t idx)>>(upload_task); futures.emplace_back(task->get_future()); - default_executor()->Submit( - [t = std::move(task), local = local_files[i], remote = remote_files[i], - obj_writer = &obj_writers[i]]() mutable { (*t)(local, remote, obj_writer); }); + default_executor()->Submit([t = std::move(task), idx = i]() mutable { (*t)(idx); }); } Status s = Status::OK(); for (auto&& f : futures) { @@ -401,16 +407,15 @@ Status S3FileSystem::download_impl(const Path& remote_file, const Path& local_fi auto resp = client->get_object( {.bucket = _bucket, .key = key,}, buf.get(), 0, size, &bytes_read); // clang-format on - if (!resp.status.ok()) { - return resp.status; + if (resp.status.code != ErrorCode::OK) { + return {resp.status.code, std::move(resp.status.msg)}; } Aws::OFStream local_file_s; local_file_s.open(local_file, std::ios::out | std::ios::binary); if (local_file_s.good()) { local_file_s << StringViewStream(buf.get(), size).rdbuf(); } else { - return Status::IOError("failed to download {}: failed to write file: {}", - remote_file.native(), local_file.native()); + return localfs_error(errno, fmt::format("failed to write file {}", local_file.native())); } return Status::OK(); diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 32319235964..91b5aace12b 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -98,10 +98,10 @@ Status S3FileWriter::_create_multi_upload_request() { return Status::InternalError<false>("invalid obj storage client"); } auto resp = client->create_multipart_upload(_obj_storage_path_opts); - if (resp.status.ok()) { + if (resp.resp.status.code == ErrorCode::OK) { _obj_storage_path_opts.upload_id = resp.upload_id; } - return resp.status; + return {resp.resp.status.code, std::move(resp.resp.status.msg)}; } void S3FileWriter::_wait_until_finish(std::string_view task_name) { @@ -304,8 +304,8 @@ void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) { return; } auto resp = client->upload_part(_obj_storage_path_opts, buf.get_string_view_data(), part_num); - if (!resp.status.ok()) { - buf.set_status(std::move(resp.status)); + if (resp.resp.status.code != ErrorCode::OK) { + buf.set_status(Status(resp.resp.status.code, std::move(resp.resp.status.msg))); return; } s3_bytes_written_total << buf.get_size(); @@ -353,8 +353,8 @@ Status S3FileWriter::_complete() { TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts); auto resp = client->complete_multipart_upload( _obj_storage_path_opts, S3CompleteMultiParts {.parts = _completed_parts}); - if (!resp.status.ok()) { - return resp.status; + if (resp.status.code != ErrorCode::OK) { + return {resp.status.code, std::move(resp.status.msg)}; } } s3_file_created_total << 1; @@ -388,8 +388,8 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) { } TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf); auto resp = client->put_object(_obj_storage_path_opts, buf.get_string_view_data()); - if (!resp.status.ok()) { - buf.set_status(std::move(resp.status)); + if (resp.status.code != ErrorCode::OK) { + buf.set_status({resp.status.code, std::move(resp.status.msg)}); return; } s3_file_created_total << 1; diff --git a/be/src/io/fs/s3_obj_storage_client.cpp b/be/src/io/fs/s3_obj_storage_client.cpp index 33d15e516b4..c45074e262c 100644 --- a/be/src/io/fs/s3_obj_storage_client.cpp +++ b/be/src/io/fs/s3_obj_storage_client.cpp @@ -82,9 +82,8 @@ using Aws::S3::Model::UploadPartOutcome; namespace doris::io { using namespace Aws::S3::Model; -using Aws::S3::S3Client; -ObjectStorageResponse S3ObjStorageClient::create_multipart_upload( +ObjectStorageUploadResponse S3ObjStorageClient::create_multipart_upload( const ObjectStoragePathOptions& opts) { CreateMultipartUploadRequest create_request; create_request.WithBucket(opts.bucket).WithKey(opts.key); @@ -97,13 +96,19 @@ ObjectStorageResponse S3ObjStorageClient::create_multipart_upload( SYNC_POINT_CALLBACK("s3_file_writer::_open", &outcome); if (outcome.IsSuccess()) { - return ObjectStorageResponse {.upload_id {outcome.GetResult().GetUploadId()}}; + return ObjectStorageUploadResponse {.upload_id {outcome.GetResult().GetUploadId()}}; } - return ObjectStorageResponse { - .status = s3fs_error( - outcome.GetError(), - fmt::format("failed to create multipart upload {} ", opts.path.native()))}; + + return ObjectStorageUploadResponse { + .resp = {convert_to_obj_response( + s3fs_error(outcome.GetError(), + fmt::format("failed to create multipart upload {} ", + opts.path.native()))), + static_cast<int>(outcome.GetError().GetResponseCode()), + outcome.GetError().GetRequestId()}, + }; } + ObjectStorageResponse S3ObjStorageClient::put_object(const ObjectStoragePathOptions& opts, std::string_view stream) { Aws::S3::Model::PutObjectRequest request; @@ -122,12 +127,15 @@ ObjectStorageResponse S3ObjStorageClient::put_object(const ObjectStoragePathOpti auto st = s3fs_error(response.GetError(), fmt::format("failed to put object {}", opts.path.native())); LOG(WARNING) << st; - return ObjectStorageResponse {.status = std::move(st)}; + return ObjectStorageResponse {convert_to_obj_response(std::move(st)), + static_cast<int>(response.GetError().GetResponseCode()), + response.GetError().GetRequestId()}; } return {}; } -ObjectStorageResponse S3ObjStorageClient::upload_part(const ObjectStoragePathOptions& opts, - std::string_view stream, int part_num) { + +ObjectStorageUploadResponse S3ObjStorageClient::upload_part(const ObjectStoragePathOptions& opts, + std::string_view stream, int part_num) { UploadPartRequest upload_request; upload_request.WithBucket(opts.bucket) .WithKey(opts.key) @@ -160,10 +168,14 @@ ObjectStorageResponse S3ObjStorageClient::upload_part(const ObjectStoragePathOpt upload_part_outcome.GetError().GetExceptionName(), upload_part_outcome.GetError().GetResponseCode()); LOG_WARNING(s.to_string()); - return ObjectStorageResponse {.status = std::move(s)}; + return ObjectStorageUploadResponse { + .resp = {convert_to_obj_response(std::move(s)), + static_cast<int>(upload_part_outcome.GetError().GetResponseCode()), + upload_part_outcome.GetError().GetRequestId()}}; } - return ObjectStorageResponse {.etag = upload_part_outcome.GetResult().GetETag()}; + return ObjectStorageUploadResponse {.etag = upload_part_outcome.GetResult().GetETag()}; } + ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload( const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts& completed_parts) { CompleteMultipartUploadRequest complete_request; @@ -188,10 +200,13 @@ ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload( fmt::format("failed to complete multi part upload {}, upload_id={}", opts.path.native(), *opts.upload_id)); LOG(WARNING) << st; - return {.status = std::move(st)}; + return {convert_to_obj_response(std::move(st)), + static_cast<int>(complete_outcome.GetError().GetResponseCode()), + complete_outcome.GetError().GetRequestId()}; } return {}; } + ObjectStorageHeadResponse S3ObjStorageClient::head_object(const ObjectStoragePathOptions& opts) { Aws::S3::Model::HeadObjectRequest request; request.WithBucket(opts.bucket).WithKey(opts.key); @@ -200,15 +215,20 @@ ObjectStorageHeadResponse S3ObjStorageClient::head_object(const ObjectStoragePat auto outcome = SYNC_POINT_HOOK_RETURN_VALUE( _client->HeadObject(request), "s3_file_system::head_object", std::ref(request).get()); if (outcome.IsSuccess()) { - return {.status = Status::OK(), .file_size = outcome.GetResult().GetContentLength()}; + return {.resp = {convert_to_obj_response(Status::OK())}, + .file_size = outcome.GetResult().GetContentLength()}; } else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { - return {.status = Status::NotFound("")}; + return {.resp = {convert_to_obj_response(Status::NotFound(""))}}; } else { - return {.status = s3fs_error(outcome.GetError(), - fmt::format("failed to check exists {}", opts.key))}; + return {.resp = {convert_to_obj_response( + s3fs_error(outcome.GetError(), + fmt::format("failed to check exists {}", opts.key))), + static_cast<int>(outcome.GetError().GetResponseCode()), + outcome.GetError().GetRequestId()}}; } return {}; } + ObjectStorageResponse S3ObjStorageClient::get_object(const ObjectStoragePathOptions& opts, void* buffer, size_t offset, size_t bytes_read, size_t* size_return) { @@ -220,17 +240,21 @@ ObjectStorageResponse S3ObjStorageClient::get_object(const ObjectStoragePathOpti SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency); auto outcome = _client->GetObject(request); if (!outcome.IsSuccess()) { - return {.status = s3fs_error(outcome.GetError(), - fmt::format("failed to read from {}", opts.path.native()))}; + return {convert_to_obj_response( + s3fs_error(outcome.GetError(), + fmt::format("failed to read from {}", opts.path.native()))), + static_cast<int>(outcome.GetError().GetResponseCode()), + outcome.GetError().GetRequestId()}; } *size_return = outcome.GetResult().GetContentLength(); if (*size_return != bytes_read) { - return {.status = Status::InternalError( - "failed to read from {}(bytes read: {}, bytes req: {})", opts.path.native(), - *size_return, bytes_read)}; + return {convert_to_obj_response( + Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})", + opts.path.native(), *size_return, bytes_read))}; } return {}; } + ObjectStorageResponse S3ObjStorageClient::list_objects(const ObjectStoragePathOptions& opts, std::vector<FileInfo>* files) { Aws::S3::Model::ListObjectsV2Request request; @@ -243,8 +267,11 @@ ObjectStorageResponse S3ObjStorageClient::list_objects(const ObjectStoragePathOp outcome = _client->ListObjectsV2(request); } if (!outcome.IsSuccess()) { - return {.status = s3fs_error(outcome.GetError(), - fmt::format("failed to list {}", opts.prefix))}; + files->clear(); + return {convert_to_obj_response(s3fs_error( + outcome.GetError(), fmt::format("failed to list {}", opts.prefix))), + static_cast<int>(outcome.GetError().GetResponseCode()), + outcome.GetError().GetRequestId()}; } for (const auto& obj : outcome.GetResult().GetContents()) { std::string key = obj.GetKey(); @@ -260,6 +287,7 @@ ObjectStorageResponse S3ObjStorageClient::list_objects(const ObjectStoragePathOp } while (is_trucated); return {}; } + ObjectStorageResponse S3ObjStorageClient::delete_objects(const ObjectStoragePathOptions& opts, std::vector<std::string> objs) { Aws::S3::Model::DeleteObjectsRequest delete_request; @@ -276,16 +304,20 @@ ObjectStorageResponse S3ObjStorageClient::delete_objects(const ObjectStoragePath SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency); auto delete_outcome = _client->DeleteObjects(delete_request); if (!delete_outcome.IsSuccess()) { - return {.status = s3fs_error(delete_outcome.GetError(), - fmt::format("failed to delete dir {}", opts.key))}; + return {convert_to_obj_response( + s3fs_error(delete_outcome.GetError(), + fmt::format("failed to delete dir {}", opts.key))), + static_cast<int>(delete_outcome.GetError().GetResponseCode()), + delete_outcome.GetError().GetRequestId()}; } if (!delete_outcome.GetResult().GetErrors().empty()) { const auto& e = delete_outcome.GetResult().GetErrors().front(); - return {.status = Status::InternalError("failed to delete object {}: {}", e.GetKey(), - e.GetMessage())}; + return {convert_to_obj_response(Status::InternalError("failed to delete object {}: {}", + e.GetKey(), e.GetMessage()))}; } return {}; } + ObjectStorageResponse S3ObjStorageClient::delete_object(const ObjectStoragePathOptions& opts) { Aws::S3::Model::DeleteObjectRequest request; request.WithBucket(opts.bucket).WithKey(opts.key); @@ -296,8 +328,10 @@ ObjectStorageResponse S3ObjStorageClient::delete_object(const ObjectStoragePathO outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { return {}; } - return {.status = s3fs_error(outcome.GetError(), - fmt::format("failed to delete file {}", opts.key))}; + return {convert_to_obj_response(s3fs_error(outcome.GetError(), + fmt::format("failed to delete file {}", opts.key))), + static_cast<int>(outcome.GetError().GetResponseCode()), + outcome.GetError().GetRequestId()}; } ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively( @@ -314,9 +348,11 @@ ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively( outcome = _client->ListObjectsV2(request); } if (!outcome.IsSuccess()) { - return {.status = s3fs_error( + return {convert_to_obj_response(s3fs_error( outcome.GetError(), - fmt::format("failed to list objects when delete dir {}", opts.prefix))}; + fmt::format("failed to list objects when delete dir {}", opts.prefix))), + static_cast<int>(outcome.GetError().GetResponseCode()), + outcome.GetError().GetRequestId()}; } const auto& result = outcome.GetResult(); Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects; @@ -331,13 +367,16 @@ ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively( SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency); auto delete_outcome = _client->DeleteObjects(delete_request); if (!delete_outcome.IsSuccess()) { - return {.status = s3fs_error(delete_outcome.GetError(), - fmt::format("failed to delete dir {}", opts.key))}; + return {convert_to_obj_response( + s3fs_error(delete_outcome.GetError(), + fmt::format("failed to delete dir {}", opts.key))), + static_cast<int>(delete_outcome.GetError().GetResponseCode()), + delete_outcome.GetError().GetRequestId()}; } if (!delete_outcome.GetResult().GetErrors().empty()) { const auto& e = delete_outcome.GetResult().GetErrors().front(); - return {.status = Status::InternalError("failed to delete object {}: {}", opts.key, - e.GetMessage())}; + return {convert_to_obj_response(Status::InternalError( + "failed to delete object {}: {}", opts.key, e.GetMessage()))}; } } is_trucated = result.GetIsTruncated(); @@ -345,4 +384,5 @@ ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively( } while (is_trucated); return {}; } + } // namespace doris::io \ No newline at end of file diff --git a/be/src/io/fs/s3_obj_storage_client.h b/be/src/io/fs/s3_obj_storage_client.h index 47a9ed733db..ea2b00e58df 100644 --- a/be/src/io/fs/s3_obj_storage_client.h +++ b/be/src/io/fs/s3_obj_storage_client.h @@ -39,11 +39,12 @@ class S3ObjStorageClient final : public ObjStorageClient { public: S3ObjStorageClient(std::shared_ptr<Aws::S3::S3Client> client) : _client(std::move(client)) {} ~S3ObjStorageClient() override = default; - ObjectStorageResponse create_multipart_upload(const ObjectStoragePathOptions& opts) override; + ObjectStorageUploadResponse create_multipart_upload( + const ObjectStoragePathOptions& opts) override; ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts, std::string_view stream) override; - ObjectStorageResponse upload_part(const ObjectStoragePathOptions& opts, std::string_view, - int partNum) override; + ObjectStorageUploadResponse upload_part(const ObjectStoragePathOptions& opts, std::string_view, + int partNum) override; ObjectStorageResponse complete_multipart_upload( const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts& completed_parts) override; diff --git a/be/test/io/fs/s3_file_writer_test.cpp b/be/test/io/fs/s3_file_writer_test.cpp index 7bad5ad8630..75a49d813a4 100644 --- a/be/test/io/fs/s3_file_writer_test.cpp +++ b/be/test/io/fs/s3_file_writer_test.cpp @@ -943,9 +943,8 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_2) { sp->set_call_back("S3FileWriter::_complete:2", [](auto&& outcome) { // Deliberately make one upload one part task fail to test if s3 file writer could // handle io error - std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>* parts = - try_any_cast<std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>( - outcome.back()); + auto* parts = try_any_cast<std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>( + outcome.back()); size_t size = parts->size(); parts->back()->SetPartNumber(size + 2); }); @@ -992,12 +991,9 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_1) { sp->set_call_back("S3FileWriter::_complete:1", [](auto&& outcome) { // Deliberately make one upload one part task fail to test if s3 file writer could // handle io error - const std::pair<std::atomic_bool*, - std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>& points = - try_any_cast<const std::pair< - std::atomic_bool*, - std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>&>( - outcome.back()); + const auto& points = try_any_cast<const std::pair< + std::atomic_bool*, std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>&>( + outcome.back()); (*points.first) = false; points.second->pop_back(); }); @@ -1044,7 +1040,8 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_3) { sp->set_call_back("S3FileWriter::_complete:3", [](auto&& outcome) { auto pair = try_any_cast_ret<io::ObjectStorageResponse>(outcome); pair->second = true; - pair->first = io::ObjectStorageResponse {.status = Status::IOError("inject error")}; + pair->first = io::ObjectStorageResponse { + .status = convert_to_obj_response(Status::IOError<false>("inject error"))}; }); Defer defer {[&]() { sp->clear_call_back("S3FileWriter::_complete:3"); }}; auto client = s3_fs->client_holder(); diff --git a/cloud/src/recycler/obj_store_accessor.h b/cloud/src/recycler/obj_store_accessor.h index 8787841aba7..a29266133ca 100644 --- a/cloud/src/recycler/obj_store_accessor.h +++ b/cloud/src/recycler/obj_store_accessor.h @@ -17,6 +17,7 @@ #pragma once +#include <functional> #include <string> #include <vector> @@ -25,6 +26,7 @@ namespace doris::cloud { struct ObjectMeta { std::string path; // Relative path to accessor prefix int64_t size {0}; + int64_t last_modify_second {0}; }; enum class AccessorType { @@ -69,4 +71,59 @@ private: const AccessorType type_; }; +struct ObjectStoragePathOptions { + std::string bucket; // blob container in azure + std::string key; // blob name + std::string prefix; // for batch delete and recursive delete + std::string_view endpoint; +}; + +struct ObjectStorageDeleteExpiredOptions { + ObjectStoragePathOptions path_opts; + std::function<std::string(const std::string& path)> relative_path_factory; +}; + +struct ObjectCompleteMultiParts {}; + +struct ObjectStorageResponse { + ObjectStorageResponse(int r, std::string msg = "") : ret(r), error_msg(std::move(msg)) {} + // clang-format off + int ret {0}; // To unify the error handle logic with BE, we'd better use the same error code as BE + // clang-format on + std::string error_msg; +}; + +// wrapper class owned by concret fs +class ObjStorageClient { +public: + virtual ~ObjStorageClient() = default; + // To directly upload a piece of data to object storage and generate a user-visible file. + // You need to clearly specify the bucket and key + virtual ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts, + std::string_view stream) = 0; + // According to the passed bucket and key, it will access whether the corresponding file exists in the object storage. + // If it exists, it will return the corresponding file size + virtual ObjectStorageResponse head_object(const ObjectStoragePathOptions& opts) = 0; + // According to the passed bucket and prefix, it traverses and retrieves all files under the prefix, and returns the name and file size of all files. + // **Attention**: The ObjectMeta contains the full key in object storage + virtual ObjectStorageResponse list_objects(const ObjectStoragePathOptions& opts, + std::vector<ObjectMeta>* files) = 0; + // According to the bucket and prefix specified by the user, it performs batch deletion based on the object names in the object array. + virtual ObjectStorageResponse delete_objects(const ObjectStoragePathOptions& opts, + std::vector<std::string> objs) = 0; + // Delete the file named key in the object storage bucket. + virtual ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts) = 0; + // According to the prefix, recursively delete all files under the prefix. + virtual ObjectStorageResponse delete_objects_recursively( + const ObjectStoragePathOptions& opts) = 0; + // Delete all the objects under the prefix which expires before the expired_time + virtual ObjectStorageResponse delete_expired(const ObjectStorageDeleteExpiredOptions& opts, + int64_t expired_time) = 0; + // Get the objects' expiration time on the bucket + virtual ObjectStorageResponse get_life_cycle(const ObjectStoragePathOptions& opts, + int64_t* expiration_days) = 0; + // Check if the objects' versioning is on or off + virtual ObjectStorageResponse check_versioning(const ObjectStoragePathOptions& opts) = 0; +}; + } // namespace doris::cloud diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 3f5d78b1c8f..04f642f3831 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -32,6 +32,7 @@ #include <algorithm> #include <execution> +#include <type_traits> #include <utility> #include "common/config.h" @@ -39,6 +40,7 @@ #include "common/sync_point.h" #include "rate-limiter/s3_rate_limiter.h" #include "recycler/obj_store_accessor.h" +#include "recycler/s3_obj_client.h" namespace doris::cloud { @@ -53,43 +55,28 @@ private: std::array<std::unique_ptr<S3RateLimiterHolder>, 2> _rate_limiters; }; -[[maybe_unused]] static Aws::Client::AWSError<Aws::S3::S3Errors> s3_error_factory() { - return {Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds limit", false}; -} - template <typename Func> -auto do_s3_rate_limit(S3RateLimitType type, Func callback) -> decltype(callback()) { +auto s3_rate_limit(S3RateLimitType op, Func callback) -> decltype(callback()) { using T = decltype(callback()); if (!config::enable_s3_rate_limiter) { return callback(); } - auto sleep_duration = AccessorRateLimiter::instance().rate_limiter(type)->add(1); + auto sleep_duration = AccessorRateLimiter::instance().rate_limiter(op)->add(1); if (sleep_duration < 0) { - return T(s3_error_factory()); + return T(-1); } return callback(); } -#ifndef UNIT_TEST -#define HELP_MACRO(ret, req, point_name) -#else -#define HELP_MACRO(ret, req, point_name) \ - do { \ - std::pair p {&ret, &req}; \ - [[maybe_unused]] auto ret_pair = [&p]() mutable { \ - TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, &p); \ - return p; \ - }(); \ - return ret; \ - } while (false); -#endif -#define SYNC_POINT_HOOK_RETURN_VALUE(expr, request, point_name, type) \ - [&]() -> decltype(auto) { \ - using T = decltype((expr)); \ - [[maybe_unused]] T t; \ - HELP_MACRO(t, request, point_name) \ - return do_s3_rate_limit(type, [&]() { return (expr); }); \ - }() +template <typename Func> +auto s3_get_rate_limit(Func callback) -> decltype(callback()) { + return s3_rate_limit(S3RateLimitType::GET, std::move(callback)); +} + +template <typename Func> +auto s3_put_rate_limit(Func callback) -> decltype(callback()) { + return s3_rate_limit(S3RateLimitType::PUT, std::move(callback)); +} AccessorRateLimiter::AccessorRateLimiter() : _rate_limiters {std::make_unique<S3RateLimiterHolder>( @@ -157,83 +144,20 @@ int S3Accessor::init() { aws_config.region = conf_.region; aws_config.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>( /*maxRetries = 10, scaleFactor = 25*/); - s3_client_ = std::make_shared<Aws::S3::S3Client>( + auto s3_client = std::make_shared<Aws::S3::S3Client>( std::move(aws_cred), std::move(aws_config), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true /* useVirtualAddressing */); + obj_client_ = std::make_shared<S3ObjClient>(std::move(s3_client)); return 0; } int S3Accessor::delete_objects_by_prefix(const std::string& relative_path) { - Aws::S3::Model::ListObjectsV2Request request; - auto prefix = get_key(relative_path); - request.WithBucket(conf_.bucket).WithPrefix(prefix); - - Aws::S3::Model::DeleteObjectsRequest delete_request; - delete_request.SetBucket(conf_.bucket); - bool is_truncated = false; - do { - auto outcome = SYNC_POINT_HOOK_RETURN_VALUE( - s3_client_->ListObjectsV2(request), std::ref(request).get(), - "s3_client::list_objects_v2", S3RateLimitType::GET); - if (!outcome.IsSuccess()) { - LOG_WARNING("failed to list objects") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("prefix", prefix) - .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) - .tag("error", outcome.GetError().GetMessage()); - if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) { - return 1; - } - return -1; - } - const auto& result = outcome.GetResult(); - VLOG_DEBUG << "get " << result.GetContents().size() << " objects"; - Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects; - objects.reserve(result.GetContents().size()); - for (const auto& obj : result.GetContents()) { - objects.emplace_back().SetKey(obj.GetKey()); - LOG_INFO("delete object") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("key", obj.GetKey()); - } - if (!objects.empty()) { - Aws::S3::Model::Delete del; - del.WithObjects(std::move(objects)).SetQuiet(true); - delete_request.SetDelete(std::move(del)); - auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE( - s3_client_->DeleteObjects(delete_request), std::ref(delete_request).get(), - "s3_client::delete_objects", S3RateLimitType::PUT); - if (!delete_outcome.IsSuccess()) { - LOG_WARNING("failed to delete objects") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("prefix", prefix) - .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) - .tag("error", outcome.GetError().GetMessage()); - if (delete_outcome.GetError().GetResponseCode() == - Aws::Http::HttpResponseCode::FORBIDDEN) { - return 1; - } - return -2; - } - if (!delete_outcome.GetResult().GetErrors().empty()) { - const auto& e = delete_outcome.GetResult().GetErrors().front(); - LOG_WARNING("failed to delete object") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("key", e.GetKey()) - .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) - .tag("error", e.GetMessage()); - return -3; - } - } - is_truncated = result.GetIsTruncated(); - request.SetContinuationToken(result.GetNextContinuationToken()); - } while (is_truncated); - return 0; + return s3_get_rate_limit([&]() { + return obj_client_->delete_objects_recursively( + {.bucket = conf_.bucket, .prefix = get_key(relative_path)}); + }) + .ret; } int S3Accessor::delete_objects(const std::vector<std::string>& relative_paths) { @@ -244,11 +168,8 @@ int S3Accessor::delete_objects(const std::vector<std::string>& relative_paths) { constexpr size_t max_delete_batch = 1000; auto path_iter = relative_paths.begin(); - Aws::S3::Model::DeleteObjectsRequest delete_request; - delete_request.SetBucket(conf_.bucket); do { - Aws::S3::Model::Delete del; - Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects; + Aws::Vector<std::string> objects; auto path_begin = path_iter; for (; path_iter != relative_paths.end() && (path_iter - path_begin < max_delete_batch); ++path_iter) { @@ -258,36 +179,16 @@ int S3Accessor::delete_objects(const std::vector<std::string>& relative_paths) { .tag("bucket", conf_.bucket) .tag("key", key) .tag("size", objects.size()); - objects.emplace_back().SetKey(std::move(key)); + objects.emplace_back(std::move(key)); } if (objects.empty()) { return 0; } - del.WithObjects(std::move(objects)).SetQuiet(true); - delete_request.SetDelete(std::move(del)); - auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE( - s3_client_->DeleteObjects(delete_request), std::ref(delete_request).get(), - "s3_client::delete_objects", S3RateLimitType::PUT); - if (!delete_outcome.IsSuccess()) { - LOG_WARNING("failed to delete objects") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("key[0]", delete_request.GetDelete().GetObjects().front().GetKey()) - .tag("responseCode", - static_cast<int>(delete_outcome.GetError().GetResponseCode())) - .tag("error", delete_outcome.GetError().GetMessage()); - return -1; - } - if (!delete_outcome.GetResult().GetErrors().empty()) { - const auto& e = delete_outcome.GetResult().GetErrors().front(); - LOG_WARNING("failed to delete object") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("key", e.GetKey()) - .tag("responseCode", - static_cast<int>(delete_outcome.GetError().GetResponseCode())) - .tag("error", e.GetMessage()); - return -2; + if (auto delete_resp = s3_put_rate_limit([&]() { + return obj_client_->delete_objects({.bucket = conf_.bucket}, std::move(objects)); + }); + delete_resp.ret != 0) { + return delete_resp.ret; } } while (path_iter != relative_paths.end()); @@ -295,218 +196,64 @@ int S3Accessor::delete_objects(const std::vector<std::string>& relative_paths) { } int S3Accessor::delete_object(const std::string& relative_path) { - Aws::S3::Model::DeleteObjectRequest request; - auto key = get_key(relative_path); - request.WithBucket(conf_.bucket).WithKey(key); - auto outcome = - SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObject(request), std::ref(request).get(), - "s3_client::delete_object", S3RateLimitType::PUT); - if (!outcome.IsSuccess()) { - LOG_WARNING("failed to delete object") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("key", key) - .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) - .tag("error", outcome.GetError().GetMessage()) - .tag("exception", outcome.GetError().GetExceptionName()); - return -1; - } - return 0; + return s3_put_rate_limit([&]() { + return obj_client_->delete_object({.bucket = conf_.bucket, .key = get_key(relative_path)}) + .ret; + }); } int S3Accessor::put_object(const std::string& relative_path, const std::string& content) { - Aws::S3::Model::PutObjectRequest request; - auto key = get_key(relative_path); - request.WithBucket(conf_.bucket).WithKey(key); - auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor"); - *input << content; - request.SetBody(input); - auto outcome = - SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request), std::ref(request).get(), - "s3_client::put_object", S3RateLimitType::PUT); - if (!outcome.IsSuccess()) { - LOG_WARNING("failed to put object") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("key", key) - .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) - .tag("error", outcome.GetError().GetMessage()); - return -1; - } - return 0; + return s3_put_rate_limit([&]() { + return obj_client_ + ->put_object({.bucket = conf_.bucket, .key = get_key(relative_path)}, content) + .ret; + }); } int S3Accessor::list(const std::string& relative_path, std::vector<ObjectMeta>* files) { - Aws::S3::Model::ListObjectsV2Request request; - auto prefix = get_key(relative_path); - request.WithBucket(conf_.bucket).WithPrefix(prefix); - - bool is_truncated = false; - do { - auto outcome = SYNC_POINT_HOOK_RETURN_VALUE( - s3_client_->ListObjectsV2(request), std::ref(request).get(), - "s3_client::list_objects_v2", S3RateLimitType::GET); - ; - if (!outcome.IsSuccess()) { - LOG_WARNING("failed to list objects") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("prefix", prefix) - .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) - .tag("error", outcome.GetError().GetMessage()); - return -1; - } - const auto& result = outcome.GetResult(); - VLOG_DEBUG << "get " << result.GetContents().size() << " objects"; - for (const auto& obj : result.GetContents()) { - files->push_back({obj.GetKey().substr(conf_.prefix.size() + 1), obj.GetSize()}); + return s3_get_rate_limit([&]() { + auto resp = obj_client_->list_objects( + {.bucket = conf_.bucket, .prefix = get_key(relative_path)}, files); + + if (resp.ret == 0) { + auto pos = conf_.prefix.size() + 1; + for (auto&& file : *files) { + file.path = file.path.substr(pos); + } } - is_truncated = result.GetIsTruncated(); - request.SetContinuationToken(result.GetNextContinuationToken()); - } while (is_truncated); - return 0; + + return resp.ret; + }); } int S3Accessor::exist(const std::string& relative_path) { - Aws::S3::Model::HeadObjectRequest request; - auto key = get_key(relative_path); - request.WithBucket(conf_.bucket).WithKey(key); - auto outcome = - SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request), std::ref(request).get(), - "s3_client::head_object", S3RateLimitType::GET); - if (outcome.IsSuccess()) { - return 0; - } else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { - return 1; - } else { - LOG_WARNING("failed to head object") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("key", key) - .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) - .tag("error", outcome.GetError().GetMessage()); - return -1; - } + return s3_get_rate_limit([&]() { + return obj_client_->head_object({.bucket = conf_.bucket, .key = get_key(relative_path)}) + .ret; + }); } int S3Accessor::delete_expired_objects(const std::string& relative_path, int64_t expired_time) { - Aws::S3::Model::ListObjectsV2Request request; - auto prefix = get_key(relative_path); - request.WithBucket(conf_.bucket).WithPrefix(prefix); - - bool is_truncated = false; - do { - auto outcome = SYNC_POINT_HOOK_RETURN_VALUE( - s3_client_->ListObjectsV2(request), std::ref(request).get(), - "s3_client::list_objects_v2", S3RateLimitType::GET); - if (!outcome.IsSuccess()) { - LOG_WARNING("failed to list objects") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("prefix", prefix) - .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) - .tag("error", outcome.GetError().GetMessage()); - return -1; - } - const auto& result = outcome.GetResult(); - std::vector<std::string> expired_keys; - for (const auto& obj : result.GetContents()) { - if (obj.GetLastModified().Seconds() < expired_time) { - auto relative_key = get_relative_path(obj.GetKey()); - if (relative_key.empty()) { - LOG_WARNING("failed get relative path") - .tag("prefix", conf_.prefix) - .tag("key", obj.GetKey()); - } else { - expired_keys.push_back(relative_key); - LOG_INFO("delete expired object") - .tag("prefix", conf_.prefix) - .tag("key", obj.GetKey()) - .tag("relative_key", relative_key) - .tag("lastModifiedTime", obj.GetLastModified().Seconds()) - .tag("expiredTime", expired_time); - } - } - } - - auto ret = delete_objects(expired_keys); - if (ret != 0) { - return ret; - } - LOG_INFO("delete expired objects") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("prefix", conf_.prefix) - .tag("num_scanned", result.GetContents().size()) - .tag("num_recycled", expired_keys.size()); - is_truncated = result.GetIsTruncated(); - request.SetContinuationToken(result.GetNextContinuationToken()); - } while (is_truncated); - return 0; + return s3_put_rate_limit([&]() { + return obj_client_ + ->delete_expired( + {.path_opts = {.bucket = conf_.bucket, .prefix = get_key(relative_path)}, + .relative_path_factory = + [&](const std::string& key) { return get_relative_path(key); }}, + expired_time) + .ret; + }); } int S3Accessor::get_bucket_lifecycle(int64_t* expiration_days) { - Aws::S3::Model::GetBucketLifecycleConfigurationRequest request; - request.SetBucket(conf_.bucket); - - auto outcome = SYNC_POINT_HOOK_RETURN_VALUE( - s3_client_->GetBucketLifecycleConfiguration(request), std::ref(request).get(), - "s3_client::get_bucket_lifecycle_configuration", S3RateLimitType::GET); - bool has_lifecycle = false; - if (outcome.IsSuccess()) { - const auto& rules = outcome.GetResult().GetRules(); - for (const auto& rule : rules) { - if (rule.NoncurrentVersionExpirationHasBeenSet()) { - has_lifecycle = true; - *expiration_days = rule.GetNoncurrentVersionExpiration().GetNoncurrentDays(); - } - } - } else { - LOG_WARNING("Err for check interval: failed to get bucket lifecycle") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("prefix", conf_.prefix) - .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) - .tag("error", outcome.GetError().GetMessage()); - return -1; - } - - if (!has_lifecycle) { - LOG_WARNING("Err for check interval: bucket doesn't have lifecycle configuration") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("prefix", conf_.prefix); - return -1; - } - return 0; + return s3_get_rate_limit([&]() { + return obj_client_->get_life_cycle({.bucket = conf_.bucket}, expiration_days).ret; + }); } int S3Accessor::check_bucket_versioning() { - Aws::S3::Model::GetBucketVersioningRequest request; - request.SetBucket(conf_.bucket); - auto outcome = SYNC_POINT_HOOK_RETURN_VALUE( - s3_client_->GetBucketVersioning(request), std::ref(request).get(), - "s3_client::get_bucket_versioning", S3RateLimitType::GET); - - if (outcome.IsSuccess()) { - const auto& versioning_configuration = outcome.GetResult().GetStatus(); - if (versioning_configuration != Aws::S3::Model::BucketVersioningStatus::Enabled) { - LOG_WARNING("Err for check interval: bucket doesn't enable bucket versioning") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("prefix", conf_.prefix); - return -1; - } - } else { - LOG_WARNING("Err for check interval: failed to get status of bucket versioning") - .tag("endpoint", conf_.endpoint) - .tag("bucket", conf_.bucket) - .tag("prefix", conf_.prefix) - .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) - .tag("error", outcome.GetError().GetMessage()); - return -1; - } - return 0; + return s3_get_rate_limit( + [&]() { return obj_client_->check_versioning({.bucket = conf_.bucket}).ret; }); } int GcsAccessor::delete_objects(const std::vector<std::string>& relative_paths) { @@ -523,6 +270,4 @@ int GcsAccessor::delete_objects(const std::vector<std::string>& relative_paths) } return ret; } -#undef SYNC_POINT_HOOK_RETURN_VALUE -#undef HELP_MACRO } // namespace doris::cloud diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h index 1a44b0971cb..5221f90cafc 100644 --- a/cloud/src/recycler/s3_accessor.h +++ b/cloud/src/recycler/s3_accessor.h @@ -20,6 +20,7 @@ #include <memory> #include "recycler/obj_store_accessor.h" +#include "recycler/s3_obj_client.h" namespace Aws::S3 { class S3Client; @@ -47,7 +48,8 @@ public: const std::string& path() const override { return path_; } - const std::shared_ptr<Aws::S3::S3Client>& s3_client() const { return s3_client_; } + // TODO(ByteYue): refactor this function to suite different kind object storage + const std::shared_ptr<Aws::S3::S3Client>& s3_client() const { return obj_client_->s3_client(); } const S3Conf& conf() const { return conf_; } @@ -89,9 +91,9 @@ private: std::string get_relative_path(const std::string& key) const; private: - std::shared_ptr<Aws::S3::S3Client> s3_client_; S3Conf conf_; std::string path_; + std::shared_ptr<S3ObjClient> obj_client_; }; class GcsAccessor final : public S3Accessor { diff --git a/cloud/src/recycler/s3_obj_client.cpp b/cloud/src/recycler/s3_obj_client.cpp new file mode 100644 index 00000000000..2b3d83cd8ce --- /dev/null +++ b/cloud/src/recycler/s3_obj_client.cpp @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "recycler/s3_obj_client.h" + +#include <aws/s3/S3Client.h> +#include <aws/s3/model/DeleteObjectRequest.h> +#include <aws/s3/model/DeleteObjectsRequest.h> +#include <aws/s3/model/GetBucketLifecycleConfigurationRequest.h> +#include <aws/s3/model/GetBucketVersioningRequest.h> +#include <aws/s3/model/HeadObjectRequest.h> +#include <aws/s3/model/ListObjectsV2Request.h> +#include <aws/s3/model/PutObjectRequest.h> + +#include "common/logging.h" +#include "common/sync_point.h" + +namespace doris::cloud { + +#ifndef UNIT_TEST +#define HELPER_MACRO(ret, req, point_name) +#else +#define HELPER_MACRO(ret, req, point_name) \ + do { \ + std::pair p {&ret, &req}; \ + [[maybe_unused]] auto ret_pair = [&p]() mutable { \ + TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, &p); \ + return p; \ + }(); \ + return ret; \ + } while (false); +#endif +#define SYNC_POINT_HOOK_RETURN_VALUE(expr, request, point_name) \ + [&]() -> decltype(auto) { \ + using T = decltype((expr)); \ + [[maybe_unused]] T t; \ + HELPER_MACRO(t, request, point_name) \ + return (expr); \ + }() + +ObjectStorageResponse S3ObjClient::put_object(const ObjectStoragePathOptions& opts, + std::string_view stream) { + Aws::S3::Model::PutObjectRequest request; + request.WithBucket(opts.bucket).WithKey(opts.key); + auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor"); + *input << stream; + request.SetBody(input); + auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request), + std::ref(request).get(), "s3_client::put_object"); + if (!outcome.IsSuccess()) { + LOG_WARNING("failed to put object") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("key", opts.key) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + return -1; + } + return 0; +} +ObjectStorageResponse S3ObjClient::head_object(const ObjectStoragePathOptions& opts) { + Aws::S3::Model::HeadObjectRequest request; + request.WithBucket(opts.bucket).WithKey(opts.key); + auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request), + std::ref(request).get(), "s3_client::head_object"); + if (outcome.IsSuccess()) { + return 0; + } else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { + return 1; + } else { + LOG_WARNING("failed to head object") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("key", opts.key) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + return -1; + } +} +ObjectStorageResponse S3ObjClient::list_objects(const ObjectStoragePathOptions& opts, + std::vector<ObjectMeta>* files) { + Aws::S3::Model::ListObjectsV2Request request; + request.WithBucket(opts.bucket).WithPrefix(opts.prefix); + + bool is_truncated = false; + do { + auto outcome = + SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request), + std::ref(request).get(), "s3_client::list_objects_v2"); + if (!outcome.IsSuccess()) { + LOG_WARNING("failed to list objects") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("prefix", opts.prefix) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + return -1; + } + const auto& result = outcome.GetResult(); + VLOG_DEBUG << "get " << result.GetContents().size() << " objects"; + for (const auto& obj : result.GetContents()) { + files->push_back({obj.GetKey(), obj.GetSize(), obj.GetLastModified().Seconds()}); + } + is_truncated = result.GetIsTruncated(); + request.SetContinuationToken(result.GetNextContinuationToken()); + } while (is_truncated); + return 0; +} +ObjectStorageResponse S3ObjClient::delete_objects(const ObjectStoragePathOptions& opts, + std::vector<std::string> objs) { + Aws::S3::Model::DeleteObjectsRequest delete_request; + delete_request.SetBucket(opts.bucket); + Aws::S3::Model::Delete del; + Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects; + for (auto&& obj : objs) { + objects.emplace_back().SetKey(std::move(obj)); + } + del.WithObjects(std::move(objects)).SetQuiet(true); + delete_request.SetDelete(std::move(del)); + auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObjects(delete_request), + std::ref(delete_request).get(), + "s3_client::delete_objects"); + if (!delete_outcome.IsSuccess()) { + LOG_WARNING("failed to delete objects") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("key[0]", delete_request.GetDelete().GetObjects().front().GetKey()) + .tag("responseCode", static_cast<int>(delete_outcome.GetError().GetResponseCode())) + .tag("error", delete_outcome.GetError().GetMessage()); + return {-1}; + } + if (!delete_outcome.GetResult().GetErrors().empty()) { + const auto& e = delete_outcome.GetResult().GetErrors().front(); + LOG_WARNING("failed to delete object") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("key", e.GetKey()) + .tag("responseCode", static_cast<int>(delete_outcome.GetError().GetResponseCode())) + .tag("error", e.GetMessage()); + return {-2}; + } + return {0}; +} + +ObjectStorageResponse S3ObjClient::delete_object(const ObjectStoragePathOptions& opts) { + Aws::S3::Model::DeleteObjectRequest request; + request.WithBucket(opts.bucket).WithKey(opts.key); + auto outcome = SYNC_POINT_HOOK_RETURN_VALUE( + s3_client_->DeleteObject(request), std::ref(request).get(), "s3_client::delete_object"); + if (!outcome.IsSuccess()) { + LOG_WARNING("failed to delete object") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("key", opts.key) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()) + .tag("exception", outcome.GetError().GetExceptionName()); + return -1; + } + return 0; +} + +ObjectStorageResponse S3ObjClient::delete_objects_recursively( + const ObjectStoragePathOptions& opts) { + Aws::S3::Model::ListObjectsV2Request request; + request.WithBucket(opts.bucket).WithPrefix(opts.prefix); + + Aws::S3::Model::DeleteObjectsRequest delete_request; + delete_request.SetBucket(opts.bucket); + bool is_truncated = false; + do { + auto outcome = + SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request), + std::ref(request).get(), "s3_client::list_objects_v2"); + + if (!outcome.IsSuccess()) { + LOG_WARNING("failed to list objects") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("prefix", opts.prefix) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) { + return {1}; + } + return {-1}; + } + const auto& result = outcome.GetResult(); + VLOG_DEBUG << "get " << result.GetContents().size() << " objects"; + Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects; + objects.reserve(result.GetContents().size()); + for (const auto& obj : result.GetContents()) { + objects.emplace_back().SetKey(obj.GetKey()); + LOG_INFO("delete object") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("key", obj.GetKey()); + } + if (!objects.empty()) { + Aws::S3::Model::Delete del; + del.WithObjects(std::move(objects)).SetQuiet(true); + delete_request.SetDelete(std::move(del)); + auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE( + s3_client_->DeleteObjects(delete_request), std::ref(delete_request).get(), + "s3_client::delete_objects"); + if (!delete_outcome.IsSuccess()) { + LOG_WARNING("failed to delete objects") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("prefix", opts.prefix) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + if (delete_outcome.GetError().GetResponseCode() == + Aws::Http::HttpResponseCode::FORBIDDEN) { + return {1}; + } + return {-2}; + } + if (!delete_outcome.GetResult().GetErrors().empty()) { + const auto& e = delete_outcome.GetResult().GetErrors().front(); + LOG_WARNING("failed to delete object") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("key", e.GetKey()) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", e.GetMessage()); + return {-3}; + } + } + is_truncated = result.GetIsTruncated(); + request.SetContinuationToken(result.GetNextContinuationToken()); + } while (is_truncated); + return {0}; +} +ObjectStorageResponse S3ObjClient::delete_expired(const ObjectStorageDeleteExpiredOptions& opts, + int64_t expired_time) { + Aws::S3::Model::ListObjectsV2Request request; + request.WithBucket(opts.path_opts.bucket).WithPrefix(opts.path_opts.prefix); + + bool is_truncated = false; + do { + auto outcome = + SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request), + std::ref(request).get(), "s3_client::list_objects_v2"); + if (!outcome.IsSuccess()) { + LOG_WARNING("failed to list objects") + .tag("endpoint", opts.path_opts.endpoint) + .tag("bucket", opts.path_opts.bucket) + .tag("prefix", opts.path_opts.prefix) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + return -1; + } + const auto& result = outcome.GetResult(); + std::vector<std::string> expired_keys; + for (const auto& obj : result.GetContents()) { + if (obj.GetLastModified().Seconds() < expired_time) { + auto relative_key = opts.relative_path_factory(obj.GetKey()); + if (relative_key.empty()) { + LOG_WARNING("failed get relative path") + .tag("prefix", opts.path_opts.prefix) + .tag("key", obj.GetKey()); + } else { + expired_keys.push_back(obj.GetKey()); + LOG_INFO("delete expired object") + .tag("prefix", opts.path_opts.prefix) + .tag("key", obj.GetKey()) + .tag("relative_key", relative_key) + .tag("lastModifiedTime", obj.GetLastModified().Seconds()) + .tag("expiredTime", expired_time); + } + } + } + + auto ret = delete_objects(opts.path_opts, std::move(expired_keys)); + if (ret.ret != 0) { + return ret; + } + LOG_INFO("delete expired objects") + .tag("endpoint", opts.path_opts.endpoint) + .tag("bucket", opts.path_opts.bucket) + .tag("prefix", opts.path_opts.prefix) + .tag("num_scanned", result.GetContents().size()) + .tag("num_recycled", expired_keys.size()); + is_truncated = result.GetIsTruncated(); + request.SetContinuationToken(result.GetNextContinuationToken()); + } while (is_truncated); + return 0; +} +ObjectStorageResponse S3ObjClient::get_life_cycle(const ObjectStoragePathOptions& opts, + int64_t* expiration_days) { + Aws::S3::Model::GetBucketLifecycleConfigurationRequest request; + request.SetBucket(opts.bucket); + + auto outcome = SYNC_POINT_HOOK_RETURN_VALUE( + s3_client_->GetBucketLifecycleConfiguration(request), std::ref(request).get(), + "s3_client::get_bucket_lifecycle_configuration"); + bool has_lifecycle = false; + if (outcome.IsSuccess()) { + const auto& rules = outcome.GetResult().GetRules(); + for (const auto& rule : rules) { + if (rule.NoncurrentVersionExpirationHasBeenSet()) { + has_lifecycle = true; + *expiration_days = rule.GetNoncurrentVersionExpiration().GetNoncurrentDays(); + } + } + } else { + LOG_WARNING("Err for check interval: failed to get bucket lifecycle") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("prefix", opts.prefix) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + return -1; + } + + if (!has_lifecycle) { + LOG_WARNING("Err for check interval: bucket doesn't have lifecycle configuration") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("prefix", opts.prefix); + return -1; + } + return 0; +} + +ObjectStorageResponse S3ObjClient::check_versioning(const ObjectStoragePathOptions& opts) { + Aws::S3::Model::GetBucketVersioningRequest request; + request.SetBucket(opts.bucket); + auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->GetBucketVersioning(request), + std::ref(request).get(), + "s3_client::get_bucket_versioning"); + + if (outcome.IsSuccess()) { + const auto& versioning_configuration = outcome.GetResult().GetStatus(); + if (versioning_configuration != Aws::S3::Model::BucketVersioningStatus::Enabled) { + LOG_WARNING("Err for check interval: bucket doesn't enable bucket versioning") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("prefix", opts.prefix); + return -1; + } + } else { + LOG_WARNING("Err for check interval: failed to get status of bucket versioning") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("prefix", opts.prefix) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + return -1; + } + return 0; +} + +#undef SYNC_POINT_HOOK_RETURN_VALUE +#undef HELPER_MACRO +} // namespace doris::cloud \ No newline at end of file diff --git a/be/src/io/fs/s3_obj_storage_client.h b/cloud/src/recycler/s3_obj_client.h similarity index 51% copy from be/src/io/fs/s3_obj_storage_client.h copy to cloud/src/recycler/s3_obj_client.h index 47a9ed733db..891474b5289 100644 --- a/be/src/io/fs/s3_obj_storage_client.h +++ b/cloud/src/recycler/s3_obj_client.h @@ -17,49 +17,41 @@ #pragma once -#include "io/fs/obj_storage_client.h" -#include "io/fs/s3_file_system.h" +#include <memory> + +#include "recycler/obj_store_accessor.h" namespace Aws::S3 { class S3Client; -namespace Model { -class CompletedPart; -} } // namespace Aws::S3 -namespace doris::io { - -struct S3CompleteMultiParts : public ObjectCompleteMultiParts { - std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& parts; -}; - -class ObjClientHolder; +namespace doris::cloud { -class S3ObjStorageClient final : public ObjStorageClient { +class S3ObjClient : public ObjStorageClient { public: - S3ObjStorageClient(std::shared_ptr<Aws::S3::S3Client> client) : _client(std::move(client)) {} - ~S3ObjStorageClient() override = default; - ObjectStorageResponse create_multipart_upload(const ObjectStoragePathOptions& opts) override; + S3ObjClient(std::shared_ptr<Aws::S3::S3Client> client) : s3_client_(std::move(client)) {} + ~S3ObjClient() override = default; + ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts, std::string_view stream) override; - ObjectStorageResponse upload_part(const ObjectStoragePathOptions& opts, std::string_view, - int partNum) override; - ObjectStorageResponse complete_multipart_upload( - const ObjectStoragePathOptions& opts, - const ObjectCompleteMultiParts& completed_parts) override; - ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& opts) override; - ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, void* buffer, - size_t offset, size_t bytes_read, - size_t* size_return) override; + ObjectStorageResponse head_object(const ObjectStoragePathOptions& opts) override; ObjectStorageResponse list_objects(const ObjectStoragePathOptions& opts, - std::vector<FileInfo>* files) override; + std::vector<ObjectMeta>* files) override; ObjectStorageResponse delete_objects(const ObjectStoragePathOptions& opts, std::vector<std::string> objs) override; ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts) override; ObjectStorageResponse delete_objects_recursively(const ObjectStoragePathOptions& opts) override; + ObjectStorageResponse delete_expired(const ObjectStorageDeleteExpiredOptions& opts, + int64_t expired_time) override; + ObjectStorageResponse get_life_cycle(const ObjectStoragePathOptions& opts, + int64_t* expiration_days) override; + + ObjectStorageResponse check_versioning(const ObjectStoragePathOptions& opts) override; + + const std::shared_ptr<Aws::S3::S3Client>& s3_client() { return s3_client_; } private: - std::shared_ptr<Aws::S3::S3Client> _client; + std::shared_ptr<Aws::S3::S3Client> s3_client_; }; -} // namespace doris::io +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/test/s3_accessor_test.cpp b/cloud/test/s3_accessor_test.cpp index 972505c3999..362a0b11cbf 100644 --- a/cloud/test/s3_accessor_test.cpp +++ b/cloud/test/s3_accessor_test.cpp @@ -397,6 +397,7 @@ TEST(S3AccessorTest, check_bucket_versioning) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -422,6 +423,7 @@ TEST(S3AccessorTest, check_bucket_versioning_error) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>()); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); return_error_for_error_s3_client = true; std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { @@ -444,6 +446,7 @@ TEST(S3AccessorTest, get_bucket_lifecycle) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -473,6 +476,7 @@ TEST(S3AccessorTest, get_bucket_lifecycle_error) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>()); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); return_error_for_error_s3_client = true; std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { @@ -496,6 +500,7 @@ TEST(S3AccessorTest, list) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -519,6 +524,7 @@ TEST(S3AccessorTest, list_error) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>()); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); return_error_for_error_s3_client = true; std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { @@ -543,6 +549,7 @@ TEST(S3AccessorTest, put) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -569,6 +576,7 @@ TEST(S3AccessorTest, put_error) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>()); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -601,6 +609,7 @@ TEST(S3AccessorTest, exist) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -624,6 +633,7 @@ TEST(S3AccessorTest, exist_error) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>()); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -649,6 +659,7 @@ TEST(S3AccessorTest, delete_object) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -675,6 +686,7 @@ TEST(S3AccessorTest, gcs_delete_objects) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<GcsAccessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -707,6 +719,7 @@ TEST(S3AccessorTest, gcs_delete_objects_error) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>()); auto accessor = std::make_unique<GcsAccessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -744,6 +757,7 @@ TEST(S3AccessorTest, delete_objects) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -776,6 +790,7 @@ TEST(S3AccessorTest, delete_objects_error) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>()); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -817,6 +832,7 @@ TEST(S3AccessorTest, delete_expired_objects) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -871,6 +887,7 @@ TEST(S3AccessorTest, delete_object_by_prefix) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), @@ -898,6 +915,7 @@ TEST(S3AccessorTest, delete_object_by_prefix_error) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>()); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); + ASSERT_EQ(0, accessor->init()); auto sp = SyncPoint::get_instance(); std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org