This is an automated email from the ASF dual-hosted git repository.

pengxiangyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ec218108d [enhancement](remote) support local cache GC at the 
granularity of cache files (#14920)
0ec218108d is described below

commit 0ec218108d7000175f24f75a32d3740e841ff617
Author: luozenglin <37725793+luozeng...@users.noreply.github.com>
AuthorDate: Fri Dec 9 17:35:23 2022 +0800

    [enhancement](remote) support local cache GC at the granularity of cache 
files (#14920)
    
    * [enhancement](remote) support local cache GC at the granularity of cache 
files
    
    * update
    
    * update
    
    * update
---
 be/src/io/cache/dummy_file_cache.cpp   | 59 +++++++++++------------
 be/src/io/cache/dummy_file_cache.h     | 21 ++++++--
 be/src/io/cache/file_cache.cpp         | 29 +++++++++++
 be/src/io/cache/file_cache.h           | 25 +++++++---
 be/src/io/cache/file_cache_manager.cpp | 49 ++++++++++++++-----
 be/src/io/cache/file_cache_manager.h   |  4 +-
 be/src/io/cache/sub_file_cache.cpp     | 88 ++++++++++++++++++++--------------
 be/src/io/cache/sub_file_cache.h       | 21 +++++++-
 be/src/io/cache/whole_file_cache.cpp   | 33 ++++---------
 be/src/io/cache/whole_file_cache.h     | 12 ++++-
 10 files changed, 224 insertions(+), 117 deletions(-)

diff --git a/be/src/io/cache/dummy_file_cache.cpp 
b/be/src/io/cache/dummy_file_cache.cpp
index 7e94bb388e..166c8b55e4 100644
--- a/be/src/io/cache/dummy_file_cache.cpp
+++ b/be/src/io/cache/dummy_file_cache.cpp
@@ -28,21 +28,15 @@ namespace io {
 DummyFileCache::DummyFileCache(const Path& cache_dir, int64_t alive_time_sec)
         : _cache_dir(cache_dir), _alive_time_sec(alive_time_sec) {}
 
-DummyFileCache::~DummyFileCache() {}
-
-void DummyFileCache::_update_last_mtime(const Path& done_file) {
-    Path cache_done_file = _cache_dir / done_file;
-    time_t m_time;
-    if (FileUtils::mtime(cache_done_file.native(), &m_time).ok() && m_time > 
_last_match_time) {
-        _last_match_time = m_time;
-    }
-}
+DummyFileCache::~DummyFileCache() = default;
 
 void DummyFileCache::_add_file_cache(const Path& data_file) {
     Path cache_file = _cache_dir / data_file;
     size_t file_size = 0;
-    if (io::global_local_filesystem()->file_size(cache_file, &file_size).ok()) 
{
-        _file_sizes[cache_file] = file_size;
+    time_t m_time = 0;
+    if (io::global_local_filesystem()->file_size(cache_file, &file_size).ok() 
&&
+        FileUtils::mtime(cache_file.native(), &m_time).ok()) {
+        _gc_lru_queue.push({cache_file, m_time});
         _cache_file_size += file_size;
     } else {
         _unfinished_files.push_back(cache_file);
@@ -72,7 +66,6 @@ void DummyFileCache::_load() {
         Path cache_filename = StringReplace(iter->native(), 
CACHE_DONE_FILE_SUFFIX, "", true);
         if (cache_names.find(cache_filename) != cache_names.end()) {
             cache_names.erase(cache_filename);
-            _update_last_mtime(*iter);
             _add_file_cache(cache_filename);
         } else {
             // not data file, but with DONE file
@@ -110,35 +103,39 @@ Status DummyFileCache::load_and_clean() {
 }
 
 Status DummyFileCache::clean_timeout_cache() {
-    if (time(nullptr) - _last_match_time > _alive_time_sec) {
-        return _clean_cache_internal();
+    while (!_gc_lru_queue.empty() &&
+           time(nullptr) - _gc_lru_queue.top().last_match_time > 
_alive_time_sec) {
+        size_t cleaned_size = 0;
+        RETURN_IF_ERROR(_clean_cache_internal(_gc_lru_queue.top().file, 
&cleaned_size));
+        _cache_file_size -= cleaned_size;
+        _gc_lru_queue.pop();
     }
     return Status::OK();
 }
 
 Status DummyFileCache::clean_all_cache() {
-    return _clean_cache_internal();
+    while (!_gc_lru_queue.empty()) {
+        RETURN_IF_ERROR(_clean_cache_internal(_gc_lru_queue.top().file, 
nullptr));
+        _gc_lru_queue.pop();
+    }
+    _cache_file_size = 0;
+    return Status::OK();
 }
 
-Status DummyFileCache::_clean_cache_internal() {
-    for (const auto& iter : _file_sizes) {
-        const auto cache_file_path = iter.first;
-        Path done_file_path = cache_file_path.native() + 
CACHE_DONE_FILE_SUFFIX;
-        LOG(INFO) << "Delete unused done_cache_path: " << 
done_file_path.native()
-                  << ", cache_file_path: " << cache_file_path.native();
-        if (!io::global_local_filesystem()->delete_file(done_file_path).ok()) {
-            LOG(ERROR) << "delete_file failed: " << done_file_path.native();
-            continue;
-        }
-        if (!io::global_local_filesystem()->delete_file(cache_file_path).ok()) 
{
-            LOG(ERROR) << "delete_file failed: " << cache_file_path.native();
-            continue;
-        }
+Status DummyFileCache::clean_one_cache(size_t* cleaned_size) {
+    if (!_gc_lru_queue.empty()) {
+        const auto& cache = _gc_lru_queue.top();
+        RETURN_IF_ERROR(_clean_cache_internal(cache.file, cleaned_size));
+        _cache_file_size -= *cleaned_size;
+        _gc_lru_queue.pop();
     }
-    _file_sizes.clear();
-    _cache_file_size = 0;
     return Status::OK();
 }
 
+Status DummyFileCache::_clean_cache_internal(const Path& cache_file_path, 
size_t* cleaned_size) {
+    Path done_file_path = cache_file_path.native() + CACHE_DONE_FILE_SUFFIX;
+    return _remove_file(cache_file_path, done_file_path, cleaned_size);
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/cache/dummy_file_cache.h 
b/be/src/io/cache/dummy_file_cache.h
index e59d867a54..4d604cdb6a 100644
--- a/be/src/io/cache/dummy_file_cache.h
+++ b/be/src/io/cache/dummy_file_cache.h
@@ -19,6 +19,7 @@
 
 #include <future>
 #include <memory>
+#include <queue>
 
 #include "common/status.h"
 #include "io/cache/file_cache.h"
@@ -55,22 +56,36 @@ public:
 
     Status clean_all_cache() override;
 
+    Status clean_one_cache(size_t* cleaned_size) override;
+
     Status load_and_clean();
 
     bool is_dummy_file_cache() override { return true; }
 
+    int64_t get_oldest_match_time() const override {
+        return _gc_lru_queue.empty() ? 0 : _gc_lru_queue.top().last_match_time;
+    };
+
+    bool is_gc_finish() const override { return _gc_lru_queue.empty(); }
+
 private:
     Status _clean_unfinished_cache();
-    void _update_last_mtime(const Path& done_file);
     void _add_file_cache(const Path& data_file);
     void _load();
-    Status _clean_cache_internal();
+    Status _clean_cache_internal(const Path&, size_t*);
 
 private:
+    struct DummyFileInfo {
+        Path file;
+        int64_t last_match_time;
+    };
+    using DummyGcQueue = std::priority_queue<DummyFileInfo, 
std::vector<DummyFileInfo>,
+                                             
SubFileLRUComparator<DummyFileInfo>>;
+    DummyGcQueue _gc_lru_queue;
+
     Path _cache_dir;
     int64_t _alive_time_sec;
 
-    std::map<Path, int64_t> _file_sizes;
     std::list<Path> _unfinished_files;
 };
 
diff --git a/be/src/io/cache/file_cache.cpp b/be/src/io/cache/file_cache.cpp
index 72f96c3776..b016ee2aa5 100644
--- a/be/src/io/cache/file_cache.cpp
+++ b/be/src/io/cache/file_cache.cpp
@@ -86,5 +86,34 @@ Status FileCache::download_cache_to_local(const Path& 
cache_file, const Path& ca
     return Status::OK();
 }
 
+Status FileCache::_remove_file(const Path& cache_file, const Path& 
cache_done_file,
+                               size_t* cleaned_size) {
+    bool done_file_exist = false;
+    RETURN_NOT_OK_STATUS_WITH_WARN(
+            io::global_local_filesystem()->exists(cache_done_file, 
&done_file_exist),
+            "Check local done file exist failed.");
+    if (done_file_exist) {
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->delete_file(cache_done_file),
+                fmt::format("Delete local done file failed: {}", 
cache_done_file.native()));
+    }
+    bool cache_file_exist = false;
+    RETURN_NOT_OK_STATUS_WITH_WARN(
+            io::global_local_filesystem()->exists(cache_file, 
&cache_file_exist),
+            "Check local cache file exist failed.");
+    if (cache_file_exist) {
+        if (cleaned_size) {
+            RETURN_NOT_OK_STATUS_WITH_WARN(
+                    io::global_local_filesystem()->file_size(cache_file, 
cleaned_size),
+                    fmt::format("get local cache file size failed: {}", 
cache_file.native()));
+        }
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->delete_file(cache_file),
+                fmt::format("Delete local cache file failed: {}", 
cache_file.native()));
+    }
+    LOG(INFO) << "Delete local cache file successfully: " << 
cache_file.native();
+    return Status::OK();
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/cache/file_cache.h b/be/src/io/cache/file_cache.h
index 8e0cb0a679..0d7afb5754 100644
--- a/be/src/io/cache/file_cache.h
+++ b/be/src/io/cache/file_cache.h
@@ -18,7 +18,9 @@
 #pragma once
 
 #include <memory>
+#include <queue>
 #include <shared_mutex>
+#include <string>
 
 #include "common/status.h"
 #include "io/fs/file_reader.h"
@@ -31,8 +33,8 @@ const std::string CACHE_DONE_FILE_SUFFIX = "_DONE";
 
 class FileCache : public FileReader {
 public:
-    FileCache() : _last_match_time(time(nullptr)), _cache_file_size(0) {}
-    virtual ~FileCache() = default;
+    FileCache() : _cache_file_size(0) {}
+    ~FileCache() override = default;
 
     DISALLOW_COPY_AND_ASSIGN(FileCache);
 
@@ -46,17 +48,28 @@ public:
 
     virtual Status clean_all_cache() = 0;
 
+    virtual Status clean_one_cache(size_t* cleaned_size) = 0;
+
+    virtual bool is_gc_finish() const = 0;
+
     virtual bool is_dummy_file_cache() { return false; }
 
     Status download_cache_to_local(const Path& cache_file, const Path& 
cache_done_file,
                                    io::FileReaderSPtr remote_file_reader, 
size_t req_size,
                                    size_t offset = 0);
 
-    void update_last_match_time() { _last_match_time = time(nullptr); }
-    int64_t get_last_match_time() const { return _last_match_time; }
+    virtual int64_t get_oldest_match_time() const = 0;
 
 protected:
-    int64_t _last_match_time;
+    Status _remove_file(const Path& cache_file, const Path& cache_done_file, 
size_t* cleaned_size);
+
+    template <typename T>
+    struct SubFileLRUComparator {
+        bool operator()(const T& lhs, const T& rhs) const {
+            return lhs.last_match_time > rhs.last_match_time;
+        };
+    };
+
     size_t _cache_file_size;
 };
 
@@ -64,7 +77,7 @@ using FileCachePtr = std::shared_ptr<FileCache>;
 
 struct FileCacheLRUComparator {
     bool operator()(const FileCachePtr& lhs, const FileCachePtr& rhs) const {
-        return lhs->get_last_match_time() > rhs->get_last_match_time();
+        return lhs->get_oldest_match_time() > rhs->get_oldest_match_time();
     }
 };
 
diff --git a/be/src/io/cache/file_cache_manager.cpp 
b/be/src/io/cache/file_cache_manager.cpp
index 840c39f454..6d34434190 100644
--- a/be/src/io/cache/file_cache_manager.cpp
+++ b/be/src/io/cache/file_cache_manager.cpp
@@ -44,13 +44,31 @@ bool GCContextPerDisk::try_add_file_cache(FileCachePtr 
cache, int64_t file_size)
     return false;
 }
 
-void GCContextPerDisk::get_gc_file_caches(std::list<FileCachePtr>& result) {
-    while (!_lru_queue.empty() && _used_size > _conf_max_size) {
+FileCachePtr GCContextPerDisk::top() {
+    if (!_lru_queue.empty() && _used_size > _conf_max_size) {
+        return _lru_queue.top();
+    }
+    return nullptr;
+}
+
+void GCContextPerDisk::pop() {
+    if (!_lru_queue.empty()) {
+        _lru_queue.pop();
+    }
+}
+
+Status GCContextPerDisk::gc_top() {
+    if (!_lru_queue.empty() && _used_size > _conf_max_size) {
         auto file_cache = _lru_queue.top();
-        _used_size -= file_cache->cache_file_size();
-        result.push_back(file_cache);
+        size_t cleaned_size = 0;
+        RETURN_IF_ERROR(file_cache->clean_one_cache(&cleaned_size));
+        _used_size -= cleaned_size;
         _lru_queue.pop();
+        if (!file_cache->is_gc_finish()) {
+            _lru_queue.push(file_cache);
+        }
     }
+    return Status::OK();
 }
 
 void FileCacheManager::add_file_cache(const std::string& cache_path, 
FileCachePtr file_cache) {
@@ -175,16 +193,21 @@ void FileCacheManager::gc_file_caches() {
     // policy2: GC file cache by disk size
     if (gc_conf_size > 0) {
         for (size_t i = 0; i < contexts.size(); ++i) {
-            std::list<FileCachePtr> gc_file_list;
-            contexts[i].get_gc_file_caches(gc_file_list);
-            for (auto item : gc_file_list) {
-                std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
-                // for dummy file cache, check already used or not again
-                if (item->is_dummy_file_cache() &&
-                    _file_cache_map.find(item->cache_dir().native()) != 
_file_cache_map.end()) {
-                    continue;
+            auto& context = contexts[i];
+            FileCachePtr file_cache;
+            while ((file_cache = context.top()) != nullptr) {
+                {
+                    std::shared_lock<std::shared_mutex> 
rdlock(_cache_map_lock);
+                    // for dummy file cache, check already used or not again
+                    if (file_cache->is_dummy_file_cache() &&
+                        _file_cache_map.find(file_cache->cache_dir().native()) 
!=
+                                _file_cache_map.end()) {
+                        context.pop();
+                        continue;
+                    }
                 }
-                item->clean_all_cache();
+                WARN_IF_ERROR(context.gc_top(),
+                              fmt::format("gc {} error", 
file_cache->cache_dir().native()));
             }
         }
     }
diff --git a/be/src/io/cache/file_cache_manager.h 
b/be/src/io/cache/file_cache_manager.h
index c25d1cfb5c..9332b324fc 100644
--- a/be/src/io/cache/file_cache_manager.h
+++ b/be/src/io/cache/file_cache_manager.h
@@ -33,7 +33,9 @@ public:
     GCContextPerDisk() : _conf_max_size(0), _used_size(0) {}
     void init(const std::string& path, int64_t max_size);
     bool try_add_file_cache(FileCachePtr cache, int64_t file_size);
-    void get_gc_file_caches(std::list<FileCachePtr>&);
+    FileCachePtr top();
+    Status gc_top();
+    void pop();
 
 private:
     std::string _disk_path;
diff --git a/be/src/io/cache/sub_file_cache.cpp 
b/be/src/io/cache/sub_file_cache.cpp
index 292f442ddd..34b509bc9e 100644
--- a/be/src/io/cache/sub_file_cache.cpp
+++ b/be/src/io/cache/sub_file_cache.cpp
@@ -17,6 +17,12 @@
 
 #include "io/cache/sub_file_cache.h"
 
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <utility>
+#include <vector>
+
 #include "common/config.h"
 #include "io/fs/local_file_system.h"
 #include "olap/iterators.h"
@@ -115,14 +121,17 @@ Status SubFileCache::read_at(size_t offset, Slice result, 
const IOContext& io_ct
             _last_match_times[*iter] = time(nullptr);
         }
     }
-    update_last_match_time();
     return Status::OK();
 }
 
+std::pair<Path, Path> SubFileCache::_cache_path(size_t offset) {
+    return {_cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset),
+            _cache_dir /
+                    fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset, 
CACHE_DONE_FILE_SUFFIX)};
+}
+
 Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
-    Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, 
offset);
-    Path cache_done_file = _cache_dir / fmt::format("{}_{}{}", 
SUB_FILE_CACHE_PREFIX, offset,
-                                                    CACHE_DONE_FILE_SUFFIX);
+    auto [cache_file, cache_done_file] = _cache_path(offset);
     bool done_file_exist = false;
     RETURN_NOT_OK_STATUS_WITH_WARN(
             io::global_local_filesystem()->exists(cache_done_file, 
&done_file_exist),
@@ -134,8 +143,9 @@ Status SubFileCache::_generate_cache_reader(size_t offset, 
size_t req_size) {
         ThreadPoolToken* thread_token =
                 
ExecEnv::GetInstance()->get_serial_download_cache_thread_token();
         if (thread_token != nullptr) {
-            auto st = thread_token->submit_func([this, &download_st, 
cache_done_file, cache_file,
-                                                 offset, req_size] {
+            auto st = thread_token->submit_func([this, &download_st,
+                                                 cache_done_file = 
cache_done_file,
+                                                 cache_file = cache_file, 
offset, req_size] {
                 auto func = [this, cache_done_file, cache_file, offset, 
req_size] {
                     bool done_file_exist = false;
                     // Judge again whether cache_done_file exists, it is 
possible that the cache
@@ -180,7 +190,6 @@ Status SubFileCache::_generate_cache_reader(size_t offset, 
size_t req_size) {
     RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, 
&cache_reader));
     _cache_file_readers.emplace(offset, cache_reader);
     _last_match_times.emplace(offset, time(nullptr));
-    update_last_match_time();
     LOG(INFO) << "Create cache file from remote file successfully: "
               << _remote_file_reader->path().native() << "(" << offset << ", " 
<< req_size
               << ") -> " << cache_file.native();
@@ -199,6 +208,8 @@ Status SubFileCache::_get_need_cache_offsets(size_t offset, 
size_t req_size,
 }
 
 Status SubFileCache::clean_timeout_cache() {
+    SubGcQueue gc_queue;
+    _gc_lru_queue.swap(gc_queue);
     std::vector<size_t> timeout_keys;
     {
         std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
@@ -206,6 +217,9 @@ Status SubFileCache::clean_timeout_cache() {
              iter != _last_match_times.cend(); ++iter) {
             if (time(nullptr) - iter->second > _alive_time_sec) {
                 timeout_keys.emplace_back(iter->first);
+            } else {
+                auto [cache_file, done_file] = _cache_path(iter->first);
+                _gc_lru_queue.push({iter->first, iter->second});
             }
         }
     }
@@ -213,9 +227,10 @@ Status SubFileCache::clean_timeout_cache() {
         std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
         for (std::vector<size_t>::const_iterator iter = timeout_keys.cbegin();
              iter != timeout_keys.cend(); ++iter) {
-            RETURN_IF_ERROR(_clean_cache_internal(*iter));
+            size_t cleaned_size = 0;
+            RETURN_IF_ERROR(_clean_cache_internal(*iter, &cleaned_size));
+            _cache_file_size -= cleaned_size;
         }
-        _cache_file_size = _calc_cache_file_size();
     }
     return Status::OK();
 }
@@ -224,40 +239,41 @@ Status SubFileCache::clean_all_cache() {
     std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
     for (std::map<size_t, int64_t>::const_iterator iter = 
_last_match_times.cbegin();
          iter != _last_match_times.cend(); ++iter) {
-        RETURN_IF_ERROR(_clean_cache_internal(iter->first));
+        RETURN_IF_ERROR(_clean_cache_internal(iter->first, nullptr));
+    }
+    _cache_file_size = 0;
+    return Status::OK();
+}
+
+Status SubFileCache::clean_one_cache(size_t* cleaned_size) {
+    if (!_gc_lru_queue.empty()) {
+        const auto& cache = _gc_lru_queue.top();
+        {
+            std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
+            if (auto it = _last_match_times.find(cache.offset);
+                it != _last_match_times.end() && it->second == 
cache.last_match_time) {
+                RETURN_IF_ERROR(_clean_cache_internal(cache.offset, 
cleaned_size));
+                _cache_file_size -= *cleaned_size;
+                _gc_lru_queue.pop();
+            }
+        }
+        decltype(_last_match_times.begin()) it;
+        while (!_gc_lru_queue.empty() &&
+               (it = _last_match_times.find(_gc_lru_queue.top().offset)) !=
+                       _last_match_times.end() &&
+               it->second != _gc_lru_queue.top().last_match_time) {
+            _gc_lru_queue.pop();
+        }
     }
-    _cache_file_size = _calc_cache_file_size();
     return Status::OK();
 }
 
-Status SubFileCache::_clean_cache_internal(size_t offset) {
+Status SubFileCache::_clean_cache_internal(size_t offset, size_t* 
cleaned_size) {
     if (_cache_file_readers.find(offset) != _cache_file_readers.end()) {
         _cache_file_readers.erase(offset);
     }
-    _cache_file_size = 0;
-    Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, 
offset);
-    Path done_file = _cache_dir /
-                     fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset, 
CACHE_DONE_FILE_SUFFIX);
-    bool done_file_exist = false;
-    RETURN_NOT_OK_STATUS_WITH_WARN(
-            io::global_local_filesystem()->exists(done_file, &done_file_exist),
-            "Check local done file exist failed.");
-    if (done_file_exist) {
-        RETURN_NOT_OK_STATUS_WITH_WARN(
-                io::global_local_filesystem()->delete_file(done_file),
-                fmt::format("Delete local done file failed: {}", 
done_file.native()));
-    }
-    bool cache_file_exist = false;
-    RETURN_NOT_OK_STATUS_WITH_WARN(
-            io::global_local_filesystem()->exists(cache_file, 
&cache_file_exist),
-            "Check local cache file exist failed.");
-    if (cache_file_exist) {
-        RETURN_NOT_OK_STATUS_WITH_WARN(
-                io::global_local_filesystem()->delete_file(cache_file),
-                fmt::format("Delete local cache file failed: {}", 
cache_file.native()));
-    }
-    LOG(INFO) << "Delete local cache file successfully: " << 
cache_file.native();
-    return Status::OK();
+    auto [cache_file, done_file] = _cache_path(offset);
+    return _remove_file(cache_file, done_file, cleaned_size);
 }
 
 size_t SubFileCache::_calc_cache_file_size() {
diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h
index 0567a2c4bd..5e26134f11 100644
--- a/be/src/io/cache/sub_file_cache.h
+++ b/be/src/io/cache/sub_file_cache.h
@@ -52,17 +52,36 @@ public:
 
     Status clean_all_cache() override;
 
+    Status clean_one_cache(size_t* cleaned_size) override;
+
+    int64_t get_oldest_match_time() const override {
+        return _gc_lru_queue.empty() ? 0 : _gc_lru_queue.top().last_match_time;
+    };
+
+    bool is_gc_finish() const override { return _gc_lru_queue.empty(); }
+
 private:
     Status _generate_cache_reader(size_t offset, size_t req_size);
 
-    Status _clean_cache_internal(size_t offset);
+    Status _clean_cache_internal(size_t offset, size_t* cleaned_size);
 
     Status _get_need_cache_offsets(size_t offset, size_t req_size,
                                    std::vector<size_t>* cache_offsets);
 
     size_t _calc_cache_file_size();
 
+    std::pair<Path, Path> _cache_path(size_t offset);
+
 private:
+    struct SubFileInfo {
+        size_t offset;
+        int64_t last_match_time;
+    };
+    using SubGcQueue = std::priority_queue<SubFileInfo, 
std::vector<SubFileInfo>,
+                                           SubFileLRUComparator<SubFileInfo>>;
+    // used by gc thread, and currently has no lock protection
+    SubGcQueue _gc_lru_queue;
+
     Path _cache_dir;
     int64_t _alive_time_sec;
     io::FileReaderSPtr _remote_file_reader;
diff --git a/be/src/io/cache/whole_file_cache.cpp 
b/be/src/io/cache/whole_file_cache.cpp
index e5c5dc7541..613215c93c 100644
--- a/be/src/io/cache/whole_file_cache.cpp
+++ b/be/src/io/cache/whole_file_cache.cpp
@@ -51,7 +51,6 @@ Status WholeFileCache::read_at(size_t offset, Slice result, 
const IOContext& io_
                    << ", bytes read: " << bytes_read << " vs required size: " 
<< result.size;
         return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
     }
-    update_last_match_time();
     return Status::OK();
 }
 
@@ -135,43 +134,27 @@ Status WholeFileCache::_generate_cache_reader(size_t 
offset, size_t req_size) {
 
 Status WholeFileCache::clean_timeout_cache() {
     if (time(nullptr) - _last_match_time > _alive_time_sec) {
-        _clean_cache_internal();
+        _clean_cache_internal(nullptr);
     }
     return Status::OK();
 }
 
 Status WholeFileCache::clean_all_cache() {
-    _clean_cache_internal();
-    return Status::OK();
+    return _clean_cache_internal(nullptr);
 }
 
-Status WholeFileCache::_clean_cache_internal() {
+Status WholeFileCache::clean_one_cache(size_t* cleaned_size) {
+    return _clean_cache_internal(cleaned_size);
+}
+
+Status WholeFileCache::_clean_cache_internal(size_t* cleaned_size) {
     std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
     _cache_file_reader.reset();
     _cache_file_size = 0;
     Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
     Path done_file =
             _cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME, 
CACHE_DONE_FILE_SUFFIX);
-    bool done_file_exist = false;
-    RETURN_NOT_OK_STATUS_WITH_WARN(
-            io::global_local_filesystem()->exists(done_file, &done_file_exist),
-            "Check local done file exist failed.");
-    if (done_file_exist) {
-        RETURN_NOT_OK_STATUS_WITH_WARN(
-                io::global_local_filesystem()->delete_file(done_file),
-                fmt::format("Delete local done file failed: {}", 
done_file.native()));
-    }
-    bool cache_file_exist = false;
-    RETURN_NOT_OK_STATUS_WITH_WARN(
-            io::global_local_filesystem()->exists(cache_file, 
&cache_file_exist),
-            "Check local cache file exist failed.");
-    if (cache_file_exist) {
-        RETURN_NOT_OK_STATUS_WITH_WARN(
-                io::global_local_filesystem()->delete_file(cache_file),
-                fmt::format("Delete local cache file failed: {}", 
cache_file.native()));
-    }
-    LOG(INFO) << "Delete local cache file successfully: " << 
cache_file.native();
-    return Status::OK();
+    return _remove_file(cache_file, done_file, cleaned_size);
 }
 
 } // namespace io
diff --git a/be/src/io/cache/whole_file_cache.h 
b/be/src/io/cache/whole_file_cache.h
index e4a7628ff6..b1e27791b0 100644
--- a/be/src/io/cache/whole_file_cache.h
+++ b/be/src/io/cache/whole_file_cache.h
@@ -52,16 +52,26 @@ public:
 
     Status clean_all_cache() override;
 
+    Status clean_one_cache(size_t* cleaned_size) override;
+
+    int64_t get_oldest_match_time() const override { return _last_match_time; 
};
+
+    bool is_gc_finish() const override { return _cache_file_reader == nullptr; 
}
+
 private:
     Status _generate_cache_reader(size_t offset, size_t req_size);
 
-    Status _clean_cache_internal();
+    Status _clean_cache_internal(size_t* cleaned_size);
+
+    void update_last_match_time() { _last_match_time = time(nullptr); }
 
 private:
     Path _cache_dir;
     int64_t _alive_time_sec;
     io::FileReaderSPtr _remote_file_reader;
 
+    int64_t _last_match_time;
+
     std::shared_mutex _cache_lock;
     io::FileReaderSPtr _cache_file_reader;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to