This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new c7c9de3e953 branch-3.0: [fix](cloud) fix get_or_set emptry <offset, cell> map #49793 (#49873) c7c9de3e953 is described below commit c7c9de3e9536cedf6608ce9ec877fac01aebef1d Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Tue Apr 8 17:46:06 2025 +0800 branch-3.0: [fix](cloud) fix get_or_set emptry <offset, cell> map #49793 (#49873) Cherry-picked from #49793 Co-authored-by: zhengyu <zhangzhen...@selectdb.com> --- be/src/io/cache/block_file_cache.cpp | 96 +++++++++++++++---------- be/test/io/cache/block_file_cache_test.cpp | 112 ++++++++++++++++++++++++++++- 2 files changed, 168 insertions(+), 40 deletions(-) diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index ad3c6e99638..52e36e42f9f 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -412,13 +412,13 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte } auto& file_blocks = it->second; - DCHECK(!file_blocks.empty()); if (file_blocks.empty()) { LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string() << " cache type=" << context.cache_type << " cache expiration time=" << context.expiration_time << " cache range=" << range.left << " " << range.right << " query id=" << context.query_id; + DCHECK(false); _files.erase(hash); return {}; } @@ -759,13 +759,12 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha return nullptr; /// Empty files are not cached. } - DCHECK_EQ(_files[hash].count(offset), 0) + auto& offsets = _files[hash]; + DCHECK_EQ(offsets.count(offset), 0) << "Cache already exists for hash: " << hash.to_string() << ", offset: " << offset << ", size: " << size << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock); - auto& offsets = _files[hash]; - FileCacheKey key; key.hash = hash; key.offset = offset; @@ -1052,37 +1051,43 @@ bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, b if (auto iter = _key_to_time.find(file_key); _key_to_time.find(file_key) != _key_to_time.end()) { if (!remove_directly) { - for (auto& [_, cell] : _files[file_key]) { - if (cell.file_block->cache_type() != FileCacheType::TTL) { - continue; - } - Status st = cell.file_block->update_expiration_time(0); - if (!st.ok()) { - LOG_WARNING("Failed to update expiration time to 0").error(st); - } + auto it = _files.find(file_key); + if (it != _files.end()) { + for (auto& [_, cell] : it->second) { + if (cell.file_block->cache_type() != FileCacheType::TTL) { + continue; + } + Status st = cell.file_block->update_expiration_time(0); + if (!st.ok()) { + LOG_WARNING("Failed to update expiration time to 0").error(st); + } - if (cell.file_block->cache_type() == FileCacheType::NORMAL) continue; - st = cell.file_block->change_cache_type_between_ttl_and_others( - FileCacheType::NORMAL); - if (st.ok()) { - if (cell.queue_iterator) { - ttl_queue.remove(cell.queue_iterator.value(), cache_lock); + if (cell.file_block->cache_type() == FileCacheType::NORMAL) continue; + st = cell.file_block->change_cache_type_between_ttl_and_others( + FileCacheType::NORMAL); + if (st.ok()) { + if (cell.queue_iterator) { + ttl_queue.remove(cell.queue_iterator.value(), cache_lock); + } + auto& queue = get_queue(FileCacheType::NORMAL); + cell.queue_iterator = queue.add( + cell.file_block->get_hash_value(), cell.file_block->offset(), + cell.file_block->range().size(), cache_lock); + } else { + LOG_WARNING("Failed to change cache type to normal").error(st); } - auto& queue = get_queue(FileCacheType::NORMAL); - cell.queue_iterator = - queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(), - cell.file_block->range().size(), cache_lock); - } else { - LOG_WARNING("Failed to change cache type to normal").error(st); } } } else { std::vector<FileBlockCell*> to_remove; - for (auto& [_, cell] : _files[file_key]) { - if (cell.releasable()) { - to_remove.push_back(&cell); - } else { - cell.file_block->set_deleting(); + auto it = _files.find(file_key); + if (it != _files.end()) { + for (auto& [_, cell] : it->second) { + if (cell.releasable()) { + to_remove.push_back(&cell); + } else { + cell.file_block->set_deleting(); + } } } std::for_each(to_remove.begin(), to_remove.end(), [&](FileBlockCell* cell) { @@ -1404,10 +1409,12 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo if (FileCacheType::TTL == type) { _cur_ttl_size -= file_block->range().size(); } - auto& offsets = _files[hash]; - offsets.erase(file_block->offset()); - if (offsets.empty()) { - _files.erase(hash); + auto it = _files.find(hash); + if (it != _files.end()) { + it->second.erase(file_block->offset()); + if (it->second.empty()) { + _files.erase(hash); + } } *_num_removed_blocks << 1; } @@ -1525,7 +1532,11 @@ std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) { std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash, std::lock_guard<std::mutex>&) { std::stringstream result; - const auto& cells_by_offset = _files[hash]; + auto it = _files.find(hash); + if (it == _files.end()) { + return std::string(""); + } + const auto& cells_by_offset = it->second; for (const auto& [_, cell] : cells_by_offset) { result << cell.file_block->get_info_for_log() << " " @@ -1544,7 +1555,11 @@ std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper size_t offset, std::lock_guard<std::mutex>&) { std::stringstream result; - const auto& cells_by_offset = _files[hash]; + auto it = _files.find(hash); + if (it == _files.end()) { + return std::string(""); + } + const auto& cells_by_offset = it->second; const auto& cell = cells_by_offset.find(offset); return cache_type_to_string(cell->second.file_block->cache_type()); @@ -1926,10 +1941,13 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, } _time_to_key.insert(std::make_pair(new_expiration_time, hash)); iter->second = new_expiration_time; - for (auto& [_, cell] : _files[hash]) { - Status st = cell.file_block->update_expiration_time(new_expiration_time); - if (!st.ok()) { - LOG_WARNING("Failed to modify expiration time").error(st); + auto it = _files.find(hash); + if (it != _files.end()) { + for (auto& [_, cell] : it->second) { + Status st = cell.file_block->update_expiration_time(new_expiration_time); + if (!st.ok()) { + LOG_WARNING("Failed to modify expiration time").error(st); + } } } diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index d531f8cc896..6aad66de5cc 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -7077,10 +7077,120 @@ TEST_F(BlockFileCacheTest, test_evict_cache_in_advance_skip) { SyncPoint::get_instance()->disable_processing(); SyncPoint::get_instance()->clear_all_call_backs(); - fs::remove_all(cache_base_path); + // fs::remove_all(cache_base_path); config::file_cache_enter_need_evict_cache_in_advance_percent = origin_enter; config::file_cache_exit_need_evict_cache_in_advance_percent = origin_exit; config::file_cache_evict_in_advance_recycle_keys_num_threshold = origin_threshold; } +TEST_F(BlockFileCacheTest, validate_get_or_set_crash) { + { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + + auto sp = SyncPoint::get_instance(); + sp->enable_processing(); + + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + // block the async load process + std::atomic_bool flag1 {false}; + SyncPoint::CallbackGuard guard1; + sp->set_call_back( + "BlockFileCache::BeforeScan", + [&](auto&&) { + // create a tmp file in hash "key1" lru_cache_test/cache1/f36/f36131fb4ba563c17e727cd0cdd63689_0/0_tmp + ASSERT_TRUE(global_local_filesystem()->create_directory( + fs::current_path() / "lru_cache_test" / "cache1" / "f36" / + "f36131fb4ba563c17e727cd0cdd63689_0")); + FileWriterPtr writer; + ASSERT_TRUE(global_local_filesystem() + ->create_file("lru_cache_test/cache1/f36/" + "f36131fb4ba563c17e727cd0cdd63689_0/0_tmp", + &writer) + .ok()); + ASSERT_TRUE(writer->append(Slice("333", 3)).ok()); + ASSERT_TRUE(writer->close().ok()); + while (!flag1) { + } + }, + &guard1); + + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + // make a get request in async open phase + { + io::CacheContext context1; + ReadStatistics rstats; + context1.stats = &rstats; + context1.cache_type = io::FileCacheType::DISPOSABLE; + context1.query_id = query_id; + auto key1 = io::BlockFileCache::hash("key1"); + LOG(INFO) << key1.to_string(); + auto holder = cache.get_or_set(key1, 0, 100000, context1); + } + + // continue async load + flag1 = true; + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + io::CacheContext context1; + ReadStatistics rstats; + context1.stats = &rstats; + context1.cache_type = io::FileCacheType::DISPOSABLE; + context1.query_id = query_id; + auto key1 = io::BlockFileCache::hash("key1"); + + // get key1 againqq + int64_t offset = 0; + { + auto holder = cache.get_or_set(key1, offset, 100000, context1); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + } + + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); + + //if (fs::exists(cache_base_path)) { + // fs::remove_all(cache_base_path); + //} +} + } // namespace doris::io --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org