This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e49027a5e66 [feature](cloud) Add CloudTabletMgr (#30089) e49027a5e66 is described below commit e49027a5e6657d27c066f87094984dd0d0462f61 Author: plat1ko <platonekos...@gmail.com> AuthorDate: Fri Jan 19 00:09:06 2024 +0800 [feature](cloud) Add CloudTabletMgr (#30089) --- be/src/cloud/cloud_storage_engine.h | 3 + be/src/cloud/cloud_tablet.cpp | 13 ++ be/src/cloud/cloud_tablet.h | 11 ++ be/src/cloud/cloud_tablet_mgr.cpp | 356 +++++++++++++++++++++++++++++++++++ be/src/cloud/cloud_tablet_mgr.h | 75 ++++++++ be/src/cloud/config.cpp | 9 + be/src/cloud/config.h | 12 ++ be/src/runtime/memory/cache_policy.h | 5 +- 8 files changed, 483 insertions(+), 1 deletion(-) diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 87e3ed52d39..eb76d5e94ef 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -24,6 +24,8 @@ namespace cloud { class CloudMetaMgr; } +class CloudTabletMgr; + class CloudStorageEngine { public: CloudStorageEngine(); @@ -34,6 +36,7 @@ public: private: std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr; + std::unique_ptr<CloudTabletMgr> _tablet_mgr; }; } // namespace doris diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 03670df78d1..6beb1e45d94 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -23,6 +23,8 @@ #include <rapidjson/rapidjson.h> #include <rapidjson/stringbuffer.h> +#include <atomic> + #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_storage_engine.h" #include "io/cache/block/block_file_cache_factory.h" @@ -365,6 +367,17 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer( Status::NotSupported("CloudTablet::create_rowset_writer is not implemented")); } +int64_t CloudTablet::get_cloud_base_compaction_score() const { + return _approximate_num_rowsets.load(std::memory_order_relaxed) - + _approximate_cumu_num_rowsets.load(std::memory_order_relaxed); +} + +int64_t CloudTablet::get_cloud_cumu_compaction_score() const { + // TODO(plat1ko): Propose an algorithm that considers tablet's key type, number of delete rowsets, + // number of tablet versions simultaneously. + return _approximate_cumu_num_deltas.load(std::memory_order_relaxed); +} + // return a json string to show the compaction status of this tablet void CloudTablet::get_compaction_status(std::string* json_result) { rapidjson::Document root; diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 537c8fe134d..bf8db3c9451 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -93,6 +93,17 @@ public: // Return number of deleted stale rowsets int delete_expired_stale_rowsets(); + bool has_stale_rowsets() const { return !_stale_rs_version_map.empty(); } + + int64_t get_cloud_base_compaction_score() const; + int64_t get_cloud_cumu_compaction_score() const; + + int64_t last_sync_time_s = 0; + int64_t last_load_time_ms = 0; + int64_t last_base_compaction_success_time_ms = 0; + int64_t last_cumu_compaction_success_time_ms = 0; + int64_t last_cumu_no_suitable_version_ms = 0; + private: Versions calc_missed_versions(int64_t spec_version); diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp new file mode 100644 index 00000000000..b3ad35257cc --- /dev/null +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -0,0 +1,356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_mgr.h" + +#include <bthread/countdown_event.h> + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/config.h" +#include "common/status.h" +#include "olap/lru_cache.h" +#include "runtime/memory/cache_policy.h" + +namespace doris { +namespace { + +// port from +// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go +template <typename Key, typename Val> +class SingleFlight { +public: + SingleFlight() = default; + + SingleFlight(const SingleFlight&) = delete; + void operator=(const SingleFlight&) = delete; + + using Loader = std::function<Val(const Key&)>; + + // Do executes and returns the results of the given function, making + // sure that only one execution is in-flight for a given key at a + // time. If a duplicate comes in, the duplicate caller waits for the + // original to complete and receives the same results. + Val load(const Key& key, Loader loader) { + std::unique_lock lock(_call_map_mtx); + + auto it = _call_map.find(key); + if (it != _call_map.end()) { + auto call = it->second; + lock.unlock(); + if (int ec = call->event.wait(); ec != 0) { + throw std::system_error(std::error_code(ec, std::system_category()), + "CountdownEvent wait failed"); + } + return call->val; + } + auto call = std::make_shared<Call>(); + _call_map.emplace(key, call); + lock.unlock(); + + call->val = loader(key); + call->event.signal(); + + lock.lock(); + _call_map.erase(key); + lock.unlock(); + + return call->val; + } + +private: + // `Call` is an in-flight or completed `load` call + struct Call { + bthread::CountdownEvent event; + Val val; + }; + + std::mutex _call_map_mtx; + std::unordered_map<Key, std::shared_ptr<Call>> _call_map; +}; + +SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>> s_singleflight_load_tablet; + +} // namespace + +// tablet_id -> cached tablet +// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle. +// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order. +// TODO(plat1ko): multi shard to increase concurrency +class CloudTabletMgr::TabletMap { +public: + void put(std::shared_ptr<CloudTablet> tablet) { + std::lock_guard lock(_mtx); + _map[tablet->tablet_id()] = std::move(tablet); + } + + void erase(CloudTablet* tablet) { + std::lock_guard lock(_mtx); + auto it = _map.find(tablet->tablet_id()); + // According to the implementation of `LRUCache`, `deleter` may be called after a tablet + // with same tablet id insert into cache and `TabletMap`. So we MUST check if the tablet + // instance to be erased is the same one in the map. + if (it != _map.end() && it->second.get() == tablet) { + _map.erase(it); + } + } + + std::shared_ptr<CloudTablet> get(int64_t tablet_id) { + std::lock_guard lock(_mtx); + if (auto it = _map.find(tablet_id); it != _map.end()) { + return it->second; + } + return nullptr; + } + + size_t size() { return _map.size(); } + + void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)> visitor) { + std::lock_guard lock(_mtx); + for (auto& [_, tablet] : _map) { + visitor(tablet); + } + } + +private: + std::mutex _mtx; + std::unordered_map<int64_t, std::shared_ptr<CloudTablet>> _map; +}; + +// TODO(plat1ko): Prune cache +CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) + : _engine(engine), + _tablet_map(std::make_unique<TabletMap>()), + _cache(std::make_unique<LRUCachePolicy>( + CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity, + LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {} + +CloudTabletMgr::~CloudTabletMgr() = default; + +Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, + bool warmup_data) { + // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` + struct Value { + // FIXME(plat1ko): The ownership of tablet seems to belong to 'TabletMap', while `Value` + // only requires a reference. + std::shared_ptr<CloudTablet> tablet; + TabletMap& tablet_map; + }; + + auto tablet_id_str = std::to_string(tablet_id); + CacheKey key(tablet_id_str); + auto* cache = _cache->cache(); + auto* handle = cache->lookup(key); + if (handle == nullptr) { + auto load_tablet = [this, cache, &key, + warmup_data](int64_t tablet_id) -> std::shared_ptr<CloudTablet> { + TabletMetaSharedPtr tablet_meta; + auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); + if (!st.ok()) { + LOG(WARNING) << "failed to tablet " << tablet_id << ": " << st; + return nullptr; + } + + auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta)); + auto value = std::make_unique<Value>(Value { + .tablet = tablet, + .tablet_map = *_tablet_map, + }); + // MUST sync stats to let compaction scheduler work correctly + st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), warmup_data); + if (!st.ok()) { + LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st; + return nullptr; + } + + auto deleter = [](const CacheKey& key, void* value) { + auto* value1 = reinterpret_cast<Value*>(value); + // tablet has been evicted, release it from `tablet_map` + value1->tablet_map.erase(value1->tablet.get()); + delete value1; + }; + + auto* handle = cache->insert(key, value.release(), 1, deleter); + auto ret = std::shared_ptr<CloudTablet>( + tablet.get(), [cache, handle](...) { cache->release(handle); }); + _tablet_map->put(std::move(tablet)); + return ret; + }; + + auto tablet = s_singleflight_load_tablet.load(tablet_id, std::move(load_tablet)); + if (tablet == nullptr) { + return ResultError(Status::InternalError("failed to get tablet {}", tablet_id)); + } + return tablet; + } + + CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(cache->value(handle))->tablet.get(); + auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, + [cache, handle](...) { cache->release(handle); }); + return tablet; +} + +void CloudTabletMgr::erase_tablet(int64_t tablet_id) { + auto tablet_id_str = std::to_string(tablet_id); + CacheKey key(tablet_id_str.data(), tablet_id_str.size()); + _cache->cache()->erase(key); +} + +void CloudTabletMgr::vacuum_stale_rowsets() { + LOG_INFO("begin to vacuum stale rowsets"); + std::vector<std::shared_ptr<CloudTablet>> tablets_to_vacuum; + tablets_to_vacuum.reserve(_tablet_map->size()); + _tablet_map->traverse([&tablets_to_vacuum](auto&& t) { + if (t->has_stale_rowsets()) { + tablets_to_vacuum.push_back(t); + } + }); + int num_vacuumed = 0; + for (auto& t : tablets_to_vacuum) { + num_vacuumed += t->delete_expired_stale_rowsets(); + } + LOG_INFO("finish vacuum stale rowsets").tag("num_vacuumed", num_vacuumed); +} + +std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() { + std::vector<std::weak_ptr<CloudTablet>> weak_tablets; + weak_tablets.reserve(_tablet_map->size()); + _tablet_map->traverse([&weak_tablets](auto& t) { weak_tablets.push_back(t); }); + return weak_tablets; +} + +void CloudTabletMgr::sync_tablets() { + LOG_INFO("begin to sync tablets"); + int64_t last_sync_time_bound = ::time(nullptr) - config::tablet_sync_interval_seconds; + + auto weak_tablets = get_weak_tablets(); + + // sort by last_sync_time + static auto cmp = [](const auto& a, const auto& b) { return a.first < b.first; }; + std::multiset<std::pair<int64_t, std::weak_ptr<CloudTablet>>, decltype(cmp)> + sync_time_tablet_set(cmp); + + for (auto& weak_tablet : weak_tablets) { + if (auto tablet = weak_tablet.lock()) { + if (tablet->tablet_state() != TABLET_RUNNING) { + continue; + } + int64_t last_sync_time = tablet->last_sync_time_s; + if (last_sync_time <= last_sync_time_bound) { + sync_time_tablet_set.emplace(last_sync_time, weak_tablet); + } + } + } + + int num_sync = 0; + for (auto&& [_, weak_tablet] : sync_time_tablet_set) { + if (auto tablet = weak_tablet.lock()) { + if (tablet->last_sync_time_s > last_sync_time_bound) { + continue; + } + + ++num_sync; + auto st = tablet->sync_meta(); + if (!st) { + LOG_WARNING("failed to sync tablet meta {}", tablet->tablet_id()).error(st); + if (st.is<ErrorCode::NOT_FOUND>()) { + continue; + } + } + + st = tablet->sync_rowsets(-1); + if (!st) { + LOG_WARNING("failed to sync tablet rowsets {}", tablet->tablet_id()).error(st); + } + } + } + LOG_INFO("finish sync tablets").tag("num_sync", num_sync); +} + +Status CloudTabletMgr::get_topn_tablets_to_compact( + int n, CompactionType compaction_type, const std::function<bool(CloudTablet*)>& filter_out, + std::vector<std::shared_ptr<CloudTablet>>* tablets, int64_t* max_score) { + DCHECK(compaction_type == CompactionType::BASE_COMPACTION || + compaction_type == CompactionType::CUMULATIVE_COMPACTION); + *max_score = 0; + int64_t max_score_tablet_id = 0; + // clang-format off + auto score = [compaction_type](CloudTablet* t) { + return compaction_type == CompactionType::BASE_COMPACTION ? t->get_cloud_base_compaction_score() + : compaction_type == CompactionType::CUMULATIVE_COMPACTION ? t->get_cloud_cumu_compaction_score() + : 0; + }; + + using namespace std::chrono; + auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + auto skip = [now, compaction_type](CloudTablet* t) { + if (compaction_type == CompactionType::BASE_COMPACTION) { + return now - t->last_base_compaction_success_time_ms < config::base_compaction_freeze_interval_s * 1000; + } + // If tablet has too many rowsets but not be compacted for a long time, compaction should be performed + // regardless of whether there is a load job recently. + return now - t->last_cumu_no_suitable_version_ms < config::min_compaction_failure_interval_ms || + (now - t->last_load_time_ms > config::cu_compaction_freeze_interval_s * 1000 + && now - t->last_cumu_compaction_success_time_ms < config::cumu_compaction_interval_s * 1000 + && t->fetch_add_approximate_num_rowsets(0) < config::max_tablet_version_num / 2); + }; + // We don't schedule tablets that are disabled for compaction + auto disable = [](CloudTablet* t) { return t->tablet_meta()->tablet_schema()->disable_auto_compaction(); }; + + auto [num_filtered, num_disabled, num_skipped] = std::make_tuple(0, 0, 0); + + auto weak_tablets = get_weak_tablets(); + std::vector<std::pair<std::shared_ptr<CloudTablet>, int64_t>> buf; + buf.reserve(n + 1); + for (auto& weak_tablet : weak_tablets) { + auto t = weak_tablet.lock(); + if (t == nullptr) { continue; } + + int64_t s = score(t.get()); + if (s > *max_score) { + max_score_tablet_id = t->tablet_id(); + *max_score = s; + } + + if (filter_out(t.get())) { ++num_filtered; continue; } + if (disable(t.get())) { ++num_disabled; continue; } + if (skip(t.get())) { ++num_skipped; continue; } + + buf.emplace_back(std::move(t), s); + std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return a.second > b.second; }); + if (buf.size() > n) { buf.pop_back(); } + } + + LOG_EVERY_N(INFO, 1000) << "get_topn_compaction_score, n=" << n << " type=" << compaction_type + << " num_tablets=" << weak_tablets.size() << " num_skipped=" << num_skipped + << " num_disabled=" << num_disabled << " num_filtered=" << num_filtered + << " max_score=" << *max_score << " max_score_tablet=" << max_score_tablet_id + << " tablets=[" << [&buf] { std::stringstream ss; for (auto& i : buf) ss << i.first->tablet_id() << ":" << i.second << ","; return ss.str(); }() << "]" + ; + // clang-format on + + tablets->clear(); + tablets->reserve(n + 1); + for (auto& [t, _] : buf) { + tablets->emplace_back(std::move(t)); + } + + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h new file mode 100644 index 00000000000..6e8ae2c17b0 --- /dev/null +++ b/be/src/cloud/cloud_tablet_mgr.h @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <functional> +#include <memory> +#include <vector> + +#include "common/status.h" +#include "olap/olap_common.h" + +namespace doris { + +class CloudTablet; +class CloudStorageEngine; +class LRUCachePolicy; + +class CloudTabletMgr { +public: + CloudTabletMgr(CloudStorageEngine& engine); + ~CloudTabletMgr(); + + // If the tablet is in cache, return this tablet directly; otherwise will get tablet meta first, + // sync rowsets after, and download segment data in background if `warmup_data` is true. + Result<std::shared_ptr<CloudTablet>> get_tablet(int64_t tablet_id, bool warmup_data = false); + + void erase_tablet(int64_t tablet_id); + + void vacuum_stale_rowsets(); + + // Return weak ptr of all cached tablets. + // We return weak ptr to avoid extend lifetime of tablets that are no longer cached. + std::vector<std::weak_ptr<CloudTablet>> get_weak_tablets(); + + void sync_tablets(); + + /** + * Gets top N tablets that are considered to be compacted first + * + * @param n max number of tablets to get, all of them are comapction enabled + * @param filter_out a filter takes a tablet and return bool to check + * whether skipping the tablet, true for skip + * @param tablets output param + * @param max_score output param, max score of existed tablets + * @return status of this call + */ + Status get_topn_tablets_to_compact(int n, CompactionType compaction_type, + const std::function<bool(CloudTablet*)>& filter_out, + std::vector<std::shared_ptr<CloudTablet>>* tablets, + int64_t* max_score); + +private: + CloudStorageEngine& _engine; + + class TabletMap; + std::unique_ptr<TabletMap> _tablet_map; + std::unique_ptr<LRUCachePolicy> _cache; +}; + +} // namespace doris diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index d7513574037..b222ec9d517 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -30,4 +30,13 @@ DEFINE_mInt32(meta_service_idle_connection_timeout_ms, "0"); DEFINE_mInt32(meta_service_rpc_retry_times, "200"); DEFINE_mInt32(meta_service_brpc_timeout_ms, "10000"); +DEFINE_Int64(tablet_cache_capacity, "100000"); +DEFINE_Int64(tablet_cache_shards, "16"); +DEFINE_mInt64(tablet_sync_interval_seconds, "1800"); + +DEFINE_mInt64(min_compaction_failure_interval_ms, "5000"); +DEFINE_mInt64(base_compaction_freeze_interval_s, "86400"); +DEFINE_mInt64(cu_compaction_freeze_interval_s, "1200"); +DEFINE_mInt64(cumu_compaction_interval_s, "1800"); + } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 0a2ceab3e5a..ccb7b834f9d 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -41,4 +41,16 @@ DECLARE_mInt32(meta_service_rpc_retry_times); // default brpc timeout DECLARE_mInt32(meta_service_brpc_timeout_ms); +// CloudTabletMgr config +DECLARE_Int64(tablet_cache_capacity); +DECLARE_Int64(tablet_cache_shards); +DECLARE_mInt64(tablet_sync_interval_seconds); + +// Cloud compaction config +DECLARE_mInt64(min_compaction_failure_interval_ms); +// For cloud read/write seperate mode +DECLARE_mInt64(base_compaction_freeze_interval_s); +DECLARE_mInt64(cu_compaction_freeze_interval_s); +DECLARE_mInt64(cumu_compaction_interval_s); + } // namespace doris::config diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h index 9a9f2c36e84..c19066b0be2 100644 --- a/be/src/runtime/memory/cache_policy.h +++ b/be/src/runtime/memory/cache_policy.h @@ -42,7 +42,8 @@ public: COMMON_OBJ_LRU_CACHE = 12, FOR_UT = 13, TABLET_SCHEMA_CACHE = 14, - CREATE_TABLET_RR_IDX_CACHE = 15 + CREATE_TABLET_RR_IDX_CACHE = 15, + CLOUD_TABLET_CACHE = 16, }; static std::string type_string(CacheType type) { @@ -79,6 +80,8 @@ public: return "TabletSchemaCache"; case CacheType::CREATE_TABLET_RR_IDX_CACHE: return "CreateTabletRRIdxCache"; + case CacheType::CLOUD_TABLET_CACHE: + return "CloudTabletCache"; default: LOG(FATAL) << "not match type of cache policy :" << static_cast<int>(type); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org