This is an automated email from the ASF dual-hosted git repository.

plat1ko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 45661f941e8 [feature](Cloud) Introduce obj storage client interface to 
recycler (#35447)
45661f941e8 is described below

commit 45661f941e80265039429b26bf8bb6ef8ec410d5
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Mon Jun 3 20:12:14 2024 +0800

    [feature](Cloud) Introduce obj storage client interface to recycler (#35447)
    
    Extract basic interface to suite different kinds of ObjectStorage.
---
 be/src/common/status.h                             |  18 +-
 be/src/io/fs/err_utils.cpp                         |   7 +
 be/src/io/fs/obj_storage_client.h                  |  24 +-
 be/src/io/fs/s3_file_reader.cpp                    |   5 +-
 be/src/io/fs/s3_file_system.cpp                    |  73 ++--
 be/src/io/fs/s3_file_writer.cpp                    |  16 +-
 be/src/io/fs/s3_obj_storage_client.cpp             | 112 ++++--
 be/src/io/fs/s3_obj_storage_client.h               |   7 +-
 be/test/io/fs/s3_file_writer_test.cpp              |  17 +-
 cloud/src/recycler/obj_store_accessor.h            |  57 +++
 cloud/src/recycler/s3_accessor.cpp                 | 389 ++++-----------------
 cloud/src/recycler/s3_accessor.h                   |   6 +-
 cloud/src/recycler/s3_obj_client.cpp               | 371 ++++++++++++++++++++
 .../src/recycler/s3_obj_client.h                   |  48 ++-
 cloud/test/s3_accessor_test.cpp                    |  18 +
 15 files changed, 715 insertions(+), 453 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index 137a268b5aa..34e13749165 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -25,6 +25,14 @@
 
 namespace doris {
 
+namespace io {
+struct ObjectStorageStatus;
+}
+
+class Status;
+
+extern io::ObjectStorageStatus convert_to_obj_response(Status st);
+
 class PStatus;
 
 namespace ErrorCode {
@@ -352,11 +360,11 @@ public:
     Status() : _code(ErrorCode::OK), _err_msg(nullptr) {}
 
     // used to convert Exception to Status
-    Status(int code, std::string msg, std::string stack) : _code(code) {
+    Status(int code, std::string msg, std::string stack = "") : _code(code) {
         _err_msg = std::make_unique<ErrMsg>();
-        _err_msg->_msg = msg;
+        _err_msg->_msg = std::move(msg);
 #ifdef ENABLE_STACKTRACE
-        _err_msg->_stack = stack;
+        _err_msg->_stack = std::move(stack);
 #endif
     }
 
@@ -529,6 +537,10 @@ public:
 
     std::string_view msg() const { return _err_msg ? _err_msg->_msg : 
std::string_view(""); }
 
+    std::pair<int, std::string> retrieve_error_msg() { return {_code, 
std::move(_err_msg->_msg)}; }
+
+    friend io::ObjectStorageStatus convert_to_obj_response(Status st);
+
 private:
     int _code;
     struct ErrMsg {
diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp
index 8552c647cdd..6552d454824 100644
--- a/be/src/io/fs/err_utils.cpp
+++ b/be/src/io/fs/err_utils.cpp
@@ -27,10 +27,17 @@
 
 #include "common/status.h"
 #include "io/fs/hdfs.h"
+#include "io/fs/obj_storage_client.h"
 
 namespace doris {
 using namespace ErrorCode;
 
+io::ObjectStorageStatus convert_to_obj_response(Status st) {
+    int code = st._code;
+    std::string msg = st._err_msg == nullptr ? "" : 
std::move(st._err_msg->_msg);
+    return io::ObjectStorageStatus {.code = code, .msg = std::move(msg)};
+}
+
 namespace io {
 
 std::string errno_to_str() {
diff --git a/be/src/io/fs/obj_storage_client.h 
b/be/src/io/fs/obj_storage_client.h
index 40e0ff9a8fe..3ab0a8e2dea 100644
--- a/be/src/io/fs/obj_storage_client.h
+++ b/be/src/io/fs/obj_storage_client.h
@@ -46,14 +46,26 @@ struct ObjectStoragePathOptions {
 
 struct ObjectCompleteMultiParts {};
 
+struct ObjectStorageStatus {
+    int code = 0;
+    std::string msg = std::string();
+};
+
+// We only store error code along with err_msg instead of Status to unify BE 
and recycler's error handle logic
 struct ObjectStorageResponse {
-    Status status = Status::OK();
+    ObjectStorageStatus status {};
+    int http_code {200};
+    std::string request_id = std::string();
+};
+
+struct ObjectStorageUploadResponse {
+    ObjectStorageResponse resp {};
     std::optional<std::string> upload_id = std::nullopt;
     std::optional<std::string> etag = std::nullopt;
 };
 
 struct ObjectStorageHeadResponse {
-    Status status = Status::OK();
+    ObjectStorageResponse resp {};
     long long file_size {0};
 };
 
@@ -62,7 +74,8 @@ public:
     virtual ~ObjStorageClient() = default;
     // Create a multi-part upload request. On AWS-compatible systems, it will 
return an upload ID, but not on Azure.
     // The input parameters should include the bucket and key for the object 
storage.
-    virtual ObjectStorageResponse create_multipart_upload(const 
ObjectStoragePathOptions& opts) = 0;
+    virtual ObjectStorageUploadResponse create_multipart_upload(
+            const ObjectStoragePathOptions& opts) = 0;
     // To directly upload a piece of data to object storage and generate a 
user-visible file.
     // You need to clearly specify the bucket and key
     virtual ObjectStorageResponse put_object(const ObjectStoragePathOptions& 
opts,
@@ -71,8 +84,8 @@ public:
     // The temporary file's ID is the value of the part_num passed in
     // You need to specify the bucket and key along with the upload_id if it's 
AWS-compatible system
     // For the same bucket and key, as well as the same part_num, it will 
directly replace the original temporary file.
-    virtual ObjectStorageResponse upload_part(const ObjectStoragePathOptions& 
opts,
-                                              std::string_view stream, int 
part_num) = 0;
+    virtual ObjectStorageUploadResponse upload_part(const 
ObjectStoragePathOptions& opts,
+                                                    std::string_view stream, 
int part_num) = 0;
     // To combine the previously uploaded multiple file parts into a complete 
file, the file name is the name of the key passed in.
     // If it is an AWS-compatible system, the upload_id needs to be included.
     // After a successful execution, the large file can be accessed in the 
object storage
@@ -88,6 +101,7 @@ public:
                                              size_t offset, size_t bytes_read,
                                              size_t* size_return) = 0;
     // According to the passed bucket and prefix, it traverses and retrieves 
all files under the prefix, and returns the name and file size of all files.
+    // **Notice**: The files returned by this function contains the full key 
in object storage.
     virtual ObjectStorageResponse list_objects(const ObjectStoragePathOptions& 
opts,
                                                std::vector<FileInfo>* files) = 
0;
     // According to the bucket and prefix specified by the user, it performs 
batch deletion based on the object names in the object array.
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index 1ea6d5c3c03..e7775803198 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -117,8 +117,9 @@ Status S3FileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_rea
     auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
             to, offset, bytes_req, bytes_read);
     // clang-format on
-    if (!resp.status.ok()) {
-        return resp.status.append(fmt::format("failed to read from {}", 
_path.native()));
+    if (resp.status.code != ErrorCode::OK) {
+        return std::move(Status(resp.status.code, std::move(resp.status.msg))
+                                 .append(fmt::format("failed to read from {}", 
_path.native())));
     }
     if (*bytes_read != bytes_req) {
         return Status::InternalError("failed to read from {}(bytes read: {}, 
bytes req: {})",
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 2cd40ea87a6..27aff992f4c 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -131,9 +131,10 @@ Result<int64_t> ObjClientHolder::object_file_size(const 
std::string& bucket,
             .key = key,
     });
 
-    if (!resp.status.ok()) {
-        return ResultError(resp.status.append(
-                fmt::format("failed to head s3 file {}", full_s3_path(bucket, 
key))));
+    if (resp.resp.status.code != ErrorCode::OK) {
+        return ResultError(std::move(Status(resp.resp.status.code, 
std::move(resp.resp.status.msg))
+                                             .append(fmt::format("failed to 
head s3 file {}",
+                                                                 
full_s3_path(bucket, key)))));
     }
 
     return resp.file_size;
@@ -207,10 +208,11 @@ Status S3FileSystem::delete_file_impl(const Path& file) {
 
     auto resp = client->delete_object({.bucket = _bucket, .key = key});
 
-    if (resp.status.ok() || resp.status.is<ErrorCode::NOT_FOUND>()) {
+    if (resp.status.code == ErrorCode::OK || resp.status.code == 
ErrorCode::NOT_FOUND) {
         return Status::OK();
     }
-    return resp.status.append(fmt::format("failed to delete file {}", 
full_s3_path(key)));
+    return std::move(Status(resp.status.code, std::move(resp.status.msg))
+                             .append(fmt::format("failed to delete file {}", 
full_s3_path(key))));
 }
 
 Status S3FileSystem::delete_directory_impl(const Path& dir) {
@@ -222,13 +224,12 @@ Status S3FileSystem::delete_directory_impl(const Path& 
dir) {
         prefix.push_back('/');
     }
 
-    return client
-            ->delete_objects_recursively({
-                    .path = full_s3_path(prefix),
-                    .bucket = _bucket,
-                    .prefix = prefix,
-            })
-            .status;
+    auto resp = client->delete_objects_recursively({
+            .path = full_s3_path(prefix),
+            .bucket = _bucket,
+            .prefix = prefix,
+    });
+    return {resp.status.code, std::move(resp.status.msg)};
 }
 
 Status S3FileSystem::batch_delete_impl(const std::vector<Path>& remote_files) {
@@ -251,8 +252,9 @@ Status S3FileSystem::batch_delete_impl(const 
std::vector<Path>& remote_files) {
             return Status::OK();
         }
         // clang-format off
-        RETURN_IF_ERROR(client->delete_objects( {.bucket = _bucket,}, 
std::move(objects))
-                                .status);
+        if (auto resp = client->delete_objects( {.bucket = _bucket,}, 
std::move(objects)); resp.status.code != ErrorCode::OK) {
+            return {resp.status.code, std::move(resp.status.msg)};
+        }
         // clang-format on
     } while (path_iter != remote_files.end());
 
@@ -266,12 +268,14 @@ Status S3FileSystem::exists_impl(const Path& path, bool* 
res) const {
 
     auto resp = client->head_object({.bucket = _bucket, .key = key});
 
-    if (resp.status.ok()) {
+    if (resp.resp.status.code == ErrorCode::OK) {
         *res = true;
-    } else if (resp.status.is<ErrorCode::NOT_FOUND>()) {
+    } else if (resp.resp.status.code == ErrorCode::NOT_FOUND) {
         *res = false;
     } else {
-        return resp.status.append(fmt::format("failed to check exists {}", 
full_s3_path(key)));
+        return std::move(
+                Status(resp.resp.status.code, std::move(resp.resp.status.msg))
+                        .append(fmt::format(" failed to check exists {}", 
full_s3_path(key))));
     }
     return Status::OK();
 }
@@ -297,8 +301,13 @@ Status S3FileSystem::list_impl(const Path& dir, bool 
only_file, std::vector<File
     // clang-format off
     auto resp = client->list_objects( {.bucket = _bucket, .prefix = prefix,}, 
files);
     // clang-format on
+    if (resp.status.code == ErrorCode::OK) {
+        for (auto&& file : *files) {
+            file.file_name.erase(0, prefix.size());
+        }
+    }
 
-    return resp.status;
+    return {resp.status.code, std::move(resp.status.msg)};
 }
 
 Status S3FileSystem::rename_impl(const Path& orig_name, const Path& new_name) {
@@ -347,10 +356,13 @@ Status S3FileSystem::batch_upload_impl(const 
std::vector<Path>& local_files,
 
     std::vector<FileWriterPtr> obj_writers(local_files.size());
 
-    auto upload_task = [this](Path local_file, Path remote_file, 
FileWriterPtr* obj_writer) {
+    auto upload_task = [&, this](size_t idx) {
+        const auto& local_file = local_files[idx];
+        const auto& remote_file = remote_files[idx];
+        auto& obj_writer = obj_writers[idx];
         auto key = DORIS_TRY(get_key(remote_file));
         LOG(INFO) << "Start to upload " << local_file.native() << " to " << 
full_s3_path(key);
-        RETURN_IF_ERROR(create_file_impl(key, obj_writer, nullptr));
+        RETURN_IF_ERROR(create_file_impl(key, &obj_writer, nullptr));
         FileReaderSPtr local_reader;
         RETURN_IF_ERROR(io::global_local_filesystem()->open_file(local_file, 
&local_reader));
         size_t local_buffer_size = 
config::s3_file_system_local_upload_buffer_size;
@@ -360,24 +372,18 @@ Status S3FileSystem::batch_upload_impl(const 
std::vector<Path>& local_files,
             size_t bytes_read = 0;
             RETURN_IF_ERROR(local_reader->read_at(
                     cur_read, Slice {write_buffer.get(), local_buffer_size}, 
&bytes_read));
-            RETURN_IF_ERROR((*obj_writer)->append({write_buffer.get(), 
bytes_read}));
+            RETURN_IF_ERROR((*obj_writer).append({write_buffer.get(), 
bytes_read}));
             cur_read += bytes_read;
         }
-        RETURN_IF_ERROR((*obj_writer)->close());
+        RETURN_IF_ERROR((*obj_writer).close());
         return Status::OK();
     };
 
     std::vector<std::future<Status>> futures;
     for (int i = 0; i < local_files.size(); ++i) {
-        std::shared_ptr<std::packaged_task<Status(Path local_file, Path 
remote_file,
-                                                  FileWriterPtr * obj_writer)>>
-                task = std::make_shared<std::packaged_task<Status(Path 
local_file, Path remote_file,
-                                                                  
FileWriterPtr * obj_writer)>>(
-                        upload_task);
+        auto task = std::make_shared<std::packaged_task<Status(size_t 
idx)>>(upload_task);
         futures.emplace_back(task->get_future());
-        default_executor()->Submit(
-                [t = std::move(task), local = local_files[i], remote = 
remote_files[i],
-                 obj_writer = &obj_writers[i]]() mutable { (*t)(local, remote, 
obj_writer); });
+        default_executor()->Submit([t = std::move(task), idx = i]() mutable { 
(*t)(idx); });
     }
     Status s = Status::OK();
     for (auto&& f : futures) {
@@ -401,16 +407,15 @@ Status S3FileSystem::download_impl(const Path& 
remote_file, const Path& local_fi
     auto resp = client->get_object( {.bucket = _bucket, .key = key,},
             buf.get(), 0, size, &bytes_read);
     // clang-format on
-    if (!resp.status.ok()) {
-        return resp.status;
+    if (resp.status.code != ErrorCode::OK) {
+        return {resp.status.code, std::move(resp.status.msg)};
     }
     Aws::OFStream local_file_s;
     local_file_s.open(local_file, std::ios::out | std::ios::binary);
     if (local_file_s.good()) {
         local_file_s << StringViewStream(buf.get(), size).rdbuf();
     } else {
-        return Status::IOError("failed to download {}: failed to write file: 
{}",
-                               remote_file.native(), local_file.native());
+        return localfs_error(errno, fmt::format("failed to write file {}", 
local_file.native()));
     }
 
     return Status::OK();
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 32319235964..91b5aace12b 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -98,10 +98,10 @@ Status S3FileWriter::_create_multi_upload_request() {
         return Status::InternalError<false>("invalid obj storage client");
     }
     auto resp = client->create_multipart_upload(_obj_storage_path_opts);
-    if (resp.status.ok()) {
+    if (resp.resp.status.code == ErrorCode::OK) {
         _obj_storage_path_opts.upload_id = resp.upload_id;
     }
-    return resp.status;
+    return {resp.resp.status.code, std::move(resp.resp.status.msg)};
 }
 
 void S3FileWriter::_wait_until_finish(std::string_view task_name) {
@@ -304,8 +304,8 @@ void S3FileWriter::_upload_one_part(int64_t part_num, 
UploadFileBuffer& buf) {
         return;
     }
     auto resp = client->upload_part(_obj_storage_path_opts, 
buf.get_string_view_data(), part_num);
-    if (!resp.status.ok()) {
-        buf.set_status(std::move(resp.status));
+    if (resp.resp.status.code != ErrorCode::OK) {
+        buf.set_status(Status(resp.resp.status.code, 
std::move(resp.resp.status.msg)));
         return;
     }
     s3_bytes_written_total << buf.get_size();
@@ -353,8 +353,8 @@ Status S3FileWriter::_complete() {
         TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", 
&_completed_parts);
         auto resp = client->complete_multipart_upload(
                 _obj_storage_path_opts, S3CompleteMultiParts {.parts = 
_completed_parts});
-        if (!resp.status.ok()) {
-            return resp.status;
+        if (resp.status.code != ErrorCode::OK) {
+            return {resp.status.code, std::move(resp.status.msg)};
         }
     }
     s3_file_created_total << 1;
@@ -388,8 +388,8 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
     }
     TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf);
     auto resp = client->put_object(_obj_storage_path_opts, 
buf.get_string_view_data());
-    if (!resp.status.ok()) {
-        buf.set_status(std::move(resp.status));
+    if (resp.status.code != ErrorCode::OK) {
+        buf.set_status({resp.status.code, std::move(resp.status.msg)});
         return;
     }
     s3_file_created_total << 1;
diff --git a/be/src/io/fs/s3_obj_storage_client.cpp 
b/be/src/io/fs/s3_obj_storage_client.cpp
index 33d15e516b4..c45074e262c 100644
--- a/be/src/io/fs/s3_obj_storage_client.cpp
+++ b/be/src/io/fs/s3_obj_storage_client.cpp
@@ -82,9 +82,8 @@ using Aws::S3::Model::UploadPartOutcome;
 
 namespace doris::io {
 using namespace Aws::S3::Model;
-using Aws::S3::S3Client;
 
-ObjectStorageResponse S3ObjStorageClient::create_multipart_upload(
+ObjectStorageUploadResponse S3ObjStorageClient::create_multipart_upload(
         const ObjectStoragePathOptions& opts) {
     CreateMultipartUploadRequest create_request;
     create_request.WithBucket(opts.bucket).WithKey(opts.key);
@@ -97,13 +96,19 @@ ObjectStorageResponse 
S3ObjStorageClient::create_multipart_upload(
     SYNC_POINT_CALLBACK("s3_file_writer::_open", &outcome);
 
     if (outcome.IsSuccess()) {
-        return ObjectStorageResponse {.upload_id 
{outcome.GetResult().GetUploadId()}};
+        return ObjectStorageUploadResponse {.upload_id 
{outcome.GetResult().GetUploadId()}};
     }
-    return ObjectStorageResponse {
-            .status = s3fs_error(
-                    outcome.GetError(),
-                    fmt::format("failed to create multipart upload {} ", 
opts.path.native()))};
+
+    return ObjectStorageUploadResponse {
+            .resp = {convert_to_obj_response(
+                             s3fs_error(outcome.GetError(),
+                                        fmt::format("failed to create 
multipart upload {} ",
+                                                    opts.path.native()))),
+                     static_cast<int>(outcome.GetError().GetResponseCode()),
+                     outcome.GetError().GetRequestId()},
+    };
 }
+
 ObjectStorageResponse S3ObjStorageClient::put_object(const 
ObjectStoragePathOptions& opts,
                                                      std::string_view stream) {
     Aws::S3::Model::PutObjectRequest request;
@@ -122,12 +127,15 @@ ObjectStorageResponse 
S3ObjStorageClient::put_object(const ObjectStoragePathOpti
         auto st = s3fs_error(response.GetError(),
                              fmt::format("failed to put object {}", 
opts.path.native()));
         LOG(WARNING) << st;
-        return ObjectStorageResponse {.status = std::move(st)};
+        return ObjectStorageResponse {convert_to_obj_response(std::move(st)),
+                                      
static_cast<int>(response.GetError().GetResponseCode()),
+                                      response.GetError().GetRequestId()};
     }
     return {};
 }
-ObjectStorageResponse S3ObjStorageClient::upload_part(const 
ObjectStoragePathOptions& opts,
-                                                      std::string_view stream, 
int part_num) {
+
+ObjectStorageUploadResponse S3ObjStorageClient::upload_part(const 
ObjectStoragePathOptions& opts,
+                                                            std::string_view 
stream, int part_num) {
     UploadPartRequest upload_request;
     upload_request.WithBucket(opts.bucket)
             .WithKey(opts.key)
@@ -160,10 +168,14 @@ ObjectStorageResponse 
S3ObjStorageClient::upload_part(const ObjectStoragePathOpt
                 upload_part_outcome.GetError().GetExceptionName(),
                 upload_part_outcome.GetError().GetResponseCode());
         LOG_WARNING(s.to_string());
-        return ObjectStorageResponse {.status = std::move(s)};
+        return ObjectStorageUploadResponse {
+                .resp = {convert_to_obj_response(std::move(s)),
+                         
static_cast<int>(upload_part_outcome.GetError().GetResponseCode()),
+                         upload_part_outcome.GetError().GetRequestId()}};
     }
-    return ObjectStorageResponse {.etag = 
upload_part_outcome.GetResult().GetETag()};
+    return ObjectStorageUploadResponse {.etag = 
upload_part_outcome.GetResult().GetETag()};
 }
+
 ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload(
         const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts& 
completed_parts) {
     CompleteMultipartUploadRequest complete_request;
@@ -188,10 +200,13 @@ ObjectStorageResponse 
S3ObjStorageClient::complete_multipart_upload(
                              fmt::format("failed to complete multi part upload 
{}, upload_id={}",
                                          opts.path.native(), *opts.upload_id));
         LOG(WARNING) << st;
-        return {.status = std::move(st)};
+        return {convert_to_obj_response(std::move(st)),
+                
static_cast<int>(complete_outcome.GetError().GetResponseCode()),
+                complete_outcome.GetError().GetRequestId()};
     }
     return {};
 }
+
 ObjectStorageHeadResponse S3ObjStorageClient::head_object(const 
ObjectStoragePathOptions& opts) {
     Aws::S3::Model::HeadObjectRequest request;
     request.WithBucket(opts.bucket).WithKey(opts.key);
@@ -200,15 +215,20 @@ ObjectStorageHeadResponse 
S3ObjStorageClient::head_object(const ObjectStoragePat
     auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
             _client->HeadObject(request), "s3_file_system::head_object", 
std::ref(request).get());
     if (outcome.IsSuccess()) {
-        return {.status = Status::OK(), .file_size = 
outcome.GetResult().GetContentLength()};
+        return {.resp = {convert_to_obj_response(Status::OK())},
+                .file_size = outcome.GetResult().GetContentLength()};
     } else if (outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
-        return {.status = Status::NotFound("")};
+        return {.resp = {convert_to_obj_response(Status::NotFound(""))}};
     } else {
-        return {.status = s3fs_error(outcome.GetError(),
-                                     fmt::format("failed to check exists {}", 
opts.key))};
+        return {.resp = {convert_to_obj_response(
+                                 s3fs_error(outcome.GetError(),
+                                            fmt::format("failed to check 
exists {}", opts.key))),
+                         
static_cast<int>(outcome.GetError().GetResponseCode()),
+                         outcome.GetError().GetRequestId()}};
     }
     return {};
 }
+
 ObjectStorageResponse S3ObjStorageClient::get_object(const 
ObjectStoragePathOptions& opts,
                                                      void* buffer, size_t 
offset, size_t bytes_read,
                                                      size_t* size_return) {
@@ -220,17 +240,21 @@ ObjectStorageResponse 
S3ObjStorageClient::get_object(const ObjectStoragePathOpti
     SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
     auto outcome = _client->GetObject(request);
     if (!outcome.IsSuccess()) {
-        return {.status = s3fs_error(outcome.GetError(),
-                                     fmt::format("failed to read from {}", 
opts.path.native()))};
+        return {convert_to_obj_response(
+                        s3fs_error(outcome.GetError(),
+                                   fmt::format("failed to read from {}", 
opts.path.native()))),
+                static_cast<int>(outcome.GetError().GetResponseCode()),
+                outcome.GetError().GetRequestId()};
     }
     *size_return = outcome.GetResult().GetContentLength();
     if (*size_return != bytes_read) {
-        return {.status = Status::InternalError(
-                        "failed to read from {}(bytes read: {}, bytes req: 
{})", opts.path.native(),
-                        *size_return, bytes_read)};
+        return {convert_to_obj_response(
+                Status::InternalError("failed to read from {}(bytes read: {}, 
bytes req: {})",
+                                      opts.path.native(), *size_return, 
bytes_read))};
     }
     return {};
 }
+
 ObjectStorageResponse S3ObjStorageClient::list_objects(const 
ObjectStoragePathOptions& opts,
                                                        std::vector<FileInfo>* 
files) {
     Aws::S3::Model::ListObjectsV2Request request;
@@ -243,8 +267,11 @@ ObjectStorageResponse 
S3ObjStorageClient::list_objects(const ObjectStoragePathOp
             outcome = _client->ListObjectsV2(request);
         }
         if (!outcome.IsSuccess()) {
-            return {.status = s3fs_error(outcome.GetError(),
-                                         fmt::format("failed to list {}", 
opts.prefix))};
+            files->clear();
+            return {convert_to_obj_response(s3fs_error(
+                            outcome.GetError(), fmt::format("failed to list 
{}", opts.prefix))),
+                    static_cast<int>(outcome.GetError().GetResponseCode()),
+                    outcome.GetError().GetRequestId()};
         }
         for (const auto& obj : outcome.GetResult().GetContents()) {
             std::string key = obj.GetKey();
@@ -260,6 +287,7 @@ ObjectStorageResponse 
S3ObjStorageClient::list_objects(const ObjectStoragePathOp
     } while (is_trucated);
     return {};
 }
+
 ObjectStorageResponse S3ObjStorageClient::delete_objects(const 
ObjectStoragePathOptions& opts,
                                                          
std::vector<std::string> objs) {
     Aws::S3::Model::DeleteObjectsRequest delete_request;
@@ -276,16 +304,20 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects(const ObjectStoragePath
     SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
     auto delete_outcome = _client->DeleteObjects(delete_request);
     if (!delete_outcome.IsSuccess()) {
-        return {.status = s3fs_error(delete_outcome.GetError(),
-                                     fmt::format("failed to delete dir {}", 
opts.key))};
+        return {convert_to_obj_response(
+                        s3fs_error(delete_outcome.GetError(),
+                                   fmt::format("failed to delete dir {}", 
opts.key))),
+                static_cast<int>(delete_outcome.GetError().GetResponseCode()),
+                delete_outcome.GetError().GetRequestId()};
     }
     if (!delete_outcome.GetResult().GetErrors().empty()) {
         const auto& e = delete_outcome.GetResult().GetErrors().front();
-        return {.status = Status::InternalError("failed to delete object {}: 
{}", e.GetKey(),
-                                                e.GetMessage())};
+        return {convert_to_obj_response(Status::InternalError("failed to 
delete object {}: {}",
+                                                              e.GetKey(), 
e.GetMessage()))};
     }
     return {};
 }
+
 ObjectStorageResponse S3ObjStorageClient::delete_object(const 
ObjectStoragePathOptions& opts) {
     Aws::S3::Model::DeleteObjectRequest request;
     request.WithBucket(opts.bucket).WithKey(opts.key);
@@ -296,8 +328,10 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_object(const ObjectStoragePathO
         outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
         return {};
     }
-    return {.status = s3fs_error(outcome.GetError(),
-                                 fmt::format("failed to delete file {}", 
opts.key))};
+    return {convert_to_obj_response(s3fs_error(outcome.GetError(),
+                                               fmt::format("failed to delete 
file {}", opts.key))),
+            static_cast<int>(outcome.GetError().GetResponseCode()),
+            outcome.GetError().GetRequestId()};
 }
 
 ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively(
@@ -314,9 +348,11 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects_recursively(
             outcome = _client->ListObjectsV2(request);
         }
         if (!outcome.IsSuccess()) {
-            return {.status = s3fs_error(
+            return {convert_to_obj_response(s3fs_error(
                             outcome.GetError(),
-                            fmt::format("failed to list objects when delete 
dir {}", opts.prefix))};
+                            fmt::format("failed to list objects when delete 
dir {}", opts.prefix))),
+                    static_cast<int>(outcome.GetError().GetResponseCode()),
+                    outcome.GetError().GetRequestId()};
         }
         const auto& result = outcome.GetResult();
         Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
@@ -331,13 +367,16 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects_recursively(
             SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
             auto delete_outcome = _client->DeleteObjects(delete_request);
             if (!delete_outcome.IsSuccess()) {
-                return {.status = s3fs_error(delete_outcome.GetError(),
-                                             fmt::format("failed to delete dir 
{}", opts.key))};
+                return {convert_to_obj_response(
+                                s3fs_error(delete_outcome.GetError(),
+                                           fmt::format("failed to delete dir 
{}", opts.key))),
+                        
static_cast<int>(delete_outcome.GetError().GetResponseCode()),
+                        delete_outcome.GetError().GetRequestId()};
             }
             if (!delete_outcome.GetResult().GetErrors().empty()) {
                 const auto& e = delete_outcome.GetResult().GetErrors().front();
-                return {.status = Status::InternalError("failed to delete 
object {}: {}", opts.key,
-                                                        e.GetMessage())};
+                return {convert_to_obj_response(Status::InternalError(
+                        "failed to delete object {}: {}", opts.key, 
e.GetMessage()))};
             }
         }
         is_trucated = result.GetIsTruncated();
@@ -345,4 +384,5 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects_recursively(
     } while (is_trucated);
     return {};
 }
+
 } // namespace doris::io
\ No newline at end of file
diff --git a/be/src/io/fs/s3_obj_storage_client.h 
b/be/src/io/fs/s3_obj_storage_client.h
index 47a9ed733db..ea2b00e58df 100644
--- a/be/src/io/fs/s3_obj_storage_client.h
+++ b/be/src/io/fs/s3_obj_storage_client.h
@@ -39,11 +39,12 @@ class S3ObjStorageClient final : public ObjStorageClient {
 public:
     S3ObjStorageClient(std::shared_ptr<Aws::S3::S3Client> client) : 
_client(std::move(client)) {}
     ~S3ObjStorageClient() override = default;
-    ObjectStorageResponse create_multipart_upload(const 
ObjectStoragePathOptions& opts) override;
+    ObjectStorageUploadResponse create_multipart_upload(
+            const ObjectStoragePathOptions& opts) override;
     ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts,
                                      std::string_view stream) override;
-    ObjectStorageResponse upload_part(const ObjectStoragePathOptions& opts, 
std::string_view,
-                                      int partNum) override;
+    ObjectStorageUploadResponse upload_part(const ObjectStoragePathOptions& 
opts, std::string_view,
+                                            int partNum) override;
     ObjectStorageResponse complete_multipart_upload(
             const ObjectStoragePathOptions& opts,
             const ObjectCompleteMultiParts& completed_parts) override;
diff --git a/be/test/io/fs/s3_file_writer_test.cpp 
b/be/test/io/fs/s3_file_writer_test.cpp
index 7bad5ad8630..75a49d813a4 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -943,9 +943,8 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_2) {
     sp->set_call_back("S3FileWriter::_complete:2", [](auto&& outcome) {
         // Deliberately make one upload one part task fail to test if s3 file 
writer could
         // handle io error
-        std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>* parts =
-                
try_any_cast<std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>(
-                        outcome.back());
+        auto* parts = 
try_any_cast<std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>(
+                outcome.back());
         size_t size = parts->size();
         parts->back()->SetPartNumber(size + 2);
     });
@@ -992,12 +991,9 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_1) {
     sp->set_call_back("S3FileWriter::_complete:1", [](auto&& outcome) {
         // Deliberately make one upload one part task fail to test if s3 file 
writer could
         // handle io error
-        const std::pair<std::atomic_bool*,
-                        
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>& points =
-                try_any_cast<const std::pair<
-                        std::atomic_bool*,
-                        
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>&>(
-                        outcome.back());
+        const auto& points = try_any_cast<const std::pair<
+                std::atomic_bool*, 
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>&>(
+                outcome.back());
         (*points.first) = false;
         points.second->pop_back();
     });
@@ -1044,7 +1040,8 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_3) {
     sp->set_call_back("S3FileWriter::_complete:3", [](auto&& outcome) {
         auto pair = try_any_cast_ret<io::ObjectStorageResponse>(outcome);
         pair->second = true;
-        pair->first = io::ObjectStorageResponse {.status = 
Status::IOError("inject error")};
+        pair->first = io::ObjectStorageResponse {
+                .status = 
convert_to_obj_response(Status::IOError<false>("inject error"))};
     });
     Defer defer {[&]() { sp->clear_call_back("S3FileWriter::_complete:3"); }};
     auto client = s3_fs->client_holder();
diff --git a/cloud/src/recycler/obj_store_accessor.h 
b/cloud/src/recycler/obj_store_accessor.h
index 8787841aba7..a29266133ca 100644
--- a/cloud/src/recycler/obj_store_accessor.h
+++ b/cloud/src/recycler/obj_store_accessor.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <functional>
 #include <string>
 #include <vector>
 
@@ -25,6 +26,7 @@ namespace doris::cloud {
 struct ObjectMeta {
     std::string path; // Relative path to accessor prefix
     int64_t size {0};
+    int64_t last_modify_second {0};
 };
 
 enum class AccessorType {
@@ -69,4 +71,59 @@ private:
     const AccessorType type_;
 };
 
+struct ObjectStoragePathOptions {
+    std::string bucket; // blob container in azure
+    std::string key;    // blob name
+    std::string prefix; // for batch delete and recursive delete
+    std::string_view endpoint;
+};
+
+struct ObjectStorageDeleteExpiredOptions {
+    ObjectStoragePathOptions path_opts;
+    std::function<std::string(const std::string& path)> relative_path_factory;
+};
+
+struct ObjectCompleteMultiParts {};
+
+struct ObjectStorageResponse {
+    ObjectStorageResponse(int r, std::string msg = "") : ret(r), 
error_msg(std::move(msg)) {}
+    // clang-format off
+    int ret {0}; // To unify the error handle logic with BE, we'd better use 
the same error code as BE
+    // clang-format on
+    std::string error_msg;
+};
+
+// wrapper class owned by concret fs
+class ObjStorageClient {
+public:
+    virtual ~ObjStorageClient() = default;
+    // To directly upload a piece of data to object storage and generate a 
user-visible file.
+    // You need to clearly specify the bucket and key
+    virtual ObjectStorageResponse put_object(const ObjectStoragePathOptions& 
opts,
+                                             std::string_view stream) = 0;
+    // According to the passed bucket and key, it will access whether the 
corresponding file exists in the object storage.
+    // If it exists, it will return the corresponding file size
+    virtual ObjectStorageResponse head_object(const ObjectStoragePathOptions& 
opts) = 0;
+    // According to the passed bucket and prefix, it traverses and retrieves 
all files under the prefix, and returns the name and file size of all files.
+    // **Attention**: The ObjectMeta contains the full key in object storage
+    virtual ObjectStorageResponse list_objects(const ObjectStoragePathOptions& 
opts,
+                                               std::vector<ObjectMeta>* files) 
= 0;
+    // According to the bucket and prefix specified by the user, it performs 
batch deletion based on the object names in the object array.
+    virtual ObjectStorageResponse delete_objects(const 
ObjectStoragePathOptions& opts,
+                                                 std::vector<std::string> 
objs) = 0;
+    // Delete the file named key in the object storage bucket.
+    virtual ObjectStorageResponse delete_object(const 
ObjectStoragePathOptions& opts) = 0;
+    // According to the prefix, recursively delete all files under the prefix.
+    virtual ObjectStorageResponse delete_objects_recursively(
+            const ObjectStoragePathOptions& opts) = 0;
+    // Delete all the objects under the prefix which expires before the 
expired_time
+    virtual ObjectStorageResponse delete_expired(const 
ObjectStorageDeleteExpiredOptions& opts,
+                                                 int64_t expired_time) = 0;
+    // Get the objects' expiration time on the bucket
+    virtual ObjectStorageResponse get_life_cycle(const 
ObjectStoragePathOptions& opts,
+                                                 int64_t* expiration_days) = 0;
+    // Check if the objects' versioning is on or off
+    virtual ObjectStorageResponse check_versioning(const 
ObjectStoragePathOptions& opts) = 0;
+};
+
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index 3f5d78b1c8f..04f642f3831 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -32,6 +32,7 @@
 
 #include <algorithm>
 #include <execution>
+#include <type_traits>
 #include <utility>
 
 #include "common/config.h"
@@ -39,6 +40,7 @@
 #include "common/sync_point.h"
 #include "rate-limiter/s3_rate_limiter.h"
 #include "recycler/obj_store_accessor.h"
+#include "recycler/s3_obj_client.h"
 
 namespace doris::cloud {
 
@@ -53,43 +55,28 @@ private:
     std::array<std::unique_ptr<S3RateLimiterHolder>, 2> _rate_limiters;
 };
 
-[[maybe_unused]] static Aws::Client::AWSError<Aws::S3::S3Errors> 
s3_error_factory() {
-    return {Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds 
limit", false};
-}
-
 template <typename Func>
-auto do_s3_rate_limit(S3RateLimitType type, Func callback) -> 
decltype(callback()) {
+auto s3_rate_limit(S3RateLimitType op, Func callback) -> decltype(callback()) {
     using T = decltype(callback());
     if (!config::enable_s3_rate_limiter) {
         return callback();
     }
-    auto sleep_duration = 
AccessorRateLimiter::instance().rate_limiter(type)->add(1);
+    auto sleep_duration = 
AccessorRateLimiter::instance().rate_limiter(op)->add(1);
     if (sleep_duration < 0) {
-        return T(s3_error_factory());
+        return T(-1);
     }
     return callback();
 }
 
-#ifndef UNIT_TEST
-#define HELP_MACRO(ret, req, point_name)
-#else
-#define HELP_MACRO(ret, req, point_name)                       \
-    do {                                                       \
-        std::pair p {&ret, &req};                              \
-        [[maybe_unused]] auto ret_pair = [&p]() mutable {      \
-            TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, &p); \
-            return p;                                          \
-        }();                                                   \
-        return ret;                                            \
-    } while (false);
-#endif
-#define SYNC_POINT_HOOK_RETURN_VALUE(expr, request, point_name, type) \
-    [&]() -> decltype(auto) {                                         \
-        using T = decltype((expr));                                   \
-        [[maybe_unused]] T t;                                         \
-        HELP_MACRO(t, request, point_name)                            \
-        return do_s3_rate_limit(type, [&]() { return (expr); });      \
-    }()
+template <typename Func>
+auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
+    return s3_rate_limit(S3RateLimitType::GET, std::move(callback));
+}
+
+template <typename Func>
+auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
+    return s3_rate_limit(S3RateLimitType::PUT, std::move(callback));
+}
 
 AccessorRateLimiter::AccessorRateLimiter()
         : _rate_limiters {std::make_unique<S3RateLimiterHolder>(
@@ -157,83 +144,20 @@ int S3Accessor::init() {
     aws_config.region = conf_.region;
     aws_config.retryStrategy = 
std::make_shared<Aws::Client::DefaultRetryStrategy>(
             /*maxRetries = 10, scaleFactor = 25*/);
-    s3_client_ = std::make_shared<Aws::S3::S3Client>(
+    auto s3_client = std::make_shared<Aws::S3::S3Client>(
             std::move(aws_cred), std::move(aws_config),
             Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
             true /* useVirtualAddressing */);
+    obj_client_ = std::make_shared<S3ObjClient>(std::move(s3_client));
     return 0;
 }
 
 int S3Accessor::delete_objects_by_prefix(const std::string& relative_path) {
-    Aws::S3::Model::ListObjectsV2Request request;
-    auto prefix = get_key(relative_path);
-    request.WithBucket(conf_.bucket).WithPrefix(prefix);
-
-    Aws::S3::Model::DeleteObjectsRequest delete_request;
-    delete_request.SetBucket(conf_.bucket);
-    bool is_truncated = false;
-    do {
-        auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-                s3_client_->ListObjectsV2(request), std::ref(request).get(),
-                "s3_client::list_objects_v2", S3RateLimitType::GET);
-        if (!outcome.IsSuccess()) {
-            LOG_WARNING("failed to list objects")
-                    .tag("endpoint", conf_.endpoint)
-                    .tag("bucket", conf_.bucket)
-                    .tag("prefix", prefix)
-                    .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                    .tag("error", outcome.GetError().GetMessage());
-            if (outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::FORBIDDEN) {
-                return 1;
-            }
-            return -1;
-        }
-        const auto& result = outcome.GetResult();
-        VLOG_DEBUG << "get " << result.GetContents().size() << " objects";
-        Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
-        objects.reserve(result.GetContents().size());
-        for (const auto& obj : result.GetContents()) {
-            objects.emplace_back().SetKey(obj.GetKey());
-            LOG_INFO("delete object")
-                    .tag("endpoint", conf_.endpoint)
-                    .tag("bucket", conf_.bucket)
-                    .tag("key", obj.GetKey());
-        }
-        if (!objects.empty()) {
-            Aws::S3::Model::Delete del;
-            del.WithObjects(std::move(objects)).SetQuiet(true);
-            delete_request.SetDelete(std::move(del));
-            auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-                    s3_client_->DeleteObjects(delete_request), 
std::ref(delete_request).get(),
-                    "s3_client::delete_objects", S3RateLimitType::PUT);
-            if (!delete_outcome.IsSuccess()) {
-                LOG_WARNING("failed to delete objects")
-                        .tag("endpoint", conf_.endpoint)
-                        .tag("bucket", conf_.bucket)
-                        .tag("prefix", prefix)
-                        .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                        .tag("error", outcome.GetError().GetMessage());
-                if (delete_outcome.GetError().GetResponseCode() ==
-                    Aws::Http::HttpResponseCode::FORBIDDEN) {
-                    return 1;
-                }
-                return -2;
-            }
-            if (!delete_outcome.GetResult().GetErrors().empty()) {
-                const auto& e = delete_outcome.GetResult().GetErrors().front();
-                LOG_WARNING("failed to delete object")
-                        .tag("endpoint", conf_.endpoint)
-                        .tag("bucket", conf_.bucket)
-                        .tag("key", e.GetKey())
-                        .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                        .tag("error", e.GetMessage());
-                return -3;
-            }
-        }
-        is_truncated = result.GetIsTruncated();
-        request.SetContinuationToken(result.GetNextContinuationToken());
-    } while (is_truncated);
-    return 0;
+    return s3_get_rate_limit([&]() {
+               return obj_client_->delete_objects_recursively(
+                       {.bucket = conf_.bucket, .prefix = 
get_key(relative_path)});
+           })
+            .ret;
 }
 
 int S3Accessor::delete_objects(const std::vector<std::string>& relative_paths) 
{
@@ -244,11 +168,8 @@ int S3Accessor::delete_objects(const 
std::vector<std::string>& relative_paths) {
     constexpr size_t max_delete_batch = 1000;
     auto path_iter = relative_paths.begin();
 
-    Aws::S3::Model::DeleteObjectsRequest delete_request;
-    delete_request.SetBucket(conf_.bucket);
     do {
-        Aws::S3::Model::Delete del;
-        Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
+        Aws::Vector<std::string> objects;
         auto path_begin = path_iter;
         for (; path_iter != relative_paths.end() && (path_iter - path_begin < 
max_delete_batch);
              ++path_iter) {
@@ -258,36 +179,16 @@ int S3Accessor::delete_objects(const 
std::vector<std::string>& relative_paths) {
                     .tag("bucket", conf_.bucket)
                     .tag("key", key)
                     .tag("size", objects.size());
-            objects.emplace_back().SetKey(std::move(key));
+            objects.emplace_back(std::move(key));
         }
         if (objects.empty()) {
             return 0;
         }
-        del.WithObjects(std::move(objects)).SetQuiet(true);
-        delete_request.SetDelete(std::move(del));
-        auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-                s3_client_->DeleteObjects(delete_request), 
std::ref(delete_request).get(),
-                "s3_client::delete_objects", S3RateLimitType::PUT);
-        if (!delete_outcome.IsSuccess()) {
-            LOG_WARNING("failed to delete objects")
-                    .tag("endpoint", conf_.endpoint)
-                    .tag("bucket", conf_.bucket)
-                    .tag("key[0]", 
delete_request.GetDelete().GetObjects().front().GetKey())
-                    .tag("responseCode",
-                         
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
-                    .tag("error", delete_outcome.GetError().GetMessage());
-            return -1;
-        }
-        if (!delete_outcome.GetResult().GetErrors().empty()) {
-            const auto& e = delete_outcome.GetResult().GetErrors().front();
-            LOG_WARNING("failed to delete object")
-                    .tag("endpoint", conf_.endpoint)
-                    .tag("bucket", conf_.bucket)
-                    .tag("key", e.GetKey())
-                    .tag("responseCode",
-                         
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
-                    .tag("error", e.GetMessage());
-            return -2;
+        if (auto delete_resp = s3_put_rate_limit([&]() {
+                return obj_client_->delete_objects({.bucket = conf_.bucket}, 
std::move(objects));
+            });
+            delete_resp.ret != 0) {
+            return delete_resp.ret;
         }
     } while (path_iter != relative_paths.end());
 
@@ -295,218 +196,64 @@ int S3Accessor::delete_objects(const 
std::vector<std::string>& relative_paths) {
 }
 
 int S3Accessor::delete_object(const std::string& relative_path) {
-    Aws::S3::Model::DeleteObjectRequest request;
-    auto key = get_key(relative_path);
-    request.WithBucket(conf_.bucket).WithKey(key);
-    auto outcome =
-            SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObject(request), 
std::ref(request).get(),
-                                         "s3_client::delete_object", 
S3RateLimitType::PUT);
-    if (!outcome.IsSuccess()) {
-        LOG_WARNING("failed to delete object")
-                .tag("endpoint", conf_.endpoint)
-                .tag("bucket", conf_.bucket)
-                .tag("key", key)
-                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                .tag("error", outcome.GetError().GetMessage())
-                .tag("exception", outcome.GetError().GetExceptionName());
-        return -1;
-    }
-    return 0;
+    return s3_put_rate_limit([&]() {
+        return obj_client_->delete_object({.bucket = conf_.bucket, .key = 
get_key(relative_path)})
+                .ret;
+    });
 }
 
 int S3Accessor::put_object(const std::string& relative_path, const 
std::string& content) {
-    Aws::S3::Model::PutObjectRequest request;
-    auto key = get_key(relative_path);
-    request.WithBucket(conf_.bucket).WithKey(key);
-    auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor");
-    *input << content;
-    request.SetBody(input);
-    auto outcome =
-            SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request), 
std::ref(request).get(),
-                                         "s3_client::put_object", 
S3RateLimitType::PUT);
-    if (!outcome.IsSuccess()) {
-        LOG_WARNING("failed to put object")
-                .tag("endpoint", conf_.endpoint)
-                .tag("bucket", conf_.bucket)
-                .tag("key", key)
-                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                .tag("error", outcome.GetError().GetMessage());
-        return -1;
-    }
-    return 0;
+    return s3_put_rate_limit([&]() {
+        return obj_client_
+                ->put_object({.bucket = conf_.bucket, .key = 
get_key(relative_path)}, content)
+                .ret;
+    });
 }
 
 int S3Accessor::list(const std::string& relative_path, 
std::vector<ObjectMeta>* files) {
-    Aws::S3::Model::ListObjectsV2Request request;
-    auto prefix = get_key(relative_path);
-    request.WithBucket(conf_.bucket).WithPrefix(prefix);
-
-    bool is_truncated = false;
-    do {
-        auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-                s3_client_->ListObjectsV2(request), std::ref(request).get(),
-                "s3_client::list_objects_v2", S3RateLimitType::GET);
-        ;
-        if (!outcome.IsSuccess()) {
-            LOG_WARNING("failed to list objects")
-                    .tag("endpoint", conf_.endpoint)
-                    .tag("bucket", conf_.bucket)
-                    .tag("prefix", prefix)
-                    .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                    .tag("error", outcome.GetError().GetMessage());
-            return -1;
-        }
-        const auto& result = outcome.GetResult();
-        VLOG_DEBUG << "get " << result.GetContents().size() << " objects";
-        for (const auto& obj : result.GetContents()) {
-            files->push_back({obj.GetKey().substr(conf_.prefix.size() + 1), 
obj.GetSize()});
+    return s3_get_rate_limit([&]() {
+        auto resp = obj_client_->list_objects(
+                {.bucket = conf_.bucket, .prefix = get_key(relative_path)}, 
files);
+
+        if (resp.ret == 0) {
+            auto pos = conf_.prefix.size() + 1;
+            for (auto&& file : *files) {
+                file.path = file.path.substr(pos);
+            }
         }
-        is_truncated = result.GetIsTruncated();
-        request.SetContinuationToken(result.GetNextContinuationToken());
-    } while (is_truncated);
-    return 0;
+
+        return resp.ret;
+    });
 }
 
 int S3Accessor::exist(const std::string& relative_path) {
-    Aws::S3::Model::HeadObjectRequest request;
-    auto key = get_key(relative_path);
-    request.WithBucket(conf_.bucket).WithKey(key);
-    auto outcome =
-            SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request), 
std::ref(request).get(),
-                                         "s3_client::head_object", 
S3RateLimitType::GET);
-    if (outcome.IsSuccess()) {
-        return 0;
-    } else if (outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
-        return 1;
-    } else {
-        LOG_WARNING("failed to head object")
-                .tag("endpoint", conf_.endpoint)
-                .tag("bucket", conf_.bucket)
-                .tag("key", key)
-                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                .tag("error", outcome.GetError().GetMessage());
-        return -1;
-    }
+    return s3_get_rate_limit([&]() {
+        return obj_client_->head_object({.bucket = conf_.bucket, .key = 
get_key(relative_path)})
+                .ret;
+    });
 }
 
 int S3Accessor::delete_expired_objects(const std::string& relative_path, 
int64_t expired_time) {
-    Aws::S3::Model::ListObjectsV2Request request;
-    auto prefix = get_key(relative_path);
-    request.WithBucket(conf_.bucket).WithPrefix(prefix);
-
-    bool is_truncated = false;
-    do {
-        auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-                s3_client_->ListObjectsV2(request), std::ref(request).get(),
-                "s3_client::list_objects_v2", S3RateLimitType::GET);
-        if (!outcome.IsSuccess()) {
-            LOG_WARNING("failed to list objects")
-                    .tag("endpoint", conf_.endpoint)
-                    .tag("bucket", conf_.bucket)
-                    .tag("prefix", prefix)
-                    .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                    .tag("error", outcome.GetError().GetMessage());
-            return -1;
-        }
-        const auto& result = outcome.GetResult();
-        std::vector<std::string> expired_keys;
-        for (const auto& obj : result.GetContents()) {
-            if (obj.GetLastModified().Seconds() < expired_time) {
-                auto relative_key = get_relative_path(obj.GetKey());
-                if (relative_key.empty()) {
-                    LOG_WARNING("failed get relative path")
-                            .tag("prefix", conf_.prefix)
-                            .tag("key", obj.GetKey());
-                } else {
-                    expired_keys.push_back(relative_key);
-                    LOG_INFO("delete expired object")
-                            .tag("prefix", conf_.prefix)
-                            .tag("key", obj.GetKey())
-                            .tag("relative_key", relative_key)
-                            .tag("lastModifiedTime", 
obj.GetLastModified().Seconds())
-                            .tag("expiredTime", expired_time);
-                }
-            }
-        }
-
-        auto ret = delete_objects(expired_keys);
-        if (ret != 0) {
-            return ret;
-        }
-        LOG_INFO("delete expired objects")
-                .tag("endpoint", conf_.endpoint)
-                .tag("bucket", conf_.bucket)
-                .tag("prefix", conf_.prefix)
-                .tag("num_scanned", result.GetContents().size())
-                .tag("num_recycled", expired_keys.size());
-        is_truncated = result.GetIsTruncated();
-        request.SetContinuationToken(result.GetNextContinuationToken());
-    } while (is_truncated);
-    return 0;
+    return s3_put_rate_limit([&]() {
+        return obj_client_
+                ->delete_expired(
+                        {.path_opts = {.bucket = conf_.bucket, .prefix = 
get_key(relative_path)},
+                         .relative_path_factory =
+                                 [&](const std::string& key) { return 
get_relative_path(key); }},
+                        expired_time)
+                .ret;
+    });
 }
 
 int S3Accessor::get_bucket_lifecycle(int64_t* expiration_days) {
-    Aws::S3::Model::GetBucketLifecycleConfigurationRequest request;
-    request.SetBucket(conf_.bucket);
-
-    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-            s3_client_->GetBucketLifecycleConfiguration(request), 
std::ref(request).get(),
-            "s3_client::get_bucket_lifecycle_configuration", 
S3RateLimitType::GET);
-    bool has_lifecycle = false;
-    if (outcome.IsSuccess()) {
-        const auto& rules = outcome.GetResult().GetRules();
-        for (const auto& rule : rules) {
-            if (rule.NoncurrentVersionExpirationHasBeenSet()) {
-                has_lifecycle = true;
-                *expiration_days = 
rule.GetNoncurrentVersionExpiration().GetNoncurrentDays();
-            }
-        }
-    } else {
-        LOG_WARNING("Err for check interval: failed to get bucket lifecycle")
-                .tag("endpoint", conf_.endpoint)
-                .tag("bucket", conf_.bucket)
-                .tag("prefix", conf_.prefix)
-                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                .tag("error", outcome.GetError().GetMessage());
-        return -1;
-    }
-
-    if (!has_lifecycle) {
-        LOG_WARNING("Err for check interval: bucket doesn't have lifecycle 
configuration")
-                .tag("endpoint", conf_.endpoint)
-                .tag("bucket", conf_.bucket)
-                .tag("prefix", conf_.prefix);
-        return -1;
-    }
-    return 0;
+    return s3_get_rate_limit([&]() {
+        return obj_client_->get_life_cycle({.bucket = conf_.bucket}, 
expiration_days).ret;
+    });
 }
 
 int S3Accessor::check_bucket_versioning() {
-    Aws::S3::Model::GetBucketVersioningRequest request;
-    request.SetBucket(conf_.bucket);
-    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
-            s3_client_->GetBucketVersioning(request), std::ref(request).get(),
-            "s3_client::get_bucket_versioning", S3RateLimitType::GET);
-
-    if (outcome.IsSuccess()) {
-        const auto& versioning_configuration = outcome.GetResult().GetStatus();
-        if (versioning_configuration != 
Aws::S3::Model::BucketVersioningStatus::Enabled) {
-            LOG_WARNING("Err for check interval: bucket doesn't enable bucket 
versioning")
-                    .tag("endpoint", conf_.endpoint)
-                    .tag("bucket", conf_.bucket)
-                    .tag("prefix", conf_.prefix);
-            return -1;
-        }
-    } else {
-        LOG_WARNING("Err for check interval: failed to get status of bucket 
versioning")
-                .tag("endpoint", conf_.endpoint)
-                .tag("bucket", conf_.bucket)
-                .tag("prefix", conf_.prefix)
-                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
-                .tag("error", outcome.GetError().GetMessage());
-        return -1;
-    }
-    return 0;
+    return s3_get_rate_limit(
+            [&]() { return obj_client_->check_versioning({.bucket = 
conf_.bucket}).ret; });
 }
 
 int GcsAccessor::delete_objects(const std::vector<std::string>& 
relative_paths) {
@@ -523,6 +270,4 @@ int GcsAccessor::delete_objects(const 
std::vector<std::string>& relative_paths)
     }
     return ret;
 }
-#undef SYNC_POINT_HOOK_RETURN_VALUE
-#undef HELP_MACRO
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 1a44b0971cb..5221f90cafc 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -20,6 +20,7 @@
 #include <memory>
 
 #include "recycler/obj_store_accessor.h"
+#include "recycler/s3_obj_client.h"
 
 namespace Aws::S3 {
 class S3Client;
@@ -47,7 +48,8 @@ public:
 
     const std::string& path() const override { return path_; }
 
-    const std::shared_ptr<Aws::S3::S3Client>& s3_client() const { return 
s3_client_; }
+    // TODO(ByteYue): refactor this function to suite different kind object 
storage
+    const std::shared_ptr<Aws::S3::S3Client>& s3_client() const { return 
obj_client_->s3_client(); }
 
     const S3Conf& conf() const { return conf_; }
 
@@ -89,9 +91,9 @@ private:
     std::string get_relative_path(const std::string& key) const;
 
 private:
-    std::shared_ptr<Aws::S3::S3Client> s3_client_;
     S3Conf conf_;
     std::string path_;
+    std::shared_ptr<S3ObjClient> obj_client_;
 };
 
 class GcsAccessor final : public S3Accessor {
diff --git a/cloud/src/recycler/s3_obj_client.cpp 
b/cloud/src/recycler/s3_obj_client.cpp
new file mode 100644
index 00000000000..2b3d83cd8ce
--- /dev/null
+++ b/cloud/src/recycler/s3_obj_client.cpp
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/s3_obj_client.h"
+
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/DeleteObjectRequest.h>
+#include <aws/s3/model/DeleteObjectsRequest.h>
+#include <aws/s3/model/GetBucketLifecycleConfigurationRequest.h>
+#include <aws/s3/model/GetBucketVersioningRequest.h>
+#include <aws/s3/model/HeadObjectRequest.h>
+#include <aws/s3/model/ListObjectsV2Request.h>
+#include <aws/s3/model/PutObjectRequest.h>
+
+#include "common/logging.h"
+#include "common/sync_point.h"
+
+namespace doris::cloud {
+
+#ifndef UNIT_TEST
+#define HELPER_MACRO(ret, req, point_name)
+#else
+#define HELPER_MACRO(ret, req, point_name)                     \
+    do {                                                       \
+        std::pair p {&ret, &req};                              \
+        [[maybe_unused]] auto ret_pair = [&p]() mutable {      \
+            TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, &p); \
+            return p;                                          \
+        }();                                                   \
+        return ret;                                            \
+    } while (false);
+#endif
+#define SYNC_POINT_HOOK_RETURN_VALUE(expr, request, point_name) \
+    [&]() -> decltype(auto) {                                   \
+        using T = decltype((expr));                             \
+        [[maybe_unused]] T t;                                   \
+        HELPER_MACRO(t, request, point_name)                    \
+        return (expr);                                          \
+    }()
+
+ObjectStorageResponse S3ObjClient::put_object(const ObjectStoragePathOptions& 
opts,
+                                              std::string_view stream) {
+    Aws::S3::Model::PutObjectRequest request;
+    request.WithBucket(opts.bucket).WithKey(opts.key);
+    auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor");
+    *input << stream;
+    request.SetBody(input);
+    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request),
+                                                std::ref(request).get(), 
"s3_client::put_object");
+    if (!outcome.IsSuccess()) {
+        LOG_WARNING("failed to put object")
+                .tag("endpoint", opts.endpoint)
+                .tag("bucket", opts.bucket)
+                .tag("key", opts.key)
+                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                .tag("error", outcome.GetError().GetMessage());
+        return -1;
+    }
+    return 0;
+}
+ObjectStorageResponse S3ObjClient::head_object(const ObjectStoragePathOptions& 
opts) {
+    Aws::S3::Model::HeadObjectRequest request;
+    request.WithBucket(opts.bucket).WithKey(opts.key);
+    auto outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request),
+                                                std::ref(request).get(), 
"s3_client::head_object");
+    if (outcome.IsSuccess()) {
+        return 0;
+    } else if (outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
+        return 1;
+    } else {
+        LOG_WARNING("failed to head object")
+                .tag("endpoint", opts.endpoint)
+                .tag("bucket", opts.bucket)
+                .tag("key", opts.key)
+                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                .tag("error", outcome.GetError().GetMessage());
+        return -1;
+    }
+}
+ObjectStorageResponse S3ObjClient::list_objects(const 
ObjectStoragePathOptions& opts,
+                                                std::vector<ObjectMeta>* 
files) {
+    Aws::S3::Model::ListObjectsV2Request request;
+    request.WithBucket(opts.bucket).WithPrefix(opts.prefix);
+
+    bool is_truncated = false;
+    do {
+        auto outcome =
+                
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
+                                             std::ref(request).get(), 
"s3_client::list_objects_v2");
+        if (!outcome.IsSuccess()) {
+            LOG_WARNING("failed to list objects")
+                    .tag("endpoint", opts.endpoint)
+                    .tag("bucket", opts.bucket)
+                    .tag("prefix", opts.prefix)
+                    .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                    .tag("error", outcome.GetError().GetMessage());
+            return -1;
+        }
+        const auto& result = outcome.GetResult();
+        VLOG_DEBUG << "get " << result.GetContents().size() << " objects";
+        for (const auto& obj : result.GetContents()) {
+            files->push_back({obj.GetKey(), obj.GetSize(), 
obj.GetLastModified().Seconds()});
+        }
+        is_truncated = result.GetIsTruncated();
+        request.SetContinuationToken(result.GetNextContinuationToken());
+    } while (is_truncated);
+    return 0;
+}
+ObjectStorageResponse S3ObjClient::delete_objects(const 
ObjectStoragePathOptions& opts,
+                                                  std::vector<std::string> 
objs) {
+    Aws::S3::Model::DeleteObjectsRequest delete_request;
+    delete_request.SetBucket(opts.bucket);
+    Aws::S3::Model::Delete del;
+    Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
+    for (auto&& obj : objs) {
+        objects.emplace_back().SetKey(std::move(obj));
+    }
+    del.WithObjects(std::move(objects)).SetQuiet(true);
+    delete_request.SetDelete(std::move(del));
+    auto delete_outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObjects(delete_request),
+                                                       
std::ref(delete_request).get(),
+                                                       
"s3_client::delete_objects");
+    if (!delete_outcome.IsSuccess()) {
+        LOG_WARNING("failed to delete objects")
+                .tag("endpoint", opts.endpoint)
+                .tag("bucket", opts.bucket)
+                .tag("key[0]", 
delete_request.GetDelete().GetObjects().front().GetKey())
+                .tag("responseCode", 
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
+                .tag("error", delete_outcome.GetError().GetMessage());
+        return {-1};
+    }
+    if (!delete_outcome.GetResult().GetErrors().empty()) {
+        const auto& e = delete_outcome.GetResult().GetErrors().front();
+        LOG_WARNING("failed to delete object")
+                .tag("endpoint", opts.endpoint)
+                .tag("bucket", opts.bucket)
+                .tag("key", e.GetKey())
+                .tag("responseCode", 
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
+                .tag("error", e.GetMessage());
+        return {-2};
+    }
+    return {0};
+}
+
+ObjectStorageResponse S3ObjClient::delete_object(const 
ObjectStoragePathOptions& opts) {
+    Aws::S3::Model::DeleteObjectRequest request;
+    request.WithBucket(opts.bucket).WithKey(opts.key);
+    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+            s3_client_->DeleteObject(request), std::ref(request).get(), 
"s3_client::delete_object");
+    if (!outcome.IsSuccess()) {
+        LOG_WARNING("failed to delete object")
+                .tag("endpoint", opts.endpoint)
+                .tag("bucket", opts.bucket)
+                .tag("key", opts.key)
+                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                .tag("error", outcome.GetError().GetMessage())
+                .tag("exception", outcome.GetError().GetExceptionName());
+        return -1;
+    }
+    return 0;
+}
+
+ObjectStorageResponse S3ObjClient::delete_objects_recursively(
+        const ObjectStoragePathOptions& opts) {
+    Aws::S3::Model::ListObjectsV2Request request;
+    request.WithBucket(opts.bucket).WithPrefix(opts.prefix);
+
+    Aws::S3::Model::DeleteObjectsRequest delete_request;
+    delete_request.SetBucket(opts.bucket);
+    bool is_truncated = false;
+    do {
+        auto outcome =
+                
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
+                                             std::ref(request).get(), 
"s3_client::list_objects_v2");
+
+        if (!outcome.IsSuccess()) {
+            LOG_WARNING("failed to list objects")
+                    .tag("endpoint", opts.endpoint)
+                    .tag("bucket", opts.bucket)
+                    .tag("prefix", opts.prefix)
+                    .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                    .tag("error", outcome.GetError().GetMessage());
+            if (outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::FORBIDDEN) {
+                return {1};
+            }
+            return {-1};
+        }
+        const auto& result = outcome.GetResult();
+        VLOG_DEBUG << "get " << result.GetContents().size() << " objects";
+        Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
+        objects.reserve(result.GetContents().size());
+        for (const auto& obj : result.GetContents()) {
+            objects.emplace_back().SetKey(obj.GetKey());
+            LOG_INFO("delete object")
+                    .tag("endpoint", opts.endpoint)
+                    .tag("bucket", opts.bucket)
+                    .tag("key", obj.GetKey());
+        }
+        if (!objects.empty()) {
+            Aws::S3::Model::Delete del;
+            del.WithObjects(std::move(objects)).SetQuiet(true);
+            delete_request.SetDelete(std::move(del));
+            auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+                    s3_client_->DeleteObjects(delete_request), 
std::ref(delete_request).get(),
+                    "s3_client::delete_objects");
+            if (!delete_outcome.IsSuccess()) {
+                LOG_WARNING("failed to delete objects")
+                        .tag("endpoint", opts.endpoint)
+                        .tag("bucket", opts.bucket)
+                        .tag("prefix", opts.prefix)
+                        .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                        .tag("error", outcome.GetError().GetMessage());
+                if (delete_outcome.GetError().GetResponseCode() ==
+                    Aws::Http::HttpResponseCode::FORBIDDEN) {
+                    return {1};
+                }
+                return {-2};
+            }
+            if (!delete_outcome.GetResult().GetErrors().empty()) {
+                const auto& e = delete_outcome.GetResult().GetErrors().front();
+                LOG_WARNING("failed to delete object")
+                        .tag("endpoint", opts.endpoint)
+                        .tag("bucket", opts.bucket)
+                        .tag("key", e.GetKey())
+                        .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                        .tag("error", e.GetMessage());
+                return {-3};
+            }
+        }
+        is_truncated = result.GetIsTruncated();
+        request.SetContinuationToken(result.GetNextContinuationToken());
+    } while (is_truncated);
+    return {0};
+}
+ObjectStorageResponse S3ObjClient::delete_expired(const 
ObjectStorageDeleteExpiredOptions& opts,
+                                                  int64_t expired_time) {
+    Aws::S3::Model::ListObjectsV2Request request;
+    
request.WithBucket(opts.path_opts.bucket).WithPrefix(opts.path_opts.prefix);
+
+    bool is_truncated = false;
+    do {
+        auto outcome =
+                
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
+                                             std::ref(request).get(), 
"s3_client::list_objects_v2");
+        if (!outcome.IsSuccess()) {
+            LOG_WARNING("failed to list objects")
+                    .tag("endpoint", opts.path_opts.endpoint)
+                    .tag("bucket", opts.path_opts.bucket)
+                    .tag("prefix", opts.path_opts.prefix)
+                    .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                    .tag("error", outcome.GetError().GetMessage());
+            return -1;
+        }
+        const auto& result = outcome.GetResult();
+        std::vector<std::string> expired_keys;
+        for (const auto& obj : result.GetContents()) {
+            if (obj.GetLastModified().Seconds() < expired_time) {
+                auto relative_key = opts.relative_path_factory(obj.GetKey());
+                if (relative_key.empty()) {
+                    LOG_WARNING("failed get relative path")
+                            .tag("prefix", opts.path_opts.prefix)
+                            .tag("key", obj.GetKey());
+                } else {
+                    expired_keys.push_back(obj.GetKey());
+                    LOG_INFO("delete expired object")
+                            .tag("prefix", opts.path_opts.prefix)
+                            .tag("key", obj.GetKey())
+                            .tag("relative_key", relative_key)
+                            .tag("lastModifiedTime", 
obj.GetLastModified().Seconds())
+                            .tag("expiredTime", expired_time);
+                }
+            }
+        }
+
+        auto ret = delete_objects(opts.path_opts, std::move(expired_keys));
+        if (ret.ret != 0) {
+            return ret;
+        }
+        LOG_INFO("delete expired objects")
+                .tag("endpoint", opts.path_opts.endpoint)
+                .tag("bucket", opts.path_opts.bucket)
+                .tag("prefix", opts.path_opts.prefix)
+                .tag("num_scanned", result.GetContents().size())
+                .tag("num_recycled", expired_keys.size());
+        is_truncated = result.GetIsTruncated();
+        request.SetContinuationToken(result.GetNextContinuationToken());
+    } while (is_truncated);
+    return 0;
+}
+ObjectStorageResponse S3ObjClient::get_life_cycle(const 
ObjectStoragePathOptions& opts,
+                                                  int64_t* expiration_days) {
+    Aws::S3::Model::GetBucketLifecycleConfigurationRequest request;
+    request.SetBucket(opts.bucket);
+
+    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+            s3_client_->GetBucketLifecycleConfiguration(request), 
std::ref(request).get(),
+            "s3_client::get_bucket_lifecycle_configuration");
+    bool has_lifecycle = false;
+    if (outcome.IsSuccess()) {
+        const auto& rules = outcome.GetResult().GetRules();
+        for (const auto& rule : rules) {
+            if (rule.NoncurrentVersionExpirationHasBeenSet()) {
+                has_lifecycle = true;
+                *expiration_days = 
rule.GetNoncurrentVersionExpiration().GetNoncurrentDays();
+            }
+        }
+    } else {
+        LOG_WARNING("Err for check interval: failed to get bucket lifecycle")
+                .tag("endpoint", opts.endpoint)
+                .tag("bucket", opts.bucket)
+                .tag("prefix", opts.prefix)
+                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                .tag("error", outcome.GetError().GetMessage());
+        return -1;
+    }
+
+    if (!has_lifecycle) {
+        LOG_WARNING("Err for check interval: bucket doesn't have lifecycle 
configuration")
+                .tag("endpoint", opts.endpoint)
+                .tag("bucket", opts.bucket)
+                .tag("prefix", opts.prefix);
+        return -1;
+    }
+    return 0;
+}
+
+ObjectStorageResponse S3ObjClient::check_versioning(const 
ObjectStoragePathOptions& opts) {
+    Aws::S3::Model::GetBucketVersioningRequest request;
+    request.SetBucket(opts.bucket);
+    auto outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->GetBucketVersioning(request),
+                                                std::ref(request).get(),
+                                                
"s3_client::get_bucket_versioning");
+
+    if (outcome.IsSuccess()) {
+        const auto& versioning_configuration = outcome.GetResult().GetStatus();
+        if (versioning_configuration != 
Aws::S3::Model::BucketVersioningStatus::Enabled) {
+            LOG_WARNING("Err for check interval: bucket doesn't enable bucket 
versioning")
+                    .tag("endpoint", opts.endpoint)
+                    .tag("bucket", opts.bucket)
+                    .tag("prefix", opts.prefix);
+            return -1;
+        }
+    } else {
+        LOG_WARNING("Err for check interval: failed to get status of bucket 
versioning")
+                .tag("endpoint", opts.endpoint)
+                .tag("bucket", opts.bucket)
+                .tag("prefix", opts.prefix)
+                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                .tag("error", outcome.GetError().GetMessage());
+        return -1;
+    }
+    return 0;
+}
+
+#undef SYNC_POINT_HOOK_RETURN_VALUE
+#undef HELPER_MACRO
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/be/src/io/fs/s3_obj_storage_client.h 
b/cloud/src/recycler/s3_obj_client.h
similarity index 51%
copy from be/src/io/fs/s3_obj_storage_client.h
copy to cloud/src/recycler/s3_obj_client.h
index 47a9ed733db..891474b5289 100644
--- a/be/src/io/fs/s3_obj_storage_client.h
+++ b/cloud/src/recycler/s3_obj_client.h
@@ -17,49 +17,41 @@
 
 #pragma once
 
-#include "io/fs/obj_storage_client.h"
-#include "io/fs/s3_file_system.h"
+#include <memory>
+
+#include "recycler/obj_store_accessor.h"
 
 namespace Aws::S3 {
 class S3Client;
-namespace Model {
-class CompletedPart;
-}
 } // namespace Aws::S3
 
-namespace doris::io {
-
-struct S3CompleteMultiParts : public ObjectCompleteMultiParts {
-    std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& parts;
-};
-
-class ObjClientHolder;
+namespace doris::cloud {
 
-class S3ObjStorageClient final : public ObjStorageClient {
+class S3ObjClient : public ObjStorageClient {
 public:
-    S3ObjStorageClient(std::shared_ptr<Aws::S3::S3Client> client) : 
_client(std::move(client)) {}
-    ~S3ObjStorageClient() override = default;
-    ObjectStorageResponse create_multipart_upload(const 
ObjectStoragePathOptions& opts) override;
+    S3ObjClient(std::shared_ptr<Aws::S3::S3Client> client) : 
s3_client_(std::move(client)) {}
+    ~S3ObjClient() override = default;
+
     ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts,
                                      std::string_view stream) override;
-    ObjectStorageResponse upload_part(const ObjectStoragePathOptions& opts, 
std::string_view,
-                                      int partNum) override;
-    ObjectStorageResponse complete_multipart_upload(
-            const ObjectStoragePathOptions& opts,
-            const ObjectCompleteMultiParts& completed_parts) override;
-    ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& 
opts) override;
-    ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, 
void* buffer,
-                                     size_t offset, size_t bytes_read,
-                                     size_t* size_return) override;
+    ObjectStorageResponse head_object(const ObjectStoragePathOptions& opts) 
override;
     ObjectStorageResponse list_objects(const ObjectStoragePathOptions& opts,
-                                       std::vector<FileInfo>* files) override;
+                                       std::vector<ObjectMeta>* files) 
override;
     ObjectStorageResponse delete_objects(const ObjectStoragePathOptions& opts,
                                          std::vector<std::string> objs) 
override;
     ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts) 
override;
     ObjectStorageResponse delete_objects_recursively(const 
ObjectStoragePathOptions& opts) override;
+    ObjectStorageResponse delete_expired(const 
ObjectStorageDeleteExpiredOptions& opts,
+                                         int64_t expired_time) override;
+    ObjectStorageResponse get_life_cycle(const ObjectStoragePathOptions& opts,
+                                         int64_t* expiration_days) override;
+
+    ObjectStorageResponse check_versioning(const ObjectStoragePathOptions& 
opts) override;
+
+    const std::shared_ptr<Aws::S3::S3Client>& s3_client() { return s3_client_; 
}
 
 private:
-    std::shared_ptr<Aws::S3::S3Client> _client;
+    std::shared_ptr<Aws::S3::S3Client> s3_client_;
 };
 
-} // namespace doris::io
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/test/s3_accessor_test.cpp b/cloud/test/s3_accessor_test.cpp
index 972505c3999..362a0b11cbf 100644
--- a/cloud/test/s3_accessor_test.cpp
+++ b/cloud/test/s3_accessor_test.cpp
@@ -397,6 +397,7 @@ TEST(S3AccessorTest, check_bucket_versioning) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -422,6 +423,7 @@ TEST(S3AccessorTest, check_bucket_versioning_error) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = 
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     return_error_for_error_s3_client = true;
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
@@ -444,6 +446,7 @@ TEST(S3AccessorTest, get_bucket_lifecycle) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -473,6 +476,7 @@ TEST(S3AccessorTest, get_bucket_lifecycle_error) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = 
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     return_error_for_error_s3_client = true;
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
@@ -496,6 +500,7 @@ TEST(S3AccessorTest, list) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -519,6 +524,7 @@ TEST(S3AccessorTest, list_error) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = 
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     return_error_for_error_s3_client = true;
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
@@ -543,6 +549,7 @@ TEST(S3AccessorTest, put) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -569,6 +576,7 @@ TEST(S3AccessorTest, put_error) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = 
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -601,6 +609,7 @@ TEST(S3AccessorTest, exist) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -624,6 +633,7 @@ TEST(S3AccessorTest, exist_error) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = 
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -649,6 +659,7 @@ TEST(S3AccessorTest, delete_object) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -675,6 +686,7 @@ TEST(S3AccessorTest, gcs_delete_objects) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<GcsAccessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -707,6 +719,7 @@ TEST(S3AccessorTest, gcs_delete_objects_error) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = 
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
     auto accessor = std::make_unique<GcsAccessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -744,6 +757,7 @@ TEST(S3AccessorTest, delete_objects) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -776,6 +790,7 @@ TEST(S3AccessorTest, delete_objects_error) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = 
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -817,6 +832,7 @@ TEST(S3AccessorTest, delete_expired_objects) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -871,6 +887,7 @@ TEST(S3AccessorTest, delete_object_by_prefix) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -898,6 +915,7 @@ TEST(S3AccessorTest, delete_object_by_prefix_error) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = 
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+    ASSERT_EQ(0, accessor->init());
     auto sp = SyncPoint::get_instance();
     std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
         sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to