This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a6537a90cd [Enhancement] Garbage collection of unused data on remote 
storage backend (#10731)
a6537a90cd is described below

commit a6537a90cd4fda01de4a51b7abdc23a06cc8a901
Author: plat1ko <platonekos...@gmail.com>
AuthorDate: Fri Jul 29 14:38:39 2022 +0800

    [Enhancement] Garbage collection of unused data on remote storage backend 
(#10731)
    
    * [Feature](cold_on_s3) support unused remote rowset gc
    
    * return aborted when skip drop tablet
    
    * perform unused remote rowset gc
---
 be/src/agent/task_worker_pool.cpp                  |  11 +--
 be/src/common/config.h                             |  11 +++
 be/src/common/status.h                             |   5 +-
 be/src/io/fs/local_file_reader.cpp                 |  11 +--
 be/src/io/fs/local_file_system.cpp                 |  35 +++----
 be/src/io/fs/local_file_writer.cpp                 |  21 ++--
 be/src/io/fs/s3_file_reader.cpp                    |  13 ++-
 be/src/io/fs/s3_file_system.cpp                    |  66 +++++++++++--
 be/src/io/fs/s3_file_system.h                      |   1 +
 be/src/olap/base_compaction.cpp                    |  20 ++++
 be/src/olap/base_tablet.h                          |   7 +-
 be/src/olap/cumulative_compaction_policy.cpp       |   7 +-
 be/src/olap/data_dir.cpp                           |  62 ++++++++++++
 be/src/olap/data_dir.h                             |   4 +
 be/src/olap/olap_define.h                          |   2 +
 be/src/olap/rowset/beta_rowset.cpp                 |   6 ++
 be/src/olap/rowset/beta_rowset.h                   |   4 +
 be/src/olap/snapshot_manager.cpp                   |  12 +++
 be/src/olap/storage_engine.cpp                     |   8 +-
 be/src/olap/tablet.cpp                             | 110 ++++++++++++++-------
 be/src/olap/tablet.h                               |  17 +++-
 be/src/olap/tablet_manager.cpp                     |  18 +++-
 be/src/olap/tablet_manager.h                       |  12 +--
 be/src/olap/tablet_meta.cpp                        |   8 +-
 be/src/olap/tablet_meta.h                          |  19 ++--
 be/src/olap/task/engine_clone_task.cpp             |   2 +-
 be/test/CMakeLists.txt                             |   1 +
 be/test/olap/delete_handler_test.cpp               |   8 +-
 be/test/olap/delta_writer_test.cpp                 |  10 +-
 .../olap/engine_storage_migration_task_test.cpp    |   2 +-
 ...et_clone_test.cpp => remote_rowset_gc_test.cpp} |  94 +++++++++---------
 be/test/olap/tablet_cooldown_test.cpp              |  25 ++---
 be/test/olap/tablet_mgr_test.cpp                   |   8 +-
 be/test/olap/tablet_test.cpp                       |   2 +-
 .../apache/doris/clone/BackendLoadStatistic.java   |  10 +-
 .../main/java/org/apache/doris/common/Config.java  |   2 +-
 gensrc/proto/olap_file.proto                       |   7 ++
 37 files changed, 446 insertions(+), 215 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 40681ce17d..deb39a3a51 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -432,7 +432,8 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
                 drop_tablet_req.tablet_id, false, &err);
         if (dropped_tablet != nullptr) {
             Status drop_status = 
StorageEngine::instance()->tablet_manager()->drop_tablet(
-                    drop_tablet_req.tablet_id, drop_tablet_req.replica_id);
+                    drop_tablet_req.tablet_id, drop_tablet_req.replica_id,
+                    drop_tablet_req.is_drop_table_or_partition);
             if (!drop_status.ok()) {
                 LOG(WARNING) << "drop table failed! signature: " << 
agent_task_req.signature;
                 error_msgs.push_back("drop table failed!");
@@ -442,11 +443,6 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() 
{
                 
StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns(
                         dropped_tablet->data_dir()->get_meta(), 
drop_tablet_req.tablet_id,
                         drop_tablet_req.schema_hash, 
dropped_tablet->tablet_uid());
-                // We remove remote rowset directly.
-                // TODO(cyx): do remove in background
-                if (drop_tablet_req.is_drop_table_or_partition) {
-                    dropped_tablet->remove_all_remote_rowsets();
-                }
             }
         } else {
             status_code = TStatusCode::NOT_FOUND;
@@ -881,8 +877,7 @@ void 
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
                     } else {
                         LOG(INFO) << "set tablet cooldown resource "
                                   << tablet_meta_info.storage_policy;
-                        tablet->tablet_meta()->set_cooldown_resource(
-                                tablet_meta_info.storage_policy);
+                        
tablet->tablet_meta()->set_storage_policy(tablet_meta_info.storage_policy);
                     }
                     break;
                 }
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d1f43eaeba..06c8bebf2c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -789,6 +789,17 @@ CONF_Int32(s3_transfer_executor_pool_size, "2");
 
 CONF_Bool(enable_time_lut, "true");
 
+#ifdef BE_TEST
+// test s3
+CONF_String(test_s3_resource, "resource");
+CONF_String(test_s3_ak, "ak");
+CONF_String(test_s3_sk, "sk");
+CONF_String(test_s3_endpoint, "endpoint");
+CONF_String(test_s3_region, "region");
+CONF_String(test_s3_bucket, "bucket");
+CONF_String(test_s3_prefix, "prefix");
+#endif
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 7604d0aa30..bf947cb812 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -283,10 +283,11 @@ public:
     template <typename... Args>
     static Status ErrorFmt(TStatusCode::type code, const std::string& fmt, 
Args&&... args) {
         // In some cases, fmt contains '{}' but there are no args.
-        if (sizeof...(args) == 0) {
+        if constexpr (sizeof...(args) == 0) {
             return Status(code, fmt);
+        } else {
+            return Status(code, fmt::format(fmt, std::forward<Args>(args)...));
         }
-        return Status(code, fmt::format(fmt, std::forward<Args>(args)...));
     }
 
     template <typename... Args>
diff --git a/be/src/io/fs/local_file_reader.cpp 
b/be/src/io/fs/local_file_reader.cpp
index 984306bf17..ae5bd44ab9 100644
--- a/be/src/io/fs/local_file_reader.cpp
+++ b/be/src/io/fs/local_file_reader.cpp
@@ -50,9 +50,8 @@ Status LocalFileReader::close() {
 Status LocalFileReader::read_at(size_t offset, Slice result, size_t* 
bytes_read) {
     DCHECK(!closed());
     if (offset > _file_size) {
-        return Status::IOError(
-                fmt::format("offset exceeds file size(offset: {), file size: 
{}, path: {})", offset,
-                            _file_size, _path.native()));
+        return Status::IOError("offset exceeds file size(offset: {), file 
size: {}, path: {})",
+                               offset, _file_size, _path.native());
     }
     size_t bytes_req = result.size;
     char* to = result.data;
@@ -62,12 +61,10 @@ Status LocalFileReader::read_at(size_t offset, Slice 
result, size_t* bytes_read)
     while (bytes_req != 0) {
         auto res = ::pread(_fd, to, bytes_req, offset);
         if (UNLIKELY(-1 == res && errno != EINTR)) {
-            return Status::IOError(
-                    fmt::format("cannot read from {}: {}", _path.native(), 
std::strerror(errno)));
+            return Status::IOError("cannot read from {}: {}", _path.native(), 
std::strerror(errno));
         }
         if (UNLIKELY(res == 0)) {
-            return Status::IOError(
-                    fmt::format("cannot read from {}: unexpected EOF", 
_path.native()));
+            return Status::IOError("cannot read from {}: unexpected EOF", 
_path.native());
         }
         if (res > 0) {
             to += res;
diff --git a/be/src/io/fs/local_file_system.cpp 
b/be/src/io/fs/local_file_system.cpp
index 3ba889c94d..46098e060f 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -40,8 +40,7 @@ Status LocalFileSystem::create_file(const Path& path, 
FileWriterPtr* writer) {
     auto fs_path = absolute_path(path);
     int fd = ::open(fs_path.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC, 
0666);
     if (-1 == fd) {
-        return Status::IOError(
-                fmt::format("cannot open {}: {}", fs_path.native(), 
std::strerror(errno)));
+        return Status::IOError("cannot open {}: {}", fs_path.native(), 
std::strerror(errno));
     }
     *writer = std::make_unique<LocalFileWriter>(std::move(fs_path), fd);
     return Status::OK();
@@ -62,14 +61,16 @@ Status LocalFileSystem::open_file(const Path& path, 
FileReaderSPtr* reader) {
 
 Status LocalFileSystem::delete_file(const Path& path) {
     auto fs_path = absolute_path(path);
+    if (!std::filesystem::exists(fs_path)) {
+        return Status::OK();
+    }
     if (!std::filesystem::is_regular_file(fs_path)) {
-        return Status::IOError(fmt::format("{} is not a file", 
fs_path.native()));
+        return Status::IOError("{} is not a file", fs_path.native());
     }
     std::error_code ec;
     std::filesystem::remove(fs_path, ec);
     if (ec) {
-        return Status::IOError(
-                fmt::format("cannot delete {}: {}", fs_path.native(), 
std::strerror(ec.value())));
+        return Status::IOError("cannot delete {}: {}", fs_path.native(), 
std::strerror(ec.value()));
     }
     return Status::OK();
 }
@@ -77,35 +78,36 @@ Status LocalFileSystem::delete_file(const Path& path) {
 Status LocalFileSystem::create_directory(const Path& path) {
     auto fs_path = absolute_path(path);
     if (std::filesystem::exists(fs_path)) {
-        return Status::IOError(fmt::format("{} exists", fs_path.native()));
+        return Status::IOError("{} exists", fs_path.native());
     }
     std::error_code ec;
     std::filesystem::create_directories(fs_path, ec);
     if (ec) {
-        return Status::IOError(
-                fmt::format("cannot create {}: {}", fs_path.native(), 
std::strerror(ec.value())));
+        return Status::IOError("cannot create {}: {}", fs_path.native(), 
std::strerror(ec.value()));
     }
     return Status::OK();
 }
 
 Status LocalFileSystem::delete_directory(const Path& path) {
     auto fs_path = absolute_path(path);
+    if (!std::filesystem::exists(fs_path)) {
+        return Status::OK();
+    }
     if (!std::filesystem::is_directory(fs_path)) {
-        return Status::IOError(fmt::format("{} is not a directory", 
fs_path.native()));
+        return Status::IOError("{} is not a directory", fs_path.native());
     }
     std::error_code ec;
     std::filesystem::remove_all(fs_path, ec);
     if (ec) {
-        return Status::IOError(
-                fmt::format("cannot delete {}: {}", fs_path.native(), 
std::strerror(ec.value())));
+        return Status::IOError("cannot delete {}: {}", fs_path.native(), 
std::strerror(ec.value()));
     }
     return Status::OK();
 }
 
 Status LocalFileSystem::link_file(const Path& src, const Path& dest) {
     if (::link(src.c_str(), dest.c_str()) != 0) {
-        return Status::IOError(fmt::format("fail to create hard link: {}. from 
{} to {}",
-                                           std::strerror(errno), src.native(), 
dest.native()));
+        return Status::IOError("fail to create hard link: {}. from {} to {}", 
std::strerror(errno),
+                               src.native(), dest.native());
     }
     return Status::OK();
 }
@@ -121,8 +123,8 @@ Status LocalFileSystem::file_size(const Path& path, size_t* 
file_size) const {
     std::error_code ec;
     *file_size = std::filesystem::file_size(fs_path, ec);
     if (ec) {
-        return Status::IOError(fmt::format("cannot get file size {}: {}", 
fs_path.native(),
-                                           std::strerror(ec.value())));
+        return Status::IOError("cannot get file size {}: {}", fs_path.native(),
+                               std::strerror(ec.value()));
     }
     return Status::OK();
 }
@@ -135,8 +137,7 @@ Status LocalFileSystem::list(const Path& path, 
std::vector<Path>* files) {
         files->push_back(entry.path().filename());
     }
     if (ec) {
-        return Status::IOError(
-                fmt::format("cannot list {}: {}", fs_path.native(), 
std::strerror(ec.value())));
+        return Status::IOError("cannot list {}: {}", fs_path.native(), 
std::strerror(ec.value()));
     }
     return Status::OK();
 }
diff --git a/be/src/io/fs/local_file_writer.cpp 
b/be/src/io/fs/local_file_writer.cpp
index 1965ed6be0..95294dbb88 100644
--- a/be/src/io/fs/local_file_writer.cpp
+++ b/be/src/io/fs/local_file_writer.cpp
@@ -37,12 +37,10 @@ Status sync_dir(const io::Path& dirname) {
     int fd;
     RETRY_ON_EINTR(fd, ::open(dirname.c_str(), O_DIRECTORY | O_RDONLY));
     if (-1 == fd) {
-        return Status::IOError(
-                fmt::format("cannot open {}: {}", dirname.native(), 
std::strerror(errno)));
+        return Status::IOError("cannot open {}: {}", dirname.native(), 
std::strerror(errno));
     }
     if (0 != ::fdatasync(fd)) {
-        return Status::IOError(
-                fmt::format("cannot fdatasync {}: {}", dirname.native(), 
std::strerror(errno)));
+        return Status::IOError("cannot fdatasync {}: {}", dirname.native(), 
std::strerror(errno));
     }
     ::close(fd);
     return Status::OK();
@@ -102,8 +100,7 @@ Status LocalFileWriter::appendv(const Slice* data, size_t 
data_cnt) {
         ssize_t res;
         RETRY_ON_EINTR(res, ::writev(_fd, iov + completed_iov, iov_count));
         if (UNLIKELY(res < 0)) {
-            return Status::IOError(
-                    fmt::format("cannot write to {}: {}", _path.native(), 
std::strerror(errno)));
+            return Status::IOError("cannot write to {}: {}", _path.native(), 
std::strerror(errno));
         }
 
         if (LIKELY(res == n_left)) {
@@ -139,8 +136,7 @@ Status LocalFileWriter::finalize() {
 #if defined(__linux__)
         int flags = SYNC_FILE_RANGE_WRITE;
         if (sync_file_range(_fd, 0, 0, flags) < 0) {
-            return Status::IOError(
-                    fmt::format("cannot sync {}: {}", _path.native(), 
std::strerror(errno)));
+            return Status::IOError("cannot sync {}: {}", _path.native(), 
std::strerror(errno));
         }
 #endif
     }
@@ -153,8 +149,7 @@ Status LocalFileWriter::_close(bool sync) {
     }
     if (sync && _dirty) {
         if (0 != ::fdatasync(_fd)) {
-            return Status::IOError(
-                    fmt::format("cannot fdatasync {}: {}", _path.native(), 
std::strerror(errno)));
+            return Status::IOError("cannot fdatasync {}: {}", _path.native(), 
std::strerror(errno));
         }
         RETURN_IF_ERROR(detail::sync_dir(_path.parent_path()));
         _dirty = false;
@@ -166,8 +161,7 @@ Status LocalFileWriter::_close(bool sync) {
     
DorisMetrics::instance()->local_bytes_written_total->increment(_bytes_appended);
 
     if (0 != ::close(_fd)) {
-        return Status::IOError(
-                fmt::format("cannot close {}: {}", _path.native(), 
std::strerror(errno)));
+        return Status::IOError("cannot close {}: {}", _path.native(), 
std::strerror(errno));
     }
     return Status::OK();
 }
@@ -182,8 +176,7 @@ Status LocalFileWriter::write_at(size_t offset, const 
Slice& data) {
     while (bytes_req != 0) {
         auto res = ::pwrite(_fd, from, bytes_req, offset);
         if (-1 == res && errno != EINTR) {
-            return Status::IOError(
-                    fmt::format("cannot write to {}: {}", _path.native(), 
std::strerror(errno)));
+            return Status::IOError("cannot write to {}: {}", _path.native(), 
std::strerror(errno));
         }
         if (res > 0) {
             from += res;
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index a75da54bb0..2f16b03db4 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -52,9 +52,8 @@ Status S3FileReader::close() {
 Status S3FileReader::read_at(size_t offset, Slice result, size_t* bytes_read) {
     DCHECK(!closed());
     if (offset > _file_size) {
-        return Status::IOError(
-                fmt::format("offset exceeds file size(offset: {), file size: 
{}, path: {})", offset,
-                            _file_size, _path.native()));
+        return Status::IOError("offset exceeds file size(offset: {), file 
size: {}, path: {})",
+                               offset, _file_size, _path.native());
     }
     size_t bytes_req = result.size;
     char* to = result.data;
@@ -75,13 +74,13 @@ Status S3FileReader::read_at(size_t offset, Slice result, 
size_t* bytes_read) {
     }
     auto outcome = client->GetObject(request);
     if (!outcome.IsSuccess()) {
-        return Status::IOError(fmt::format("failed to read from {}: {}", 
_path.native(),
-                                           outcome.GetError().GetMessage()));
+        return Status::IOError("failed to read from {}: {}", _path.native(),
+                               outcome.GetError().GetMessage());
     }
     *bytes_read = outcome.GetResult().GetContentLength();
     if (*bytes_read != bytes_req) {
-        return Status::IOError(fmt::format("failed to read from {}(bytes read: 
{}, bytes req: {})",
-                                           _path.native(), *bytes_read, 
bytes_req));
+        return Status::IOError("failed to read from {}(bytes read: {}, bytes 
req: {})",
+                               _path.native(), *bytes_read, bytes_req);
     }
     DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
     return Status::OK();
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index e421863774..65d6a27de7 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -20,7 +20,9 @@
 #include <aws/core/utils/threading/Executor.h>
 #include <aws/s3/S3Client.h>
 #include <aws/s3/model/DeleteObjectRequest.h>
+#include <aws/s3/model/DeleteObjectsRequest.h>
 #include <aws/s3/model/HeadObjectRequest.h>
+#include <aws/s3/model/ListObjectsV2Request.h>
 #include <aws/s3/model/PutObjectRequest.h>
 #include <aws/transfer/TransferManager.h>
 
@@ -153,12 +155,13 @@ Status S3FileSystem::delete_file(const Path& path) {
     request.WithBucket(_s3_conf.bucket).WithKey(key);
 
     auto outcome = client->DeleteObject(request);
-    if (!outcome.IsSuccess()) {
-        return Status::IOError("failed to delete object(endpoint={}, 
bucket={}, key={}): {}",
-                               _s3_conf.endpoint, _s3_conf.bucket, key,
-                               outcome.GetError().GetMessage());
+    if (outcome.IsSuccess() ||
+        outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
+        return Status::OK();
     }
-    return Status::OK();
+    return Status::IOError("failed to delete object(endpoint={}, bucket={}, 
key={}): {}",
+                           _s3_conf.endpoint, _s3_conf.bucket, key,
+                           outcome.GetError().GetMessage());
 }
 
 Status S3FileSystem::create_directory(const Path& path) {
@@ -166,7 +169,54 @@ Status S3FileSystem::create_directory(const Path& path) {
 }
 
 Status S3FileSystem::delete_directory(const Path& path) {
-    return Status::NotSupported("not support");
+    auto client = get_client();
+    CHECK_S3_CLIENT(client);
+
+    Aws::S3::Model::ListObjectsV2Request request;
+    auto prefix = get_key(path);
+    request.WithBucket(_s3_conf.bucket).WithPrefix(prefix);
+
+    Aws::S3::Model::DeleteObjectsRequest delete_request;
+    delete_request.SetBucket(_s3_conf.bucket);
+    bool is_trucated = false;
+    do {
+        auto outcome = client->ListObjectsV2(request);
+        if (!outcome.IsSuccess()) {
+            return Status::IOError("failed to list objects(endpoint={}, 
bucket={}, prefix={}): {}",
+                                   _s3_conf.endpoint, _s3_conf.bucket, prefix,
+                                   outcome.GetError().GetMessage());
+        }
+        const auto& result = outcome.GetResult();
+        Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
+        objects.reserve(result.GetContents().size());
+        for (const auto& obj : result.GetContents()) {
+            objects.emplace_back().SetKey(obj.GetKey());
+        }
+        if (!objects.empty()) {
+            Aws::S3::Model::Delete del;
+            del.WithObjects(std::move(objects)).SetQuiet(true);
+            delete_request.SetDelete(std::move(del));
+            auto delete_outcome = client->DeleteObjects(delete_request);
+            if (!delete_outcome.IsSuccess()) {
+                return Status::IOError(
+                        "failed to delete objects(endpoint={}, bucket={}, 
prefix={}): {}",
+                        _s3_conf.endpoint, _s3_conf.bucket, prefix,
+                        delete_outcome.GetError().GetMessage());
+            }
+            if (!delete_outcome.GetResult().GetErrors().empty()) {
+                const auto& e = delete_outcome.GetResult().GetErrors().front();
+                return Status::IOError("fail to delete object(endpoint={}, 
bucket={}, key={}): {}",
+                                       _s3_conf.endpoint, _s3_conf.bucket, 
e.GetKey(),
+                                       e.GetMessage());
+            }
+            VLOG_TRACE << "delete " << objects.size()
+                       << " s3 objects, endpoint: " << _s3_conf.endpoint
+                       << ", bucket: " << _s3_conf.bucket << ", prefix: " << 
_s3_conf.prefix;
+        }
+        is_trucated = result.GetIsTruncated();
+        request.SetContinuationToken(result.GetNextContinuationToken());
+    } while (is_trucated);
+    return Status::OK();
 }
 
 Status S3FileSystem::link_file(const Path& src, const Path& dest) {
@@ -181,7 +231,7 @@ Status S3FileSystem::exists(const Path& path, bool* res) 
const {
     auto key = get_key(path);
     request.WithBucket(_s3_conf.bucket).WithKey(key);
 
-    auto outcome = _client->HeadObject(request);
+    auto outcome = client->HeadObject(request);
     if (outcome.IsSuccess()) {
         *res = true;
     } else if (outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
@@ -202,7 +252,7 @@ Status S3FileSystem::file_size(const Path& path, size_t* 
file_size) const {
     auto key = get_key(path);
     request.WithBucket(_s3_conf.bucket).WithKey(key);
 
-    auto outcome = _client->HeadObject(request);
+    auto outcome = client->HeadObject(request);
     if (outcome.IsSuccess()) {
         *file_size = outcome.GetResult().GetContentLength();
     } else {
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index 219ecb3a0d..9eb393996f 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -46,6 +46,7 @@ public:
 
     Status create_directory(const Path& path) override;
 
+    // Delete all objects start with path.
     Status delete_directory(const Path& path) override;
 
     Status link_file(const Path& src, const Path& dest) override;
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 1284af6084..307fa2fee0 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -92,6 +92,26 @@ Status BaseCompaction::pick_rowsets_to_compact() {
     RETURN_NOT_OK(check_version_continuity(_input_rowsets));
     RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets));
 
+    // If there are delete predicate rowsets in tablet, start_version > 0 
implies some rowsets before
+    // delete version cannot apply these delete predicates, which can cause 
incorrect query result.
+    // So we must abort this base compaction.
+    // A typical scenario is that some rowsets before cumulative point are on 
remote storage.
+    if (_input_rowsets.front()->start_version() > 0) {
+        bool has_delete_predicate = false;
+        for (const auto& rs : _input_rowsets) {
+            if (rs->rowset_meta()->has_delete_predicate()) {
+                has_delete_predicate = true;
+                break;
+            }
+        }
+        if (has_delete_predicate) {
+            LOG(WARNING)
+                    << "Some rowsets cannot apply delete predicates in base 
compaction. tablet_id="
+                    << _tablet->tablet_id();
+            return Status::OLAPInternalError(OLAP_ERR_BE_NO_SUITABLE_VERSION);
+        }
+    }
+
     if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) {
         // the tablet is with rowset: [0-1], [2-y]
         // and [0-1] has no data. in this situation, no need to do base 
compaction.
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 84e336fc2e..26b44f9665 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <memory>
+#include <string>
 
 #include "olap/olap_define.h"
 #include "olap/tablet_meta.h"
@@ -58,11 +59,9 @@ public:
     int16_t shard_id() const;
     bool equal(int64_t tablet_id, int32_t schema_hash);
 
-    const io::ResourceId& cooldown_resource() const { return 
_tablet_meta->cooldown_resource(); }
+    const std::string& storage_policy() const { return 
_tablet_meta->storage_policy(); }
 
-    void set_cooldown_resource(io::ResourceId resource) {
-        _tablet_meta->set_cooldown_resource(std::move(resource));
-    }
+    void set_storage_policy(const std::string& policy) { 
_tablet_meta->set_storage_policy(policy); }
 
     // properties encapsulated in TabletSchema
     virtual const TabletSchema& tablet_schema() const;
diff --git a/be/src/olap/cumulative_compaction_policy.cpp 
b/be/src/olap/cumulative_compaction_policy.cpp
index 05da215028..874ce60ddf 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -463,10 +463,9 @@ void 
NumBasedCumulativeCompactionPolicy::calculate_cumulative_point(
 void CumulativeCompactionPolicy::pick_candidate_rowsets(
         const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& 
rs_version_map,
         int64_t cumulative_point, std::vector<RowsetSharedPtr>* 
candidate_rowsets) {
-    for (auto& it : rs_version_map) {
-        // find all rowset version greater than cumulative_point and skip the 
create time in skip_window_sec
-        if (it.first.first >= cumulative_point && it.second->is_local()) {
-            candidate_rowsets->push_back(it.second);
+    for (const auto& [version, rs] : rs_version_map) {
+        if (version.first >= cumulative_point && rs->is_local()) {
+            candidate_rowsets->push_back(rs);
         }
     }
     std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), 
Rowset::comparator);
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 0f7acc749f..baf51034e6 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -40,6 +40,7 @@
 #include "io/fs/path.h"
 #include "olap/file_helper.h"
 #include "olap/olap_define.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset_meta_manager.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_meta_manager.h"
@@ -820,4 +821,65 @@ Status DataDir::move_to_trash(const std::string& 
tablet_path) {
     return Status::OK();
 }
 
+void DataDir::perform_remote_rowset_gc() {
+    std::vector<std::pair<std::string, std::string>> gc_kvs;
+    auto traverse_remote_rowset_func = [&gc_kvs](const std::string& key,
+                                                 const std::string& value) -> 
bool {
+        gc_kvs.emplace_back(key, value);
+        return true;
+    };
+    _meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_ROWSET_GC_PREFIX, 
traverse_remote_rowset_func);
+    std::vector<std::string> deleted_keys;
+    for (auto& [key, val] : gc_kvs) {
+        auto rowset_id = key.substr(REMOTE_ROWSET_GC_PREFIX.size());
+        RemoteRowsetGcPB gc_pb;
+        gc_pb.ParseFromString(val);
+        auto fs = io::FileSystemMap::instance()->get(gc_pb.resource_id());
+        if (!fs) {
+            LOG(WARNING) << "Cannot get file system: " << gc_pb.resource_id();
+            continue;
+        }
+        DCHECK(fs->type() != io::FileSystemType::LOCAL);
+        Status st;
+        for (int i = 0; i < gc_pb.num_segments(); ++i) {
+            auto seg_path = BetaRowset::remote_segment_path(gc_pb.tablet_id(), 
rowset_id, i);
+            st = fs->delete_file(seg_path);
+            if (!st.ok()) {
+                LOG(WARNING) << st.to_string();
+                break;
+            }
+        }
+        if (st.ok()) {
+            deleted_keys.push_back(std::move(key));
+        }
+    }
+    for (const auto& key : deleted_keys) {
+        _meta->remove(META_COLUMN_FAMILY_INDEX, key);
+    }
+}
+
+void DataDir::perform_remote_tablet_gc() {
+    std::vector<std::pair<std::string, std::string>> tablet_gc_kvs;
+    auto traverse_remote_tablet_func = [&tablet_gc_kvs](const std::string& key,
+                                                        const std::string& 
value) -> bool {
+        tablet_gc_kvs.emplace_back(key, value);
+        return true;
+    };
+    _meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_TABLET_GC_PREFIX, 
traverse_remote_tablet_func);
+    std::vector<std::string> deleted_keys;
+    for (auto& [key, resource] : tablet_gc_kvs) {
+        auto tablet_id = key.substr(REMOTE_TABLET_GC_PREFIX.size());
+        auto fs = io::FileSystemMap::instance()->get(resource);
+        auto st = fs->delete_directory(DATA_PREFIX + "/" + tablet_id);
+        if (st.ok()) {
+            deleted_keys.push_back(std::move(key));
+        } else {
+            LOG(WARNING) << st;
+        }
+    }
+    for (const auto& key : deleted_keys) {
+        _meta->remove(META_COLUMN_FAMILY_INDEX, key);
+    }
+}
+
 } // namespace doris
diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h
index 642058c46f..4cf40fd1bd 100644
--- a/be/src/olap/data_dir.h
+++ b/be/src/olap/data_dir.h
@@ -116,6 +116,10 @@ public:
 
     void perform_path_gc_by_tablet();
 
+    void perform_remote_rowset_gc();
+
+    void perform_remote_tablet_gc();
+
     // check if the capacity reach the limit after adding the incoming data
     // return true if limit reached, otherwise, return false.
     // TODO(cmy): for now we can not precisely calculate the capacity Doris 
used,
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index 077ed670f9..65f2de8c8e 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -127,6 +127,8 @@ const std::string TABLET_ID_KEY = "tablet_id";
 const std::string ENABLE_BYTE_TO_BASE64 = "byte_to_base64";
 const std::string TABLET_ID_PREFIX = "t_";
 const std::string ROWSET_ID_PREFIX = "s_";
+const std::string REMOTE_ROWSET_GC_PREFIX = "gc_";
+const std::string REMOTE_TABLET_GC_PREFIX = "tgc_";
 
 #if defined(__GNUC__)
 #define OLAP_LIKELY(x) __builtin_expect((x), 1)
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index 1329917a82..1e7cbde9f4 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -46,6 +46,12 @@ std::string BetaRowset::local_segment_path(const 
std::string& tablet_path,
     return fmt::format("{}/{}_{}.dat", tablet_path, rowset_id.to_string(), 
segment_id);
 }
 
+std::string BetaRowset::remote_segment_path(int64_t tablet_id, const 
std::string& rowset_id,
+                                            int segment_id) {
+    // data/{tablet_id}/{rowset_id}_{seg_num}.dat
+    return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, 
segment_id);
+}
+
 std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& 
rowset_id,
                                             int segment_id) {
     // data/{tablet_id}/{rowset_id}_{seg_num}.dat
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 9ccaa7b2b9..8d96bfe090 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -19,6 +19,7 @@
 #define DORIS_SRC_OLAP_ROWSET_BETA_ROWSET_H_
 
 #include <cstdint>
+#include <string>
 
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -49,6 +50,9 @@ public:
     static std::string remote_segment_path(int64_t tablet_id, const RowsetId& 
rowset_id,
                                            int segment_id);
 
+    static std::string remote_segment_path(int64_t tablet_id, const 
std::string& rowset_id,
+                                           int segment_id);
+
     Status split_range(const RowCursor& start_key, const RowCursor& end_key,
                        uint64_t request_block_row_count, size_t key_num,
                        std::vector<OlapTuple>* ranges) override;
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 74cbb1aa68..390bd66206 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -28,12 +28,14 @@
 #include <map>
 #include <set>
 
+#include "common/status.h"
 #include "env/env.h"
 #include "gen_cpp/Types_constants.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/storage_engine.h"
+#include "olap/tablet_meta.h"
 #include "runtime/thread_context.h"
 
 using std::filesystem::path;
@@ -360,6 +362,9 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
         /// make the full snapshot of the tablet.
         {
             std::shared_lock rdlock(ref_tablet->get_header_lock());
+            if (ref_tablet->tablet_state() == TABLET_SHUTDOWN) {
+                return Status::Aborted("tablet has shutdown");
+            }
             if (request.__isset.missing_version) {
                 for (int64_t missed_version : request.missing_version) {
                     Version version = {missed_version, missed_version};
@@ -422,6 +427,13 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
             CHECK(res.ok()) << res;
             ref_tablet->generate_tablet_meta_copy_unlocked(new_tablet_meta);
         }
+        {
+            std::unique_lock wlock(ref_tablet->get_header_lock());
+            if (ref_tablet->tablet_state() == TABLET_SHUTDOWN) {
+                return Status::Aborted("tablet has shutdown");
+            }
+            ref_tablet->update_self_owned_remote_rowsets(consistent_rowsets);
+        }
 
         std::vector<RowsetMetaSharedPtr> rs_metas;
         for (auto& rs : consistent_rowsets) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 9a7c414971..4e28ee7d8b 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -329,7 +329,7 @@ std::vector<DataDir*> StorageEngine::get_stores() {
     stores.reserve(_store_map.size());
 
     std::lock_guard<std::mutex> l(_store_lock);
-    if (include_unused) {
+    if constexpr (include_unused) {
         for (auto& it : _store_map) {
             stores.push_back(it.second);
         }
@@ -720,6 +720,12 @@ Status StorageEngine::start_trash_sweep(double* usage, 
bool ignore_guard) {
     // clean unused rowset metas in OlapMeta
     _clean_unused_rowset_metas();
 
+    // clean unused rowsets in remote storage backends
+    for (auto data_dir : get_stores()) {
+        data_dir->perform_remote_rowset_gc();
+        data_dir->perform_remote_tablet_gc();
+    }
+
     return res;
 }
 
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index c60f3c4d07..290753de9e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -18,6 +18,7 @@
 #include "olap/tablet.h"
 
 #include <ctype.h>
+#include <fmt/core.h>
 #include <glog/logging.h>
 #include <pthread.h>
 #include <rapidjson/prettywriter.h>
@@ -1145,10 +1146,8 @@ void 
Tablet::pick_candidate_rowsets_to_cumulative_compaction(
 
 void 
Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>* 
candidate_rowsets) {
     std::shared_lock rdlock(_meta_lock);
-    // FIXME(cyx): If there are delete predicate rowsets in tablet,
-    // remote rowsets cannot apply these delete predicate, which can cause
-    // incorrect query result.
     for (auto& it : _rs_version_map) {
+        // Do compaction on local rowsets only.
         if (it.first.first < _cumulative_point && it.second->is_local()) {
             candidate_rowsets->push_back(it.second);
         }
@@ -1702,7 +1701,7 @@ Status Tablet::cooldown() {
         LOG(WARNING) << "Failed to own cumu_compaction_lock. tablet=" << 
tablet_id();
         return Status::OLAPInternalError(OLAP_ERR_BE_TRY_BE_LOCK_ERROR);
     }
-    auto dest_fs = io::FileSystemMap::instance()->get(cooldown_resource());
+    auto dest_fs = io::FileSystemMap::instance()->get(storage_policy());
     if (!dest_fs) {
         return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
     }
@@ -1716,8 +1715,13 @@ Status Tablet::cooldown() {
 
     auto start = std::chrono::steady_clock::now();
 
-    
RETURN_IF_ERROR(old_rowset->upload_to(reinterpret_cast<io::RemoteFileSystem*>(dest_fs.get()),
-                                          new_rowset_id));
+    auto st = 
old_rowset->upload_to(reinterpret_cast<io::RemoteFileSystem*>(dest_fs.get()),
+                                    new_rowset_id);
+    if (!st.ok()) {
+        record_unused_remote_rowset(new_rowset_id, dest_fs->resource_id(),
+                                    old_rowset->num_segments());
+        return st;
+    }
 
     auto duration = 
std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
     LOG(INFO) << "Upload rowset " << old_rowset->version() << " " << 
new_rowset_id.to_string()
@@ -1732,14 +1736,30 @@ Status Tablet::cooldown() {
     new_rowset_meta->set_fs(dest_fs);
     new_rowset_meta->set_creation_time(time(nullptr));
     RowsetSharedPtr new_rowset;
-    RowsetFactory::create_rowset(&_schema, _tablet_path, 
std::move(new_rowset_meta), &new_rowset);
+    RowsetFactory::create_rowset(&_schema, _tablet_path, new_rowset_meta, 
&new_rowset);
 
     std::vector to_add {std::move(new_rowset)};
     std::vector to_delete {std::move(old_rowset)};
 
-    std::unique_lock meta_wlock(_meta_lock);
-    modify_rowsets(to_add, to_delete);
-    save_meta();
+    bool has_shutdown = false;
+    {
+        std::unique_lock meta_wlock(_meta_lock);
+        has_shutdown = tablet_state() == TABLET_SHUTDOWN;
+        if (!has_shutdown) {
+            modify_rowsets(to_add, to_delete);
+            if (new_rowset_meta->has_delete_predicate()) {
+                add_delete_predicate(new_rowset_meta->delete_predicate(),
+                                     new_rowset_meta->start_version());
+            }
+            _self_owned_remote_rowsets.insert(to_add.front());
+            save_meta();
+        }
+    }
+    if (has_shutdown) {
+        record_unused_remote_rowset(new_rowset_id, dest_fs->resource_id(),
+                                    to_add.front()->num_segments());
+        return Status::Aborted("tablet {} has shutdown", tablet_id());
+    }
     return Status::OK();
 }
 
@@ -1763,13 +1783,13 @@ RowsetSharedPtr Tablet::pick_cooldown_rowset() {
 
 bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) {
     // std::shared_lock meta_rlock(_meta_lock);
-    if (cooldown_resource().empty()) {
+    if (storage_policy().empty()) {
         VLOG_DEBUG << "tablet does not need cooldown, tablet id: " << 
tablet_id();
         return false;
     }
-    auto policy = 
ExecEnv::GetInstance()->storage_policy_mgr()->get(cooldown_resource());
+    auto policy = 
ExecEnv::GetInstance()->storage_policy_mgr()->get(storage_policy());
     if (!policy) {
-        LOG(WARNING) << "Cannot get storage policy: " << cooldown_resource();
+        LOG(WARNING) << "Cannot get storage policy: " << storage_policy();
         return false;
     }
     auto cooldown_ttl_sec = policy->cooldown_ttl;
@@ -1817,28 +1837,26 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp, 
size_t* file_size) {
     return false;
 }
 
-void Tablet::remove_all_remote_rowsets() {
-    std::unique_lock meta_wlock(_meta_lock);
-    DCHECK(_state == TabletState::TABLET_SHUTDOWN);
-    Status st;
-    for (auto& it : _rs_version_map) {
-        auto& rs = it.second;
-        if (!rs->is_local()) {
-            st = rs->remove();
-            LOG_IF(WARNING, !st.ok()) << "Failed to remove rowset " << 
rs->version() << " "
-                                      << rs->rowset_id().to_string() << " in 
tablet " << tablet_id()
-                                      << ": " << st.to_string();
-        }
-    }
-    for (auto& it : _stale_rs_version_map) {
-        auto& rs = it.second;
-        if (!rs->is_local()) {
-            st = rs->remove();
-            LOG_IF(WARNING, !st.ok()) << "Failed to remove rowset " << 
rs->version() << " "
-                                      << rs->rowset_id().to_string() << " in 
tablet " << tablet_id()
-                                      << ": " << st.to_string();
-        }
+void Tablet::record_unused_remote_rowset(const RowsetId& rowset_id, const 
io::ResourceId& resource,
+                                         int64_t num_segments) {
+    auto gc_key = REMOTE_ROWSET_GC_PREFIX + rowset_id.to_string();
+    RemoteRowsetGcPB gc_pb;
+    gc_pb.set_resource_id(resource);
+    gc_pb.set_tablet_id(tablet_id());
+    gc_pb.set_num_segments(num_segments);
+    WARN_IF_ERROR(
+            _data_dir->get_meta()->put(META_COLUMN_FAMILY_INDEX, gc_key, 
gc_pb.SerializeAsString()),
+            fmt::format("Failed to record unused remote rowset(tablet id: {}, 
rowset id: {})",
+                        tablet_id(), rowset_id.to_string()));
+}
+
+Status Tablet::remove_all_remote_rowsets() {
+    DCHECK(_state == TABLET_SHUTDOWN);
+    if (storage_policy().empty()) {
+        return Status::OK();
     }
+    auto tablet_gc_key = REMOTE_TABLET_GC_PREFIX + std::to_string(tablet_id());
+    return _data_dir->get_meta()->put(META_COLUMN_FAMILY_INDEX, tablet_gc_key, 
storage_policy());
 }
 
 const TabletSchema& Tablet::tablet_schema() const {
@@ -1887,4 +1905,28 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, 
RowLocation* row_locatio
     return Status::NotFound("can't find key in all rowsets");
 }
 
+void Tablet::remove_self_owned_remote_rowsets() {
+    DCHECK(_state == TABLET_SHUTDOWN);
+    for (const auto& rs : _self_owned_remote_rowsets) {
+        DCHECK(!rs->is_local());
+        record_unused_remote_rowset(rs->rowset_id(), 
rs->rowset_meta()->resource_id(),
+                                    rs->num_segments());
+    }
+}
+
+void Tablet::update_self_owned_remote_rowsets(
+        const std::vector<RowsetSharedPtr>& rowsets_in_snapshot) {
+    if (_self_owned_remote_rowsets.empty()) {
+        return;
+    }
+    for (const auto& rs : rowsets_in_snapshot) {
+        if (!rs->is_local()) {
+            auto it = _self_owned_remote_rowsets.find(rs);
+            if (it != _self_owned_remote_rowsets.end()) {
+                _self_owned_remote_rowsets.erase(it);
+            }
+        }
+    }
+}
+
 } // namespace doris
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 08e5fecd15..accc157d6a 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -22,6 +22,7 @@
 #include <set>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "gen_cpp/AgentService_types.h"
@@ -30,6 +31,7 @@
 #include "olap/base_tablet.h"
 #include "olap/cumulative_compaction_policy.h"
 #include "olap/data_dir.h"
+#include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_reader.h"
@@ -298,14 +300,22 @@ public:
 
     bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
 
-    // Physically remove remote rowsets.
-    void remove_all_remote_rowsets();
+    Status remove_all_remote_rowsets();
 
     // Lookup the row location of `encoded_key`, the function sets 
`row_location` on success.
     // NOTE: the method only works in unique key model with primary key index, 
you will got a
     //       not supported error in other data model.
     Status lookup_row_key(const Slice& encoded_key, RowLocation* row_location, 
uint32_t version);
 
+    void remove_self_owned_remote_rowsets();
+
+    // Erase entries in `_self_owned_remote_rowsets` iff they are in 
`rowsets_in_snapshot`.
+    // REQUIRES: held _meta_lock
+    void update_self_owned_remote_rowsets(const std::vector<RowsetSharedPtr>& 
rowsets_in_snapshot);
+
+    void record_unused_remote_rowset(const RowsetId& rowset_id, const 
io::ResourceId& resource,
+                                     int64_t num_segments);
+
 private:
     Status _init_once_action();
     void _print_missed_versions(const std::vector<Version>& missed_versions) 
const;
@@ -397,6 +407,9 @@ private:
     int64_t _last_missed_version;
     int64_t _last_missed_time_s;
 
+    // Remote rowsets not shared by other BE. We can delete them when drop 
tablet.
+    std::unordered_set<RowsetSharedPtr> _self_owned_remote_rowsets; // guarded 
by _meta_lock
+
     DISALLOW_COPY_AND_ASSIGN(Tablet);
 
 public:
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 3ab3c3a99d..f5c2ebdbcf 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -194,7 +194,7 @@ Status TabletManager::_add_tablet_to_map_unlocked(TTabletId 
tablet_id,
         // If the new tablet is fresher than the existing one, then replace
         // the existing tablet with the new one.
         // Use default replica_id to ignore whether replica_id is match when 
drop tablet.
-        RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, /* replica_id */ 0, 
keep_files),
+        RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, /* replica_id */ 0, 
keep_files, false),
                           strings::Substitute("failed to drop old tablet when 
add new tablet. "
                                               "tablet_id=$0",
                                               tablet_id));
@@ -356,7 +356,7 @@ TabletSharedPtr 
TabletManager::_internal_create_tablet_unlocked(
     }
     // something is wrong, we need clear environment
     if (is_tablet_added) {
-        Status status = _drop_tablet_unlocked(new_tablet_id, 
request.replica_id, false);
+        Status status = _drop_tablet_unlocked(new_tablet_id, 
request.replica_id, false, false);
         if (!status.ok()) {
             LOG(WARNING) << "fail to drop tablet when create tablet failed. 
res=" << res;
         }
@@ -425,7 +425,8 @@ TabletSharedPtr 
TabletManager::_create_tablet_meta_and_dir_unlocked(
     return nullptr;
 }
 
-Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, 
bool keep_files) {
+Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id,
+                                  bool is_drop_table_or_partition) {
     auto& shard = _get_tablets_shard(tablet_id);
     std::lock_guard wrlock(shard.lock);
     if (shard.tablets_under_clone.count(tablet_id) > 0) {
@@ -433,12 +434,12 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, 
TReplicaId replica_id, bo
         return Status::Aborted("aborted");
     }
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-    return _drop_tablet_unlocked(tablet_id, replica_id, keep_files);
+    return _drop_tablet_unlocked(tablet_id, replica_id, false, 
is_drop_table_or_partition);
 }
 
 // Drop specified tablet.
 Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId 
replica_id,
-                                            bool keep_files) {
+                                            bool keep_files, bool 
is_drop_table_or_partition) {
     LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", 
replica_id=" << replica_id;
     DorisMetrics::instance()->drop_tablet_requests_total->increment(1);
 
@@ -472,6 +473,13 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId 
tablet_id, TReplicaId repl
         // and the tablet will be loaded at restart time.
         // To avoid this exception, we first set the state of the tablet to 
`SHUTDOWN`.
         to_drop_tablet->set_tablet_state(TABLET_SHUTDOWN);
+        // We must record unused remote rowsets path info to OlapMeta before 
tablet state is marked as TABLET_SHUTDOWN in OlapMeta,
+        // otherwise if BE shutdown after saving tablet state, these remote 
rowsets path info will lost.
+        if (is_drop_table_or_partition) {
+            RETURN_IF_ERROR(to_drop_tablet->remove_all_remote_rowsets());
+        } else {
+            to_drop_tablet->remove_self_owned_remote_rowsets();
+        }
         to_drop_tablet->save_meta();
         {
             std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock);
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 7b17b59ce3..159b8baefd 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -61,12 +61,9 @@ public:
     // task to be fail, even if there is enough space on other disks
     Status create_tablet(const TCreateTabletReq& request, 
std::vector<DataDir*> stores);
 
-    // Drop a tablet by description
-    // If set keep_files == true, files will NOT be deleted when 
deconstruction.
-    // Return OLAP_SUCCESS, if run ok
-    //        OLAP_ERR_TABLE_DELETE_NOEXIST_ERROR, if tablet not exist
-    //        Status::OLAPInternalError(OLAP_ERR_NOT_INITED), if not inited
-    Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool 
keep_files = false);
+    // Drop a tablet by description.
+    // If `is_drop_table_or_partition` is true, we need to remove all remote 
rowsets in this tablet.
+    Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool 
is_drop_table_or_partition);
 
     Status drop_tablets_on_error_root_path(const std::vector<TabletInfo>& 
tablet_info_vec);
 
@@ -156,7 +153,8 @@ private:
 
     bool _check_tablet_id_exist_unlocked(TTabletId tablet_id);
 
-    Status _drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id, 
bool keep_files);
+    Status _drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id, 
bool keep_files,
+                                 bool is_drop_table_or_partition);
 
     TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id);
     TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id, bool 
include_deleted,
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 18dd73f8bc..bc88710854 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -206,7 +206,7 @@ TabletMeta::TabletMeta(const TabletMeta& b)
           _del_predicates(b._del_predicates),
           _in_restore_mode(b._in_restore_mode),
           _preferred_rowset_type(b._preferred_rowset_type),
-          _cooldown_resource(b._cooldown_resource),
+          _storage_policy(b._storage_policy),
           _delete_bitmap(b._delete_bitmap) {};
 
 void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& 
tcolumn,
@@ -461,7 +461,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
         _preferred_rowset_type = tablet_meta_pb.preferred_rowset_type();
     }
 
-    _cooldown_resource = tablet_meta_pb.storage_policy();
+    _storage_policy = tablet_meta_pb.storage_policy();
     if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) {
         _enable_unique_key_merge_on_write = 
tablet_meta_pb.enable_unique_key_merge_on_write();
     }
@@ -528,7 +528,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
         tablet_meta_pb->set_preferred_rowset_type(_preferred_rowset_type);
     }
 
-    tablet_meta_pb->set_storage_policy(_cooldown_resource);
+    tablet_meta_pb->set_storage_policy(_storage_policy);
     
tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write);
 
     {
@@ -753,7 +753,7 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
     }
     if (a._in_restore_mode != b._in_restore_mode) return false;
     if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
-    if (a._cooldown_resource != b._cooldown_resource) return false;
+    if (a._storage_policy != b._storage_policy) return false;
     return true;
 }
 
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 7275d0430d..34a42ed510 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -26,7 +26,6 @@
 
 #include "common/logging.h"
 #include "gen_cpp/olap_file.pb.h"
-#include "io/fs/file_system.h"
 #include "olap/delete_handler.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -186,23 +185,24 @@ public:
 
     bool all_beta() const;
 
-    const io::ResourceId& cooldown_resource() const {
+    const std::string& storage_policy() const {
         std::shared_lock<std::shared_mutex> rlock(_meta_lock);
-        return _cooldown_resource;
+        return _storage_policy;
     }
 
-    void set_cooldown_resource(io::ResourceId resource) {
+    void set_storage_policy(const std::string& policy) {
         std::unique_lock<std::shared_mutex> wlock(_meta_lock);
-        VLOG_NOTICE << "set tablet_id : " << _table_id << " cooldown resource 
from "
-                    << _cooldown_resource << " to " << resource;
-        _cooldown_resource = std::move(resource);
+        VLOG_NOTICE << "set tablet_id : " << _table_id << " storage policy 
from " << _storage_policy
+                    << " to " << policy;
+        _storage_policy = policy;
     }
+
     static void init_column_from_tcolumn(uint32_t unique_id, const TColumn& 
tcolumn,
                                          ColumnPB* column);
 
     DeleteBitmap& delete_bitmap() { return *_delete_bitmap; }
 
-    bool enable_unique_key_merge_on_write() { return 
_enable_unique_key_merge_on_write; }
+    bool enable_unique_key_merge_on_write() const { return 
_enable_unique_key_merge_on_write; }
 
 private:
     Status _save_meta(DataDir* data_dir);
@@ -238,8 +238,7 @@ private:
     bool _in_restore_mode = false;
     RowsetTypePB _preferred_rowset_type = BETA_ROWSET;
 
-    // FIXME(cyx): Currently `cooldown_resource` is equivalent to 
`storage_policy`.
-    io::ResourceId _cooldown_resource;
+    std::string _storage_policy;
 
     // For unique key data model, the feature Merge-on-Write will leverage a 
primary
     // key index and a delete-bitmap to mark duplicate keys as deleted in load 
stage,
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 221be83850..73466cb72e 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -249,7 +249,7 @@ void EngineCloneTask::_set_tablet_info(Status status, bool 
is_new_tablet) {
                              << ", signature:" << _signature << ", version:" 
<< tablet_info.version
                              << ", expected_version: " << 
_clone_req.committed_version;
                 Status drop_status = 
StorageEngine::instance()->tablet_manager()->drop_tablet(
-                        _clone_req.tablet_id, _clone_req.replica_id);
+                        _clone_req.tablet_id, _clone_req.replica_id, false);
                 if (drop_status != Status::OK() &&
                     drop_status.precise_code() != OLAP_ERR_TABLE_NOT_FOUND) {
                     // just log
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index d05fe0a976..f503e3b4dc 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -197,6 +197,7 @@ set(OLAP_TEST_FILES
     # olap/push_handler_test.cpp
     olap/tablet_cooldown_test.cpp
     olap/rowid_conversion_test.cpp
+    olap/remote_rowset_gc_test.cpp
 )
 
 set(RUNTIME_TEST_FILES
diff --git a/be/test/olap/delete_handler_test.cpp 
b/be/test/olap/delete_handler_test.cpp
index b383fbc3a7..7ba23ca2c1 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -278,7 +278,7 @@ protected:
         tablet.reset();
         dup_tablet.reset();
         
StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
-                                                                 
_create_tablet.replica_id);
+                                                                 
_create_tablet.replica_id, false);
         EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
     }
 
@@ -443,8 +443,8 @@ protected:
     void TearDown() {
         // Remove all dir.
         tablet.reset();
-        k_engine->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
-                                                _create_tablet.replica_id);
+        k_engine->tablet_manager()->drop_tablet(_create_tablet.tablet_id, 
_create_tablet.replica_id,
+                                                false);
         EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
     }
 
@@ -820,7 +820,7 @@ protected:
         tablet.reset();
         _delete_handler.finalize();
         
StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
-                                                                 
_create_tablet.replica_id);
+                                                                 
_create_tablet.replica_id, false);
         EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
     }
 
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index 3126b30f94..eae41cb320 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -406,7 +406,7 @@ TEST_F(TestDeltaWriter, open) {
     EXPECT_EQ(Status::OK(), res);
     SAFE_DELETE(delta_writer);
 
-    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     EXPECT_EQ(Status::OK(), res);
 }
 
@@ -527,7 +527,7 @@ TEST_F(TestDeltaWriter, write) {
     }
     EXPECT_EQ(1, tablet->num_rows());
 
-    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     EXPECT_EQ(Status::OK(), res);
     delete delta_writer;
 }
@@ -673,7 +673,7 @@ TEST_F(TestDeltaWriter, vec_write) {
     }
     ASSERT_EQ(1, tablet->num_rows());
 
-    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     ASSERT_TRUE(res.ok());
     delete delta_writer;
 }
@@ -740,7 +740,7 @@ TEST_F(TestDeltaWriter, sequence_col) {
     }
     EXPECT_EQ(1, tablet->num_rows());
 
-    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     EXPECT_EQ(Status::OK(), res);
     delete delta_writer;
 }
@@ -833,7 +833,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     }
     ASSERT_EQ(1, tablet->num_rows());
 
-    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     ASSERT_TRUE(res.ok());
     delete delta_writer;
 }
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index b5b3b4d988..a36f0ff0c4 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -259,7 +259,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
     EXPECT_NE(tablet3, tablet);
     // test case 2 end
 
-    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     EXPECT_EQ(Status::OK(), res);
     delete delta_writer;
 }
diff --git a/be/test/olap/tablet_clone_test.cpp 
b/be/test/olap/remote_rowset_gc_test.cpp
similarity index 73%
rename from be/test/olap/tablet_clone_test.cpp
rename to be/test/olap/remote_rowset_gc_test.cpp
index 51124f87cf..c02b15c1c4 100644
--- a/be/test/olap/tablet_clone_test.cpp
+++ b/be/test/olap/remote_rowset_gc_test.cpp
@@ -19,11 +19,12 @@
 
 #include <memory>
 
+#include "common/config.h"
 #include "common/status.h"
 #include "io/fs/file_system_map.h"
 #include "io/fs/s3_file_system.h"
 #include "olap/delta_writer.h"
-#include "olap/snapshot_manager.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "runtime/descriptor_helper.h"
@@ -35,47 +36,39 @@ namespace doris {
 
 static StorageEngine* k_engine = nullptr;
 
-static const std::string kTestDir = "./ut_dir/tablet_clone_test";
-static std::string kSnapshotDir = "./ut_dir/tablet_clone_test/snapshot";
-static const std::string kResourceId = "TabletCloneTest";
-static const int64_t kTabletId = 10005;
-static const int32_t KSchemaHash = 270068377;
-
-static const std::string AK = "ak";
-static const std::string SK = "sk";
-static const std::string ENDPOINT = "endpoint";
-static const std::string REGION = "region";
-static const std::string BUCKET = "bucket";
-static const std::string PREFIX = "prefix";
+static const std::string kTestDir = "./ut_dir/remote_rowset_gc_test";
+static const std::string kResourceId = "RemoteRowsetGcTest";
 
 // remove DISABLED_ when need run this test
-#define TabletCloneTest DISABLED_TabletCloneTest
-#define private public
-class TabletCloneTest : public testing::Test {
+#define RemoteRowsetGcTest DISABLED_RemoteRowsetGcTest
+class RemoteRowsetGcTest : public testing::Test {
 public:
     static void SetUpTestSuite() {
         S3Conf s3_conf;
-        s3_conf.ak = AK;
-        s3_conf.sk = SK;
-        s3_conf.endpoint = ENDPOINT;
-        s3_conf.region = REGION;
-        s3_conf.bucket = BUCKET;
-        s3_conf.prefix = PREFIX;
+        s3_conf.ak = config::test_s3_ak;
+        s3_conf.sk = config::test_s3_sk;
+        s3_conf.endpoint = config::test_s3_endpoint;
+        s3_conf.region = config::test_s3_region;
+        s3_conf.bucket = config::test_s3_bucket;
+        s3_conf.prefix = "remote_rowset_gc_test";
         auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), 
kResourceId);
         ASSERT_TRUE(s3_fs->connect().ok());
         io::FileSystemMap::instance()->insert(kResourceId, s3_fs);
 
-        config::storage_root_path = kTestDir;
+        constexpr uint32_t MAX_PATH_LEN = 1024;
+        char buffer[MAX_PATH_LEN];
+        EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+        config::storage_root_path = std::string(buffer) + "/" + kTestDir;
         config::min_file_descriptor_number = 1000;
-        FileUtils::remove_all(kTestDir);
-        FileUtils::create_dir(kTestDir);
 
-        std::vector<StorePath> paths {{kTestDir, -1}};
+        FileUtils::remove_all(config::storage_root_path);
+        FileUtils::create_dir(config::storage_root_path);
+
+        std::vector<StorePath> paths {{config::storage_root_path, -1}};
 
         EngineOptions options;
         options.store_paths = paths;
         doris::StorageEngine::open(options, &k_engine);
-        k_engine->start_bg_threads();
     }
 
     static void TearDownTestSuite() {
@@ -145,9 +138,9 @@ static TDescriptorTable 
create_descriptor_tablet_with_sequence_col() {
     return desc_tbl_builder.desc_tbl();
 }
 
-TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) {
+TEST_F(RemoteRowsetGcTest, normal) {
     TCreateTabletReq request;
-    create_tablet_request_with_sequence_col(kTabletId, KSchemaHash, &request);
+    create_tablet_request_with_sequence_col(10005, 270068377, &request);
     Status st = k_engine->create_tablet(request);
     ASSERT_EQ(Status::OK(), st);
 
@@ -161,13 +154,14 @@ TEST_F(TabletCloneTest, 
convert_rowset_ids_has_file_in_s3) {
     PUniqueId load_id;
     load_id.set_hi(0);
     load_id.set_lo(0);
-    WriteRequest write_req = {kTabletId, KSchemaHash, WriteType::LOAD, 20003,
-                              30003,     load_id,     tuple_desc,      
&(tuple_desc->slots())};
+    WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
+                              30003, load_id,   tuple_desc,      
&(tuple_desc->slots())};
     DeltaWriter* delta_writer = nullptr;
     DeltaWriter::open(&write_req, &delta_writer);
     ASSERT_NE(delta_writer, nullptr);
 
-    MemPool pool;
+    MemTracker tracker;
+    MemPool pool(&tracker);
     // Tuple 1
     {
         Tuple* tuple = 
reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size()));
@@ -199,27 +193,39 @@ TEST_F(TabletCloneTest, 
convert_rowset_ids_has_file_in_s3) {
             write_req.txn_id, write_req.partition_id, &tablet_related_rs);
     for (auto& tablet_rs : tablet_related_rs) {
         RowsetSharedPtr rowset = tablet_rs.second;
-        rowset->rowset_meta()->set_resource_id(kResourceId);
         st = k_engine->txn_manager()->publish_txn(meta, 
write_req.partition_id, write_req.txn_id,
-                                                  tablet->tablet_id(), 
tablet->schema_hash(),
-                                                  tablet->tablet_uid(), 
version);
+                                                  write_req.tablet_id, 
write_req.schema_hash,
+                                                  tablet_rs.first.tablet_uid, 
version);
         ASSERT_EQ(Status::OK(), st);
         st = tablet->add_inc_rowset(rowset);
         ASSERT_EQ(Status::OK(), st);
     }
     EXPECT_EQ(1, tablet->num_rows());
 
-    TSnapshotRequest snapshot_req;
-    snapshot_req.tablet_id = kTabletId;
-    snapshot_req.schema_hash = KSchemaHash;
-    bool allow_incremental_clone = false;
-    st = SnapshotManager::instance()->_create_snapshot_files(tablet, 
snapshot_req, &kSnapshotDir,
-                                                             
&allow_incremental_clone);
+    tablet->set_storage_policy(kResourceId);
+    st = tablet->cooldown(); // rowset [0-1]
+    ASSERT_EQ(Status::OK(), st);
+    st = tablet->cooldown(); // rowset [2-2]
     ASSERT_EQ(Status::OK(), st);
-    st = SnapshotManager::instance()->convert_rowset_ids(kTestDir, kTabletId, 
request.replica_id,
-                                                         KSchemaHash);
-    ASSERT_NE(Status::OK(), st);
+    ASSERT_EQ(DorisMetrics::instance()->upload_rowset_count->value(), 1);
+
     delete delta_writer;
+
+    auto fs = io::FileSystemMap::instance()->get(kResourceId);
+    auto rowset = tablet->get_rowset_by_version({2, 2});
+    ASSERT_TRUE(rowset);
+    auto seg_path = BetaRowset::remote_segment_path(10005, 
rowset->rowset_id(), 0);
+    bool exists = false;
+    st = fs->exists(seg_path, &exists);
+    ASSERT_EQ(Status::OK(), st);
+    ASSERT_TRUE(exists);
+
+    st = k_engine->tablet_manager()->drop_tablet(10005, 0, true);
+    ASSERT_EQ(Status::OK(), st);
+    tablet->data_dir()->perform_remote_tablet_gc();
+    st = fs->exists(seg_path, &exists);
+    ASSERT_EQ(Status::OK(), st);
+    ASSERT_FALSE(exists);
 }
 
 } // namespace doris
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index f20eccd376..eae932e6f0 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -25,7 +25,6 @@
 #include "io/fs/s3_file_system.h"
 #include "olap/delta_writer.h"
 #include "olap/storage_engine.h"
-#include "olap/storage_policy_mgr.h"
 #include "olap/tablet.h"
 #include "runtime/descriptor_helper.h"
 #include "runtime/tuple.h"
@@ -39,25 +38,18 @@ static StorageEngine* k_engine = nullptr;
 static const std::string kTestDir = "./ut_dir/tablet_cooldown_test";
 static const std::string kResourceId = "TabletCooldownTest";
 
-static const std::string AK = "ak";
-static const std::string SK = "sk";
-static const std::string ENDPOINT = "endpoint";
-static const std::string REGION = "region";
-static const std::string BUCKET = "bucket";
-static const std::string PREFIX = "tablet_cooldown_test";
-
 // remove DISABLED_ when need run this test
 #define TabletCooldownTest DISABLED_TabletCooldownTest
 class TabletCooldownTest : public testing::Test {
 public:
     static void SetUpTestSuite() {
         S3Conf s3_conf;
-        s3_conf.ak = AK;
-        s3_conf.sk = SK;
-        s3_conf.endpoint = ENDPOINT;
-        s3_conf.region = REGION;
-        s3_conf.bucket = BUCKET;
-        s3_conf.prefix = PREFIX;
+        s3_conf.ak = config::test_s3_ak;
+        s3_conf.sk = config::test_s3_sk;
+        s3_conf.endpoint = config::test_s3_endpoint;
+        s3_conf.region = config::test_s3_region;
+        s3_conf.bucket = config::test_s3_bucket;
+        s3_conf.prefix = "tablet_cooldown_test";
         auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), 
kResourceId);
         ASSERT_TRUE(s3_fs->connect().ok());
         io::FileSystemMap::instance()->insert(kResourceId, s3_fs);
@@ -167,7 +159,8 @@ TEST_F(TabletCooldownTest, normal) {
     DeltaWriter::open(&write_req, &delta_writer);
     ASSERT_NE(delta_writer, nullptr);
 
-    MemPool pool;
+    MemTracker tracker;
+    MemPool pool(&tracker);
     // Tuple 1
     {
         Tuple* tuple = 
reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size()));
@@ -208,7 +201,7 @@ TEST_F(TabletCooldownTest, normal) {
     }
     EXPECT_EQ(1, tablet->num_rows());
 
-    tablet->set_cooldown_resource(kResourceId);
+    tablet->set_storage_policy(kResourceId);
     st = tablet->cooldown(); // rowset [0-1]
     ASSERT_EQ(Status::OK(), st);
     st = tablet->cooldown(); // rowset [2-2]
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index a47c1ddc95..12838e9b22 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -117,7 +117,7 @@ TEST_F(TabletMgrTest, CreateTablet) {
     create_st = _tablet_mgr->create_tablet(create_tablet_req, data_dirs);
     EXPECT_TRUE(create_st == Status::OK());
 
-    Status drop_st = _tablet_mgr->drop_tablet(111, 
create_tablet_req.replica_id);
+    Status drop_st = _tablet_mgr->drop_tablet(111, 
create_tablet_req.replica_id, false);
     EXPECT_TRUE(drop_st == Status::OK());
     tablet.reset();
     Status trash_st = _tablet_mgr->start_trash_sweep();
@@ -172,7 +172,7 @@ TEST_F(TabletMgrTest, CreateTabletWithSequence) {
     Status check_meta_st = TabletMetaManager::get_meta(_data_dir, 111, 3333, 
new_tablet_meta);
     EXPECT_TRUE(check_meta_st == Status::OK());
 
-    Status drop_st = _tablet_mgr->drop_tablet(111, 
create_tablet_req.replica_id);
+    Status drop_st = _tablet_mgr->drop_tablet(111, 
create_tablet_req.replica_id, false);
     EXPECT_TRUE(drop_st == Status::OK());
     tablet.reset();
     Status trash_st = _tablet_mgr->start_trash_sweep();
@@ -206,13 +206,13 @@ TEST_F(TabletMgrTest, DropTablet) {
     EXPECT_TRUE(tablet != nullptr);
 
     // drop unexist tablet will be success
-    Status drop_st = _tablet_mgr->drop_tablet(1121, 
create_tablet_req.replica_id);
+    Status drop_st = _tablet_mgr->drop_tablet(1121, 
create_tablet_req.replica_id, false);
     EXPECT_TRUE(drop_st == Status::OK());
     tablet = _tablet_mgr->get_tablet(111);
     EXPECT_TRUE(tablet != nullptr);
 
     // drop exist tablet will be success
-    drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id);
+    drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id, 
false);
     EXPECT_TRUE(drop_st == Status::OK());
     tablet = _tablet_mgr->get_tablet(111);
     EXPECT_TRUE(tablet == nullptr);
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index 2ca5d4450d..077190c3c1 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -281,7 +281,7 @@ TEST_F(TestTablet, cooldown_policy) {
 
     TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr));
     _tablet->init();
-    _tablet->set_cooldown_resource("test_policy_name");
+    _tablet->set_storage_policy("test_policy_name");
 
     _tablet->_rs_version_map[ptr1->version()] = rowset1;
     _tablet->_rs_version_map[ptr2->version()] = rowset2;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
index 85136dd17a..47befaaccb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
@@ -283,10 +283,12 @@ public class BackendLoadStatistic {
         // Else if capacity used percent > 75%, set capacityCoefficient to 1.
         // Else, capacityCoefficient changed smoothly from 0.5 to 1 with used 
capacity increasing
         // Function: (2 * usedCapacityPercent - 0.5)
-        loadScore.capacityCoefficient = usedCapacityPercent < 0.5 ? 0.5
-                : (usedCapacityPercent > 
Config.capacity_used_percent_high_water ? 1.0
-                        : (2 * usedCapacityPercent - 0.5));
-        loadScore.replicaNumCoefficient = 1 - loadScore.capacityCoefficient;
+        if (!Config.be_rebalancer_fuzzy_test) {
+            loadScore.capacityCoefficient = usedCapacityPercent < 0.5 ? 0.5
+                    : (usedCapacityPercent > 
Config.capacity_used_percent_high_water ? 1.0
+                            : (2 * usedCapacityPercent - 0.5));
+            loadScore.replicaNumCoefficient = 1 - 
loadScore.capacityCoefficient;
+        }
         loadScore.score = capacityProportion * loadScore.capacityCoefficient
                 + replicaNumProportion * loadScore.replicaNumCoefficient;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 26f180ea28..db79c6cbdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1685,7 +1685,7 @@ public class Config extends ConfigBase {
      * It's used to test the reliability in single replica case when tablet 
scheduling are frequent.
      * Default is false.
      */
-    @ConfField(mutable = false, masterOnly = true)
+    @ConfField(mutable = true, masterOnly = true)
     public static boolean be_rebalancer_fuzzy_test = false;
 
     /**
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index c1ecfac30d..1bea2eb071 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -115,6 +115,13 @@ message RowsetMetaPB {
     optional SegmentsOverlapPB segments_overlap_pb = 51 [default = 
OVERLAP_UNKNOWN];
 }
 
+// unused remote rowsets garbage collection kv value
+message RemoteRowsetGcPB {
+    required string resource_id = 1;
+    required int64 tablet_id = 2;
+    required int64 num_segments = 3;
+}
+
 message AlphaRowsetExtraMetaPB {
     repeated SegmentGroupPB segment_groups = 1;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to