freemandealer commented on code in PR #47473: URL: https://github.com/apache/doris/pull/47473#discussion_r1940970243
########## be/test/io/cache/block_file_cache_test.cpp: ########## @@ -6742,4 +6746,124 @@ TEST_F(BlockFileCacheTest, evict_privilege_order_for_ttl) { } } +TEST_F(BlockFileCacheTest, evict_in_advance) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + auto sp = SyncPoint::get_instance(); + SyncPoint::CallbackGuard guard1; + sp->set_call_back( + "BlockFileCache::set_sleep_time", + [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; }, &guard1); + sp->enable_processing(); + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + size_t limit = 1000000; + size_t cache_max = 10000000; + io::CacheContext context; + ReadStatistics rstats; + context.stats = &rstats; + context.cache_type = io::FileCacheType::NORMAL; + context.query_id = query_id; + // int64_t cur_time = UnixSeconds(); + // context.expiration_time = cur_time + 120; + auto key1 = io::BlockFileCache::hash("key1"); + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + int64_t offset = 0; + // fill the cache to its limit + for (; offset < limit; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + // grab more exceed the limit to max cache capacity + for (; offset < cache_max; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(), 0); + + // grab more exceed the cache capacity + size_t exceed = 2000000; + for (; offset < (cache_max + exceed); offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max); + + config::file_cache_evict_in_advance_batch_bytes = 200000; // evict 2 100000 blocks + config::enable_evict_file_cache_in_advance = true; // enable evict in advance + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // wait for clear + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max - 200000); + + if (fs::exists(cache_base_path)) { Review Comment: sure, it will take some time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org