This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b43fb2597ed branch-3.0: [enhancement](cloud) make file cache version 
upgrade faster #50726 (#51194)
b43fb2597ed is described below

commit b43fb2597ed183c1b2f746a73e36dd796325d436
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 28 14:25:16 2025 +0800

    branch-3.0: [enhancement](cloud) make file cache version upgrade faster 
#50726 (#51194)
    
    Cherry-picked from #50726
    
    Co-authored-by: zhengyu <[email protected]>
---
 be/src/common/config.cpp                   |   9 +-
 be/src/common/config.h                     |   1 +
 be/src/common/status.h                     |   2 +
 be/src/io/cache/fs_file_cache_storage.cpp  | 267 ++++++++++++++-----
 be/src/io/cache/fs_file_cache_storage.h    |   8 +
 be/src/io/fs/err_utils.cpp                 |   2 +
 be/src/runtime/exec_env_init.cpp           |   3 +-
 be/test/io/cache/block_file_cache_test.cpp | 414 ++++++++++++++++++++++++++++-
 gensrc/thrift/Status.thrift                |   1 +
 9 files changed, 630 insertions(+), 77 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5b52532e753..61758a0b4fe 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -657,6 +657,7 @@ DEFINE_Int32(num_cores, "0");
 // When BE start, If there is a broken disk, BE process will exit by default.
 // Otherwise, we will ignore the broken disk,
 DEFINE_Bool(ignore_broken_disk, "false");
+DEFINE_Bool(ignore_file_cache_dir_upgrade_failure, "false");
 
 // Sleep time in milliseconds between memory maintenance iterations
 DEFINE_mInt32(memory_maintenance_sleep_time_ms, "20");
@@ -1084,11 +1085,11 @@ DEFINE_Int64(file_cache_each_block_size, "1048576"); // 
1MB
 
 DEFINE_Bool(clear_file_cache, "false");
 DEFINE_Bool(enable_file_cache_query_limit, "false");
-DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88");
-DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
+DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
+DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "88");
 DEFINE_mBool(enable_evict_file_cache_in_advance, "true");
-DEFINE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent, "78");
-DEFINE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent, "75");
+DEFINE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent, "88");
+DEFINE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent, "85");
 DEFINE_mInt32(file_cache_evict_in_advance_interval_ms, "1000");
 DEFINE_mInt64(file_cache_evict_in_advance_batch_bytes, "31457280"); // 30MB
 DEFINE_mInt64(file_cache_evict_in_advance_recycle_keys_num_threshold, "1000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 941b6a13777..a7e16c53d5e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -700,6 +700,7 @@ DECLARE_Int32(num_cores);
 // When BE start, If there is a broken disk, BE process will exit by default.
 // Otherwise, we will ignore the broken disk,
 DECLARE_Bool(ignore_broken_disk);
+DECLARE_Bool(ignore_file_cache_dir_upgrade_failure);
 
 // Sleep time in milliseconds between memory maintenance iterations
 DECLARE_mInt32(memory_maintenance_sleep_time_ms);
diff --git a/be/src/common/status.h b/be/src/common/status.h
index d003645b258..26877e88dc7 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -47,6 +47,7 @@ namespace ErrorCode {
     TStatusError(IO_ERROR, true);                         \
     TStatusError(NOT_FOUND, true);                        \
     TStatusError(ALREADY_EXIST, true);                    \
+    TStatusError(DIRECTORY_NOT_EMPTY, true);              \
     TStatusError(NOT_IMPLEMENTED_ERROR, false);           \
     TStatusError(END_OF_FILE, false);                     \
     TStatusError(INTERNAL_ERROR, true);                   \
@@ -471,6 +472,7 @@ public:
     ERROR_CTOR(IOError, IO_ERROR)
     ERROR_CTOR(NotFound, NOT_FOUND)
     ERROR_CTOR_NOSTACK(AlreadyExist, ALREADY_EXIST)
+    ERROR_CTOR_NOSTACK(DirectoryNotEmpty, DIRECTORY_NOT_EMPTY)
     ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR)
     ERROR_CTOR_NOSTACK(EndOfFile, END_OF_FILE)
     ERROR_CTOR(InternalError, INTERNAL_ERROR)
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp 
b/be/src/io/cache/fs_file_cache_storage.cpp
index 01900fbd2a5..ecb594e14a2 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -31,6 +31,8 @@
 #include "io/fs/local_file_reader.h"
 #include "io/fs/local_file_writer.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
 #include "vec/common/hex.h"
 
 namespace doris::io {
@@ -101,12 +103,33 @@ size_t FDCache::file_reader_cache_size() {
 }
 
 Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
+    _iterator_dir_retry_cnt = std::make_shared<bvar::LatencyRecorder>(
+            _cache_base_path.c_str(), 
"file_cache_fs_storage_iterator_dir_retry_cnt");
     _cache_base_path = _mgr->_cache_base_path;
-    RETURN_IF_ERROR(upgrade_cache_dir_if_necessary());
     _cache_background_load_thread = std::thread([this, mgr = _mgr]() {
+        auto mem_tracker = 
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
+                                                            
fmt::format("FileCacheVersionReader"));
+        SCOPED_ATTACH_TASK(mem_tracker);
+        Status st = upgrade_cache_dir_if_necessary();
+        if (!st.ok()) {
+            std::string msg = fmt::format(
+                    "file cache {} upgrade done with error. upgrade version 
failed. st={}",
+                    _cache_base_path, st.to_string());
+            if (doris::config::ignore_file_cache_dir_upgrade_failure) {
+                LOG(WARNING) << msg << " be conf: 
`ignore_file_cache_dir_upgrade_failure = true`"
+                             << " so we are ignoring the error (unsuccessful 
cache files will be "
+                                "removed)";
+                remove_old_version_directories();
+            } else {
+                LOG(WARNING) << msg << " please fix error and restart BE or"
+                             << " use be conf: 
`ignore_file_cache_dir_upgrade_failure = true`"
+                             << " to skip the error (unsuccessful cache files 
will be removed)";
+                throw doris::Exception(Status::InternalError(msg));
+            }
+        }
         load_cache_info_into_memory(mgr);
         mgr->_async_open_done = true;
-        LOG_INFO("FileCache {} lazy load done.", _cache_base_path);
+        LOG_INFO("file cache {} lazy load done.", _cache_base_path);
     });
     return Status::OK();
 }
@@ -307,6 +330,99 @@ std::string 
FSFileCacheStorage::get_path_in_local_cache(const UInt128Wrapper& va
     }
 }
 
+void FSFileCacheStorage::remove_old_version_directories() {
+    std::error_code ec;
+    std::filesystem::directory_iterator key_it {_cache_base_path, ec};
+    if (ec) {
+        LOG(WARNING) << "Failed to list directory: " << _cache_base_path
+                     << ", error: " << ec.message();
+        return;
+    }
+
+    std::vector<std::filesystem::path> file_list;
+    // the dir is concurrently accessed, so handle invalid iter with retry
+    bool success = false;
+    size_t retry_count = 0;
+    const size_t max_retry = 5;
+    while (!success && retry_count < max_retry) {
+        try {
+            ++retry_count;
+            for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
+                file_list.push_back(key_it->path());
+            }
+            success = true;
+        } catch (const std::exception& e) {
+            LOG(WARNING) << "Error occurred while iterating directory: " << 
e.what();
+            file_list.clear();
+        }
+    }
+
+    if (!success) {
+        LOG_WARNING("iteration of cache dir still failed after retry {} 
times.", max_retry);
+    }
+
+    auto path_itr = file_list.begin();
+    for (; path_itr != file_list.end(); ++path_itr) {
+        if (std::filesystem::is_directory(*path_itr)) {
+            std::string cache_key = path_itr->filename().native();
+            if (cache_key.size() > KEY_PREFIX_LENGTH) {
+                // try our best to delete, not care the return
+                (void)fs->delete_directory(*path_itr);
+            }
+        }
+    }
+    auto s = fs->delete_file(get_version_path());
+    if (!s.ok()) {
+        LOG(WARNING) << "deleted old version file failed: " << s.to_string();
+        return;
+    }
+    s = write_file_cache_version();
+    if (!s.ok()) {
+        LOG(WARNING) << "write new version file failed: " << s.to_string();
+        return;
+    }
+}
+
+Status FSFileCacheStorage::collect_directory_entries(const 
std::filesystem::path& dir_path,
+                                                     std::vector<std::string>& 
file_list) const {
+    std::error_code ec;
+    bool success = false;
+    size_t retry_count = 0;
+    const size_t max_retry = 5;
+
+    while (!success && retry_count < max_retry) {
+        try {
+            ++retry_count;
+            std::filesystem::directory_iterator it {dir_path, ec};
+            
TEST_SYNC_POINT_CALLBACK("FSFileCacheStorage::collect_directory_entries");
+            if (ec) {
+                LOG(WARNING) << "Failed to list directory: " << dir_path
+                             << ", error: " << ec.message();
+                continue;
+            }
+
+            file_list.clear();
+            for (; it != std::filesystem::directory_iterator(); ++it) {
+                file_list.push_back(it->path().string());
+            }
+            success = true;
+        } catch (const std::exception& e) {
+            LOG(WARNING) << "Error occurred while iterating directory: " << 
dir_path
+                         << " err: " << e.what();
+            file_list.clear();
+        }
+    }
+
+    *_iterator_dir_retry_cnt << retry_count;
+
+    if (!success) {
+        LOG_WARNING("iteration of cache dir still failed after retry {} 
times.", max_retry);
+        return Status::InternalError("Failed to iterate directory after 
retries.");
+    }
+
+    return Status::OK();
+}
+
 Status FSFileCacheStorage::upgrade_cache_dir_if_necessary() const {
     /*
      * If use version2 but was version 1, do upgrade:
@@ -326,91 +442,99 @@ Status 
FSFileCacheStorage::upgrade_cache_dir_if_necessary() const {
     std::string version;
     std::error_code ec;
     int rename_count = 0;
+    int failure_count = 0;
     auto start_time = std::chrono::steady_clock::now();
 
     RETURN_IF_ERROR(read_file_cache_version(&version));
+
     LOG(INFO) << "Checking cache version upgrade. Current version: " << version
               << ", target version: 2.0, need upgrade: "
               << (USE_CACHE_VERSION2 && version != "2.0");
     if (USE_CACHE_VERSION2 && version != "2.0") {
         // move directories format as version 2.0
-        std::filesystem::directory_iterator key_it {_cache_base_path, ec};
-        if (ec) {
-            LOG(WARNING) << "Failed to list directory: " << _cache_base_path
-                         << ", error: " << ec.message();
-            return Status::InternalError("Failed to list dir {}: {}", 
_cache_base_path,
-                                         ec.message());
-        }
-        for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
-            if (key_it->is_directory()) {
-                std::string cache_key = key_it->path().filename().native();
-                if (cache_key.size() > KEY_PREFIX_LENGTH) {
-                    std::string key_prefix =
-                            Path(_cache_base_path) / cache_key.substr(0, 
KEY_PREFIX_LENGTH);
-                    bool exists = false;
-                    auto exists_status = fs->exists(key_prefix, &exists);
-                    if (!exists_status.ok()) {
-                        LOG(WARNING) << "Failed to check directory existence: 
" << key_prefix
-                                     << ", error: " << 
exists_status.to_string();
-                        return exists_status;
-                    }
-                    if (!exists) {
-                        auto create_status = fs->create_directory(key_prefix);
-                        if (!create_status.ok()) {
-                            LOG(WARNING) << "Failed to create directory: " << 
key_prefix
-                                         << ", error: " << 
create_status.to_string();
-                            return create_status;
+        std::vector<std::string> file_list;
+        file_list.reserve(10000);
+        RETURN_IF_ERROR(collect_directory_entries(_cache_base_path, 
file_list));
+
+        // this directory_iterator should be a problem in concurrent access
+        for (const auto& file_path : file_list) {
+            try {
+                if (std::filesystem::is_directory(file_path)) {
+                    std::string cache_key = 
std::filesystem::path(file_path).filename().native();
+                    if (cache_key.size() > KEY_PREFIX_LENGTH) {
+                        if (cache_key.find('_') == std::string::npos) {
+                            cache_key += "_0";
+                        }
+                        std::string key_prefix =
+                                Path(_cache_base_path) / cache_key.substr(0, 
KEY_PREFIX_LENGTH);
+                        bool exists = false;
+                        auto exists_status = fs->exists(key_prefix, &exists);
+                        if (!exists_status.ok()) {
+                            LOG(WARNING) << "Failed to check directory 
existence: " << key_prefix
+                                         << ", error: " << 
exists_status.to_string();
+                            ++failure_count;
+                            continue;
+                        }
+                        if (!exists) {
+                            auto create_status = 
fs->create_directory(key_prefix);
+                            if (!create_status.ok() &&
+                                create_status.code() != 
TStatusCode::type::ALREADY_EXIST) {
+                                LOG(WARNING) << "Failed to create directory: " 
<< key_prefix
+                                             << ", error: " << 
create_status.to_string();
+                                ++failure_count;
+                                continue;
+                            }
+                        }
+                        auto rename_status = Status::OK();
+                        const std::string new_file_path = key_prefix + "/" + 
cache_key;
+                        TEST_SYNC_POINT_CALLBACK(
+                                
"FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename",
+                                &file_path, &new_file_path);
+                        rename_status = fs->rename(file_path, new_file_path);
+                        if (rename_status.ok() ||
+                            rename_status.code() == 
TStatusCode::type::DIRECTORY_NOT_EMPTY) {
+                            ++rename_count;
+                        } else {
+                            LOG(WARNING)
+                                    << "Failed to rename directory from " << 
file_path << " to "
+                                    << new_file_path << ", error: " << 
rename_status.to_string();
+                            ++failure_count;
+                            continue;
                         }
-                    }
-                    auto rename_status = fs->rename(key_it->path(), key_prefix 
/ cache_key);
-                    if (rename_status.ok()) {
-                        ++rename_count;
-                    } else {
-                        LOG(WARNING)
-                                << "Failed to rename directory from " << 
key_it->path().native()
-                                << " to " << (key_prefix / cache_key).native()
-                                << ", error: " << rename_status.to_string();
-                        return rename_status;
                     }
                 }
+            } catch (const std::exception& e) {
+                LOG(WARNING) << "Error occurred while upgrading file cache 
directory: " << file_path
+                             << " err: " << e.what();
+                ++failure_count;
             }
         }
 
-        auto rebuild_dir = [&](std::filesystem::directory_iterator& 
upgrade_key_it) -> Status {
-            for (; upgrade_key_it != std::filesystem::directory_iterator(); 
++upgrade_key_it) {
-                if (upgrade_key_it->path().filename().native().find('_') == 
std::string::npos) {
-                    
RETURN_IF_ERROR(fs->delete_directory(upgrade_key_it->path().native() + "_0"));
-                    auto rename_status = fs->rename(upgrade_key_it->path(),
-                                                    
upgrade_key_it->path().native() + "_0");
-                    if (rename_status.ok()) {
-                        ++rename_count;
-                    }
-                    RETURN_IF_ERROR(rename_status);
-                }
-            }
-            return Status::OK();
-        };
+        std::vector<std::string> rebuilt_file_list;
+        rebuilt_file_list.reserve(10000);
+        RETURN_IF_ERROR(collect_directory_entries(_cache_base_path, 
rebuilt_file_list));
 
-        std::filesystem::directory_iterator key_prefix_it {_cache_base_path, 
ec};
-        if (ec) [[unlikely]] {
-            LOG(WARNING) << ec.message();
-            return Status::IOError(ec.message());
-        }
-        for (; key_prefix_it != std::filesystem::directory_iterator(); 
++key_prefix_it) {
-            if (!key_prefix_it->is_directory()) {
+        for (const auto& key_it : rebuilt_file_list) {
+            if (!std::filesystem::is_directory(key_it)) {
                 // maybe version hits file
                 continue;
             }
-            if (key_prefix_it->path().filename().native().size() != 
KEY_PREFIX_LENGTH) {
-                LOG(WARNING) << "Unknown directory " << 
key_prefix_it->path().native()
-                             << ", try to remove it";
-                RETURN_IF_ERROR(fs->delete_directory(key_prefix_it->path()));
-            }
-            std::filesystem::directory_iterator key_it {key_prefix_it->path(), 
ec};
-            if (ec) [[unlikely]] {
-                return Status::IOError(ec.message());
+            try {
+                if (Path(key_it).filename().native().size() != 
KEY_PREFIX_LENGTH) {
+                    LOG(WARNING) << "Unknown directory " << key_it << ", try 
to remove it";
+                    auto delete_status = fs->delete_directory(key_it);
+                    if (!delete_status.ok()) {
+                        LOG(WARNING) << "Failed to delete unknown directory: " 
<< key_it
+                                     << ", error: " << 
delete_status.to_string();
+                        ++failure_count;
+                        continue;
+                    }
+                }
+            } catch (const std::exception& e) {
+                LOG(WARNING) << "Error occurred while upgrading file cache 
directory: " << key_it
+                             << " err: " << e.what();
+                ++failure_count;
             }
-            RETURN_IF_ERROR(rebuild_dir(key_it));
         }
         if (!write_file_cache_version().ok()) {
             return Status::InternalError("Failed to write version hints for 
file cache");
@@ -420,7 +544,8 @@ Status FSFileCacheStorage::upgrade_cache_dir_if_necessary() 
const {
     auto end_time = std::chrono::steady_clock::now();
     auto duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
     LOG(INFO) << "Cache directory upgrade completed. Total files renamed: " << 
rename_count
-              << ", Time taken: " << duration.count() << "ms";
+              << ", Time taken: " << duration.count() << "ms"
+              << ", Failure count: " << failure_count;
     return Status::OK();
 }
 
@@ -452,7 +577,9 @@ Status 
FSFileCacheStorage::read_file_cache_version(std::string* buffer) const {
     size_t bytes_read = 0;
     RETURN_IF_ERROR(version_reader->read_at(0, Slice(buffer->data(), 
file_size), &bytes_read));
     RETURN_IF_ERROR(version_reader->close());
-    return Status::OK();
+    auto st = Status::OK();
+    TEST_SYNC_POINT_CALLBACK("FSFileCacheStorage::read_file_cache_version", 
&st);
+    return st;
 }
 
 std::string FSFileCacheStorage::get_version_path() const {
diff --git a/be/src/io/cache/fs_file_cache_storage.h 
b/be/src/io/cache/fs_file_cache_storage.h
index 8a97aa109ad..114517bdf72 100644
--- a/be/src/io/cache/fs_file_cache_storage.h
+++ b/be/src/io/cache/fs_file_cache_storage.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <bvar/bvar.h>
+
 #include <memory>
 #include <shared_mutex>
 #include <thread>
@@ -87,6 +89,11 @@ public:
     FileCacheStorageType get_type() override { return DISK; }
 
 private:
+    void remove_old_version_directories();
+
+    Status collect_directory_entries(const std::filesystem::path& dir_path,
+                                     std::vector<std::string>& file_list) 
const;
+
     Status upgrade_cache_dir_if_necessary() const;
 
     Status read_file_cache_version(std::string* buffer) const;
@@ -111,6 +118,7 @@ private:
     // TODO(Lchangliang): use a more efficient data structure
     std::mutex _mtx;
     std::unordered_map<FileWriterMapKey, FileWriterPtr, FileWriterMapKeyHash> 
_key_to_writer;
+    std::shared_ptr<bvar::LatencyRecorder> _iterator_dir_retry_cnt;
 };
 
 } // namespace doris::io
diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp
index 19c460f1618..96ac7b817e9 100644
--- a/be/src/io/fs/err_utils.cpp
+++ b/be/src/io/fs/err_utils.cpp
@@ -96,6 +96,8 @@ Status localfs_error(const std::error_code& ec, 
std::string_view msg) {
         return Status::Error<DISK_REACH_CAPACITY_LIMIT, false>(message);
     } else if (ec == std::errc::permission_denied) {
         return Status::Error<PERMISSION_DENIED, false>(message);
+    } else if (ec == std::errc::directory_not_empty) {
+        return Status::Error<DIRECTORY_NOT_EMPTY, false>(message);
     } else {
         return Status::Error<ErrorCode::INTERNAL_ERROR, false>(message);
     }
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 393d3fa3d19..a0bed40953a 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -238,6 +238,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               
.set_min_threads(config::min_s3_file_system_thread_num)
                               
.set_max_threads(config::max_s3_file_system_thread_num)
                               .build(&_s3_file_system_thread_pool));
+    RETURN_IF_ERROR(_init_mem_env());
 
     // NOTE: runtime query statistics mgr could be visited by query and daemon 
thread
     // so it should be created before all query begin and deleted after all 
query and daemon thread stoppped
@@ -304,8 +305,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
         return status;
     }
 
-    RETURN_IF_ERROR(_init_mem_env());
-
     RETURN_IF_ERROR(_memtable_memory_limiter->init(MemInfo::mem_limit()));
     RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
     RETURN_IF_ERROR(_wal_manager->init());
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index 0e62170b760..d0546787026 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -29,6 +29,8 @@
 #endif
 
 // IWYU pragma: no_include <bits/chrono.h>
+#include <gtest/gtest.h>
+
 #include <chrono> // IWYU pragma: keep
 #include <condition_variable>
 #include <filesystem>
@@ -40,6 +42,7 @@
 #include <optional>
 #include <random>
 #include <ranges>
+#include <stdexcept>
 #include <string>
 #include <thread>
 #include <vector>
@@ -164,7 +167,7 @@ public:
         ExecEnv::GetInstance()->_file_cache_open_fd_cache = 
std::make_unique<io::FDCache>();
     }
     static void TearDownTestSuite() {
-        config::file_cache_enter_disk_resource_limit_mode_percent = 90;
+        config::file_cache_enter_disk_resource_limit_mode_percent = 99;
         ExecEnv::GetInstance()->_file_cache_open_fd_cache.reset(nullptr);
     }
 
@@ -3256,6 +3259,7 @@ TEST_F(BlockFileCacheTest, test_factory_1) {
     }
     FileCacheFactory::instance()->clear_file_caches(true);
     EXPECT_EQ(cache->_cur_cache_size, 0);
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -3855,6 +3859,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
     }
     EXPECT_TRUE(reader.close().ok());
     EXPECT_TRUE(reader.closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -3916,6 +3921,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_tail) {
         assert_range(2, blocks[1], io::FileBlock::Range(10_mb, 10_mb),
                      io::FileBlock::State::DOWNLOADED);
     }
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -4027,6 +4033,13 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_init) {
     context.stats = &rstats;
     context.query_id = query_id;
     
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
+    auto cache = FileCacheFactory::instance()->_caches[0].get();
+    for (int i = 0; i < 100; i++) {
+        if (cache->get_async_open_success()) {
+            break;
+        };
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
     FileReaderSPtr local_reader;
     ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
     io::FileReaderOptions opts;
@@ -4123,6 +4136,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_concurrent) {
     }
     EXPECT_TRUE(reader->close().ok());
     EXPECT_TRUE(reader->closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -4196,6 +4210,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_concurrent_2) {
     }
     EXPECT_TRUE(reader->close().ok());
     EXPECT_TRUE(reader->closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -4742,6 +4757,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_opt_lock) {
         EXPECT_EQ(buffer, std::string(10086, '0'));
         EXPECT_EQ(reader._cache_file_readers.size(), 8);
     }
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -7331,6 +7347,7 @@ TEST_F(BlockFileCacheTest, 
reader_dryrun_when_download_file_cache) {
 
     EXPECT_TRUE(reader.close().ok());
     EXPECT_TRUE(reader.closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -7340,4 +7357,399 @@ TEST_F(BlockFileCacheTest, 
reader_dryrun_when_download_file_cache) {
     config::enable_reader_dryrun_when_download_file_cache = org;
 }
 
+void move_dir_to_version1(const std::string& dirPath) {
+    try {
+        // layer 1
+        for (const auto& entry : fs::directory_iterator(dirPath)) {
+            if (fs::is_directory(entry)) {
+                std::string firstLevelDir = entry.path().string();
+
+                // layer 2
+                for (const auto& subEntry : 
fs::directory_iterator(firstLevelDir)) {
+                    if (fs::is_directory(subEntry)) {
+                        std::string secondLevelDir = subEntry.path().string();
+                        std::string newPath = dirPath + 
subEntry.path().filename().string();
+
+                        // Check if newPath ends with "_0"
+                        if (newPath.size() >= 2 && 
newPath.substr(newPath.size() - 2) == "_0") {
+                            // Remove the "_0" suffix
+                            newPath = newPath.substr(0, newPath.size() - 2);
+                        }
+
+                        // move 2 to 1
+                        fs::rename(secondLevelDir, newPath);
+                        LOG(INFO) << "Moved: " << secondLevelDir << " to " << 
newPath;
+                    }
+                }
+
+                // rm original 1
+                fs::remove_all(firstLevelDir);
+                LOG(INFO) << "Deleted: " << firstLevelDir;
+            }
+        }
+        std::fstream file(dirPath + "/version", std::ios::out | std::ios::in);
+        if (file.is_open()) {
+            file << "1.0";
+            file.close();
+            LOG(INFO) << "version 1.0 written";
+        }
+    } catch (const std::filesystem::filesystem_error& e) {
+        LOG(WARNING) << "Error: " << e.what();
+    }
+}
+
+void copy_dir(const fs::path& sourceDir, const fs::path& destinationDir) {
+    if (!fs::exists(destinationDir)) {
+        fs::create_directories(destinationDir);
+    }
+
+    for (const auto& entry : fs::directory_iterator(sourceDir)) {
+        const auto& path = entry.path();
+        if (fs::is_directory(path)) {
+            copy_dir(path, destinationDir / path.filename());
+        } else {
+            fs::copy_file(path, destinationDir / path.filename(),
+                          fs::copy_options::overwrite_existing);
+        }
+    }
+}
+
+TEST_F(BlockFileCacheTest, test_upgrade_cache_dir_version) {
+    config::enable_evict_file_cache_in_advance = false;
+    config::file_cache_enter_disk_resource_limit_mode_percent = 99;
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("FSFileCacheStorage::read_file_cache_version", [](auto&& 
args) {
+        *try_any_cast<Status*>(args[0]) = Status::IOError("inject io error");
+    });
+
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    size_t limit = 1000000;
+    io::CacheContext context;
+    ReadStatistics rstats;
+    context.stats = &rstats;
+    context.cache_type = io::FileCacheType::NORMAL;
+    context.query_id = query_id;
+    // int64_t cur_time = UnixSeconds();
+    // context.expiration_time = cur_time + 120;
+    LOG(INFO) << "start from empty";
+    auto key1 = io::BlockFileCache::hash("key1");
+    config::ignore_file_cache_dir_upgrade_failure = true;
+    { // the 1st cache initialize
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_TRUE(cache.initialize());
+        int i = 0;
+        for (; i < 100; i++) {
+            if (cache.get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+        ASSERT_TRUE(cache.get_async_open_success());
+        int64_t offset = 0;
+        // fill the cache to its limit
+        for (; offset < limit; offset += 100000) {
+            auto holder = cache.get_or_set(key1, offset, 100000, context);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+
+            assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY);
+            ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+            download(blocks[0]);
+            assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+
+            blocks.clear();
+        }
+    }
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    LOG(INFO) << "normal no upgrade";
+    { // the 2nd cache initialize
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_TRUE(cache.initialize());
+        int i = 0;
+        for (; i < 100; i++) {
+            if (cache.get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        }
+        ASSERT_TRUE(cache.get_async_open_success());
+        int64_t offset = 0;
+        // fill the cache to its limit
+        for (; offset < limit; offset += 100000) {
+            auto holder = cache.get_or_set(key1, offset, 100000, context);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+
+            assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED); // already 
download in the 1st try
+            blocks.clear();
+        }
+    }
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    move_dir_to_version1(cache_base_path);
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    // try the 3rd, update dir
+    LOG(INFO) << "normal upgrade";
+    {
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_TRUE(cache.initialize());
+        int i = 0;
+        for (; i < 100; i++) {
+            if (cache.get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+        ASSERT_TRUE(cache.get_async_open_success());
+        int64_t offset = 0;
+        // fill the cache to its limit
+        for (; offset < limit; offset += 100000) {
+            auto holder = cache.get_or_set(key1, offset, 100000, context);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+            assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED); // update should 
success
+
+            blocks.clear();
+        }
+    }
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    move_dir_to_version1(cache_base_path);
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    // inject failure and try the 4th
+    sp->enable_processing();
+    LOG(INFO) << "error injected upgrade";
+    {
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_TRUE(cache.initialize());
+        int i = 0;
+        for (; i < 100; i++) {
+            if (cache.get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        }
+        ASSERT_TRUE(cache.get_async_open_success());
+        int64_t offset = 0;
+        // fill the cache to its limit
+        for (; offset < limit; offset += 100000) {
+            auto holder = cache.get_or_set(key1, offset, 100000, context);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+
+            assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY); // inject failure 
removed the files
+            download(blocks[0]);
+            assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+            blocks.clear();
+        }
+    }
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    move_dir_to_version1(cache_base_path);
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    config::ignore_file_cache_dir_upgrade_failure = false;
+    LOG(INFO) << "error injected upgrade without ignore";
+    { // set ignore_file_cache_dir_upgrade_failure = false, inject failure and 
try the 5th cache initialize
+        io::BlockFileCache cache(cache_base_path, settings);
+        // (void)cache.initialize(); // thow exception!
+    }
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    sp->clear_call_back("FSFileCacheStorage::read_file_cache_version");
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    move_dir_to_version1(cache_base_path);
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    config::ignore_file_cache_dir_upgrade_failure = true;
+    sp->set_call_back("FSFileCacheStorage::collect_directory_entries", 
[](auto&& args) {
+        throw doris::Exception(
+                Status::InternalError("Inject exception to 
collect_directory_entries"));
+    });
+    LOG(INFO) << "collect_directory_entries exception injected upgrade";
+    {
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_TRUE(cache.initialize());
+        int i = 0;
+        for (; i < 1000; i++) {
+            if (cache.get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        }
+        ASSERT_TRUE(cache.get_async_open_success());
+        int64_t offset = 0;
+        // fill the cache to its limit
+        for (; offset < limit; offset += 100000) {
+            auto holder = cache.get_or_set(key1, offset, 100000, context);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+
+            assert_range(7, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY); // inject failure 
removed the files
+            download(blocks[0]);
+            assert_range(8, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+            blocks.clear();
+        }
+    }
+    sp->clear_call_back("FSFileCacheStorage::collect_directory_entries");
+
+    LOG(INFO) << "upgrade_cache_dir_if_necessary_rename exception injected 
upgrade";
+    
sp->set_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename", 
[](auto&& args) {
+        throw doris::Exception(
+                Status::InternalError("Inject exception to 
upgrade_cache_dir_if_necessary_rename"));
+    });
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    move_dir_to_version1(cache_base_path);
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    config::ignore_file_cache_dir_upgrade_failure = true;
+    {
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_TRUE(cache.initialize());
+        int i = 0;
+        for (; i < 1000; i++) {
+            if (cache.get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        }
+        ASSERT_TRUE(cache.get_async_open_success());
+        int64_t offset = 0;
+        // fill the cache to its limit
+        for (; offset < limit; offset += 100000) {
+            auto holder = cache.get_or_set(key1, offset, 100000, context);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+
+            assert_range(9, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY); // inject failure 
removed the files
+            download(blocks[0]);
+            assert_range(10, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+            blocks.clear();
+        }
+    }
+    
sp->clear_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename");
+
+    // mock upgrade when delete
+    LOG(INFO) << "upgrade_cache_dir_if_necessary_rename delete old error 
injected upgrade";
+    
sp->set_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename", 
[](auto&& args) {
+        std::string file_path = *try_any_cast<const std::string*>(args.at(0));
+        LOG(INFO) << "file_path=" << file_path;
+        std::error_code ec;
+        bool is_exist = std::filesystem::exists(file_path, ec);
+        ASSERT_TRUE(is_exist);
+        fs::remove_all(file_path);
+        is_exist = std::filesystem::exists(file_path, ec);
+        ASSERT_FALSE(is_exist);
+    });
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    move_dir_to_version1(cache_base_path);
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    config::ignore_file_cache_dir_upgrade_failure = true;
+    {
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_TRUE(cache.initialize());
+        int i = 0;
+        for (; i < 1000; i++) {
+            if (cache.get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        }
+        ASSERT_TRUE(cache.get_async_open_success());
+        int64_t offset = 0;
+        // fill the cache to its limit
+        for (; offset < limit; offset += 100000) {
+            auto holder = cache.get_or_set(key1, offset, 100000, context);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+
+            assert_range(11, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY); // inject failure 
removed the files
+            download(blocks[0]);
+            assert_range(12, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+            blocks.clear();
+        }
+    }
+    
sp->clear_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename");
+
+    // mock concurrent query create target file while upgrading
+    LOG(INFO) << "upgrade_cache_dir_if_necessary_rename new already exists 
error injected upgrade";
+    
sp->set_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename", 
[](auto&& args) {
+        std::string file_path = *try_any_cast<const std::string*>(args.at(0));
+        LOG(INFO) << "file_path=" << file_path;
+        std::string new_file_path = *try_any_cast<const 
std::string*>(args.at(1));
+        LOG(INFO) << "new_file_path=" << new_file_path;
+        std::error_code ec;
+        bool is_exist = std::filesystem::exists(new_file_path, ec);
+        ASSERT_FALSE(is_exist);
+        is_exist = std::filesystem::exists(file_path, ec);
+        ASSERT_TRUE(is_exist);
+        copy_dir(file_path, new_file_path);
+        is_exist = std::filesystem::exists(new_file_path, ec);
+        ASSERT_TRUE(is_exist);
+        is_exist = std::filesystem::exists(file_path, ec);
+        ASSERT_TRUE(is_exist);
+    });
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    move_dir_to_version1(cache_base_path);
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    config::ignore_file_cache_dir_upgrade_failure = true;
+    {
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_TRUE(cache.initialize());
+        int i = 0;
+        for (; i < 1000; i++) {
+            if (cache.get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        }
+        ASSERT_TRUE(cache.get_async_open_success());
+        int64_t offset = 0;
+        // fill the cache to its limit
+        for (; offset < limit; offset += 100000) {
+            auto holder = cache.get_or_set(key1, offset, 100000, context);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+            assert_range(13, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+            blocks.clear();
+        }
+    }
+    
sp->clear_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename");
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
 } // namespace doris::io
diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift
index 7bdafc59ddb..5a329b1636d 100644
--- a/gensrc/thrift/Status.thrift
+++ b/gensrc/thrift/Status.thrift
@@ -69,6 +69,7 @@ enum TStatusCode {
     ILLEGAL_STATE       = 37,
     NOT_AUTHORIZED      = 38,
     ABORTED             = 39,
+    DIRECTORY_NOT_EMPTY = 40,
     //REMOTE_ERROR        = 40,
     //SERVICE_UNAVAILABLE = 41, // Not used any more
     UNINITIALIZED       = 42,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to