This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c7c9de3e953 branch-3.0: [fix](cloud) fix get_or_set emptry <offset, 
cell> map #49793 (#49873)
c7c9de3e953 is described below

commit c7c9de3e9536cedf6608ce9ec877fac01aebef1d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 8 17:46:06 2025 +0800

    branch-3.0: [fix](cloud) fix get_or_set emptry <offset, cell> map #49793 
(#49873)
    
    Cherry-picked from #49793
    
    Co-authored-by: zhengyu <zhangzhen...@selectdb.com>
---
 be/src/io/cache/block_file_cache.cpp       |  96 +++++++++++++++----------
 be/test/io/cache/block_file_cache_test.cpp | 112 ++++++++++++++++++++++++++++-
 2 files changed, 168 insertions(+), 40 deletions(-)

diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index ad3c6e99638..52e36e42f9f 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -412,13 +412,13 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& 
hash, const CacheConte
     }
 
     auto& file_blocks = it->second;
-    DCHECK(!file_blocks.empty());
     if (file_blocks.empty()) {
         LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
                      << " cache type=" << context.cache_type
                      << " cache expiration time=" << context.expiration_time
                      << " cache range=" << range.left << " " << range.right
                      << " query id=" << context.query_id;
+        DCHECK(false);
         _files.erase(hash);
         return {};
     }
@@ -759,13 +759,12 @@ BlockFileCache::FileBlockCell* 
BlockFileCache::add_cell(const UInt128Wrapper& ha
         return nullptr; /// Empty files are not cached.
     }
 
-    DCHECK_EQ(_files[hash].count(offset), 0)
+    auto& offsets = _files[hash];
+    DCHECK_EQ(offsets.count(offset), 0)
             << "Cache already exists for hash: " << hash.to_string() << ", 
offset: " << offset
             << ", size: " << size
             << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, 
cache_lock);
 
-    auto& offsets = _files[hash];
-
     FileCacheKey key;
     key.hash = hash;
     key.offset = offset;
@@ -1052,37 +1051,43 @@ bool BlockFileCache::remove_if_ttl_file_blocks(const 
UInt128Wrapper& file_key, b
     if (auto iter = _key_to_time.find(file_key);
         _key_to_time.find(file_key) != _key_to_time.end()) {
         if (!remove_directly) {
-            for (auto& [_, cell] : _files[file_key]) {
-                if (cell.file_block->cache_type() != FileCacheType::TTL) {
-                    continue;
-                }
-                Status st = cell.file_block->update_expiration_time(0);
-                if (!st.ok()) {
-                    LOG_WARNING("Failed to update expiration time to 
0").error(st);
-                }
+            auto it = _files.find(file_key);
+            if (it != _files.end()) {
+                for (auto& [_, cell] : it->second) {
+                    if (cell.file_block->cache_type() != FileCacheType::TTL) {
+                        continue;
+                    }
+                    Status st = cell.file_block->update_expiration_time(0);
+                    if (!st.ok()) {
+                        LOG_WARNING("Failed to update expiration time to 
0").error(st);
+                    }
 
-                if (cell.file_block->cache_type() == FileCacheType::NORMAL) 
continue;
-                st = cell.file_block->change_cache_type_between_ttl_and_others(
-                        FileCacheType::NORMAL);
-                if (st.ok()) {
-                    if (cell.queue_iterator) {
-                        ttl_queue.remove(cell.queue_iterator.value(), 
cache_lock);
+                    if (cell.file_block->cache_type() == 
FileCacheType::NORMAL) continue;
+                    st = 
cell.file_block->change_cache_type_between_ttl_and_others(
+                            FileCacheType::NORMAL);
+                    if (st.ok()) {
+                        if (cell.queue_iterator) {
+                            ttl_queue.remove(cell.queue_iterator.value(), 
cache_lock);
+                        }
+                        auto& queue = get_queue(FileCacheType::NORMAL);
+                        cell.queue_iterator = queue.add(
+                                cell.file_block->get_hash_value(), 
cell.file_block->offset(),
+                                cell.file_block->range().size(), cache_lock);
+                    } else {
+                        LOG_WARNING("Failed to change cache type to 
normal").error(st);
                     }
-                    auto& queue = get_queue(FileCacheType::NORMAL);
-                    cell.queue_iterator =
-                            queue.add(cell.file_block->get_hash_value(), 
cell.file_block->offset(),
-                                      cell.file_block->range().size(), 
cache_lock);
-                } else {
-                    LOG_WARNING("Failed to change cache type to 
normal").error(st);
                 }
             }
         } else {
             std::vector<FileBlockCell*> to_remove;
-            for (auto& [_, cell] : _files[file_key]) {
-                if (cell.releasable()) {
-                    to_remove.push_back(&cell);
-                } else {
-                    cell.file_block->set_deleting();
+            auto it = _files.find(file_key);
+            if (it != _files.end()) {
+                for (auto& [_, cell] : it->second) {
+                    if (cell.releasable()) {
+                        to_remove.push_back(&cell);
+                    } else {
+                        cell.file_block->set_deleting();
+                    }
                 }
             }
             std::for_each(to_remove.begin(), to_remove.end(), 
[&](FileBlockCell* cell) {
@@ -1404,10 +1409,12 @@ void BlockFileCache::remove(FileBlockSPtr file_block, 
T& cache_lock, U& block_lo
     if (FileCacheType::TTL == type) {
         _cur_ttl_size -= file_block->range().size();
     }
-    auto& offsets = _files[hash];
-    offsets.erase(file_block->offset());
-    if (offsets.empty()) {
-        _files.erase(hash);
+    auto it = _files.find(hash);
+    if (it != _files.end()) {
+        it->second.erase(file_block->offset());
+        if (it->second.empty()) {
+            _files.erase(hash);
+        }
     }
     *_num_removed_blocks << 1;
 }
@@ -1525,7 +1532,11 @@ std::string BlockFileCache::dump_structure(const 
UInt128Wrapper& hash) {
 std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
                                                     
std::lock_guard<std::mutex>&) {
     std::stringstream result;
-    const auto& cells_by_offset = _files[hash];
+    auto it = _files.find(hash);
+    if (it == _files.end()) {
+        return std::string("");
+    }
+    const auto& cells_by_offset = it->second;
 
     for (const auto& [_, cell] : cells_by_offset) {
         result << cell.file_block->get_info_for_log() << " "
@@ -1544,7 +1555,11 @@ std::string 
BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper
                                                             size_t offset,
                                                             
std::lock_guard<std::mutex>&) {
     std::stringstream result;
-    const auto& cells_by_offset = _files[hash];
+    auto it = _files.find(hash);
+    if (it == _files.end()) {
+        return std::string("");
+    }
+    const auto& cells_by_offset = it->second;
     const auto& cell = cells_by_offset.find(offset);
 
     return cache_type_to_string(cell->second.file_block->cache_type());
@@ -1926,10 +1941,13 @@ void BlockFileCache::modify_expiration_time(const 
UInt128Wrapper& hash,
         }
         _time_to_key.insert(std::make_pair(new_expiration_time, hash));
         iter->second = new_expiration_time;
-        for (auto& [_, cell] : _files[hash]) {
-            Status st = 
cell.file_block->update_expiration_time(new_expiration_time);
-            if (!st.ok()) {
-                LOG_WARNING("Failed to modify expiration time").error(st);
+        auto it = _files.find(hash);
+        if (it != _files.end()) {
+            for (auto& [_, cell] : it->second) {
+                Status st = 
cell.file_block->update_expiration_time(new_expiration_time);
+                if (!st.ok()) {
+                    LOG_WARNING("Failed to modify expiration time").error(st);
+                }
             }
         }
 
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index d531f8cc896..6aad66de5cc 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -7077,10 +7077,120 @@ TEST_F(BlockFileCacheTest, 
test_evict_cache_in_advance_skip) {
 
     SyncPoint::get_instance()->disable_processing();
     SyncPoint::get_instance()->clear_all_call_backs();
-    fs::remove_all(cache_base_path);
+    // fs::remove_all(cache_base_path);
     config::file_cache_enter_need_evict_cache_in_advance_percent = 
origin_enter;
     config::file_cache_exit_need_evict_cache_in_advance_percent = origin_exit;
     config::file_cache_evict_in_advance_recycle_keys_num_threshold = 
origin_threshold;
 }
 
+TEST_F(BlockFileCacheTest, validate_get_or_set_crash) {
+    {
+        if (fs::exists(cache_base_path)) {
+            fs::remove_all(cache_base_path);
+        }
+        fs::create_directories(cache_base_path);
+
+        auto sp = SyncPoint::get_instance();
+        sp->enable_processing();
+
+        TUniqueId query_id;
+        query_id.hi = 1;
+        query_id.lo = 1;
+        io::FileCacheSettings settings;
+
+        settings.ttl_queue_size = 5000000;
+        settings.ttl_queue_elements = 50000;
+        settings.query_queue_size = 3000000;
+        settings.query_queue_elements = 30000;
+        settings.index_queue_size = 1000000;
+        settings.index_queue_elements = 10000;
+        settings.disposable_queue_size = 1000000;
+        settings.disposable_queue_elements = 10000;
+        settings.capacity = 10000000;
+        settings.max_file_block_size = 100000;
+        settings.max_query_cache_size = 30;
+
+        // block the async load process
+        std::atomic_bool flag1 {false};
+        SyncPoint::CallbackGuard guard1;
+        sp->set_call_back(
+                "BlockFileCache::BeforeScan",
+                [&](auto&&) {
+                    // create a tmp file in hash "key1"   
lru_cache_test/cache1/f36/f36131fb4ba563c17e727cd0cdd63689_0/0_tmp
+                    ASSERT_TRUE(global_local_filesystem()->create_directory(
+                            fs::current_path() / "lru_cache_test" / "cache1" / 
"f36" /
+                            "f36131fb4ba563c17e727cd0cdd63689_0"));
+                    FileWriterPtr writer;
+                    ASSERT_TRUE(global_local_filesystem()
+                                        
->create_file("lru_cache_test/cache1/f36/"
+                                                      
"f36131fb4ba563c17e727cd0cdd63689_0/0_tmp",
+                                                      &writer)
+                                        .ok());
+                    ASSERT_TRUE(writer->append(Slice("333", 3)).ok());
+                    ASSERT_TRUE(writer->close().ok());
+                    while (!flag1) {
+                    }
+                },
+                &guard1);
+
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_TRUE(cache.initialize());
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+        // make a get request in async open phase
+        {
+            io::CacheContext context1;
+            ReadStatistics rstats;
+            context1.stats = &rstats;
+            context1.cache_type = io::FileCacheType::DISPOSABLE;
+            context1.query_id = query_id;
+            auto key1 = io::BlockFileCache::hash("key1");
+            LOG(INFO) << key1.to_string();
+            auto holder = cache.get_or_set(key1, 0, 100000, context1);
+        }
+
+        // continue async load
+        flag1 = true;
+        int i = 0;
+        for (; i < 100; i++) {
+            if (cache.get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+        ASSERT_TRUE(cache.get_async_open_success());
+
+        io::CacheContext context1;
+        ReadStatistics rstats;
+        context1.stats = &rstats;
+        context1.cache_type = io::FileCacheType::DISPOSABLE;
+        context1.query_id = query_id;
+        auto key1 = io::BlockFileCache::hash("key1");
+
+        // get key1 againqq
+        int64_t offset = 0;
+        {
+            auto holder = cache.get_or_set(key1, offset, 100000, context1);
+            auto blocks = fromHolder(holder);
+            ASSERT_EQ(blocks.size(), 1);
+
+            assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::EMPTY);
+            ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+            download(blocks[0]);
+            assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                         io::FileBlock::State::DOWNLOADED);
+
+            blocks.clear();
+        }
+    }
+
+    SyncPoint::get_instance()->disable_processing();
+    SyncPoint::get_instance()->clear_all_call_backs();
+
+    //if (fs::exists(cache_base_path)) {
+    //    fs::remove_all(cache_base_path);
+    //}
+}
+
 } // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to