This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b43fb2597ed branch-3.0: [enhancement](cloud) make file cache version
upgrade faster #50726 (#51194)
b43fb2597ed is described below
commit b43fb2597ed183c1b2f746a73e36dd796325d436
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 28 14:25:16 2025 +0800
branch-3.0: [enhancement](cloud) make file cache version upgrade faster
#50726 (#51194)
Cherry-picked from #50726
Co-authored-by: zhengyu <[email protected]>
---
be/src/common/config.cpp | 9 +-
be/src/common/config.h | 1 +
be/src/common/status.h | 2 +
be/src/io/cache/fs_file_cache_storage.cpp | 267 ++++++++++++++-----
be/src/io/cache/fs_file_cache_storage.h | 8 +
be/src/io/fs/err_utils.cpp | 2 +
be/src/runtime/exec_env_init.cpp | 3 +-
be/test/io/cache/block_file_cache_test.cpp | 414 ++++++++++++++++++++++++++++-
gensrc/thrift/Status.thrift | 1 +
9 files changed, 630 insertions(+), 77 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5b52532e753..61758a0b4fe 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -657,6 +657,7 @@ DEFINE_Int32(num_cores, "0");
// When BE start, If there is a broken disk, BE process will exit by default.
// Otherwise, we will ignore the broken disk,
DEFINE_Bool(ignore_broken_disk, "false");
+DEFINE_Bool(ignore_file_cache_dir_upgrade_failure, "false");
// Sleep time in milliseconds between memory maintenance iterations
DEFINE_mInt32(memory_maintenance_sleep_time_ms, "20");
@@ -1084,11 +1085,11 @@ DEFINE_Int64(file_cache_each_block_size, "1048576"); //
1MB
DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
-DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88");
-DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
+DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
+DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "88");
DEFINE_mBool(enable_evict_file_cache_in_advance, "true");
-DEFINE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent, "78");
-DEFINE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent, "75");
+DEFINE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent, "88");
+DEFINE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent, "85");
DEFINE_mInt32(file_cache_evict_in_advance_interval_ms, "1000");
DEFINE_mInt64(file_cache_evict_in_advance_batch_bytes, "31457280"); // 30MB
DEFINE_mInt64(file_cache_evict_in_advance_recycle_keys_num_threshold, "1000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 941b6a13777..a7e16c53d5e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -700,6 +700,7 @@ DECLARE_Int32(num_cores);
// When BE start, If there is a broken disk, BE process will exit by default.
// Otherwise, we will ignore the broken disk,
DECLARE_Bool(ignore_broken_disk);
+DECLARE_Bool(ignore_file_cache_dir_upgrade_failure);
// Sleep time in milliseconds between memory maintenance iterations
DECLARE_mInt32(memory_maintenance_sleep_time_ms);
diff --git a/be/src/common/status.h b/be/src/common/status.h
index d003645b258..26877e88dc7 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -47,6 +47,7 @@ namespace ErrorCode {
TStatusError(IO_ERROR, true); \
TStatusError(NOT_FOUND, true); \
TStatusError(ALREADY_EXIST, true); \
+ TStatusError(DIRECTORY_NOT_EMPTY, true); \
TStatusError(NOT_IMPLEMENTED_ERROR, false); \
TStatusError(END_OF_FILE, false); \
TStatusError(INTERNAL_ERROR, true); \
@@ -471,6 +472,7 @@ public:
ERROR_CTOR(IOError, IO_ERROR)
ERROR_CTOR(NotFound, NOT_FOUND)
ERROR_CTOR_NOSTACK(AlreadyExist, ALREADY_EXIST)
+ ERROR_CTOR_NOSTACK(DirectoryNotEmpty, DIRECTORY_NOT_EMPTY)
ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR)
ERROR_CTOR_NOSTACK(EndOfFile, END_OF_FILE)
ERROR_CTOR(InternalError, INTERNAL_ERROR)
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp
b/be/src/io/cache/fs_file_cache_storage.cpp
index 01900fbd2a5..ecb594e14a2 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -31,6 +31,8 @@
#include "io/fs/local_file_reader.h"
#include "io/fs/local_file_writer.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
#include "vec/common/hex.h"
namespace doris::io {
@@ -101,12 +103,33 @@ size_t FDCache::file_reader_cache_size() {
}
Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
+ _iterator_dir_retry_cnt = std::make_shared<bvar::LatencyRecorder>(
+ _cache_base_path.c_str(),
"file_cache_fs_storage_iterator_dir_retry_cnt");
_cache_base_path = _mgr->_cache_base_path;
- RETURN_IF_ERROR(upgrade_cache_dir_if_necessary());
_cache_background_load_thread = std::thread([this, mgr = _mgr]() {
+ auto mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
+
fmt::format("FileCacheVersionReader"));
+ SCOPED_ATTACH_TASK(mem_tracker);
+ Status st = upgrade_cache_dir_if_necessary();
+ if (!st.ok()) {
+ std::string msg = fmt::format(
+ "file cache {} upgrade done with error. upgrade version
failed. st={}",
+ _cache_base_path, st.to_string());
+ if (doris::config::ignore_file_cache_dir_upgrade_failure) {
+ LOG(WARNING) << msg << " be conf:
`ignore_file_cache_dir_upgrade_failure = true`"
+ << " so we are ignoring the error (unsuccessful
cache files will be "
+ "removed)";
+ remove_old_version_directories();
+ } else {
+ LOG(WARNING) << msg << " please fix error and restart BE or"
+ << " use be conf:
`ignore_file_cache_dir_upgrade_failure = true`"
+ << " to skip the error (unsuccessful cache files
will be removed)";
+ throw doris::Exception(Status::InternalError(msg));
+ }
+ }
load_cache_info_into_memory(mgr);
mgr->_async_open_done = true;
- LOG_INFO("FileCache {} lazy load done.", _cache_base_path);
+ LOG_INFO("file cache {} lazy load done.", _cache_base_path);
});
return Status::OK();
}
@@ -307,6 +330,99 @@ std::string
FSFileCacheStorage::get_path_in_local_cache(const UInt128Wrapper& va
}
}
+void FSFileCacheStorage::remove_old_version_directories() {
+ std::error_code ec;
+ std::filesystem::directory_iterator key_it {_cache_base_path, ec};
+ if (ec) {
+ LOG(WARNING) << "Failed to list directory: " << _cache_base_path
+ << ", error: " << ec.message();
+ return;
+ }
+
+ std::vector<std::filesystem::path> file_list;
+ // the dir is concurrently accessed, so handle invalid iter with retry
+ bool success = false;
+ size_t retry_count = 0;
+ const size_t max_retry = 5;
+ while (!success && retry_count < max_retry) {
+ try {
+ ++retry_count;
+ for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
+ file_list.push_back(key_it->path());
+ }
+ success = true;
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Error occurred while iterating directory: " <<
e.what();
+ file_list.clear();
+ }
+ }
+
+ if (!success) {
+ LOG_WARNING("iteration of cache dir still failed after retry {}
times.", max_retry);
+ }
+
+ auto path_itr = file_list.begin();
+ for (; path_itr != file_list.end(); ++path_itr) {
+ if (std::filesystem::is_directory(*path_itr)) {
+ std::string cache_key = path_itr->filename().native();
+ if (cache_key.size() > KEY_PREFIX_LENGTH) {
+ // try our best to delete, not care the return
+ (void)fs->delete_directory(*path_itr);
+ }
+ }
+ }
+ auto s = fs->delete_file(get_version_path());
+ if (!s.ok()) {
+ LOG(WARNING) << "deleted old version file failed: " << s.to_string();
+ return;
+ }
+ s = write_file_cache_version();
+ if (!s.ok()) {
+ LOG(WARNING) << "write new version file failed: " << s.to_string();
+ return;
+ }
+}
+
+Status FSFileCacheStorage::collect_directory_entries(const
std::filesystem::path& dir_path,
+ std::vector<std::string>&
file_list) const {
+ std::error_code ec;
+ bool success = false;
+ size_t retry_count = 0;
+ const size_t max_retry = 5;
+
+ while (!success && retry_count < max_retry) {
+ try {
+ ++retry_count;
+ std::filesystem::directory_iterator it {dir_path, ec};
+
TEST_SYNC_POINT_CALLBACK("FSFileCacheStorage::collect_directory_entries");
+ if (ec) {
+ LOG(WARNING) << "Failed to list directory: " << dir_path
+ << ", error: " << ec.message();
+ continue;
+ }
+
+ file_list.clear();
+ for (; it != std::filesystem::directory_iterator(); ++it) {
+ file_list.push_back(it->path().string());
+ }
+ success = true;
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Error occurred while iterating directory: " <<
dir_path
+ << " err: " << e.what();
+ file_list.clear();
+ }
+ }
+
+ *_iterator_dir_retry_cnt << retry_count;
+
+ if (!success) {
+ LOG_WARNING("iteration of cache dir still failed after retry {}
times.", max_retry);
+ return Status::InternalError("Failed to iterate directory after
retries.");
+ }
+
+ return Status::OK();
+}
+
Status FSFileCacheStorage::upgrade_cache_dir_if_necessary() const {
/*
* If use version2 but was version 1, do upgrade:
@@ -326,91 +442,99 @@ Status
FSFileCacheStorage::upgrade_cache_dir_if_necessary() const {
std::string version;
std::error_code ec;
int rename_count = 0;
+ int failure_count = 0;
auto start_time = std::chrono::steady_clock::now();
RETURN_IF_ERROR(read_file_cache_version(&version));
+
LOG(INFO) << "Checking cache version upgrade. Current version: " << version
<< ", target version: 2.0, need upgrade: "
<< (USE_CACHE_VERSION2 && version != "2.0");
if (USE_CACHE_VERSION2 && version != "2.0") {
// move directories format as version 2.0
- std::filesystem::directory_iterator key_it {_cache_base_path, ec};
- if (ec) {
- LOG(WARNING) << "Failed to list directory: " << _cache_base_path
- << ", error: " << ec.message();
- return Status::InternalError("Failed to list dir {}: {}",
_cache_base_path,
- ec.message());
- }
- for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
- if (key_it->is_directory()) {
- std::string cache_key = key_it->path().filename().native();
- if (cache_key.size() > KEY_PREFIX_LENGTH) {
- std::string key_prefix =
- Path(_cache_base_path) / cache_key.substr(0,
KEY_PREFIX_LENGTH);
- bool exists = false;
- auto exists_status = fs->exists(key_prefix, &exists);
- if (!exists_status.ok()) {
- LOG(WARNING) << "Failed to check directory existence:
" << key_prefix
- << ", error: " <<
exists_status.to_string();
- return exists_status;
- }
- if (!exists) {
- auto create_status = fs->create_directory(key_prefix);
- if (!create_status.ok()) {
- LOG(WARNING) << "Failed to create directory: " <<
key_prefix
- << ", error: " <<
create_status.to_string();
- return create_status;
+ std::vector<std::string> file_list;
+ file_list.reserve(10000);
+ RETURN_IF_ERROR(collect_directory_entries(_cache_base_path,
file_list));
+
+ // this directory_iterator should be a problem in concurrent access
+ for (const auto& file_path : file_list) {
+ try {
+ if (std::filesystem::is_directory(file_path)) {
+ std::string cache_key =
std::filesystem::path(file_path).filename().native();
+ if (cache_key.size() > KEY_PREFIX_LENGTH) {
+ if (cache_key.find('_') == std::string::npos) {
+ cache_key += "_0";
+ }
+ std::string key_prefix =
+ Path(_cache_base_path) / cache_key.substr(0,
KEY_PREFIX_LENGTH);
+ bool exists = false;
+ auto exists_status = fs->exists(key_prefix, &exists);
+ if (!exists_status.ok()) {
+ LOG(WARNING) << "Failed to check directory
existence: " << key_prefix
+ << ", error: " <<
exists_status.to_string();
+ ++failure_count;
+ continue;
+ }
+ if (!exists) {
+ auto create_status =
fs->create_directory(key_prefix);
+ if (!create_status.ok() &&
+ create_status.code() !=
TStatusCode::type::ALREADY_EXIST) {
+ LOG(WARNING) << "Failed to create directory: "
<< key_prefix
+ << ", error: " <<
create_status.to_string();
+ ++failure_count;
+ continue;
+ }
+ }
+ auto rename_status = Status::OK();
+ const std::string new_file_path = key_prefix + "/" +
cache_key;
+ TEST_SYNC_POINT_CALLBACK(
+
"FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename",
+ &file_path, &new_file_path);
+ rename_status = fs->rename(file_path, new_file_path);
+ if (rename_status.ok() ||
+ rename_status.code() ==
TStatusCode::type::DIRECTORY_NOT_EMPTY) {
+ ++rename_count;
+ } else {
+ LOG(WARNING)
+ << "Failed to rename directory from " <<
file_path << " to "
+ << new_file_path << ", error: " <<
rename_status.to_string();
+ ++failure_count;
+ continue;
}
- }
- auto rename_status = fs->rename(key_it->path(), key_prefix
/ cache_key);
- if (rename_status.ok()) {
- ++rename_count;
- } else {
- LOG(WARNING)
- << "Failed to rename directory from " <<
key_it->path().native()
- << " to " << (key_prefix / cache_key).native()
- << ", error: " << rename_status.to_string();
- return rename_status;
}
}
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Error occurred while upgrading file cache
directory: " << file_path
+ << " err: " << e.what();
+ ++failure_count;
}
}
- auto rebuild_dir = [&](std::filesystem::directory_iterator&
upgrade_key_it) -> Status {
- for (; upgrade_key_it != std::filesystem::directory_iterator();
++upgrade_key_it) {
- if (upgrade_key_it->path().filename().native().find('_') ==
std::string::npos) {
-
RETURN_IF_ERROR(fs->delete_directory(upgrade_key_it->path().native() + "_0"));
- auto rename_status = fs->rename(upgrade_key_it->path(),
-
upgrade_key_it->path().native() + "_0");
- if (rename_status.ok()) {
- ++rename_count;
- }
- RETURN_IF_ERROR(rename_status);
- }
- }
- return Status::OK();
- };
+ std::vector<std::string> rebuilt_file_list;
+ rebuilt_file_list.reserve(10000);
+ RETURN_IF_ERROR(collect_directory_entries(_cache_base_path,
rebuilt_file_list));
- std::filesystem::directory_iterator key_prefix_it {_cache_base_path,
ec};
- if (ec) [[unlikely]] {
- LOG(WARNING) << ec.message();
- return Status::IOError(ec.message());
- }
- for (; key_prefix_it != std::filesystem::directory_iterator();
++key_prefix_it) {
- if (!key_prefix_it->is_directory()) {
+ for (const auto& key_it : rebuilt_file_list) {
+ if (!std::filesystem::is_directory(key_it)) {
// maybe version hits file
continue;
}
- if (key_prefix_it->path().filename().native().size() !=
KEY_PREFIX_LENGTH) {
- LOG(WARNING) << "Unknown directory " <<
key_prefix_it->path().native()
- << ", try to remove it";
- RETURN_IF_ERROR(fs->delete_directory(key_prefix_it->path()));
- }
- std::filesystem::directory_iterator key_it {key_prefix_it->path(),
ec};
- if (ec) [[unlikely]] {
- return Status::IOError(ec.message());
+ try {
+ if (Path(key_it).filename().native().size() !=
KEY_PREFIX_LENGTH) {
+ LOG(WARNING) << "Unknown directory " << key_it << ", try
to remove it";
+ auto delete_status = fs->delete_directory(key_it);
+ if (!delete_status.ok()) {
+ LOG(WARNING) << "Failed to delete unknown directory: "
<< key_it
+ << ", error: " <<
delete_status.to_string();
+ ++failure_count;
+ continue;
+ }
+ }
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Error occurred while upgrading file cache
directory: " << key_it
+ << " err: " << e.what();
+ ++failure_count;
}
- RETURN_IF_ERROR(rebuild_dir(key_it));
}
if (!write_file_cache_version().ok()) {
return Status::InternalError("Failed to write version hints for
file cache");
@@ -420,7 +544,8 @@ Status FSFileCacheStorage::upgrade_cache_dir_if_necessary()
const {
auto end_time = std::chrono::steady_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
LOG(INFO) << "Cache directory upgrade completed. Total files renamed: " <<
rename_count
- << ", Time taken: " << duration.count() << "ms";
+ << ", Time taken: " << duration.count() << "ms"
+ << ", Failure count: " << failure_count;
return Status::OK();
}
@@ -452,7 +577,9 @@ Status
FSFileCacheStorage::read_file_cache_version(std::string* buffer) const {
size_t bytes_read = 0;
RETURN_IF_ERROR(version_reader->read_at(0, Slice(buffer->data(),
file_size), &bytes_read));
RETURN_IF_ERROR(version_reader->close());
- return Status::OK();
+ auto st = Status::OK();
+ TEST_SYNC_POINT_CALLBACK("FSFileCacheStorage::read_file_cache_version",
&st);
+ return st;
}
std::string FSFileCacheStorage::get_version_path() const {
diff --git a/be/src/io/cache/fs_file_cache_storage.h
b/be/src/io/cache/fs_file_cache_storage.h
index 8a97aa109ad..114517bdf72 100644
--- a/be/src/io/cache/fs_file_cache_storage.h
+++ b/be/src/io/cache/fs_file_cache_storage.h
@@ -17,6 +17,8 @@
#pragma once
+#include <bvar/bvar.h>
+
#include <memory>
#include <shared_mutex>
#include <thread>
@@ -87,6 +89,11 @@ public:
FileCacheStorageType get_type() override { return DISK; }
private:
+ void remove_old_version_directories();
+
+ Status collect_directory_entries(const std::filesystem::path& dir_path,
+ std::vector<std::string>& file_list)
const;
+
Status upgrade_cache_dir_if_necessary() const;
Status read_file_cache_version(std::string* buffer) const;
@@ -111,6 +118,7 @@ private:
// TODO(Lchangliang): use a more efficient data structure
std::mutex _mtx;
std::unordered_map<FileWriterMapKey, FileWriterPtr, FileWriterMapKeyHash>
_key_to_writer;
+ std::shared_ptr<bvar::LatencyRecorder> _iterator_dir_retry_cnt;
};
} // namespace doris::io
diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp
index 19c460f1618..96ac7b817e9 100644
--- a/be/src/io/fs/err_utils.cpp
+++ b/be/src/io/fs/err_utils.cpp
@@ -96,6 +96,8 @@ Status localfs_error(const std::error_code& ec,
std::string_view msg) {
return Status::Error<DISK_REACH_CAPACITY_LIMIT, false>(message);
} else if (ec == std::errc::permission_denied) {
return Status::Error<PERMISSION_DENIED, false>(message);
+ } else if (ec == std::errc::directory_not_empty) {
+ return Status::Error<DIRECTORY_NOT_EMPTY, false>(message);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(message);
}
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 393d3fa3d19..a0bed40953a 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -238,6 +238,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
.set_min_threads(config::min_s3_file_system_thread_num)
.set_max_threads(config::max_s3_file_system_thread_num)
.build(&_s3_file_system_thread_pool));
+ RETURN_IF_ERROR(_init_mem_env());
// NOTE: runtime query statistics mgr could be visited by query and daemon
thread
// so it should be created before all query begin and deleted after all
query and daemon thread stoppped
@@ -304,8 +305,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
return status;
}
- RETURN_IF_ERROR(_init_mem_env());
-
RETURN_IF_ERROR(_memtable_memory_limiter->init(MemInfo::mem_limit()));
RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
RETURN_IF_ERROR(_wal_manager->init());
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 0e62170b760..d0546787026 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -29,6 +29,8 @@
#endif
// IWYU pragma: no_include <bits/chrono.h>
+#include <gtest/gtest.h>
+
#include <chrono> // IWYU pragma: keep
#include <condition_variable>
#include <filesystem>
@@ -40,6 +42,7 @@
#include <optional>
#include <random>
#include <ranges>
+#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
@@ -164,7 +167,7 @@ public:
ExecEnv::GetInstance()->_file_cache_open_fd_cache =
std::make_unique<io::FDCache>();
}
static void TearDownTestSuite() {
- config::file_cache_enter_disk_resource_limit_mode_percent = 90;
+ config::file_cache_enter_disk_resource_limit_mode_percent = 99;
ExecEnv::GetInstance()->_file_cache_open_fd_cache.reset(nullptr);
}
@@ -3256,6 +3259,7 @@ TEST_F(BlockFileCacheTest, test_factory_1) {
}
FileCacheFactory::instance()->clear_file_caches(true);
EXPECT_EQ(cache->_cur_cache_size, 0);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -3855,6 +3859,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
}
EXPECT_TRUE(reader.close().ok());
EXPECT_TRUE(reader.closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -3916,6 +3921,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_tail) {
assert_range(2, blocks[1], io::FileBlock::Range(10_mb, 10_mb),
io::FileBlock::State::DOWNLOADED);
}
+ std::this_thread::sleep_for(std::chrono::seconds(1));
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -4027,6 +4033,13 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_init) {
context.stats = &rstats;
context.query_id = query_id;
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+ auto cache = FileCacheFactory::instance()->_caches[0].get();
+ for (int i = 0; i < 100; i++) {
+ if (cache->get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
FileReaderSPtr local_reader;
ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
io::FileReaderOptions opts;
@@ -4123,6 +4136,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_concurrent) {
}
EXPECT_TRUE(reader->close().ok());
EXPECT_TRUE(reader->closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -4196,6 +4210,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_concurrent_2) {
}
EXPECT_TRUE(reader->close().ok());
EXPECT_TRUE(reader->closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -4742,6 +4757,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_opt_lock) {
EXPECT_EQ(buffer, std::string(10086, '0'));
EXPECT_EQ(reader._cache_file_readers.size(), 8);
}
+ std::this_thread::sleep_for(std::chrono::seconds(1));
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -7331,6 +7347,7 @@ TEST_F(BlockFileCacheTest,
reader_dryrun_when_download_file_cache) {
EXPECT_TRUE(reader.close().ok());
EXPECT_TRUE(reader.closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -7340,4 +7357,399 @@ TEST_F(BlockFileCacheTest,
reader_dryrun_when_download_file_cache) {
config::enable_reader_dryrun_when_download_file_cache = org;
}
+void move_dir_to_version1(const std::string& dirPath) {
+ try {
+ // layer 1
+ for (const auto& entry : fs::directory_iterator(dirPath)) {
+ if (fs::is_directory(entry)) {
+ std::string firstLevelDir = entry.path().string();
+
+ // layer 2
+ for (const auto& subEntry :
fs::directory_iterator(firstLevelDir)) {
+ if (fs::is_directory(subEntry)) {
+ std::string secondLevelDir = subEntry.path().string();
+ std::string newPath = dirPath +
subEntry.path().filename().string();
+
+ // Check if newPath ends with "_0"
+ if (newPath.size() >= 2 &&
newPath.substr(newPath.size() - 2) == "_0") {
+ // Remove the "_0" suffix
+ newPath = newPath.substr(0, newPath.size() - 2);
+ }
+
+ // move 2 to 1
+ fs::rename(secondLevelDir, newPath);
+ LOG(INFO) << "Moved: " << secondLevelDir << " to " <<
newPath;
+ }
+ }
+
+ // rm original 1
+ fs::remove_all(firstLevelDir);
+ LOG(INFO) << "Deleted: " << firstLevelDir;
+ }
+ }
+ std::fstream file(dirPath + "/version", std::ios::out | std::ios::in);
+ if (file.is_open()) {
+ file << "1.0";
+ file.close();
+ LOG(INFO) << "version 1.0 written";
+ }
+ } catch (const std::filesystem::filesystem_error& e) {
+ LOG(WARNING) << "Error: " << e.what();
+ }
+}
+
+void copy_dir(const fs::path& sourceDir, const fs::path& destinationDir) {
+ if (!fs::exists(destinationDir)) {
+ fs::create_directories(destinationDir);
+ }
+
+ for (const auto& entry : fs::directory_iterator(sourceDir)) {
+ const auto& path = entry.path();
+ if (fs::is_directory(path)) {
+ copy_dir(path, destinationDir / path.filename());
+ } else {
+ fs::copy_file(path, destinationDir / path.filename(),
+ fs::copy_options::overwrite_existing);
+ }
+ }
+}
+
+TEST_F(BlockFileCacheTest, test_upgrade_cache_dir_version) {
+ config::enable_evict_file_cache_in_advance = false;
+ config::file_cache_enter_disk_resource_limit_mode_percent = 99;
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("FSFileCacheStorage::read_file_cache_version", [](auto&&
args) {
+ *try_any_cast<Status*>(args[0]) = Status::IOError("inject io error");
+ });
+
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ size_t limit = 1000000;
+ io::CacheContext context;
+ ReadStatistics rstats;
+ context.stats = &rstats;
+ context.cache_type = io::FileCacheType::NORMAL;
+ context.query_id = query_id;
+ // int64_t cur_time = UnixSeconds();
+ // context.expiration_time = cur_time + 120;
+ LOG(INFO) << "start from empty";
+ auto key1 = io::BlockFileCache::hash("key1");
+ config::ignore_file_cache_dir_upgrade_failure = true;
+ { // the 1st cache initialize
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ LOG(INFO) << "normal no upgrade";
+ { // the 2nd cache initialize
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED); // already
download in the 1st try
+ blocks.clear();
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ move_dir_to_version1(cache_base_path);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ // try the 3rd, update dir
+ LOG(INFO) << "normal upgrade";
+ {
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED); // update should
success
+
+ blocks.clear();
+ }
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ move_dir_to_version1(cache_base_path);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ // inject failure and try the 4th
+ sp->enable_processing();
+ LOG(INFO) << "error injected upgrade";
+ {
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(5, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY); // inject failure
removed the files
+ download(blocks[0]);
+ assert_range(6, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+ blocks.clear();
+ }
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ move_dir_to_version1(cache_base_path);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ config::ignore_file_cache_dir_upgrade_failure = false;
+ LOG(INFO) << "error injected upgrade without ignore";
+ { // set ignore_file_cache_dir_upgrade_failure = false, inject failure and
try the 5th cache initialize
+ io::BlockFileCache cache(cache_base_path, settings);
+ // (void)cache.initialize(); // thow exception!
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ sp->clear_call_back("FSFileCacheStorage::read_file_cache_version");
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ move_dir_to_version1(cache_base_path);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ config::ignore_file_cache_dir_upgrade_failure = true;
+ sp->set_call_back("FSFileCacheStorage::collect_directory_entries",
[](auto&& args) {
+ throw doris::Exception(
+ Status::InternalError("Inject exception to
collect_directory_entries"));
+ });
+ LOG(INFO) << "collect_directory_entries exception injected upgrade";
+ {
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 1000; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(7, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY); // inject failure
removed the files
+ download(blocks[0]);
+ assert_range(8, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+ blocks.clear();
+ }
+ }
+ sp->clear_call_back("FSFileCacheStorage::collect_directory_entries");
+
+ LOG(INFO) << "upgrade_cache_dir_if_necessary_rename exception injected
upgrade";
+
sp->set_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename",
[](auto&& args) {
+ throw doris::Exception(
+ Status::InternalError("Inject exception to
upgrade_cache_dir_if_necessary_rename"));
+ });
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ move_dir_to_version1(cache_base_path);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ config::ignore_file_cache_dir_upgrade_failure = true;
+ {
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 1000; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(9, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY); // inject failure
removed the files
+ download(blocks[0]);
+ assert_range(10, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+ blocks.clear();
+ }
+ }
+
sp->clear_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename");
+
+ // mock upgrade when delete
+ LOG(INFO) << "upgrade_cache_dir_if_necessary_rename delete old error
injected upgrade";
+
sp->set_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename",
[](auto&& args) {
+ std::string file_path = *try_any_cast<const std::string*>(args.at(0));
+ LOG(INFO) << "file_path=" << file_path;
+ std::error_code ec;
+ bool is_exist = std::filesystem::exists(file_path, ec);
+ ASSERT_TRUE(is_exist);
+ fs::remove_all(file_path);
+ is_exist = std::filesystem::exists(file_path, ec);
+ ASSERT_FALSE(is_exist);
+ });
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ move_dir_to_version1(cache_base_path);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ config::ignore_file_cache_dir_upgrade_failure = true;
+ {
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 1000; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(11, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY); // inject failure
removed the files
+ download(blocks[0]);
+ assert_range(12, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+ blocks.clear();
+ }
+ }
+
sp->clear_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename");
+
+ // mock concurrent query create target file while upgrading
+ LOG(INFO) << "upgrade_cache_dir_if_necessary_rename new already exists
error injected upgrade";
+
sp->set_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename",
[](auto&& args) {
+ std::string file_path = *try_any_cast<const std::string*>(args.at(0));
+ LOG(INFO) << "file_path=" << file_path;
+ std::string new_file_path = *try_any_cast<const
std::string*>(args.at(1));
+ LOG(INFO) << "new_file_path=" << new_file_path;
+ std::error_code ec;
+ bool is_exist = std::filesystem::exists(new_file_path, ec);
+ ASSERT_FALSE(is_exist);
+ is_exist = std::filesystem::exists(file_path, ec);
+ ASSERT_TRUE(is_exist);
+ copy_dir(file_path, new_file_path);
+ is_exist = std::filesystem::exists(new_file_path, ec);
+ ASSERT_TRUE(is_exist);
+ is_exist = std::filesystem::exists(file_path, ec);
+ ASSERT_TRUE(is_exist);
+ });
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ move_dir_to_version1(cache_base_path);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ config::ignore_file_cache_dir_upgrade_failure = true;
+ {
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 1000; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(13, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+ blocks.clear();
+ }
+ }
+
sp->clear_call_back("FSFileCacheStorage::upgrade_cache_dir_if_necessary_rename");
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
} // namespace doris::io
diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift
index 7bdafc59ddb..5a329b1636d 100644
--- a/gensrc/thrift/Status.thrift
+++ b/gensrc/thrift/Status.thrift
@@ -69,6 +69,7 @@ enum TStatusCode {
ILLEGAL_STATE = 37,
NOT_AUTHORIZED = 38,
ABORTED = 39,
+ DIRECTORY_NOT_EMPTY = 40,
//REMOTE_ERROR = 40,
//SERVICE_UNAVAILABLE = 41, // Not used any more
UNINITIALIZED = 42,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]