This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 330f369764 [enhancement](file-cache) limit the file cache handle num 
and init the file cache concurrently (#22919)
330f369764 is described below

commit 330f369764d3518b9da6bea8dad2a854c08b8002
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Thu Aug 17 16:52:08 2023 +0800

    [enhancement](file-cache) limit the file cache handle num and init the file 
cache concurrently (#22919)
    
    1. the real value of BE config `file_cache_max_file_reader_cache_size` will 
be the 1/3 of process's max open file number.
    2. use thread pool to create or init the file cache concurrently.
        To solve the issue that when there are lots of files in file cache dir, 
the starting time of BE will be very slow because
        it will traverse all file cache dirs sequentially.
---
 be/src/io/cache/block/block_file_cache.cpp         | 20 +++++++++++++-
 be/src/io/cache/block/block_file_cache.h           |  4 +++
 be/src/io/cache/block/block_file_cache_factory.cpp | 23 ++++++++++++----
 be/src/io/cache/block/block_file_cache_factory.h   |  6 ++--
 be/src/io/cache/block/block_lru_file_cache.cpp     |  8 ++++--
 be/src/service/doris_main.cpp                      | 32 +++++++++++++++++++---
 be/test/io/cache/file_block_cache_test.cpp         |  1 +
 .../test_truncate_char_or_varchar_columns.groovy   | 12 ++++----
 8 files changed, 85 insertions(+), 21 deletions(-)

diff --git a/be/src/io/cache/block/block_file_cache.cpp 
b/be/src/io/cache/block/block_file_cache.cpp
index 0cd546aeaa..2b7b8cb134 100644
--- a/be/src/io/cache/block/block_file_cache.cpp
+++ b/be/src/io/cache/block/block_file_cache.cpp
@@ -22,6 +22,8 @@
 
 #include <glog/logging.h>
 // IWYU pragma: no_include <bits/chrono.h>
+#include <sys/resource.h>
+
 #include <chrono> // IWYU pragma: keep
 #include <filesystem>
 #include <utility>
@@ -176,7 +178,7 @@ std::weak_ptr<FileReader> 
IFileCache::cache_file_reader(const AccessKeyAndOffset
     std::weak_ptr<FileReader> wp;
     if (!s_read_only) [[likely]] {
         std::lock_guard lock(s_file_reader_cache_mtx);
-        if (config::file_cache_max_file_reader_cache_size == 
s_file_reader_cache.size()) {
+        if (s_file_reader_cache.size() >= _max_file_reader_cache_size) {
             s_file_name_to_reader.erase(s_file_reader_cache.back().first);
             s_file_reader_cache.pop_back();
         }
@@ -205,5 +207,21 @@ size_t IFileCache::file_reader_cache_size() {
     return s_file_name_to_reader.size();
 }
 
+void IFileCache::init() {
+    struct rlimit limit;
+    if (getrlimit(RLIMIT_NOFILE, &limit) != 0) {
+        LOG(FATAL) << "getrlimit() failed with errno: " << errno;
+        return;
+    }
+
+    _max_file_reader_cache_size =
+            std::min((uint64_t)config::file_cache_max_file_reader_cache_size, 
limit.rlim_max / 3);
+    LOG(INFO) << "max file reader cache size is: " << 
_max_file_reader_cache_size
+              << ", resource hard limit is: " << limit.rlim_max
+              << ", config file_cache_max_file_reader_cache_size is: "
+              << config::file_cache_max_file_reader_cache_size;
+    return;
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/cache/block/block_file_cache.h 
b/be/src/io/cache/block/block_file_cache.h
index 5d0dd1f06f..9ac1a464e8 100644
--- a/be/src/io/cache/block/block_file_cache.h
+++ b/be/src/io/cache/block/block_file_cache.h
@@ -303,8 +303,12 @@ private:
             s_file_name_to_reader;
     static inline std::mutex s_file_reader_cache_mtx;
     static inline std::atomic_bool s_read_only {false};
+    static inline uint64_t _max_file_reader_cache_size = 65533;
 
 public:
+    // should be call when BE start
+    static void init();
+
     static void set_read_only(bool read_only);
 
     static bool read_only() { return s_read_only; }
diff --git a/be/src/io/cache/block/block_file_cache_factory.cpp 
b/be/src/io/cache/block/block_file_cache_factory.cpp
index fc60469ada..8741ced5c0 100644
--- a/be/src/io/cache/block/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block/block_file_cache_factory.cpp
@@ -58,8 +58,9 @@ size_t FileCacheFactory::try_release(const std::string& 
base_path) {
     return 0;
 }
 
-Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
-                                           const FileCacheSettings& 
file_cache_settings) {
+void FileCacheFactory::create_file_cache(const std::string& cache_base_path,
+                                         const FileCacheSettings& 
file_cache_settings,
+                                         Status* status) {
     if (config::clear_file_cache) {
         auto fs = global_local_filesystem();
         bool res = false;
@@ -71,12 +72,22 @@ Status FileCacheFactory::create_file_cache(const 
std::string& cache_base_path,
 
     std::unique_ptr<IFileCache> cache =
             std::make_unique<LRUFileCache>(cache_base_path, 
file_cache_settings);
-    RETURN_IF_ERROR(cache->initialize());
-    _path_to_cache[cache_base_path] = cache.get();
-    _caches.push_back(std::move(cache));
+    *status = cache->initialize();
+    if (!status->ok()) {
+        return;
+    }
+
+    {
+        // the create_file_cache() may be called concurrently,
+        // so need to protect it with lock
+        std::lock_guard<std::mutex> lock(_cache_mutex);
+        _path_to_cache[cache_base_path] = cache.get();
+        _caches.push_back(std::move(cache));
+    }
     LOG(INFO) << "[FileCache] path: " << cache_base_path
               << " total_size: " << file_cache_settings.total_size;
-    return Status::OK();
+    *status = Status::OK();
+    return;
 }
 
 CloudFileCachePtr FileCacheFactory::get_by_path(const IFileCache::Key& key) {
diff --git a/be/src/io/cache/block/block_file_cache_factory.h 
b/be/src/io/cache/block/block_file_cache_factory.h
index 97bffb7193..c8ed2893f1 100644
--- a/be/src/io/cache/block/block_file_cache_factory.h
+++ b/be/src/io/cache/block/block_file_cache_factory.h
@@ -39,8 +39,8 @@ class FileCacheFactory {
 public:
     static FileCacheFactory& instance();
 
-    Status create_file_cache(const std::string& cache_base_path,
-                             const FileCacheSettings& file_cache_settings);
+    void create_file_cache(const std::string& cache_base_path,
+                           const FileCacheSettings& file_cache_settings, 
Status* status);
 
     size_t try_release();
 
@@ -55,6 +55,8 @@ public:
     FileCacheFactory(const FileCacheFactory&) = delete;
 
 private:
+    // to protect following containers
+    std::mutex _cache_mutex;
     std::vector<std::unique_ptr<IFileCache>> _caches;
     std::unordered_map<std::string, CloudFileCachePtr> _path_to_cache;
 };
diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp 
b/be/src/io/cache/block/block_lru_file_cache.cpp
index 62e9a3b31d..53518653ac 100644
--- a/be/src/io/cache/block/block_lru_file_cache.cpp
+++ b/be/src/io/cache/block/block_lru_file_cache.cpp
@@ -51,6 +51,7 @@
 #include "io/fs/path.h"
 #include "util/doris_metrics.h"
 #include "util/slice.h"
+#include "util/stopwatch.hpp"
 #include "vec/common/hex.h"
 
 namespace fs = std::filesystem;
@@ -118,6 +119,8 @@ LRUFileCache::LRUFileCache(const std::string& 
cache_base_path,
 }
 
 Status LRUFileCache::initialize() {
+    MonotonicStopWatch watch;
+    watch.start();
     std::lock_guard cache_lock(_mutex);
     if (!_is_initialized) {
         if (fs::exists(_cache_base_path)) {
@@ -134,17 +137,18 @@ Status LRUFileCache::initialize() {
     }
     _is_initialized = true;
     _cache_background_thread = 
std::thread(&LRUFileCache::run_background_operation, this);
+    int64_t cost = watch.elapsed_time() / 1000 / 1000;
     LOG(INFO) << fmt::format(
             "After initialize file cache path={}, disposable queue size={} 
elements={}, index "
             "queue size={} "
             "elements={}, query queue "
-            "size={} elements={}",
+            "size={} elements={}, init cost(ms)={}",
             _cache_base_path, 
_disposable_queue.get_total_cache_size(cache_lock),
             _disposable_queue.get_elements_num(cache_lock),
             _index_queue.get_total_cache_size(cache_lock),
             _index_queue.get_elements_num(cache_lock),
             _normal_queue.get_total_cache_size(cache_lock),
-            _normal_queue.get_elements_num(cache_lock));
+            _normal_queue.get_elements_num(cache_lock), cost);
     return Status::OK();
 }
 
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 45dc46e32d..4829b45cc7 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -392,6 +392,7 @@ int main(int argc, char** argv) {
     }
 
     if (doris::config::enable_file_cache) {
+        doris::io::IFileCache::init();
         std::unordered_set<std::string> cache_path_set;
         std::vector<doris::CachePath> cache_paths;
         olap_res = 
doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
@@ -400,16 +401,39 @@ int main(int argc, char** argv) {
                        << doris::config::file_cache_path;
             exit(-1);
         }
+
+        std::unique_ptr<doris::ThreadPool> file_cache_init_pool;
+        doris::ThreadPoolBuilder("FileCacheInitThreadPool")
+                .set_min_threads(cache_paths.size())
+                .set_max_threads(cache_paths.size())
+                .build(&file_cache_init_pool);
+
+        std::vector<doris::Status> cache_status;
         for (auto& cache_path : cache_paths) {
             if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
                 LOG(WARNING) << fmt::format("cache path {} is duplicate", 
cache_path.path);
                 continue;
             }
+
+            cache_status.push_back(Status::OK());
+            RETURN_IF_ERROR(file_cache_init_pool->submit_func(
+                    std::bind(&doris::io::FileCacheFactory::create_file_cache,
+                              &(doris::io::FileCacheFactory::instance()), 
cache_path.path,
+                              cache_path.init_settings(), 
&(cache_status.back()))));
+
             cache_path_set.emplace(cache_path.path);
-            Status st = 
doris::io::FileCacheFactory::instance().create_file_cache(
-                    cache_path.path, cache_path.init_settings());
-            if (!st) {
-                LOG(FATAL) << st;
+            // Status st = 
doris::io::FileCacheFactory::instance().create_file_cache(
+            //         cache_path.path, cache_path.init_settings());
+            // if (!st) {
+            //     LOG(FATAL) << st;
+            //     exit(-1);
+            // }
+        }
+
+        file_cache_init_pool->wait();
+        for (int i = 0; i < cache_status.size(); ++i) {
+            if (!cache_status[i].ok()) {
+                LOG(FATAL) << "failed to init file cache: " << i << ", err: " 
<< cache_status[i];
                 exit(-1);
             }
         }
diff --git a/be/test/io/cache/file_block_cache_test.cpp 
b/be/test/io/cache/file_block_cache_test.cpp
index 1c60447035..f8ce21dd27 100644
--- a/be/test/io/cache/file_block_cache_test.cpp
+++ b/be/test/io/cache/file_block_cache_test.cpp
@@ -916,6 +916,7 @@ TEST(LRUFileCache, fd_cache_evict) {
     context.cache_type = io::CacheType::NORMAL;
     auto key = io::LRUFileCache::hash("key1");
     config::file_cache_max_file_reader_cache_size = 2;
+    IFileCache::init();
     {
         auto holder = cache.get_or_set(key, 0, 9, context); /// Add range [0, 
8]
         auto segments = fromHolder(holder);
diff --git 
a/regression-test/suites/external_table_p2/hive/test_truncate_char_or_varchar_columns.groovy
 
b/regression-test/suites/external_table_p2/hive/test_truncate_char_or_varchar_columns.groovy
index 4ed6814e03..b597e3d457 100644
--- 
a/regression-test/suites/external_table_p2/hive/test_truncate_char_or_varchar_columns.groovy
+++ 
b/regression-test/suites/external_table_p2/hive/test_truncate_char_or_varchar_columns.groovy
@@ -35,17 +35,17 @@ suite("test_truncate_char_or_varchar_columns", 
"p2,external,hive,external_remote
         // test parquet format
         def q01_parquet = {
             qt_q01 """ select * from 
multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """
-            qt_q02 """ select city, concat("at ", city, " in ", country) from 
regression.multi_catalog.test_truncate_char_or_varchar_columns_parquet order by 
id """
+            qt_q02 """ select city, concat("at ", city, " in ", country) from 
${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_parquet 
order by id """
         }
         // test orc format
         def q01_orc = {
             qt_q01 """ select * from 
multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """
-            qt_q02 """ select city, concat("at ", city, " in ", country) from 
regression.multi_catalog.test_truncate_char_or_varchar_columns_orc order by id 
"""
+            qt_q02 """ select city, concat("at ", city, " in ", country) from 
${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_orc order 
by id """
         }
         // test text format
         def q01_text = {
             qt_q01 """ select * from 
multi_catalog.test_truncate_char_or_varchar_columns_text order by id """
-            qt_q02 """ select city, concat("at ", city, " in ", country) from 
regression.multi_catalog.test_truncate_char_or_varchar_columns_text order by id 
"""
+            qt_q02 """ select city, concat("at ", city, " in ", country) from 
${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_text order 
by id """
         }
         sql """ use `multi_catalog`; """
         q01_parquet()
@@ -57,17 +57,17 @@ suite("test_truncate_char_or_varchar_columns", 
"p2,external,hive,external_remote
         // test parquet format
         def q02_parquet = {
             qt_q01 """ select * from 
multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """
-            qt_q02 """ select city, concat("at ", city, " in ", country) from 
regression.multi_catalog.test_truncate_char_or_varchar_columns_parquet order by 
id """
+            qt_q02 """ select city, concat("at ", city, " in ", country) from 
${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_parquet 
order by id """
         }
         // test orc format
         def q02_orc = {
             qt_q01 """ select * from 
multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """
-            qt_q02 """ select city, concat("at ", city, " in ", country) from 
regression.multi_catalog.test_truncate_char_or_varchar_columns_orc order by id 
"""
+            qt_q02 """ select city, concat("at ", city, " in ", country) from 
${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_orc order 
by id """
         }
         // test text format
         def q02_text = {
             qt_q01 """ select * from 
multi_catalog.test_truncate_char_or_varchar_columns_text order by id """
-            qt_q02 """ select city, concat("at ", city, " in ", country) from 
regression.multi_catalog.test_truncate_char_or_varchar_columns_text order by id 
"""
+            qt_q02 """ select city, concat("at ", city, " in ", country) from 
${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_text order 
by id """
         }
         sql """ use `multi_catalog`; """
         q02_parquet()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to