This is an automated email from the ASF dual-hosted git repository. pengxiangyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0ec218108d [enhancement](remote) support local cache GC at the granularity of cache files (#14920) 0ec218108d is described below commit 0ec218108d7000175f24f75a32d3740e841ff617 Author: luozenglin <37725793+luozeng...@users.noreply.github.com> AuthorDate: Fri Dec 9 17:35:23 2022 +0800 [enhancement](remote) support local cache GC at the granularity of cache files (#14920) * [enhancement](remote) support local cache GC at the granularity of cache files * update * update * update --- be/src/io/cache/dummy_file_cache.cpp | 59 +++++++++++------------ be/src/io/cache/dummy_file_cache.h | 21 ++++++-- be/src/io/cache/file_cache.cpp | 29 +++++++++++ be/src/io/cache/file_cache.h | 25 +++++++--- be/src/io/cache/file_cache_manager.cpp | 49 ++++++++++++++----- be/src/io/cache/file_cache_manager.h | 4 +- be/src/io/cache/sub_file_cache.cpp | 88 ++++++++++++++++++++-------------- be/src/io/cache/sub_file_cache.h | 21 +++++++- be/src/io/cache/whole_file_cache.cpp | 33 ++++--------- be/src/io/cache/whole_file_cache.h | 12 ++++- 10 files changed, 224 insertions(+), 117 deletions(-) diff --git a/be/src/io/cache/dummy_file_cache.cpp b/be/src/io/cache/dummy_file_cache.cpp index 7e94bb388e..166c8b55e4 100644 --- a/be/src/io/cache/dummy_file_cache.cpp +++ b/be/src/io/cache/dummy_file_cache.cpp @@ -28,21 +28,15 @@ namespace io { DummyFileCache::DummyFileCache(const Path& cache_dir, int64_t alive_time_sec) : _cache_dir(cache_dir), _alive_time_sec(alive_time_sec) {} -DummyFileCache::~DummyFileCache() {} - -void DummyFileCache::_update_last_mtime(const Path& done_file) { - Path cache_done_file = _cache_dir / done_file; - time_t m_time; - if (FileUtils::mtime(cache_done_file.native(), &m_time).ok() && m_time > _last_match_time) { - _last_match_time = m_time; - } -} +DummyFileCache::~DummyFileCache() = default; void DummyFileCache::_add_file_cache(const Path& data_file) { Path cache_file = _cache_dir / data_file; size_t file_size = 0; - if (io::global_local_filesystem()->file_size(cache_file, &file_size).ok()) { - _file_sizes[cache_file] = file_size; + time_t m_time = 0; + if (io::global_local_filesystem()->file_size(cache_file, &file_size).ok() && + FileUtils::mtime(cache_file.native(), &m_time).ok()) { + _gc_lru_queue.push({cache_file, m_time}); _cache_file_size += file_size; } else { _unfinished_files.push_back(cache_file); @@ -72,7 +66,6 @@ void DummyFileCache::_load() { Path cache_filename = StringReplace(iter->native(), CACHE_DONE_FILE_SUFFIX, "", true); if (cache_names.find(cache_filename) != cache_names.end()) { cache_names.erase(cache_filename); - _update_last_mtime(*iter); _add_file_cache(cache_filename); } else { // not data file, but with DONE file @@ -110,35 +103,39 @@ Status DummyFileCache::load_and_clean() { } Status DummyFileCache::clean_timeout_cache() { - if (time(nullptr) - _last_match_time > _alive_time_sec) { - return _clean_cache_internal(); + while (!_gc_lru_queue.empty() && + time(nullptr) - _gc_lru_queue.top().last_match_time > _alive_time_sec) { + size_t cleaned_size = 0; + RETURN_IF_ERROR(_clean_cache_internal(_gc_lru_queue.top().file, &cleaned_size)); + _cache_file_size -= cleaned_size; + _gc_lru_queue.pop(); } return Status::OK(); } Status DummyFileCache::clean_all_cache() { - return _clean_cache_internal(); + while (!_gc_lru_queue.empty()) { + RETURN_IF_ERROR(_clean_cache_internal(_gc_lru_queue.top().file, nullptr)); + _gc_lru_queue.pop(); + } + _cache_file_size = 0; + return Status::OK(); } -Status DummyFileCache::_clean_cache_internal() { - for (const auto& iter : _file_sizes) { - const auto cache_file_path = iter.first; - Path done_file_path = cache_file_path.native() + CACHE_DONE_FILE_SUFFIX; - LOG(INFO) << "Delete unused done_cache_path: " << done_file_path.native() - << ", cache_file_path: " << cache_file_path.native(); - if (!io::global_local_filesystem()->delete_file(done_file_path).ok()) { - LOG(ERROR) << "delete_file failed: " << done_file_path.native(); - continue; - } - if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) { - LOG(ERROR) << "delete_file failed: " << cache_file_path.native(); - continue; - } +Status DummyFileCache::clean_one_cache(size_t* cleaned_size) { + if (!_gc_lru_queue.empty()) { + const auto& cache = _gc_lru_queue.top(); + RETURN_IF_ERROR(_clean_cache_internal(cache.file, cleaned_size)); + _cache_file_size -= *cleaned_size; + _gc_lru_queue.pop(); } - _file_sizes.clear(); - _cache_file_size = 0; return Status::OK(); } +Status DummyFileCache::_clean_cache_internal(const Path& cache_file_path, size_t* cleaned_size) { + Path done_file_path = cache_file_path.native() + CACHE_DONE_FILE_SUFFIX; + return _remove_file(cache_file_path, done_file_path, cleaned_size); +} + } // namespace io } // namespace doris diff --git a/be/src/io/cache/dummy_file_cache.h b/be/src/io/cache/dummy_file_cache.h index e59d867a54..4d604cdb6a 100644 --- a/be/src/io/cache/dummy_file_cache.h +++ b/be/src/io/cache/dummy_file_cache.h @@ -19,6 +19,7 @@ #include <future> #include <memory> +#include <queue> #include "common/status.h" #include "io/cache/file_cache.h" @@ -55,22 +56,36 @@ public: Status clean_all_cache() override; + Status clean_one_cache(size_t* cleaned_size) override; + Status load_and_clean(); bool is_dummy_file_cache() override { return true; } + int64_t get_oldest_match_time() const override { + return _gc_lru_queue.empty() ? 0 : _gc_lru_queue.top().last_match_time; + }; + + bool is_gc_finish() const override { return _gc_lru_queue.empty(); } + private: Status _clean_unfinished_cache(); - void _update_last_mtime(const Path& done_file); void _add_file_cache(const Path& data_file); void _load(); - Status _clean_cache_internal(); + Status _clean_cache_internal(const Path&, size_t*); private: + struct DummyFileInfo { + Path file; + int64_t last_match_time; + }; + using DummyGcQueue = std::priority_queue<DummyFileInfo, std::vector<DummyFileInfo>, + SubFileLRUComparator<DummyFileInfo>>; + DummyGcQueue _gc_lru_queue; + Path _cache_dir; int64_t _alive_time_sec; - std::map<Path, int64_t> _file_sizes; std::list<Path> _unfinished_files; }; diff --git a/be/src/io/cache/file_cache.cpp b/be/src/io/cache/file_cache.cpp index 72f96c3776..b016ee2aa5 100644 --- a/be/src/io/cache/file_cache.cpp +++ b/be/src/io/cache/file_cache.cpp @@ -86,5 +86,34 @@ Status FileCache::download_cache_to_local(const Path& cache_file, const Path& ca return Status::OK(); } +Status FileCache::_remove_file(const Path& cache_file, const Path& cache_done_file, + size_t* cleaned_size) { + bool done_file_exist = false; + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->exists(cache_done_file, &done_file_exist), + "Check local done file exist failed."); + if (done_file_exist) { + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->delete_file(cache_done_file), + fmt::format("Delete local done file failed: {}", cache_done_file.native())); + } + bool cache_file_exist = false; + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->exists(cache_file, &cache_file_exist), + "Check local cache file exist failed."); + if (cache_file_exist) { + if (cleaned_size) { + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->file_size(cache_file, cleaned_size), + fmt::format("get local cache file size failed: {}", cache_file.native())); + } + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->delete_file(cache_file), + fmt::format("Delete local cache file failed: {}", cache_file.native())); + } + LOG(INFO) << "Delete local cache file successfully: " << cache_file.native(); + return Status::OK(); +} + } // namespace io } // namespace doris diff --git a/be/src/io/cache/file_cache.h b/be/src/io/cache/file_cache.h index 8e0cb0a679..0d7afb5754 100644 --- a/be/src/io/cache/file_cache.h +++ b/be/src/io/cache/file_cache.h @@ -18,7 +18,9 @@ #pragma once #include <memory> +#include <queue> #include <shared_mutex> +#include <string> #include "common/status.h" #include "io/fs/file_reader.h" @@ -31,8 +33,8 @@ const std::string CACHE_DONE_FILE_SUFFIX = "_DONE"; class FileCache : public FileReader { public: - FileCache() : _last_match_time(time(nullptr)), _cache_file_size(0) {} - virtual ~FileCache() = default; + FileCache() : _cache_file_size(0) {} + ~FileCache() override = default; DISALLOW_COPY_AND_ASSIGN(FileCache); @@ -46,17 +48,28 @@ public: virtual Status clean_all_cache() = 0; + virtual Status clean_one_cache(size_t* cleaned_size) = 0; + + virtual bool is_gc_finish() const = 0; + virtual bool is_dummy_file_cache() { return false; } Status download_cache_to_local(const Path& cache_file, const Path& cache_done_file, io::FileReaderSPtr remote_file_reader, size_t req_size, size_t offset = 0); - void update_last_match_time() { _last_match_time = time(nullptr); } - int64_t get_last_match_time() const { return _last_match_time; } + virtual int64_t get_oldest_match_time() const = 0; protected: - int64_t _last_match_time; + Status _remove_file(const Path& cache_file, const Path& cache_done_file, size_t* cleaned_size); + + template <typename T> + struct SubFileLRUComparator { + bool operator()(const T& lhs, const T& rhs) const { + return lhs.last_match_time > rhs.last_match_time; + }; + }; + size_t _cache_file_size; }; @@ -64,7 +77,7 @@ using FileCachePtr = std::shared_ptr<FileCache>; struct FileCacheLRUComparator { bool operator()(const FileCachePtr& lhs, const FileCachePtr& rhs) const { - return lhs->get_last_match_time() > rhs->get_last_match_time(); + return lhs->get_oldest_match_time() > rhs->get_oldest_match_time(); } }; diff --git a/be/src/io/cache/file_cache_manager.cpp b/be/src/io/cache/file_cache_manager.cpp index 840c39f454..6d34434190 100644 --- a/be/src/io/cache/file_cache_manager.cpp +++ b/be/src/io/cache/file_cache_manager.cpp @@ -44,13 +44,31 @@ bool GCContextPerDisk::try_add_file_cache(FileCachePtr cache, int64_t file_size) return false; } -void GCContextPerDisk::get_gc_file_caches(std::list<FileCachePtr>& result) { - while (!_lru_queue.empty() && _used_size > _conf_max_size) { +FileCachePtr GCContextPerDisk::top() { + if (!_lru_queue.empty() && _used_size > _conf_max_size) { + return _lru_queue.top(); + } + return nullptr; +} + +void GCContextPerDisk::pop() { + if (!_lru_queue.empty()) { + _lru_queue.pop(); + } +} + +Status GCContextPerDisk::gc_top() { + if (!_lru_queue.empty() && _used_size > _conf_max_size) { auto file_cache = _lru_queue.top(); - _used_size -= file_cache->cache_file_size(); - result.push_back(file_cache); + size_t cleaned_size = 0; + RETURN_IF_ERROR(file_cache->clean_one_cache(&cleaned_size)); + _used_size -= cleaned_size; _lru_queue.pop(); + if (!file_cache->is_gc_finish()) { + _lru_queue.push(file_cache); + } } + return Status::OK(); } void FileCacheManager::add_file_cache(const std::string& cache_path, FileCachePtr file_cache) { @@ -175,16 +193,21 @@ void FileCacheManager::gc_file_caches() { // policy2: GC file cache by disk size if (gc_conf_size > 0) { for (size_t i = 0; i < contexts.size(); ++i) { - std::list<FileCachePtr> gc_file_list; - contexts[i].get_gc_file_caches(gc_file_list); - for (auto item : gc_file_list) { - std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock); - // for dummy file cache, check already used or not again - if (item->is_dummy_file_cache() && - _file_cache_map.find(item->cache_dir().native()) != _file_cache_map.end()) { - continue; + auto& context = contexts[i]; + FileCachePtr file_cache; + while ((file_cache = context.top()) != nullptr) { + { + std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock); + // for dummy file cache, check already used or not again + if (file_cache->is_dummy_file_cache() && + _file_cache_map.find(file_cache->cache_dir().native()) != + _file_cache_map.end()) { + context.pop(); + continue; + } } - item->clean_all_cache(); + WARN_IF_ERROR(context.gc_top(), + fmt::format("gc {} error", file_cache->cache_dir().native())); } } } diff --git a/be/src/io/cache/file_cache_manager.h b/be/src/io/cache/file_cache_manager.h index c25d1cfb5c..9332b324fc 100644 --- a/be/src/io/cache/file_cache_manager.h +++ b/be/src/io/cache/file_cache_manager.h @@ -33,7 +33,9 @@ public: GCContextPerDisk() : _conf_max_size(0), _used_size(0) {} void init(const std::string& path, int64_t max_size); bool try_add_file_cache(FileCachePtr cache, int64_t file_size); - void get_gc_file_caches(std::list<FileCachePtr>&); + FileCachePtr top(); + Status gc_top(); + void pop(); private: std::string _disk_path; diff --git a/be/src/io/cache/sub_file_cache.cpp b/be/src/io/cache/sub_file_cache.cpp index 292f442ddd..34b509bc9e 100644 --- a/be/src/io/cache/sub_file_cache.cpp +++ b/be/src/io/cache/sub_file_cache.cpp @@ -17,6 +17,12 @@ #include "io/cache/sub_file_cache.h" +#include <glog/logging.h> + +#include <algorithm> +#include <utility> +#include <vector> + #include "common/config.h" #include "io/fs/local_file_system.h" #include "olap/iterators.h" @@ -115,14 +121,17 @@ Status SubFileCache::read_at(size_t offset, Slice result, const IOContext& io_ct _last_match_times[*iter] = time(nullptr); } } - update_last_match_time(); return Status::OK(); } +std::pair<Path, Path> SubFileCache::_cache_path(size_t offset) { + return {_cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset), + _cache_dir / + fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset, CACHE_DONE_FILE_SUFFIX)}; +} + Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) { - Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset); - Path cache_done_file = _cache_dir / fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset, - CACHE_DONE_FILE_SUFFIX); + auto [cache_file, cache_done_file] = _cache_path(offset); bool done_file_exist = false; RETURN_NOT_OK_STATUS_WITH_WARN( io::global_local_filesystem()->exists(cache_done_file, &done_file_exist), @@ -134,8 +143,9 @@ Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) { ThreadPoolToken* thread_token = ExecEnv::GetInstance()->get_serial_download_cache_thread_token(); if (thread_token != nullptr) { - auto st = thread_token->submit_func([this, &download_st, cache_done_file, cache_file, - offset, req_size] { + auto st = thread_token->submit_func([this, &download_st, + cache_done_file = cache_done_file, + cache_file = cache_file, offset, req_size] { auto func = [this, cache_done_file, cache_file, offset, req_size] { bool done_file_exist = false; // Judge again whether cache_done_file exists, it is possible that the cache @@ -180,7 +190,6 @@ Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) { RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &cache_reader)); _cache_file_readers.emplace(offset, cache_reader); _last_match_times.emplace(offset, time(nullptr)); - update_last_match_time(); LOG(INFO) << "Create cache file from remote file successfully: " << _remote_file_reader->path().native() << "(" << offset << ", " << req_size << ") -> " << cache_file.native(); @@ -199,6 +208,8 @@ Status SubFileCache::_get_need_cache_offsets(size_t offset, size_t req_size, } Status SubFileCache::clean_timeout_cache() { + SubGcQueue gc_queue; + _gc_lru_queue.swap(gc_queue); std::vector<size_t> timeout_keys; { std::shared_lock<std::shared_mutex> rlock(_cache_map_lock); @@ -206,6 +217,9 @@ Status SubFileCache::clean_timeout_cache() { iter != _last_match_times.cend(); ++iter) { if (time(nullptr) - iter->second > _alive_time_sec) { timeout_keys.emplace_back(iter->first); + } else { + auto [cache_file, done_file] = _cache_path(iter->first); + _gc_lru_queue.push({iter->first, iter->second}); } } } @@ -213,9 +227,10 @@ Status SubFileCache::clean_timeout_cache() { std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock); for (std::vector<size_t>::const_iterator iter = timeout_keys.cbegin(); iter != timeout_keys.cend(); ++iter) { - RETURN_IF_ERROR(_clean_cache_internal(*iter)); + size_t cleaned_size = 0; + RETURN_IF_ERROR(_clean_cache_internal(*iter, &cleaned_size)); + _cache_file_size -= cleaned_size; } - _cache_file_size = _calc_cache_file_size(); } return Status::OK(); } @@ -224,40 +239,41 @@ Status SubFileCache::clean_all_cache() { std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock); for (std::map<size_t, int64_t>::const_iterator iter = _last_match_times.cbegin(); iter != _last_match_times.cend(); ++iter) { - RETURN_IF_ERROR(_clean_cache_internal(iter->first)); + RETURN_IF_ERROR(_clean_cache_internal(iter->first, nullptr)); + } + _cache_file_size = 0; + return Status::OK(); +} + +Status SubFileCache::clean_one_cache(size_t* cleaned_size) { + if (!_gc_lru_queue.empty()) { + const auto& cache = _gc_lru_queue.top(); + { + std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock); + if (auto it = _last_match_times.find(cache.offset); + it != _last_match_times.end() && it->second == cache.last_match_time) { + RETURN_IF_ERROR(_clean_cache_internal(cache.offset, cleaned_size)); + _cache_file_size -= *cleaned_size; + _gc_lru_queue.pop(); + } + } + decltype(_last_match_times.begin()) it; + while (!_gc_lru_queue.empty() && + (it = _last_match_times.find(_gc_lru_queue.top().offset)) != + _last_match_times.end() && + it->second != _gc_lru_queue.top().last_match_time) { + _gc_lru_queue.pop(); + } } - _cache_file_size = _calc_cache_file_size(); return Status::OK(); } -Status SubFileCache::_clean_cache_internal(size_t offset) { +Status SubFileCache::_clean_cache_internal(size_t offset, size_t* cleaned_size) { if (_cache_file_readers.find(offset) != _cache_file_readers.end()) { _cache_file_readers.erase(offset); } - _cache_file_size = 0; - Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset); - Path done_file = _cache_dir / - fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset, CACHE_DONE_FILE_SUFFIX); - bool done_file_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(done_file, &done_file_exist), - "Check local done file exist failed."); - if (done_file_exist) { - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->delete_file(done_file), - fmt::format("Delete local done file failed: {}", done_file.native())); - } - bool cache_file_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(cache_file, &cache_file_exist), - "Check local cache file exist failed."); - if (cache_file_exist) { - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->delete_file(cache_file), - fmt::format("Delete local cache file failed: {}", cache_file.native())); - } - LOG(INFO) << "Delete local cache file successfully: " << cache_file.native(); - return Status::OK(); + auto [cache_file, done_file] = _cache_path(offset); + return _remove_file(cache_file, done_file, cleaned_size); } size_t SubFileCache::_calc_cache_file_size() { diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h index 0567a2c4bd..5e26134f11 100644 --- a/be/src/io/cache/sub_file_cache.h +++ b/be/src/io/cache/sub_file_cache.h @@ -52,17 +52,36 @@ public: Status clean_all_cache() override; + Status clean_one_cache(size_t* cleaned_size) override; + + int64_t get_oldest_match_time() const override { + return _gc_lru_queue.empty() ? 0 : _gc_lru_queue.top().last_match_time; + }; + + bool is_gc_finish() const override { return _gc_lru_queue.empty(); } + private: Status _generate_cache_reader(size_t offset, size_t req_size); - Status _clean_cache_internal(size_t offset); + Status _clean_cache_internal(size_t offset, size_t* cleaned_size); Status _get_need_cache_offsets(size_t offset, size_t req_size, std::vector<size_t>* cache_offsets); size_t _calc_cache_file_size(); + std::pair<Path, Path> _cache_path(size_t offset); + private: + struct SubFileInfo { + size_t offset; + int64_t last_match_time; + }; + using SubGcQueue = std::priority_queue<SubFileInfo, std::vector<SubFileInfo>, + SubFileLRUComparator<SubFileInfo>>; + // used by gc thread, and currently has no lock protection + SubGcQueue _gc_lru_queue; + Path _cache_dir; int64_t _alive_time_sec; io::FileReaderSPtr _remote_file_reader; diff --git a/be/src/io/cache/whole_file_cache.cpp b/be/src/io/cache/whole_file_cache.cpp index e5c5dc7541..613215c93c 100644 --- a/be/src/io/cache/whole_file_cache.cpp +++ b/be/src/io/cache/whole_file_cache.cpp @@ -51,7 +51,6 @@ Status WholeFileCache::read_at(size_t offset, Slice result, const IOContext& io_ << ", bytes read: " << bytes_read << " vs required size: " << result.size; return Status::OLAPInternalError(OLAP_ERR_OS_ERROR); } - update_last_match_time(); return Status::OK(); } @@ -135,43 +134,27 @@ Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) { Status WholeFileCache::clean_timeout_cache() { if (time(nullptr) - _last_match_time > _alive_time_sec) { - _clean_cache_internal(); + _clean_cache_internal(nullptr); } return Status::OK(); } Status WholeFileCache::clean_all_cache() { - _clean_cache_internal(); - return Status::OK(); + return _clean_cache_internal(nullptr); } -Status WholeFileCache::_clean_cache_internal() { +Status WholeFileCache::clean_one_cache(size_t* cleaned_size) { + return _clean_cache_internal(cleaned_size); +} + +Status WholeFileCache::_clean_cache_internal(size_t* cleaned_size) { std::unique_lock<std::shared_mutex> wrlock(_cache_lock); _cache_file_reader.reset(); _cache_file_size = 0; Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME; Path done_file = _cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME, CACHE_DONE_FILE_SUFFIX); - bool done_file_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(done_file, &done_file_exist), - "Check local done file exist failed."); - if (done_file_exist) { - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->delete_file(done_file), - fmt::format("Delete local done file failed: {}", done_file.native())); - } - bool cache_file_exist = false; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->exists(cache_file, &cache_file_exist), - "Check local cache file exist failed."); - if (cache_file_exist) { - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->delete_file(cache_file), - fmt::format("Delete local cache file failed: {}", cache_file.native())); - } - LOG(INFO) << "Delete local cache file successfully: " << cache_file.native(); - return Status::OK(); + return _remove_file(cache_file, done_file, cleaned_size); } } // namespace io diff --git a/be/src/io/cache/whole_file_cache.h b/be/src/io/cache/whole_file_cache.h index e4a7628ff6..b1e27791b0 100644 --- a/be/src/io/cache/whole_file_cache.h +++ b/be/src/io/cache/whole_file_cache.h @@ -52,16 +52,26 @@ public: Status clean_all_cache() override; + Status clean_one_cache(size_t* cleaned_size) override; + + int64_t get_oldest_match_time() const override { return _last_match_time; }; + + bool is_gc_finish() const override { return _cache_file_reader == nullptr; } + private: Status _generate_cache_reader(size_t offset, size_t req_size); - Status _clean_cache_internal(); + Status _clean_cache_internal(size_t* cleaned_size); + + void update_last_match_time() { _last_match_time = time(nullptr); } private: Path _cache_dir; int64_t _alive_time_sec; io::FileReaderSPtr _remote_file_reader; + int64_t _last_match_time; + std::shared_mutex _cache_lock; io::FileReaderSPtr _cache_file_reader; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org