This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a6537a90cd [Enhancement] Garbage collection of unused data on remote storage backend (#10731) a6537a90cd is described below commit a6537a90cd4fda01de4a51b7abdc23a06cc8a901 Author: plat1ko <platonekos...@gmail.com> AuthorDate: Fri Jul 29 14:38:39 2022 +0800 [Enhancement] Garbage collection of unused data on remote storage backend (#10731) * [Feature](cold_on_s3) support unused remote rowset gc * return aborted when skip drop tablet * perform unused remote rowset gc --- be/src/agent/task_worker_pool.cpp | 11 +-- be/src/common/config.h | 11 +++ be/src/common/status.h | 5 +- be/src/io/fs/local_file_reader.cpp | 11 +-- be/src/io/fs/local_file_system.cpp | 35 +++---- be/src/io/fs/local_file_writer.cpp | 21 ++-- be/src/io/fs/s3_file_reader.cpp | 13 ++- be/src/io/fs/s3_file_system.cpp | 66 +++++++++++-- be/src/io/fs/s3_file_system.h | 1 + be/src/olap/base_compaction.cpp | 20 ++++ be/src/olap/base_tablet.h | 7 +- be/src/olap/cumulative_compaction_policy.cpp | 7 +- be/src/olap/data_dir.cpp | 62 ++++++++++++ be/src/olap/data_dir.h | 4 + be/src/olap/olap_define.h | 2 + be/src/olap/rowset/beta_rowset.cpp | 6 ++ be/src/olap/rowset/beta_rowset.h | 4 + be/src/olap/snapshot_manager.cpp | 12 +++ be/src/olap/storage_engine.cpp | 8 +- be/src/olap/tablet.cpp | 110 ++++++++++++++------- be/src/olap/tablet.h | 17 +++- be/src/olap/tablet_manager.cpp | 18 +++- be/src/olap/tablet_manager.h | 12 +-- be/src/olap/tablet_meta.cpp | 8 +- be/src/olap/tablet_meta.h | 19 ++-- be/src/olap/task/engine_clone_task.cpp | 2 +- be/test/CMakeLists.txt | 1 + be/test/olap/delete_handler_test.cpp | 8 +- be/test/olap/delta_writer_test.cpp | 10 +- .../olap/engine_storage_migration_task_test.cpp | 2 +- ...et_clone_test.cpp => remote_rowset_gc_test.cpp} | 94 +++++++++--------- be/test/olap/tablet_cooldown_test.cpp | 25 ++--- be/test/olap/tablet_mgr_test.cpp | 8 +- be/test/olap/tablet_test.cpp | 2 +- .../apache/doris/clone/BackendLoadStatistic.java | 10 +- .../main/java/org/apache/doris/common/Config.java | 2 +- gensrc/proto/olap_file.proto | 7 ++ 37 files changed, 446 insertions(+), 215 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 40681ce17d..deb39a3a51 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -432,7 +432,8 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() { drop_tablet_req.tablet_id, false, &err); if (dropped_tablet != nullptr) { Status drop_status = StorageEngine::instance()->tablet_manager()->drop_tablet( - drop_tablet_req.tablet_id, drop_tablet_req.replica_id); + drop_tablet_req.tablet_id, drop_tablet_req.replica_id, + drop_tablet_req.is_drop_table_or_partition); if (!drop_status.ok()) { LOG(WARNING) << "drop table failed! signature: " << agent_task_req.signature; error_msgs.push_back("drop table failed!"); @@ -442,11 +443,6 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() { StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns( dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id, drop_tablet_req.schema_hash, dropped_tablet->tablet_uid()); - // We remove remote rowset directly. - // TODO(cyx): do remove in background - if (drop_tablet_req.is_drop_table_or_partition) { - dropped_tablet->remove_all_remote_rowsets(); - } } } else { status_code = TStatusCode::NOT_FOUND; @@ -881,8 +877,7 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { } else { LOG(INFO) << "set tablet cooldown resource " << tablet_meta_info.storage_policy; - tablet->tablet_meta()->set_cooldown_resource( - tablet_meta_info.storage_policy); + tablet->tablet_meta()->set_storage_policy(tablet_meta_info.storage_policy); } break; } diff --git a/be/src/common/config.h b/be/src/common/config.h index d1f43eaeba..06c8bebf2c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -789,6 +789,17 @@ CONF_Int32(s3_transfer_executor_pool_size, "2"); CONF_Bool(enable_time_lut, "true"); +#ifdef BE_TEST +// test s3 +CONF_String(test_s3_resource, "resource"); +CONF_String(test_s3_ak, "ak"); +CONF_String(test_s3_sk, "sk"); +CONF_String(test_s3_endpoint, "endpoint"); +CONF_String(test_s3_region, "region"); +CONF_String(test_s3_bucket, "bucket"); +CONF_String(test_s3_prefix, "prefix"); +#endif + } // namespace config } // namespace doris diff --git a/be/src/common/status.h b/be/src/common/status.h index 7604d0aa30..bf947cb812 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -283,10 +283,11 @@ public: template <typename... Args> static Status ErrorFmt(TStatusCode::type code, const std::string& fmt, Args&&... args) { // In some cases, fmt contains '{}' but there are no args. - if (sizeof...(args) == 0) { + if constexpr (sizeof...(args) == 0) { return Status(code, fmt); + } else { + return Status(code, fmt::format(fmt, std::forward<Args>(args)...)); } - return Status(code, fmt::format(fmt, std::forward<Args>(args)...)); } template <typename... Args> diff --git a/be/src/io/fs/local_file_reader.cpp b/be/src/io/fs/local_file_reader.cpp index 984306bf17..ae5bd44ab9 100644 --- a/be/src/io/fs/local_file_reader.cpp +++ b/be/src/io/fs/local_file_reader.cpp @@ -50,9 +50,8 @@ Status LocalFileReader::close() { Status LocalFileReader::read_at(size_t offset, Slice result, size_t* bytes_read) { DCHECK(!closed()); if (offset > _file_size) { - return Status::IOError( - fmt::format("offset exceeds file size(offset: {), file size: {}, path: {})", offset, - _file_size, _path.native())); + return Status::IOError("offset exceeds file size(offset: {), file size: {}, path: {})", + offset, _file_size, _path.native()); } size_t bytes_req = result.size; char* to = result.data; @@ -62,12 +61,10 @@ Status LocalFileReader::read_at(size_t offset, Slice result, size_t* bytes_read) while (bytes_req != 0) { auto res = ::pread(_fd, to, bytes_req, offset); if (UNLIKELY(-1 == res && errno != EINTR)) { - return Status::IOError( - fmt::format("cannot read from {}: {}", _path.native(), std::strerror(errno))); + return Status::IOError("cannot read from {}: {}", _path.native(), std::strerror(errno)); } if (UNLIKELY(res == 0)) { - return Status::IOError( - fmt::format("cannot read from {}: unexpected EOF", _path.native())); + return Status::IOError("cannot read from {}: unexpected EOF", _path.native()); } if (res > 0) { to += res; diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index 3ba889c94d..46098e060f 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -40,8 +40,7 @@ Status LocalFileSystem::create_file(const Path& path, FileWriterPtr* writer) { auto fs_path = absolute_path(path); int fd = ::open(fs_path.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC, 0666); if (-1 == fd) { - return Status::IOError( - fmt::format("cannot open {}: {}", fs_path.native(), std::strerror(errno))); + return Status::IOError("cannot open {}: {}", fs_path.native(), std::strerror(errno)); } *writer = std::make_unique<LocalFileWriter>(std::move(fs_path), fd); return Status::OK(); @@ -62,14 +61,16 @@ Status LocalFileSystem::open_file(const Path& path, FileReaderSPtr* reader) { Status LocalFileSystem::delete_file(const Path& path) { auto fs_path = absolute_path(path); + if (!std::filesystem::exists(fs_path)) { + return Status::OK(); + } if (!std::filesystem::is_regular_file(fs_path)) { - return Status::IOError(fmt::format("{} is not a file", fs_path.native())); + return Status::IOError("{} is not a file", fs_path.native()); } std::error_code ec; std::filesystem::remove(fs_path, ec); if (ec) { - return Status::IOError( - fmt::format("cannot delete {}: {}", fs_path.native(), std::strerror(ec.value()))); + return Status::IOError("cannot delete {}: {}", fs_path.native(), std::strerror(ec.value())); } return Status::OK(); } @@ -77,35 +78,36 @@ Status LocalFileSystem::delete_file(const Path& path) { Status LocalFileSystem::create_directory(const Path& path) { auto fs_path = absolute_path(path); if (std::filesystem::exists(fs_path)) { - return Status::IOError(fmt::format("{} exists", fs_path.native())); + return Status::IOError("{} exists", fs_path.native()); } std::error_code ec; std::filesystem::create_directories(fs_path, ec); if (ec) { - return Status::IOError( - fmt::format("cannot create {}: {}", fs_path.native(), std::strerror(ec.value()))); + return Status::IOError("cannot create {}: {}", fs_path.native(), std::strerror(ec.value())); } return Status::OK(); } Status LocalFileSystem::delete_directory(const Path& path) { auto fs_path = absolute_path(path); + if (!std::filesystem::exists(fs_path)) { + return Status::OK(); + } if (!std::filesystem::is_directory(fs_path)) { - return Status::IOError(fmt::format("{} is not a directory", fs_path.native())); + return Status::IOError("{} is not a directory", fs_path.native()); } std::error_code ec; std::filesystem::remove_all(fs_path, ec); if (ec) { - return Status::IOError( - fmt::format("cannot delete {}: {}", fs_path.native(), std::strerror(ec.value()))); + return Status::IOError("cannot delete {}: {}", fs_path.native(), std::strerror(ec.value())); } return Status::OK(); } Status LocalFileSystem::link_file(const Path& src, const Path& dest) { if (::link(src.c_str(), dest.c_str()) != 0) { - return Status::IOError(fmt::format("fail to create hard link: {}. from {} to {}", - std::strerror(errno), src.native(), dest.native())); + return Status::IOError("fail to create hard link: {}. from {} to {}", std::strerror(errno), + src.native(), dest.native()); } return Status::OK(); } @@ -121,8 +123,8 @@ Status LocalFileSystem::file_size(const Path& path, size_t* file_size) const { std::error_code ec; *file_size = std::filesystem::file_size(fs_path, ec); if (ec) { - return Status::IOError(fmt::format("cannot get file size {}: {}", fs_path.native(), - std::strerror(ec.value()))); + return Status::IOError("cannot get file size {}: {}", fs_path.native(), + std::strerror(ec.value())); } return Status::OK(); } @@ -135,8 +137,7 @@ Status LocalFileSystem::list(const Path& path, std::vector<Path>* files) { files->push_back(entry.path().filename()); } if (ec) { - return Status::IOError( - fmt::format("cannot list {}: {}", fs_path.native(), std::strerror(ec.value()))); + return Status::IOError("cannot list {}: {}", fs_path.native(), std::strerror(ec.value())); } return Status::OK(); } diff --git a/be/src/io/fs/local_file_writer.cpp b/be/src/io/fs/local_file_writer.cpp index 1965ed6be0..95294dbb88 100644 --- a/be/src/io/fs/local_file_writer.cpp +++ b/be/src/io/fs/local_file_writer.cpp @@ -37,12 +37,10 @@ Status sync_dir(const io::Path& dirname) { int fd; RETRY_ON_EINTR(fd, ::open(dirname.c_str(), O_DIRECTORY | O_RDONLY)); if (-1 == fd) { - return Status::IOError( - fmt::format("cannot open {}: {}", dirname.native(), std::strerror(errno))); + return Status::IOError("cannot open {}: {}", dirname.native(), std::strerror(errno)); } if (0 != ::fdatasync(fd)) { - return Status::IOError( - fmt::format("cannot fdatasync {}: {}", dirname.native(), std::strerror(errno))); + return Status::IOError("cannot fdatasync {}: {}", dirname.native(), std::strerror(errno)); } ::close(fd); return Status::OK(); @@ -102,8 +100,7 @@ Status LocalFileWriter::appendv(const Slice* data, size_t data_cnt) { ssize_t res; RETRY_ON_EINTR(res, ::writev(_fd, iov + completed_iov, iov_count)); if (UNLIKELY(res < 0)) { - return Status::IOError( - fmt::format("cannot write to {}: {}", _path.native(), std::strerror(errno))); + return Status::IOError("cannot write to {}: {}", _path.native(), std::strerror(errno)); } if (LIKELY(res == n_left)) { @@ -139,8 +136,7 @@ Status LocalFileWriter::finalize() { #if defined(__linux__) int flags = SYNC_FILE_RANGE_WRITE; if (sync_file_range(_fd, 0, 0, flags) < 0) { - return Status::IOError( - fmt::format("cannot sync {}: {}", _path.native(), std::strerror(errno))); + return Status::IOError("cannot sync {}: {}", _path.native(), std::strerror(errno)); } #endif } @@ -153,8 +149,7 @@ Status LocalFileWriter::_close(bool sync) { } if (sync && _dirty) { if (0 != ::fdatasync(_fd)) { - return Status::IOError( - fmt::format("cannot fdatasync {}: {}", _path.native(), std::strerror(errno))); + return Status::IOError("cannot fdatasync {}: {}", _path.native(), std::strerror(errno)); } RETURN_IF_ERROR(detail::sync_dir(_path.parent_path())); _dirty = false; @@ -166,8 +161,7 @@ Status LocalFileWriter::_close(bool sync) { DorisMetrics::instance()->local_bytes_written_total->increment(_bytes_appended); if (0 != ::close(_fd)) { - return Status::IOError( - fmt::format("cannot close {}: {}", _path.native(), std::strerror(errno))); + return Status::IOError("cannot close {}: {}", _path.native(), std::strerror(errno)); } return Status::OK(); } @@ -182,8 +176,7 @@ Status LocalFileWriter::write_at(size_t offset, const Slice& data) { while (bytes_req != 0) { auto res = ::pwrite(_fd, from, bytes_req, offset); if (-1 == res && errno != EINTR) { - return Status::IOError( - fmt::format("cannot write to {}: {}", _path.native(), std::strerror(errno))); + return Status::IOError("cannot write to {}: {}", _path.native(), std::strerror(errno)); } if (res > 0) { from += res; diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index a75da54bb0..2f16b03db4 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -52,9 +52,8 @@ Status S3FileReader::close() { Status S3FileReader::read_at(size_t offset, Slice result, size_t* bytes_read) { DCHECK(!closed()); if (offset > _file_size) { - return Status::IOError( - fmt::format("offset exceeds file size(offset: {), file size: {}, path: {})", offset, - _file_size, _path.native())); + return Status::IOError("offset exceeds file size(offset: {), file size: {}, path: {})", + offset, _file_size, _path.native()); } size_t bytes_req = result.size; char* to = result.data; @@ -75,13 +74,13 @@ Status S3FileReader::read_at(size_t offset, Slice result, size_t* bytes_read) { } auto outcome = client->GetObject(request); if (!outcome.IsSuccess()) { - return Status::IOError(fmt::format("failed to read from {}: {}", _path.native(), - outcome.GetError().GetMessage())); + return Status::IOError("failed to read from {}: {}", _path.native(), + outcome.GetError().GetMessage()); } *bytes_read = outcome.GetResult().GetContentLength(); if (*bytes_read != bytes_req) { - return Status::IOError(fmt::format("failed to read from {}(bytes read: {}, bytes req: {})", - _path.native(), *bytes_read, bytes_req)); + return Status::IOError("failed to read from {}(bytes read: {}, bytes req: {})", + _path.native(), *bytes_read, bytes_req); } DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read); return Status::OK(); diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index e421863774..65d6a27de7 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -20,7 +20,9 @@ #include <aws/core/utils/threading/Executor.h> #include <aws/s3/S3Client.h> #include <aws/s3/model/DeleteObjectRequest.h> +#include <aws/s3/model/DeleteObjectsRequest.h> #include <aws/s3/model/HeadObjectRequest.h> +#include <aws/s3/model/ListObjectsV2Request.h> #include <aws/s3/model/PutObjectRequest.h> #include <aws/transfer/TransferManager.h> @@ -153,12 +155,13 @@ Status S3FileSystem::delete_file(const Path& path) { request.WithBucket(_s3_conf.bucket).WithKey(key); auto outcome = client->DeleteObject(request); - if (!outcome.IsSuccess()) { - return Status::IOError("failed to delete object(endpoint={}, bucket={}, key={}): {}", - _s3_conf.endpoint, _s3_conf.bucket, key, - outcome.GetError().GetMessage()); + if (outcome.IsSuccess() || + outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { + return Status::OK(); } - return Status::OK(); + return Status::IOError("failed to delete object(endpoint={}, bucket={}, key={}): {}", + _s3_conf.endpoint, _s3_conf.bucket, key, + outcome.GetError().GetMessage()); } Status S3FileSystem::create_directory(const Path& path) { @@ -166,7 +169,54 @@ Status S3FileSystem::create_directory(const Path& path) { } Status S3FileSystem::delete_directory(const Path& path) { - return Status::NotSupported("not support"); + auto client = get_client(); + CHECK_S3_CLIENT(client); + + Aws::S3::Model::ListObjectsV2Request request; + auto prefix = get_key(path); + request.WithBucket(_s3_conf.bucket).WithPrefix(prefix); + + Aws::S3::Model::DeleteObjectsRequest delete_request; + delete_request.SetBucket(_s3_conf.bucket); + bool is_trucated = false; + do { + auto outcome = client->ListObjectsV2(request); + if (!outcome.IsSuccess()) { + return Status::IOError("failed to list objects(endpoint={}, bucket={}, prefix={}): {}", + _s3_conf.endpoint, _s3_conf.bucket, prefix, + outcome.GetError().GetMessage()); + } + const auto& result = outcome.GetResult(); + Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects; + objects.reserve(result.GetContents().size()); + for (const auto& obj : result.GetContents()) { + objects.emplace_back().SetKey(obj.GetKey()); + } + if (!objects.empty()) { + Aws::S3::Model::Delete del; + del.WithObjects(std::move(objects)).SetQuiet(true); + delete_request.SetDelete(std::move(del)); + auto delete_outcome = client->DeleteObjects(delete_request); + if (!delete_outcome.IsSuccess()) { + return Status::IOError( + "failed to delete objects(endpoint={}, bucket={}, prefix={}): {}", + _s3_conf.endpoint, _s3_conf.bucket, prefix, + delete_outcome.GetError().GetMessage()); + } + if (!delete_outcome.GetResult().GetErrors().empty()) { + const auto& e = delete_outcome.GetResult().GetErrors().front(); + return Status::IOError("fail to delete object(endpoint={}, bucket={}, key={}): {}", + _s3_conf.endpoint, _s3_conf.bucket, e.GetKey(), + e.GetMessage()); + } + VLOG_TRACE << "delete " << objects.size() + << " s3 objects, endpoint: " << _s3_conf.endpoint + << ", bucket: " << _s3_conf.bucket << ", prefix: " << _s3_conf.prefix; + } + is_trucated = result.GetIsTruncated(); + request.SetContinuationToken(result.GetNextContinuationToken()); + } while (is_trucated); + return Status::OK(); } Status S3FileSystem::link_file(const Path& src, const Path& dest) { @@ -181,7 +231,7 @@ Status S3FileSystem::exists(const Path& path, bool* res) const { auto key = get_key(path); request.WithBucket(_s3_conf.bucket).WithKey(key); - auto outcome = _client->HeadObject(request); + auto outcome = client->HeadObject(request); if (outcome.IsSuccess()) { *res = true; } else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { @@ -202,7 +252,7 @@ Status S3FileSystem::file_size(const Path& path, size_t* file_size) const { auto key = get_key(path); request.WithBucket(_s3_conf.bucket).WithKey(key); - auto outcome = _client->HeadObject(request); + auto outcome = client->HeadObject(request); if (outcome.IsSuccess()) { *file_size = outcome.GetResult().GetContentLength(); } else { diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index 219ecb3a0d..9eb393996f 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -46,6 +46,7 @@ public: Status create_directory(const Path& path) override; + // Delete all objects start with path. Status delete_directory(const Path& path) override; Status link_file(const Path& src, const Path& dest) override; diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 1284af6084..307fa2fee0 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -92,6 +92,26 @@ Status BaseCompaction::pick_rowsets_to_compact() { RETURN_NOT_OK(check_version_continuity(_input_rowsets)); RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets)); + // If there are delete predicate rowsets in tablet, start_version > 0 implies some rowsets before + // delete version cannot apply these delete predicates, which can cause incorrect query result. + // So we must abort this base compaction. + // A typical scenario is that some rowsets before cumulative point are on remote storage. + if (_input_rowsets.front()->start_version() > 0) { + bool has_delete_predicate = false; + for (const auto& rs : _input_rowsets) { + if (rs->rowset_meta()->has_delete_predicate()) { + has_delete_predicate = true; + break; + } + } + if (has_delete_predicate) { + LOG(WARNING) + << "Some rowsets cannot apply delete predicates in base compaction. tablet_id=" + << _tablet->tablet_id(); + return Status::OLAPInternalError(OLAP_ERR_BE_NO_SUITABLE_VERSION); + } + } + if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) { // the tablet is with rowset: [0-1], [2-y] // and [0-1] has no data. in this situation, no need to do base compaction. diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 84e336fc2e..26b44f9665 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -18,6 +18,7 @@ #pragma once #include <memory> +#include <string> #include "olap/olap_define.h" #include "olap/tablet_meta.h" @@ -58,11 +59,9 @@ public: int16_t shard_id() const; bool equal(int64_t tablet_id, int32_t schema_hash); - const io::ResourceId& cooldown_resource() const { return _tablet_meta->cooldown_resource(); } + const std::string& storage_policy() const { return _tablet_meta->storage_policy(); } - void set_cooldown_resource(io::ResourceId resource) { - _tablet_meta->set_cooldown_resource(std::move(resource)); - } + void set_storage_policy(const std::string& policy) { _tablet_meta->set_storage_policy(policy); } // properties encapsulated in TabletSchema virtual const TabletSchema& tablet_schema() const; diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index 05da215028..874ce60ddf 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -463,10 +463,9 @@ void NumBasedCumulativeCompactionPolicy::calculate_cumulative_point( void CumulativeCompactionPolicy::pick_candidate_rowsets( const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map, int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets) { - for (auto& it : rs_version_map) { - // find all rowset version greater than cumulative_point and skip the create time in skip_window_sec - if (it.first.first >= cumulative_point && it.second->is_local()) { - candidate_rowsets->push_back(it.second); + for (const auto& [version, rs] : rs_version_map) { + if (version.first >= cumulative_point && rs->is_local()) { + candidate_rowsets->push_back(rs); } } std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator); diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 0f7acc749f..baf51034e6 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -40,6 +40,7 @@ #include "io/fs/path.h" #include "olap/file_helper.h" #include "olap/olap_define.h" +#include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset_meta_manager.h" #include "olap/storage_engine.h" #include "olap/tablet_meta_manager.h" @@ -820,4 +821,65 @@ Status DataDir::move_to_trash(const std::string& tablet_path) { return Status::OK(); } +void DataDir::perform_remote_rowset_gc() { + std::vector<std::pair<std::string, std::string>> gc_kvs; + auto traverse_remote_rowset_func = [&gc_kvs](const std::string& key, + const std::string& value) -> bool { + gc_kvs.emplace_back(key, value); + return true; + }; + _meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_ROWSET_GC_PREFIX, traverse_remote_rowset_func); + std::vector<std::string> deleted_keys; + for (auto& [key, val] : gc_kvs) { + auto rowset_id = key.substr(REMOTE_ROWSET_GC_PREFIX.size()); + RemoteRowsetGcPB gc_pb; + gc_pb.ParseFromString(val); + auto fs = io::FileSystemMap::instance()->get(gc_pb.resource_id()); + if (!fs) { + LOG(WARNING) << "Cannot get file system: " << gc_pb.resource_id(); + continue; + } + DCHECK(fs->type() != io::FileSystemType::LOCAL); + Status st; + for (int i = 0; i < gc_pb.num_segments(); ++i) { + auto seg_path = BetaRowset::remote_segment_path(gc_pb.tablet_id(), rowset_id, i); + st = fs->delete_file(seg_path); + if (!st.ok()) { + LOG(WARNING) << st.to_string(); + break; + } + } + if (st.ok()) { + deleted_keys.push_back(std::move(key)); + } + } + for (const auto& key : deleted_keys) { + _meta->remove(META_COLUMN_FAMILY_INDEX, key); + } +} + +void DataDir::perform_remote_tablet_gc() { + std::vector<std::pair<std::string, std::string>> tablet_gc_kvs; + auto traverse_remote_tablet_func = [&tablet_gc_kvs](const std::string& key, + const std::string& value) -> bool { + tablet_gc_kvs.emplace_back(key, value); + return true; + }; + _meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_TABLET_GC_PREFIX, traverse_remote_tablet_func); + std::vector<std::string> deleted_keys; + for (auto& [key, resource] : tablet_gc_kvs) { + auto tablet_id = key.substr(REMOTE_TABLET_GC_PREFIX.size()); + auto fs = io::FileSystemMap::instance()->get(resource); + auto st = fs->delete_directory(DATA_PREFIX + "/" + tablet_id); + if (st.ok()) { + deleted_keys.push_back(std::move(key)); + } else { + LOG(WARNING) << st; + } + } + for (const auto& key : deleted_keys) { + _meta->remove(META_COLUMN_FAMILY_INDEX, key); + } +} + } // namespace doris diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 642058c46f..4cf40fd1bd 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -116,6 +116,10 @@ public: void perform_path_gc_by_tablet(); + void perform_remote_rowset_gc(); + + void perform_remote_tablet_gc(); + // check if the capacity reach the limit after adding the incoming data // return true if limit reached, otherwise, return false. // TODO(cmy): for now we can not precisely calculate the capacity Doris used, diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 077ed670f9..65f2de8c8e 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -127,6 +127,8 @@ const std::string TABLET_ID_KEY = "tablet_id"; const std::string ENABLE_BYTE_TO_BASE64 = "byte_to_base64"; const std::string TABLET_ID_PREFIX = "t_"; const std::string ROWSET_ID_PREFIX = "s_"; +const std::string REMOTE_ROWSET_GC_PREFIX = "gc_"; +const std::string REMOTE_TABLET_GC_PREFIX = "tgc_"; #if defined(__GNUC__) #define OLAP_LIKELY(x) __builtin_expect((x), 1) diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 1329917a82..1e7cbde9f4 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -46,6 +46,12 @@ std::string BetaRowset::local_segment_path(const std::string& tablet_path, return fmt::format("{}/{}_{}.dat", tablet_path, rowset_id.to_string(), segment_id); } +std::string BetaRowset::remote_segment_path(int64_t tablet_id, const std::string& rowset_id, + int segment_id) { + // data/{tablet_id}/{rowset_id}_{seg_num}.dat + return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, segment_id); +} + std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id, int segment_id) { // data/{tablet_id}/{rowset_id}_{seg_num}.dat diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index 9ccaa7b2b9..8d96bfe090 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -19,6 +19,7 @@ #define DORIS_SRC_OLAP_ROWSET_BETA_ROWSET_H_ #include <cstdint> +#include <string> #include "olap/olap_common.h" #include "olap/olap_define.h" @@ -49,6 +50,9 @@ public: static std::string remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id, int segment_id); + static std::string remote_segment_path(int64_t tablet_id, const std::string& rowset_id, + int segment_id); + Status split_range(const RowCursor& start_key, const RowCursor& end_key, uint64_t request_block_row_count, size_t key_num, std::vector<OlapTuple>* ranges) override; diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index 74cbb1aa68..390bd66206 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -28,12 +28,14 @@ #include <map> #include <set> +#include "common/status.h" #include "env/env.h" #include "gen_cpp/Types_constants.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_writer.h" #include "olap/storage_engine.h" +#include "olap/tablet_meta.h" #include "runtime/thread_context.h" using std::filesystem::path; @@ -360,6 +362,9 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet /// make the full snapshot of the tablet. { std::shared_lock rdlock(ref_tablet->get_header_lock()); + if (ref_tablet->tablet_state() == TABLET_SHUTDOWN) { + return Status::Aborted("tablet has shutdown"); + } if (request.__isset.missing_version) { for (int64_t missed_version : request.missing_version) { Version version = {missed_version, missed_version}; @@ -422,6 +427,13 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet CHECK(res.ok()) << res; ref_tablet->generate_tablet_meta_copy_unlocked(new_tablet_meta); } + { + std::unique_lock wlock(ref_tablet->get_header_lock()); + if (ref_tablet->tablet_state() == TABLET_SHUTDOWN) { + return Status::Aborted("tablet has shutdown"); + } + ref_tablet->update_self_owned_remote_rowsets(consistent_rowsets); + } std::vector<RowsetMetaSharedPtr> rs_metas; for (auto& rs : consistent_rowsets) { diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 9a7c414971..4e28ee7d8b 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -329,7 +329,7 @@ std::vector<DataDir*> StorageEngine::get_stores() { stores.reserve(_store_map.size()); std::lock_guard<std::mutex> l(_store_lock); - if (include_unused) { + if constexpr (include_unused) { for (auto& it : _store_map) { stores.push_back(it.second); } @@ -720,6 +720,12 @@ Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) { // clean unused rowset metas in OlapMeta _clean_unused_rowset_metas(); + // clean unused rowsets in remote storage backends + for (auto data_dir : get_stores()) { + data_dir->perform_remote_rowset_gc(); + data_dir->perform_remote_tablet_gc(); + } + return res; } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index c60f3c4d07..290753de9e 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -18,6 +18,7 @@ #include "olap/tablet.h" #include <ctype.h> +#include <fmt/core.h> #include <glog/logging.h> #include <pthread.h> #include <rapidjson/prettywriter.h> @@ -1145,10 +1146,8 @@ void Tablet::pick_candidate_rowsets_to_cumulative_compaction( void Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>* candidate_rowsets) { std::shared_lock rdlock(_meta_lock); - // FIXME(cyx): If there are delete predicate rowsets in tablet, - // remote rowsets cannot apply these delete predicate, which can cause - // incorrect query result. for (auto& it : _rs_version_map) { + // Do compaction on local rowsets only. if (it.first.first < _cumulative_point && it.second->is_local()) { candidate_rowsets->push_back(it.second); } @@ -1702,7 +1701,7 @@ Status Tablet::cooldown() { LOG(WARNING) << "Failed to own cumu_compaction_lock. tablet=" << tablet_id(); return Status::OLAPInternalError(OLAP_ERR_BE_TRY_BE_LOCK_ERROR); } - auto dest_fs = io::FileSystemMap::instance()->get(cooldown_resource()); + auto dest_fs = io::FileSystemMap::instance()->get(storage_policy()); if (!dest_fs) { return Status::OLAPInternalError(OLAP_ERR_NOT_INITED); } @@ -1716,8 +1715,13 @@ Status Tablet::cooldown() { auto start = std::chrono::steady_clock::now(); - RETURN_IF_ERROR(old_rowset->upload_to(reinterpret_cast<io::RemoteFileSystem*>(dest_fs.get()), - new_rowset_id)); + auto st = old_rowset->upload_to(reinterpret_cast<io::RemoteFileSystem*>(dest_fs.get()), + new_rowset_id); + if (!st.ok()) { + record_unused_remote_rowset(new_rowset_id, dest_fs->resource_id(), + old_rowset->num_segments()); + return st; + } auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start); LOG(INFO) << "Upload rowset " << old_rowset->version() << " " << new_rowset_id.to_string() @@ -1732,14 +1736,30 @@ Status Tablet::cooldown() { new_rowset_meta->set_fs(dest_fs); new_rowset_meta->set_creation_time(time(nullptr)); RowsetSharedPtr new_rowset; - RowsetFactory::create_rowset(&_schema, _tablet_path, std::move(new_rowset_meta), &new_rowset); + RowsetFactory::create_rowset(&_schema, _tablet_path, new_rowset_meta, &new_rowset); std::vector to_add {std::move(new_rowset)}; std::vector to_delete {std::move(old_rowset)}; - std::unique_lock meta_wlock(_meta_lock); - modify_rowsets(to_add, to_delete); - save_meta(); + bool has_shutdown = false; + { + std::unique_lock meta_wlock(_meta_lock); + has_shutdown = tablet_state() == TABLET_SHUTDOWN; + if (!has_shutdown) { + modify_rowsets(to_add, to_delete); + if (new_rowset_meta->has_delete_predicate()) { + add_delete_predicate(new_rowset_meta->delete_predicate(), + new_rowset_meta->start_version()); + } + _self_owned_remote_rowsets.insert(to_add.front()); + save_meta(); + } + } + if (has_shutdown) { + record_unused_remote_rowset(new_rowset_id, dest_fs->resource_id(), + to_add.front()->num_segments()); + return Status::Aborted("tablet {} has shutdown", tablet_id()); + } return Status::OK(); } @@ -1763,13 +1783,13 @@ RowsetSharedPtr Tablet::pick_cooldown_rowset() { bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) { // std::shared_lock meta_rlock(_meta_lock); - if (cooldown_resource().empty()) { + if (storage_policy().empty()) { VLOG_DEBUG << "tablet does not need cooldown, tablet id: " << tablet_id(); return false; } - auto policy = ExecEnv::GetInstance()->storage_policy_mgr()->get(cooldown_resource()); + auto policy = ExecEnv::GetInstance()->storage_policy_mgr()->get(storage_policy()); if (!policy) { - LOG(WARNING) << "Cannot get storage policy: " << cooldown_resource(); + LOG(WARNING) << "Cannot get storage policy: " << storage_policy(); return false; } auto cooldown_ttl_sec = policy->cooldown_ttl; @@ -1817,28 +1837,26 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) { return false; } -void Tablet::remove_all_remote_rowsets() { - std::unique_lock meta_wlock(_meta_lock); - DCHECK(_state == TabletState::TABLET_SHUTDOWN); - Status st; - for (auto& it : _rs_version_map) { - auto& rs = it.second; - if (!rs->is_local()) { - st = rs->remove(); - LOG_IF(WARNING, !st.ok()) << "Failed to remove rowset " << rs->version() << " " - << rs->rowset_id().to_string() << " in tablet " << tablet_id() - << ": " << st.to_string(); - } - } - for (auto& it : _stale_rs_version_map) { - auto& rs = it.second; - if (!rs->is_local()) { - st = rs->remove(); - LOG_IF(WARNING, !st.ok()) << "Failed to remove rowset " << rs->version() << " " - << rs->rowset_id().to_string() << " in tablet " << tablet_id() - << ": " << st.to_string(); - } +void Tablet::record_unused_remote_rowset(const RowsetId& rowset_id, const io::ResourceId& resource, + int64_t num_segments) { + auto gc_key = REMOTE_ROWSET_GC_PREFIX + rowset_id.to_string(); + RemoteRowsetGcPB gc_pb; + gc_pb.set_resource_id(resource); + gc_pb.set_tablet_id(tablet_id()); + gc_pb.set_num_segments(num_segments); + WARN_IF_ERROR( + _data_dir->get_meta()->put(META_COLUMN_FAMILY_INDEX, gc_key, gc_pb.SerializeAsString()), + fmt::format("Failed to record unused remote rowset(tablet id: {}, rowset id: {})", + tablet_id(), rowset_id.to_string())); +} + +Status Tablet::remove_all_remote_rowsets() { + DCHECK(_state == TABLET_SHUTDOWN); + if (storage_policy().empty()) { + return Status::OK(); } + auto tablet_gc_key = REMOTE_TABLET_GC_PREFIX + std::to_string(tablet_id()); + return _data_dir->get_meta()->put(META_COLUMN_FAMILY_INDEX, tablet_gc_key, storage_policy()); } const TabletSchema& Tablet::tablet_schema() const { @@ -1887,4 +1905,28 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, RowLocation* row_locatio return Status::NotFound("can't find key in all rowsets"); } +void Tablet::remove_self_owned_remote_rowsets() { + DCHECK(_state == TABLET_SHUTDOWN); + for (const auto& rs : _self_owned_remote_rowsets) { + DCHECK(!rs->is_local()); + record_unused_remote_rowset(rs->rowset_id(), rs->rowset_meta()->resource_id(), + rs->num_segments()); + } +} + +void Tablet::update_self_owned_remote_rowsets( + const std::vector<RowsetSharedPtr>& rowsets_in_snapshot) { + if (_self_owned_remote_rowsets.empty()) { + return; + } + for (const auto& rs : rowsets_in_snapshot) { + if (!rs->is_local()) { + auto it = _self_owned_remote_rowsets.find(rs); + if (it != _self_owned_remote_rowsets.end()) { + _self_owned_remote_rowsets.erase(it); + } + } + } +} + } // namespace doris diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 08e5fecd15..accc157d6a 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -22,6 +22,7 @@ #include <set> #include <string> #include <unordered_map> +#include <unordered_set> #include <vector> #include "gen_cpp/AgentService_types.h" @@ -30,6 +31,7 @@ #include "olap/base_tablet.h" #include "olap/cumulative_compaction_policy.h" #include "olap/data_dir.h" +#include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_reader.h" @@ -298,14 +300,22 @@ public: bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size); - // Physically remove remote rowsets. - void remove_all_remote_rowsets(); + Status remove_all_remote_rowsets(); // Lookup the row location of `encoded_key`, the function sets `row_location` on success. // NOTE: the method only works in unique key model with primary key index, you will got a // not supported error in other data model. Status lookup_row_key(const Slice& encoded_key, RowLocation* row_location, uint32_t version); + void remove_self_owned_remote_rowsets(); + + // Erase entries in `_self_owned_remote_rowsets` iff they are in `rowsets_in_snapshot`. + // REQUIRES: held _meta_lock + void update_self_owned_remote_rowsets(const std::vector<RowsetSharedPtr>& rowsets_in_snapshot); + + void record_unused_remote_rowset(const RowsetId& rowset_id, const io::ResourceId& resource, + int64_t num_segments); + private: Status _init_once_action(); void _print_missed_versions(const std::vector<Version>& missed_versions) const; @@ -397,6 +407,9 @@ private: int64_t _last_missed_version; int64_t _last_missed_time_s; + // Remote rowsets not shared by other BE. We can delete them when drop tablet. + std::unordered_set<RowsetSharedPtr> _self_owned_remote_rowsets; // guarded by _meta_lock + DISALLOW_COPY_AND_ASSIGN(Tablet); public: diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 3ab3c3a99d..f5c2ebdbcf 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -194,7 +194,7 @@ Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, // If the new tablet is fresher than the existing one, then replace // the existing tablet with the new one. // Use default replica_id to ignore whether replica_id is match when drop tablet. - RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, /* replica_id */ 0, keep_files), + RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, /* replica_id */ 0, keep_files, false), strings::Substitute("failed to drop old tablet when add new tablet. " "tablet_id=$0", tablet_id)); @@ -356,7 +356,7 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( } // something is wrong, we need clear environment if (is_tablet_added) { - Status status = _drop_tablet_unlocked(new_tablet_id, request.replica_id, false); + Status status = _drop_tablet_unlocked(new_tablet_id, request.replica_id, false, false); if (!status.ok()) { LOG(WARNING) << "fail to drop tablet when create tablet failed. res=" << res; } @@ -425,7 +425,8 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( return nullptr; } -Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool keep_files) { +Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, + bool is_drop_table_or_partition) { auto& shard = _get_tablets_shard(tablet_id); std::lock_guard wrlock(shard.lock); if (shard.tablets_under_clone.count(tablet_id) > 0) { @@ -433,12 +434,12 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bo return Status::Aborted("aborted"); } SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - return _drop_tablet_unlocked(tablet_id, replica_id, keep_files); + return _drop_tablet_unlocked(tablet_id, replica_id, false, is_drop_table_or_partition); } // Drop specified tablet. Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id, - bool keep_files) { + bool keep_files, bool is_drop_table_or_partition) { LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", replica_id=" << replica_id; DorisMetrics::instance()->drop_tablet_requests_total->increment(1); @@ -472,6 +473,13 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl // and the tablet will be loaded at restart time. // To avoid this exception, we first set the state of the tablet to `SHUTDOWN`. to_drop_tablet->set_tablet_state(TABLET_SHUTDOWN); + // We must record unused remote rowsets path info to OlapMeta before tablet state is marked as TABLET_SHUTDOWN in OlapMeta, + // otherwise if BE shutdown after saving tablet state, these remote rowsets path info will lost. + if (is_drop_table_or_partition) { + RETURN_IF_ERROR(to_drop_tablet->remove_all_remote_rowsets()); + } else { + to_drop_tablet->remove_self_owned_remote_rowsets(); + } to_drop_tablet->save_meta(); { std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 7b17b59ce3..159b8baefd 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -61,12 +61,9 @@ public: // task to be fail, even if there is enough space on other disks Status create_tablet(const TCreateTabletReq& request, std::vector<DataDir*> stores); - // Drop a tablet by description - // If set keep_files == true, files will NOT be deleted when deconstruction. - // Return OLAP_SUCCESS, if run ok - // OLAP_ERR_TABLE_DELETE_NOEXIST_ERROR, if tablet not exist - // Status::OLAPInternalError(OLAP_ERR_NOT_INITED), if not inited - Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool keep_files = false); + // Drop a tablet by description. + // If `is_drop_table_or_partition` is true, we need to remove all remote rowsets in this tablet. + Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool is_drop_table_or_partition); Status drop_tablets_on_error_root_path(const std::vector<TabletInfo>& tablet_info_vec); @@ -156,7 +153,8 @@ private: bool _check_tablet_id_exist_unlocked(TTabletId tablet_id); - Status _drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id, bool keep_files); + Status _drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id, bool keep_files, + bool is_drop_table_or_partition); TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id); TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id, bool include_deleted, diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 18dd73f8bc..bc88710854 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -206,7 +206,7 @@ TabletMeta::TabletMeta(const TabletMeta& b) _del_predicates(b._del_predicates), _in_restore_mode(b._in_restore_mode), _preferred_rowset_type(b._preferred_rowset_type), - _cooldown_resource(b._cooldown_resource), + _storage_policy(b._storage_policy), _delete_bitmap(b._delete_bitmap) {}; void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn, @@ -461,7 +461,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { _preferred_rowset_type = tablet_meta_pb.preferred_rowset_type(); } - _cooldown_resource = tablet_meta_pb.storage_policy(); + _storage_policy = tablet_meta_pb.storage_policy(); if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) { _enable_unique_key_merge_on_write = tablet_meta_pb.enable_unique_key_merge_on_write(); } @@ -528,7 +528,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { tablet_meta_pb->set_preferred_rowset_type(_preferred_rowset_type); } - tablet_meta_pb->set_storage_policy(_cooldown_resource); + tablet_meta_pb->set_storage_policy(_storage_policy); tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write); { @@ -753,7 +753,7 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) { } if (a._in_restore_mode != b._in_restore_mode) return false; if (a._preferred_rowset_type != b._preferred_rowset_type) return false; - if (a._cooldown_resource != b._cooldown_resource) return false; + if (a._storage_policy != b._storage_policy) return false; return true; } diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 7275d0430d..34a42ed510 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -26,7 +26,6 @@ #include "common/logging.h" #include "gen_cpp/olap_file.pb.h" -#include "io/fs/file_system.h" #include "olap/delete_handler.h" #include "olap/olap_common.h" #include "olap/olap_define.h" @@ -186,23 +185,24 @@ public: bool all_beta() const; - const io::ResourceId& cooldown_resource() const { + const std::string& storage_policy() const { std::shared_lock<std::shared_mutex> rlock(_meta_lock); - return _cooldown_resource; + return _storage_policy; } - void set_cooldown_resource(io::ResourceId resource) { + void set_storage_policy(const std::string& policy) { std::unique_lock<std::shared_mutex> wlock(_meta_lock); - VLOG_NOTICE << "set tablet_id : " << _table_id << " cooldown resource from " - << _cooldown_resource << " to " << resource; - _cooldown_resource = std::move(resource); + VLOG_NOTICE << "set tablet_id : " << _table_id << " storage policy from " << _storage_policy + << " to " << policy; + _storage_policy = policy; } + static void init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn, ColumnPB* column); DeleteBitmap& delete_bitmap() { return *_delete_bitmap; } - bool enable_unique_key_merge_on_write() { return _enable_unique_key_merge_on_write; } + bool enable_unique_key_merge_on_write() const { return _enable_unique_key_merge_on_write; } private: Status _save_meta(DataDir* data_dir); @@ -238,8 +238,7 @@ private: bool _in_restore_mode = false; RowsetTypePB _preferred_rowset_type = BETA_ROWSET; - // FIXME(cyx): Currently `cooldown_resource` is equivalent to `storage_policy`. - io::ResourceId _cooldown_resource; + std::string _storage_policy; // For unique key data model, the feature Merge-on-Write will leverage a primary // key index and a delete-bitmap to mark duplicate keys as deleted in load stage, diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 221be83850..73466cb72e 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -249,7 +249,7 @@ void EngineCloneTask::_set_tablet_info(Status status, bool is_new_tablet) { << ", signature:" << _signature << ", version:" << tablet_info.version << ", expected_version: " << _clone_req.committed_version; Status drop_status = StorageEngine::instance()->tablet_manager()->drop_tablet( - _clone_req.tablet_id, _clone_req.replica_id); + _clone_req.tablet_id, _clone_req.replica_id, false); if (drop_status != Status::OK() && drop_status.precise_code() != OLAP_ERR_TABLE_NOT_FOUND) { // just log diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index d05fe0a976..f503e3b4dc 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -197,6 +197,7 @@ set(OLAP_TEST_FILES # olap/push_handler_test.cpp olap/tablet_cooldown_test.cpp olap/rowid_conversion_test.cpp + olap/remote_rowset_gc_test.cpp ) set(RUNTIME_TEST_FILES diff --git a/be/test/olap/delete_handler_test.cpp b/be/test/olap/delete_handler_test.cpp index b383fbc3a7..7ba23ca2c1 100644 --- a/be/test/olap/delete_handler_test.cpp +++ b/be/test/olap/delete_handler_test.cpp @@ -278,7 +278,7 @@ protected: tablet.reset(); dup_tablet.reset(); StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id, - _create_tablet.replica_id); + _create_tablet.replica_id, false); EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok()); } @@ -443,8 +443,8 @@ protected: void TearDown() { // Remove all dir. tablet.reset(); - k_engine->tablet_manager()->drop_tablet(_create_tablet.tablet_id, - _create_tablet.replica_id); + k_engine->tablet_manager()->drop_tablet(_create_tablet.tablet_id, _create_tablet.replica_id, + false); EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok()); } @@ -820,7 +820,7 @@ protected: tablet.reset(); _delete_handler.finalize(); StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id, - _create_tablet.replica_id); + _create_tablet.replica_id, false); EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok()); } diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 3126b30f94..eae41cb320 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -406,7 +406,7 @@ TEST_F(TestDeltaWriter, open) { EXPECT_EQ(Status::OK(), res); SAFE_DELETE(delta_writer); - res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); EXPECT_EQ(Status::OK(), res); } @@ -527,7 +527,7 @@ TEST_F(TestDeltaWriter, write) { } EXPECT_EQ(1, tablet->num_rows()); - res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); EXPECT_EQ(Status::OK(), res); delete delta_writer; } @@ -673,7 +673,7 @@ TEST_F(TestDeltaWriter, vec_write) { } ASSERT_EQ(1, tablet->num_rows()); - res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); ASSERT_TRUE(res.ok()); delete delta_writer; } @@ -740,7 +740,7 @@ TEST_F(TestDeltaWriter, sequence_col) { } EXPECT_EQ(1, tablet->num_rows()); - res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); EXPECT_EQ(Status::OK(), res); delete delta_writer; } @@ -833,7 +833,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { } ASSERT_EQ(1, tablet->num_rows()); - res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); ASSERT_TRUE(res.ok()); delete delta_writer; } diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index b5b3b4d988..a36f0ff0c4 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -259,7 +259,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { EXPECT_NE(tablet3, tablet); // test case 2 end - res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); EXPECT_EQ(Status::OK(), res); delete delta_writer; } diff --git a/be/test/olap/tablet_clone_test.cpp b/be/test/olap/remote_rowset_gc_test.cpp similarity index 73% rename from be/test/olap/tablet_clone_test.cpp rename to be/test/olap/remote_rowset_gc_test.cpp index 51124f87cf..c02b15c1c4 100644 --- a/be/test/olap/tablet_clone_test.cpp +++ b/be/test/olap/remote_rowset_gc_test.cpp @@ -19,11 +19,12 @@ #include <memory> +#include "common/config.h" #include "common/status.h" #include "io/fs/file_system_map.h" #include "io/fs/s3_file_system.h" #include "olap/delta_writer.h" -#include "olap/snapshot_manager.h" +#include "olap/rowset/beta_rowset.h" #include "olap/storage_engine.h" #include "olap/tablet.h" #include "runtime/descriptor_helper.h" @@ -35,47 +36,39 @@ namespace doris { static StorageEngine* k_engine = nullptr; -static const std::string kTestDir = "./ut_dir/tablet_clone_test"; -static std::string kSnapshotDir = "./ut_dir/tablet_clone_test/snapshot"; -static const std::string kResourceId = "TabletCloneTest"; -static const int64_t kTabletId = 10005; -static const int32_t KSchemaHash = 270068377; - -static const std::string AK = "ak"; -static const std::string SK = "sk"; -static const std::string ENDPOINT = "endpoint"; -static const std::string REGION = "region"; -static const std::string BUCKET = "bucket"; -static const std::string PREFIX = "prefix"; +static const std::string kTestDir = "./ut_dir/remote_rowset_gc_test"; +static const std::string kResourceId = "RemoteRowsetGcTest"; // remove DISABLED_ when need run this test -#define TabletCloneTest DISABLED_TabletCloneTest -#define private public -class TabletCloneTest : public testing::Test { +#define RemoteRowsetGcTest DISABLED_RemoteRowsetGcTest +class RemoteRowsetGcTest : public testing::Test { public: static void SetUpTestSuite() { S3Conf s3_conf; - s3_conf.ak = AK; - s3_conf.sk = SK; - s3_conf.endpoint = ENDPOINT; - s3_conf.region = REGION; - s3_conf.bucket = BUCKET; - s3_conf.prefix = PREFIX; + s3_conf.ak = config::test_s3_ak; + s3_conf.sk = config::test_s3_sk; + s3_conf.endpoint = config::test_s3_endpoint; + s3_conf.region = config::test_s3_region; + s3_conf.bucket = config::test_s3_bucket; + s3_conf.prefix = "remote_rowset_gc_test"; auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), kResourceId); ASSERT_TRUE(s3_fs->connect().ok()); io::FileSystemMap::instance()->insert(kResourceId, s3_fs); - config::storage_root_path = kTestDir; + constexpr uint32_t MAX_PATH_LEN = 1024; + char buffer[MAX_PATH_LEN]; + EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); + config::storage_root_path = std::string(buffer) + "/" + kTestDir; config::min_file_descriptor_number = 1000; - FileUtils::remove_all(kTestDir); - FileUtils::create_dir(kTestDir); - std::vector<StorePath> paths {{kTestDir, -1}}; + FileUtils::remove_all(config::storage_root_path); + FileUtils::create_dir(config::storage_root_path); + + std::vector<StorePath> paths {{config::storage_root_path, -1}}; EngineOptions options; options.store_paths = paths; doris::StorageEngine::open(options, &k_engine); - k_engine->start_bg_threads(); } static void TearDownTestSuite() { @@ -145,9 +138,9 @@ static TDescriptorTable create_descriptor_tablet_with_sequence_col() { return desc_tbl_builder.desc_tbl(); } -TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) { +TEST_F(RemoteRowsetGcTest, normal) { TCreateTabletReq request; - create_tablet_request_with_sequence_col(kTabletId, KSchemaHash, &request); + create_tablet_request_with_sequence_col(10005, 270068377, &request); Status st = k_engine->create_tablet(request); ASSERT_EQ(Status::OK(), st); @@ -161,13 +154,14 @@ TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) { PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - WriteRequest write_req = {kTabletId, KSchemaHash, WriteType::LOAD, 20003, - 30003, load_id, tuple_desc, &(tuple_desc->slots())}; + WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, + 30003, load_id, tuple_desc, &(tuple_desc->slots())}; DeltaWriter* delta_writer = nullptr; DeltaWriter::open(&write_req, &delta_writer); ASSERT_NE(delta_writer, nullptr); - MemPool pool; + MemTracker tracker; + MemPool pool(&tracker); // Tuple 1 { Tuple* tuple = reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size())); @@ -199,27 +193,39 @@ TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) { write_req.txn_id, write_req.partition_id, &tablet_related_rs); for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; - rowset->rowset_meta()->set_resource_id(kResourceId); st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - tablet->tablet_id(), tablet->schema_hash(), - tablet->tablet_uid(), version); + write_req.tablet_id, write_req.schema_hash, + tablet_rs.first.tablet_uid, version); ASSERT_EQ(Status::OK(), st); st = tablet->add_inc_rowset(rowset); ASSERT_EQ(Status::OK(), st); } EXPECT_EQ(1, tablet->num_rows()); - TSnapshotRequest snapshot_req; - snapshot_req.tablet_id = kTabletId; - snapshot_req.schema_hash = KSchemaHash; - bool allow_incremental_clone = false; - st = SnapshotManager::instance()->_create_snapshot_files(tablet, snapshot_req, &kSnapshotDir, - &allow_incremental_clone); + tablet->set_storage_policy(kResourceId); + st = tablet->cooldown(); // rowset [0-1] + ASSERT_EQ(Status::OK(), st); + st = tablet->cooldown(); // rowset [2-2] ASSERT_EQ(Status::OK(), st); - st = SnapshotManager::instance()->convert_rowset_ids(kTestDir, kTabletId, request.replica_id, - KSchemaHash); - ASSERT_NE(Status::OK(), st); + ASSERT_EQ(DorisMetrics::instance()->upload_rowset_count->value(), 1); + delete delta_writer; + + auto fs = io::FileSystemMap::instance()->get(kResourceId); + auto rowset = tablet->get_rowset_by_version({2, 2}); + ASSERT_TRUE(rowset); + auto seg_path = BetaRowset::remote_segment_path(10005, rowset->rowset_id(), 0); + bool exists = false; + st = fs->exists(seg_path, &exists); + ASSERT_EQ(Status::OK(), st); + ASSERT_TRUE(exists); + + st = k_engine->tablet_manager()->drop_tablet(10005, 0, true); + ASSERT_EQ(Status::OK(), st); + tablet->data_dir()->perform_remote_tablet_gc(); + st = fs->exists(seg_path, &exists); + ASSERT_EQ(Status::OK(), st); + ASSERT_FALSE(exists); } } // namespace doris diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index f20eccd376..eae932e6f0 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -25,7 +25,6 @@ #include "io/fs/s3_file_system.h" #include "olap/delta_writer.h" #include "olap/storage_engine.h" -#include "olap/storage_policy_mgr.h" #include "olap/tablet.h" #include "runtime/descriptor_helper.h" #include "runtime/tuple.h" @@ -39,25 +38,18 @@ static StorageEngine* k_engine = nullptr; static const std::string kTestDir = "./ut_dir/tablet_cooldown_test"; static const std::string kResourceId = "TabletCooldownTest"; -static const std::string AK = "ak"; -static const std::string SK = "sk"; -static const std::string ENDPOINT = "endpoint"; -static const std::string REGION = "region"; -static const std::string BUCKET = "bucket"; -static const std::string PREFIX = "tablet_cooldown_test"; - // remove DISABLED_ when need run this test #define TabletCooldownTest DISABLED_TabletCooldownTest class TabletCooldownTest : public testing::Test { public: static void SetUpTestSuite() { S3Conf s3_conf; - s3_conf.ak = AK; - s3_conf.sk = SK; - s3_conf.endpoint = ENDPOINT; - s3_conf.region = REGION; - s3_conf.bucket = BUCKET; - s3_conf.prefix = PREFIX; + s3_conf.ak = config::test_s3_ak; + s3_conf.sk = config::test_s3_sk; + s3_conf.endpoint = config::test_s3_endpoint; + s3_conf.region = config::test_s3_region; + s3_conf.bucket = config::test_s3_bucket; + s3_conf.prefix = "tablet_cooldown_test"; auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), kResourceId); ASSERT_TRUE(s3_fs->connect().ok()); io::FileSystemMap::instance()->insert(kResourceId, s3_fs); @@ -167,7 +159,8 @@ TEST_F(TabletCooldownTest, normal) { DeltaWriter::open(&write_req, &delta_writer); ASSERT_NE(delta_writer, nullptr); - MemPool pool; + MemTracker tracker; + MemPool pool(&tracker); // Tuple 1 { Tuple* tuple = reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size())); @@ -208,7 +201,7 @@ TEST_F(TabletCooldownTest, normal) { } EXPECT_EQ(1, tablet->num_rows()); - tablet->set_cooldown_resource(kResourceId); + tablet->set_storage_policy(kResourceId); st = tablet->cooldown(); // rowset [0-1] ASSERT_EQ(Status::OK(), st); st = tablet->cooldown(); // rowset [2-2] diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp index a47c1ddc95..12838e9b22 100644 --- a/be/test/olap/tablet_mgr_test.cpp +++ b/be/test/olap/tablet_mgr_test.cpp @@ -117,7 +117,7 @@ TEST_F(TabletMgrTest, CreateTablet) { create_st = _tablet_mgr->create_tablet(create_tablet_req, data_dirs); EXPECT_TRUE(create_st == Status::OK()); - Status drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id); + Status drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id, false); EXPECT_TRUE(drop_st == Status::OK()); tablet.reset(); Status trash_st = _tablet_mgr->start_trash_sweep(); @@ -172,7 +172,7 @@ TEST_F(TabletMgrTest, CreateTabletWithSequence) { Status check_meta_st = TabletMetaManager::get_meta(_data_dir, 111, 3333, new_tablet_meta); EXPECT_TRUE(check_meta_st == Status::OK()); - Status drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id); + Status drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id, false); EXPECT_TRUE(drop_st == Status::OK()); tablet.reset(); Status trash_st = _tablet_mgr->start_trash_sweep(); @@ -206,13 +206,13 @@ TEST_F(TabletMgrTest, DropTablet) { EXPECT_TRUE(tablet != nullptr); // drop unexist tablet will be success - Status drop_st = _tablet_mgr->drop_tablet(1121, create_tablet_req.replica_id); + Status drop_st = _tablet_mgr->drop_tablet(1121, create_tablet_req.replica_id, false); EXPECT_TRUE(drop_st == Status::OK()); tablet = _tablet_mgr->get_tablet(111); EXPECT_TRUE(tablet != nullptr); // drop exist tablet will be success - drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id); + drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id, false); EXPECT_TRUE(drop_st == Status::OK()); tablet = _tablet_mgr->get_tablet(111); EXPECT_TRUE(tablet == nullptr); diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp index 2ca5d4450d..077190c3c1 100644 --- a/be/test/olap/tablet_test.cpp +++ b/be/test/olap/tablet_test.cpp @@ -281,7 +281,7 @@ TEST_F(TestTablet, cooldown_policy) { TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr)); _tablet->init(); - _tablet->set_cooldown_resource("test_policy_name"); + _tablet->set_storage_policy("test_policy_name"); _tablet->_rs_version_map[ptr1->version()] = rowset1; _tablet->_rs_version_map[ptr2->version()] = rowset2; diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java index 85136dd17a..47befaaccb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java @@ -283,10 +283,12 @@ public class BackendLoadStatistic { // Else if capacity used percent > 75%, set capacityCoefficient to 1. // Else, capacityCoefficient changed smoothly from 0.5 to 1 with used capacity increasing // Function: (2 * usedCapacityPercent - 0.5) - loadScore.capacityCoefficient = usedCapacityPercent < 0.5 ? 0.5 - : (usedCapacityPercent > Config.capacity_used_percent_high_water ? 1.0 - : (2 * usedCapacityPercent - 0.5)); - loadScore.replicaNumCoefficient = 1 - loadScore.capacityCoefficient; + if (!Config.be_rebalancer_fuzzy_test) { + loadScore.capacityCoefficient = usedCapacityPercent < 0.5 ? 0.5 + : (usedCapacityPercent > Config.capacity_used_percent_high_water ? 1.0 + : (2 * usedCapacityPercent - 0.5)); + loadScore.replicaNumCoefficient = 1 - loadScore.capacityCoefficient; + } loadScore.score = capacityProportion * loadScore.capacityCoefficient + replicaNumProportion * loadScore.replicaNumCoefficient; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 26f180ea28..db79c6cbdb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1685,7 +1685,7 @@ public class Config extends ConfigBase { * It's used to test the reliability in single replica case when tablet scheduling are frequent. * Default is false. */ - @ConfField(mutable = false, masterOnly = true) + @ConfField(mutable = true, masterOnly = true) public static boolean be_rebalancer_fuzzy_test = false; /** diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index c1ecfac30d..1bea2eb071 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -115,6 +115,13 @@ message RowsetMetaPB { optional SegmentsOverlapPB segments_overlap_pb = 51 [default = OVERLAP_UNKNOWN]; } +// unused remote rowsets garbage collection kv value +message RemoteRowsetGcPB { + required string resource_id = 1; + required int64 tablet_id = 2; + required int64 num_segments = 3; +} + message AlphaRowsetExtraMetaPB { repeated SegmentGroupPB segment_groups = 1; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org