This is an automated email from the ASF dual-hosted git repository. ashingau pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 330f369764 [enhancement](file-cache) limit the file cache handle num and init the file cache concurrently (#22919) 330f369764 is described below commit 330f369764d3518b9da6bea8dad2a854c08b8002 Author: Mingyu Chen <morning...@163.com> AuthorDate: Thu Aug 17 16:52:08 2023 +0800 [enhancement](file-cache) limit the file cache handle num and init the file cache concurrently (#22919) 1. the real value of BE config `file_cache_max_file_reader_cache_size` will be the 1/3 of process's max open file number. 2. use thread pool to create or init the file cache concurrently. To solve the issue that when there are lots of files in file cache dir, the starting time of BE will be very slow because it will traverse all file cache dirs sequentially. --- be/src/io/cache/block/block_file_cache.cpp | 20 +++++++++++++- be/src/io/cache/block/block_file_cache.h | 4 +++ be/src/io/cache/block/block_file_cache_factory.cpp | 23 ++++++++++++---- be/src/io/cache/block/block_file_cache_factory.h | 6 ++-- be/src/io/cache/block/block_lru_file_cache.cpp | 8 ++++-- be/src/service/doris_main.cpp | 32 +++++++++++++++++++--- be/test/io/cache/file_block_cache_test.cpp | 1 + .../test_truncate_char_or_varchar_columns.groovy | 12 ++++---- 8 files changed, 85 insertions(+), 21 deletions(-) diff --git a/be/src/io/cache/block/block_file_cache.cpp b/be/src/io/cache/block/block_file_cache.cpp index 0cd546aeaa..2b7b8cb134 100644 --- a/be/src/io/cache/block/block_file_cache.cpp +++ b/be/src/io/cache/block/block_file_cache.cpp @@ -22,6 +22,8 @@ #include <glog/logging.h> // IWYU pragma: no_include <bits/chrono.h> +#include <sys/resource.h> + #include <chrono> // IWYU pragma: keep #include <filesystem> #include <utility> @@ -176,7 +178,7 @@ std::weak_ptr<FileReader> IFileCache::cache_file_reader(const AccessKeyAndOffset std::weak_ptr<FileReader> wp; if (!s_read_only) [[likely]] { std::lock_guard lock(s_file_reader_cache_mtx); - if (config::file_cache_max_file_reader_cache_size == s_file_reader_cache.size()) { + if (s_file_reader_cache.size() >= _max_file_reader_cache_size) { s_file_name_to_reader.erase(s_file_reader_cache.back().first); s_file_reader_cache.pop_back(); } @@ -205,5 +207,21 @@ size_t IFileCache::file_reader_cache_size() { return s_file_name_to_reader.size(); } +void IFileCache::init() { + struct rlimit limit; + if (getrlimit(RLIMIT_NOFILE, &limit) != 0) { + LOG(FATAL) << "getrlimit() failed with errno: " << errno; + return; + } + + _max_file_reader_cache_size = + std::min((uint64_t)config::file_cache_max_file_reader_cache_size, limit.rlim_max / 3); + LOG(INFO) << "max file reader cache size is: " << _max_file_reader_cache_size + << ", resource hard limit is: " << limit.rlim_max + << ", config file_cache_max_file_reader_cache_size is: " + << config::file_cache_max_file_reader_cache_size; + return; +} + } // namespace io } // namespace doris diff --git a/be/src/io/cache/block/block_file_cache.h b/be/src/io/cache/block/block_file_cache.h index 5d0dd1f06f..9ac1a464e8 100644 --- a/be/src/io/cache/block/block_file_cache.h +++ b/be/src/io/cache/block/block_file_cache.h @@ -303,8 +303,12 @@ private: s_file_name_to_reader; static inline std::mutex s_file_reader_cache_mtx; static inline std::atomic_bool s_read_only {false}; + static inline uint64_t _max_file_reader_cache_size = 65533; public: + // should be call when BE start + static void init(); + static void set_read_only(bool read_only); static bool read_only() { return s_read_only; } diff --git a/be/src/io/cache/block/block_file_cache_factory.cpp b/be/src/io/cache/block/block_file_cache_factory.cpp index fc60469ada..8741ced5c0 100644 --- a/be/src/io/cache/block/block_file_cache_factory.cpp +++ b/be/src/io/cache/block/block_file_cache_factory.cpp @@ -58,8 +58,9 @@ size_t FileCacheFactory::try_release(const std::string& base_path) { return 0; } -Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, - const FileCacheSettings& file_cache_settings) { +void FileCacheFactory::create_file_cache(const std::string& cache_base_path, + const FileCacheSettings& file_cache_settings, + Status* status) { if (config::clear_file_cache) { auto fs = global_local_filesystem(); bool res = false; @@ -71,12 +72,22 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, std::unique_ptr<IFileCache> cache = std::make_unique<LRUFileCache>(cache_base_path, file_cache_settings); - RETURN_IF_ERROR(cache->initialize()); - _path_to_cache[cache_base_path] = cache.get(); - _caches.push_back(std::move(cache)); + *status = cache->initialize(); + if (!status->ok()) { + return; + } + + { + // the create_file_cache() may be called concurrently, + // so need to protect it with lock + std::lock_guard<std::mutex> lock(_cache_mutex); + _path_to_cache[cache_base_path] = cache.get(); + _caches.push_back(std::move(cache)); + } LOG(INFO) << "[FileCache] path: " << cache_base_path << " total_size: " << file_cache_settings.total_size; - return Status::OK(); + *status = Status::OK(); + return; } CloudFileCachePtr FileCacheFactory::get_by_path(const IFileCache::Key& key) { diff --git a/be/src/io/cache/block/block_file_cache_factory.h b/be/src/io/cache/block/block_file_cache_factory.h index 97bffb7193..c8ed2893f1 100644 --- a/be/src/io/cache/block/block_file_cache_factory.h +++ b/be/src/io/cache/block/block_file_cache_factory.h @@ -39,8 +39,8 @@ class FileCacheFactory { public: static FileCacheFactory& instance(); - Status create_file_cache(const std::string& cache_base_path, - const FileCacheSettings& file_cache_settings); + void create_file_cache(const std::string& cache_base_path, + const FileCacheSettings& file_cache_settings, Status* status); size_t try_release(); @@ -55,6 +55,8 @@ public: FileCacheFactory(const FileCacheFactory&) = delete; private: + // to protect following containers + std::mutex _cache_mutex; std::vector<std::unique_ptr<IFileCache>> _caches; std::unordered_map<std::string, CloudFileCachePtr> _path_to_cache; }; diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index 62e9a3b31d..53518653ac 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -51,6 +51,7 @@ #include "io/fs/path.h" #include "util/doris_metrics.h" #include "util/slice.h" +#include "util/stopwatch.hpp" #include "vec/common/hex.h" namespace fs = std::filesystem; @@ -118,6 +119,8 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path, } Status LRUFileCache::initialize() { + MonotonicStopWatch watch; + watch.start(); std::lock_guard cache_lock(_mutex); if (!_is_initialized) { if (fs::exists(_cache_base_path)) { @@ -134,17 +137,18 @@ Status LRUFileCache::initialize() { } _is_initialized = true; _cache_background_thread = std::thread(&LRUFileCache::run_background_operation, this); + int64_t cost = watch.elapsed_time() / 1000 / 1000; LOG(INFO) << fmt::format( "After initialize file cache path={}, disposable queue size={} elements={}, index " "queue size={} " "elements={}, query queue " - "size={} elements={}", + "size={} elements={}, init cost(ms)={}", _cache_base_path, _disposable_queue.get_total_cache_size(cache_lock), _disposable_queue.get_elements_num(cache_lock), _index_queue.get_total_cache_size(cache_lock), _index_queue.get_elements_num(cache_lock), _normal_queue.get_total_cache_size(cache_lock), - _normal_queue.get_elements_num(cache_lock)); + _normal_queue.get_elements_num(cache_lock), cost); return Status::OK(); } diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 45dc46e32d..4829b45cc7 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -392,6 +392,7 @@ int main(int argc, char** argv) { } if (doris::config::enable_file_cache) { + doris::io::IFileCache::init(); std::unordered_set<std::string> cache_path_set; std::vector<doris::CachePath> cache_paths; olap_res = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths); @@ -400,16 +401,39 @@ int main(int argc, char** argv) { << doris::config::file_cache_path; exit(-1); } + + std::unique_ptr<doris::ThreadPool> file_cache_init_pool; + doris::ThreadPoolBuilder("FileCacheInitThreadPool") + .set_min_threads(cache_paths.size()) + .set_max_threads(cache_paths.size()) + .build(&file_cache_init_pool); + + std::vector<doris::Status> cache_status; for (auto& cache_path : cache_paths) { if (cache_path_set.find(cache_path.path) != cache_path_set.end()) { LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path); continue; } + + cache_status.push_back(Status::OK()); + RETURN_IF_ERROR(file_cache_init_pool->submit_func( + std::bind(&doris::io::FileCacheFactory::create_file_cache, + &(doris::io::FileCacheFactory::instance()), cache_path.path, + cache_path.init_settings(), &(cache_status.back())))); + cache_path_set.emplace(cache_path.path); - Status st = doris::io::FileCacheFactory::instance().create_file_cache( - cache_path.path, cache_path.init_settings()); - if (!st) { - LOG(FATAL) << st; + // Status st = doris::io::FileCacheFactory::instance().create_file_cache( + // cache_path.path, cache_path.init_settings()); + // if (!st) { + // LOG(FATAL) << st; + // exit(-1); + // } + } + + file_cache_init_pool->wait(); + for (int i = 0; i < cache_status.size(); ++i) { + if (!cache_status[i].ok()) { + LOG(FATAL) << "failed to init file cache: " << i << ", err: " << cache_status[i]; exit(-1); } } diff --git a/be/test/io/cache/file_block_cache_test.cpp b/be/test/io/cache/file_block_cache_test.cpp index 1c60447035..f8ce21dd27 100644 --- a/be/test/io/cache/file_block_cache_test.cpp +++ b/be/test/io/cache/file_block_cache_test.cpp @@ -916,6 +916,7 @@ TEST(LRUFileCache, fd_cache_evict) { context.cache_type = io::CacheType::NORMAL; auto key = io::LRUFileCache::hash("key1"); config::file_cache_max_file_reader_cache_size = 2; + IFileCache::init(); { auto holder = cache.get_or_set(key, 0, 9, context); /// Add range [0, 8] auto segments = fromHolder(holder); diff --git a/regression-test/suites/external_table_p2/hive/test_truncate_char_or_varchar_columns.groovy b/regression-test/suites/external_table_p2/hive/test_truncate_char_or_varchar_columns.groovy index 4ed6814e03..b597e3d457 100644 --- a/regression-test/suites/external_table_p2/hive/test_truncate_char_or_varchar_columns.groovy +++ b/regression-test/suites/external_table_p2/hive/test_truncate_char_or_varchar_columns.groovy @@ -35,17 +35,17 @@ suite("test_truncate_char_or_varchar_columns", "p2,external,hive,external_remote // test parquet format def q01_parquet = { qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """ - qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """ + qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """ } // test orc format def q01_orc = { qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """ - qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """ + qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """ } // test text format def q01_text = { qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_text order by id """ - qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_text order by id """ + qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_text order by id """ } sql """ use `multi_catalog`; """ q01_parquet() @@ -57,17 +57,17 @@ suite("test_truncate_char_or_varchar_columns", "p2,external,hive,external_remote // test parquet format def q02_parquet = { qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """ - qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """ + qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """ } // test orc format def q02_orc = { qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """ - qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """ + qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """ } // test text format def q02_text = { qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_text order by id """ - qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_text order by id """ + qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_text order by id """ } sql """ use `multi_catalog`; """ q02_parquet() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org