This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 07ad6482dd4 [fix](file-cache) Make sync clear use safe removal (#64578)
07ad6482dd4 is described below
commit 07ad6482dd4eb73a7e75a7af6c562d34a12ca52c
Author: Gavin Chou <[email protected]>
AuthorDate: Mon Jun 22 17:16:54 2026 +0800
[fix](file-cache) Make sync clear use safe removal (#64578)
### What
- Replace sync clear direct storage wipe with the safe block scan/remove
path.
- Keep busy blocks on holder lifecycle and synchronously remove
releasable downloaded blocks.
- Reject concurrent sync clear requests with HTTP 400.
- Add large sync-clear and concurrent-clear BE tests with SyncPoint
mocked storage removal.
### Tests
- `sh run-be-ut.sh --run --filter
"BlockFileCacheTest.clear_file_cache_sync*:BlockFileCacheTest.clear_retains_meta_directory_and_clears_meta_entries"`
Co-authored-by: gavinchou <[email protected]>
---
be/src/io/cache/block_file_cache.cpp | 111 ++-----------
be/src/io/cache/block_file_cache.h | 10 +-
be/src/io/cache/block_file_cache_factory.cpp | 45 ++++-
be/src/io/cache/block_file_cache_factory.h | 2 +
be/src/io/cache/fs_file_cache_storage.cpp | 4 +
be/src/service/http/action/file_cache_action.cpp | 24 +--
be/test/io/cache/block_file_cache_test.cpp | 185 +++++++++++++++++++++
.../io/cache/block_file_cache_test_meta_store.cpp | 26 +--
be/test/service/http/file_cache_action_test.cpp | 10 +-
9 files changed, 285 insertions(+), 132 deletions(-)
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index 5c4a275ac3d..4684961f884 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -715,7 +715,16 @@ void
BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
}
std::string BlockFileCache::clear_file_cache_async() {
- LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
+ return clear_file_cache_impl(false);
+}
+
+std::string BlockFileCache::clear_file_cache_sync() {
+ return clear_file_cache_impl(true);
+}
+
+std::string BlockFileCache::clear_file_cache_impl(bool sync_remove) {
+ const char* action = sync_remove ? "clear_file_cache_sync" :
"clear_file_cache_async";
+ LOG(INFO) << "start " << action << ", path=" << _cache_base_path;
_lru_dumper->remove_lru_dump_files();
int64_t num_cells_all = 0;
int64_t num_cells_to_delete = 0;
@@ -734,7 +743,11 @@ std::string BlockFileCache::clear_file_cache_async() {
}
}
- // we cannot delete the element in the loop above, because it will
break the iterator
+ // Do not erase while walking _files above: remove() may erase the
current map element.
+ //
+ // sync_remove only changes how already releasable DOWNLOADED blocks
are deleted from
+ // storage. Busy blocks keep the existing holder lifecycle: mark them
deleting and leave
+ // them in _files until the last holder releases them.
for (auto& cell : deleting_cells) {
if (!cell->releasable()) {
LOG(INFO) << "cell is not releasable, hash="
@@ -746,7 +759,7 @@ std::string BlockFileCache::clear_file_cache_async() {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
- remove(file_block, cache_lock, block_lock, false);
+ remove(file_block, cache_lock, block_lock, sync_remove);
++num_cells_to_delete;
}
}
@@ -754,7 +767,7 @@ std::string BlockFileCache::clear_file_cache_async() {
}
std::stringstream ss;
- ss << "finish clear_file_cache_async, path=" << _cache_base_path
+ ss << "finish " << action << ", path=" << _cache_base_path << "
sync_remove=" << sync_remove
<< " num_files_all=" << num_files_all << " num_cells_all=" <<
num_cells_all
<< " num_cells_to_delete=" << num_cells_to_delete
<< " num_cells_wait_recycle=" << num_cells_wait_recycle;
@@ -2313,96 +2326,6 @@ void BlockFileCache::clear_need_update_lru_blocks() {
*_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
}
-void BlockFileCache::pause_ttl_manager() {
- if (_ttl_mgr) {
- _ttl_mgr->stop();
- }
-}
-
-void BlockFileCache::resume_ttl_manager() {
- if (_ttl_mgr) {
- _ttl_mgr->resume();
- }
-}
-
-std::string BlockFileCache::clear_file_cache_directly() {
- pause_ttl_manager();
- _lru_dumper->remove_lru_dump_files();
- using namespace std::chrono;
- std::stringstream ss;
- auto start = steady_clock::now();
- std::string result;
- {
- SCOPED_CACHE_LOCK(_mutex, this);
- LOG_INFO("start clear_file_cache_directly").tag("path",
_cache_base_path);
-
- std::string clear_msg;
- auto s = _storage->clear(clear_msg);
- if (!s.ok()) {
- result = clear_msg;
- } else {
- int64_t num_files = _files.size();
- int64_t cache_size = _cur_cache_size;
- int64_t index_queue_size =
_index_queue.get_elements_num(cache_lock);
- int64_t normal_queue_size =
_normal_queue.get_elements_num(cache_lock);
- int64_t disposible_queue_size =
_disposable_queue.get_elements_num(cache_lock);
- int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
-
- int64_t clear_fd_duration = 0;
- {
- // clear FDCache to release fd
- SCOPED_RAW_TIMER(&clear_fd_duration);
- for (const auto& [file_key, file_blocks] : _files) {
- for (const auto& [offset, file_block_cell] : file_blocks) {
- AccessKeyAndOffset access_key_and_offset(file_key,
offset);
-
FDCache::instance()->remove_file_reader(access_key_and_offset);
- }
- }
- }
-
- _files.clear();
- _cur_cache_size = 0;
- _cur_ttl_size = 0;
- _time_to_key.clear();
- _key_to_time.clear();
- _index_queue.clear(cache_lock);
- _normal_queue.clear(cache_lock);
- _disposable_queue.clear(cache_lock);
- _ttl_queue.clear(cache_lock);
-
- // Update cache metrics immediately so consumers observe the
cleared state
- // without waiting for the next background monitor round.
- _cur_cache_size_metrics->set_value(0);
- _cur_ttl_cache_size_metrics->set_value(0);
- _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
- _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
- _cur_normal_queue_cache_size_metrics->set_value(0);
- _cur_normal_queue_element_count_metrics->set_value(0);
- _cur_index_queue_cache_size_metrics->set_value(0);
- _cur_index_queue_element_count_metrics->set_value(0);
- _cur_disposable_queue_cache_size_metrics->set_value(0);
- _cur_disposable_queue_element_count_metrics->set_value(0);
-
- clear_need_update_lru_blocks();
-
- ss << "finish clear_file_cache_directly"
- << " path=" << _cache_base_path << " time_elapsed_ms="
- << duration_cast<milliseconds>(steady_clock::now() -
start).count()
- << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
- << " num_files=" << num_files << " cache_size=" << cache_size
- << " index_queue_size=" << index_queue_size
- << " normal_queue_size=" << normal_queue_size
- << " disposible_queue_size=" << disposible_queue_size
- << "ttl_queue_size=" << ttl_queue_size;
- result = ss.str();
- LOG(INFO) << result;
- }
- }
- _lru_dumper->remove_lru_dump_files();
- resume_ttl_manager();
- return result;
-}
-
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const
UInt128Wrapper& hash) {
std::map<size_t, FileBlockSPtr> offset_to_block;
SCOPED_CACHE_LOCK(_mutex, this);
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index 5c7faea9ac4..a1a7ced18e3 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -248,7 +248,7 @@ public:
* @returns summary message
*/
std::string clear_file_cache_async();
- std::string clear_file_cache_directly();
+ std::string clear_file_cache_sync();
/**
* Reset the cache capacity. If the new_capacity is smaller than
_capacity, the redundant data will be remove async.
@@ -313,9 +313,6 @@ public:
void update_ttl_atime(const UInt128Wrapper& hash);
- void pause_ttl_manager();
- void resume_ttl_manager();
-
std::map<std::string, double> get_stats();
// for be UTs
@@ -398,6 +395,11 @@ public:
Status check_file_cache_consistency(InconsistencyContext&
inconsistency_context);
private:
+ // Shared scan used by both clear modes. It keeps the FileBlock holder
lifecycle intact:
+ // releasable blocks are removed immediately, while blocks held by readers
are only marked
+ // deleting and are later removed by FileBlocksHolder destruction.
+ std::string clear_file_cache_impl(bool sync_remove);
+
LRUQueue& get_queue(FileCacheType type);
const LRUQueue& get_queue(FileCacheType type) const;
diff --git a/be/src/io/cache/block_file_cache_factory.cpp
b/be/src/io/cache/block_file_cache_factory.cpp
index 03f26a0276f..0ffc9cea365 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -31,6 +31,7 @@
#endif
#include <algorithm>
+#include <atomic>
#include <execution>
#include <ostream>
#include <utility>
@@ -246,27 +247,55 @@ FileCacheFactory::get_query_context_holders(const
TUniqueId& query_id,
return holders;
}
-std::string FileCacheFactory::clear_file_caches(bool sync) {
+Status FileCacheFactory::clear_file_caches(bool sync, std::string* ret) {
+ DCHECK(ret != nullptr);
+
+ // Sync clear is an operational action and can synchronously remove many
files. Keep a single
+ // process-wide sync clear in flight, so a second HTTP request fails fast
instead of piling onto
+ // the same cache instances. Async clear keeps the previous behavior and
is not gated here.
+ static std::atomic_bool sync_clear_running {false};
+ struct SyncClearRunningGuard {
+ std::atomic_bool* running = nullptr;
+ ~SyncClearRunningGuard() {
+ if (running != nullptr) {
+ running->store(false, std::memory_order_release);
+ }
+ }
+ } sync_clear_guard;
+ if (sync) {
+ bool expected = false;
+ if (!sync_clear_running.compare_exchange_strong(expected, true,
std::memory_order_acq_rel,
+
std::memory_order_acquire)) {
+ return Status::InvalidArgument("sync clear_file_caches is already
running");
+ }
+ sync_clear_guard.running = &sync_clear_running;
+ }
+
std::vector<std::string> results(_caches.size());
#ifndef USE_LIBCPP
std::for_each(std::execution::par, _caches.begin(), _caches.end(),
[&](const auto& cache) {
size_t index = &cache - &_caches[0];
- results[index] =
- sync ? cache->clear_file_cache_directly() :
cache->clear_file_cache_async();
+ results[index] = sync ? cache->clear_file_cache_sync() :
cache->clear_file_cache_async();
});
#else
// libcpp do not support std::execution::par
std::for_each(_caches.begin(), _caches.end(), [&](const auto& cache) {
size_t index = &cache - &_caches[0];
- results[index] =
- sync ? cache->clear_file_cache_directly() :
cache->clear_file_cache_async();
+ results[index] = sync ? cache->clear_file_cache_sync() :
cache->clear_file_cache_async();
});
#endif
std::stringstream ss;
- for (const auto& result : results) {
- ss << result << "\n";
+ for (const auto& cache_result : results) {
+ ss << cache_result << "\n";
}
- return ss.str();
+ *ret = ss.str();
+ return Status::OK();
+}
+
+std::string FileCacheFactory::clear_file_caches(bool sync) {
+ std::string result;
+ auto st = clear_file_caches(sync, &result);
+ return st.ok() ? result : st.to_string();
}
void FileCacheFactory::dump_all_caches() {
diff --git a/be/src/io/cache/block_file_cache_factory.h
b/be/src/io/cache/block_file_cache_factory.h
index 2a2b84010b6..f5be334de8b 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -23,6 +23,7 @@
#include <gen_cpp/internal_service.pb.h>
#include <memory>
+#include <mutex>
#include <optional>
#include <string>
#include <string_view>
@@ -85,6 +86,7 @@ public:
* @return summary message
*/
std::string clear_file_caches(bool sync);
+ Status clear_file_caches(bool sync, std::string* result);
/**
* dump lru queue info for all file cache instances
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp
b/be/src/io/cache/fs_file_cache_storage.cpp
index 19cecbeab1e..371edcc0b35 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -289,6 +289,10 @@ Status FSFileCacheStorage::read(const FileCacheKey& key,
size_t value_offset, Sl
}
Status FSFileCacheStorage::remove(const FileCacheKey& key) {
+ // Large clear-cache tests only need to verify the synchronous remove
handoff and in-memory
+ // index cleanup. They can return early here to avoid test-only disk churn.
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("FSFileCacheStorage::remove",
Status::OK(), &key);
+
const std::string v3_dir = get_path_in_local_cache_v3(key.hash);
const std::string v3_file = get_path_in_local_cache_v3(v3_dir, key.offset);
FDCache::instance()->remove_file_reader(std::make_pair(key.hash,
key.offset));
diff --git a/be/src/service/http/action/file_cache_action.cpp
b/be/src/service/http/action/file_cache_action.cpp
index 2402f1ad572..d291bc8df1d 100644
--- a/be/src/service/http/action/file_cache_action.cpp
+++ b/be/src/service/http/action/file_cache_action.cpp
@@ -59,8 +59,6 @@ constexpr static std::string_view RELEASED_ELEMENTS =
"released_elements";
constexpr static std::string_view DUMP = "dump";
constexpr static std::string_view VALUE = "value";
constexpr static std::string_view RELOAD = "reload";
-constexpr static std::string_view SYNC_CLEAR_UNSUPPORTED_MSG =
- "sync clear_file_cache is no longer supported in http api, running
async clear instead";
Status FileCacheAction::_handle_header(HttpRequest* req, std::string*
json_metrics) {
const std::string header_json(HEADER_JSON);
@@ -87,18 +85,21 @@ Status FileCacheAction::_handle_header(HttpRequest* req,
std::string* json_metri
const std::string& sync = req->param(std::string(SYNC));
const std::string& segment_path = req->param(std::string(VALUE));
if (segment_path.empty()) {
- io::FileCacheFactory::instance()->clear_file_caches(false);
+ const bool sync_clear = to_lower(sync) == "true";
+ std::string clear_msg;
+ RETURN_IF_ERROR(
+
io::FileCacheFactory::instance()->clear_file_caches(sync_clear, &clear_msg));
+ if (sync_clear) {
+ EasyJson json;
+ json["status"] = "OK";
+ json["msg"] = clear_msg;
+ *json_metrics = json.ToString();
+ }
} else {
io::UInt128Wrapper hash = io::BlockFileCache::hash(segment_path);
io::BlockFileCache* cache =
io::FileCacheFactory::instance()->get_by_path(hash);
cache->remove_if_cached_async(hash);
}
- if (to_lower(sync) == "true") {
- EasyJson json;
- json["status"] = "OK";
- json["msg"] = std::string(SYNC_CLEAR_UNSUPPORTED_MSG);
- *json_metrics = json.ToString();
- }
} else if (operation == RESET) {
std::string capacity = req->param(std::string(CAPACITY));
int64_t new_capacity = 0;
@@ -220,7 +221,10 @@ void FileCacheAction::handle(HttpRequest* req) {
HttpChannel::send_reply(req, HttpStatus::OK,
json_metrics.empty() ? status.to_json() :
json_metrics);
} else {
- HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
status_result);
+ const auto http_status = status.is<ErrorCode::INVALID_ARGUMENT>()
+ ? HttpStatus::BAD_REQUEST
+ : HttpStatus::INTERNAL_SERVER_ERROR;
+ HttpChannel::send_reply(req, http_status, status_result);
}
}
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index b57e1285f87..dcf210e3cb1 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -18,6 +18,9 @@
//
https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/tests/gtest_lru_file_cache.cpp
// and modified by Doris
+#include <atomic>
+#include <future>
+
#include "io/cache/block_file_cache_test_common.h"
#include "io/fs/buffered_reader.h"
#include "storage/olap_define.h"
@@ -2768,6 +2771,188 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) {
delete holder;
}
+TEST_F(BlockFileCacheTest,
clear_file_cache_sync_removes_releasable_blocks_synchronously) {
+ std::string my_cache_path = caches_dir /
"clear_file_cache_sync_mixed_states" / "";
+ if (fs::exists(my_cache_path)) {
+ fs::remove_all(my_cache_path);
+ }
+ fs::create_directories(my_cache_path);
+
+ constexpr int num_blocks = 100000;
+ constexpr size_t block_size = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = num_blocks * block_size * 2;
+ settings.query_queue_elements = num_blocks + 1024;
+ settings.index_queue_size = num_blocks * block_size * 2;
+ settings.index_queue_elements = num_blocks + 1024;
+ settings.disposable_queue_size = num_blocks * block_size * 2;
+ settings.disposable_queue_elements = num_blocks + 1024;
+ settings.capacity = num_blocks * block_size * 2;
+ settings.max_file_block_size = 64;
+ settings.max_query_cache_size = num_blocks * block_size * 2;
+
+ io::BlockFileCache cache(my_cache_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ wait_until_cache_ready(cache);
+
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::NORMAL;
+ auto key = io::BlockFileCache::hash("clear_file_cache_sync_mixed_states");
+ int expected_sync_removes = 0;
+ int expected_remaining = 0;
+ std::vector<size_t> expected_remaining_offsets;
+ std::vector<io::FileBlockSPtr> busy_refs;
+ {
+ std::lock_guard<std::mutex> cache_lock(cache._mutex);
+ for (int i = 0; i < num_blocks; ++i) {
+ // Keep the scan large, but make non-releasable states sparse so
the test does not
+ // spend most of its time writing one log line per busy block.
+ const bool busy = i % 997 == 0;
+ const bool downloading = i % 991 == 0;
+ const bool empty = !downloading && i % 983 == 0;
+ const auto state =
+ empty ? io::FileBlock::State::EMPTY :
io::FileBlock::State::DOWNLOADED;
+ auto* cell =
+ cache.add_cell(key, context, i * block_size, block_size,
state, cache_lock);
+ ASSERT_NE(cell, nullptr);
+ if (downloading) {
+ FileBlockTestAccessor::set_state(*cell->file_block,
+
io::FileBlock::State::DOWNLOADING);
+ FileBlockTestAccessor::set_downloader_id(*cell->file_block,
1000 + i);
+ }
+ if (busy) {
+ busy_refs.push_back(cell->file_block);
+ }
+
+ if (busy || downloading) {
+ ++expected_remaining;
+ expected_remaining_offsets.push_back(i * block_size);
+ } else if (!empty) {
+ ++expected_sync_removes;
+ }
+ }
+ }
+
+ std::atomic<int> storage_remove_calls {0};
+ auto sp = SyncPoint::get_instance();
+ SyncPoint::CallbackGuard guard;
+ sp->set_call_back(
+ "FSFileCacheStorage::remove",
+ [&storage_remove_calls](auto&& args) {
+ storage_remove_calls.fetch_add(1);
+ try_any_cast_ret<Status>(args)->second = true;
+ },
+ &guard);
+ sp->enable_processing();
+ Defer defer {[sp] {
+ sp->disable_processing();
+ sp->clear_all_call_backs();
+ }};
+
+ auto msg = cache.clear_file_cache_sync();
+ EXPECT_NE(msg.find("clear_file_cache_sync"), std::string::npos);
+ EXPECT_NE(msg.find("sync_remove=1"), std::string::npos);
+ EXPECT_EQ(storage_remove_calls.load(), expected_sync_removes);
+ EXPECT_EQ(cache._cur_cache_size, expected_remaining * block_size);
+
+ {
+ std::lock_guard<std::mutex> cache_lock(cache._mutex);
+ for (auto offset : expected_remaining_offsets) {
+ auto* cell = cache.get_cell(key, offset, cache_lock);
+ ASSERT_NE(cell, nullptr);
+ EXPECT_TRUE(cell->file_block->is_deleting());
+ }
+ }
+
+ if (fs::exists(my_cache_path)) {
+ fs::remove_all(my_cache_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest,
clear_file_cache_sync_factory_rejects_concurrent_sync_clear) {
+ std::string my_cache_path = caches_dir /
"clear_file_cache_sync_factory_busy" / "";
+ if (fs::exists(my_cache_path)) {
+ fs::remove_all(my_cache_path);
+ }
+ fs::create_directories(my_cache_path);
+
+ auto* factory = FileCacheFactory::instance();
+ factory->_caches.clear();
+ factory->_path_to_cache.clear();
+ factory->_capacity = 0;
+
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 1024;
+ settings.query_queue_elements = 128;
+ settings.index_queue_size = 1024;
+ settings.index_queue_elements = 128;
+ settings.disposable_queue_size = 1024;
+ settings.disposable_queue_elements = 128;
+ settings.capacity = 2048;
+ settings.max_file_block_size = 64;
+ settings.max_query_cache_size = 1024;
+ ASSERT_TRUE(factory->create_file_cache(my_cache_path, settings).ok());
+ auto* cache = factory->get_by_path(my_cache_path);
+ ASSERT_NE(cache, nullptr);
+ wait_until_cache_ready(*cache);
+
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::NORMAL;
+ auto key = io::BlockFileCache::hash("clear_file_cache_sync_factory_busy");
+ {
+ std::lock_guard<std::mutex> cache_lock(cache->_mutex);
+ ASSERT_NE(cache->add_cell(key, context, 0, 4,
io::FileBlock::State::DOWNLOADED, cache_lock),
+ nullptr);
+ }
+
+ std::promise<void> remove_entered;
+ std::promise<void> release_remove;
+ auto release_remove_future = release_remove.get_future().share();
+ std::atomic_bool remove_released {false};
+ auto release_remove_once = [&]() {
+ if (!remove_released.exchange(true)) {
+ release_remove.set_value();
+ }
+ };
+ std::atomic_bool block_first_remove {true};
+ auto sp = SyncPoint::get_instance();
+ SyncPoint::CallbackGuard guard;
+ sp->set_call_back(
+ "FSFileCacheStorage::remove",
+ [&](auto&& args) {
+ try_any_cast_ret<Status>(args)->second = true;
+ if (block_first_remove.exchange(false)) {
+ remove_entered.set_value();
+ release_remove_future.wait();
+ }
+ },
+ &guard);
+ sp->enable_processing();
+ Defer defer {[&] {
+ release_remove_once();
+ sp->disable_processing();
+ sp->clear_all_call_backs();
+ factory->_caches.clear();
+ factory->_path_to_cache.clear();
+ factory->_capacity = 0;
+ if (fs::exists(my_cache_path)) {
+ fs::remove_all(my_cache_path);
+ }
+ }};
+
+ auto first_clear =
+ std::async(std::launch::async, [factory] { return
factory->clear_file_caches(true); });
+ ASSERT_EQ(remove_entered.get_future().wait_for(std::chrono::seconds(5)),
+ std::future_status::ready);
+
+ auto second_result = factory->clear_file_caches(true);
+ EXPECT_NE(second_result.find("already running"), std::string::npos);
+
+ release_remove_once();
+ auto first_result = first_clear.get();
+ EXPECT_NE(first_result.find("clear_file_cache_sync"), std::string::npos);
+}
+
TEST_F(BlockFileCacheTest, remove_directly) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
diff --git a/be/test/io/cache/block_file_cache_test_meta_store.cpp
b/be/test/io/cache/block_file_cache_test_meta_store.cpp
index 511320f1ede..b92bda1b0ae 100644
--- a/be/test/io/cache/block_file_cache_test_meta_store.cpp
+++ b/be/test/io/cache/block_file_cache_test_meta_store.cpp
@@ -628,14 +628,16 @@ TEST_F(BlockFileCacheTest,
clear_retains_meta_directory_and_clears_meta_entries)
context.tablet_id = 314;
auto key = io::BlockFileCache::hash("meta_clear_key");
- auto holder = cache.get_or_set(key, 0, 100000, context);
- auto blocks = fromHolder(holder);
- ASSERT_EQ(blocks.size(), 1);
- assert_range(1, blocks[0], io::FileBlock::Range(0, 99999),
io::FileBlock::State::EMPTY);
- ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
- download(blocks[0]);
- assert_range(2, blocks[0], io::FileBlock::Range(0, 99999),
io::FileBlock::State::DOWNLOADED);
- blocks.clear();
+ {
+ auto holder = cache.get_or_set(key, 0, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(0, 99999),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(0, 99999),
+ io::FileBlock::State::DOWNLOADED);
+ }
auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(cache._storage.get());
ASSERT_NE(fs_storage, nullptr) << "Expected FSFileCacheStorage but got
different storage type";
@@ -645,15 +647,17 @@ TEST_F(BlockFileCacheTest,
clear_retains_meta_directory_and_clears_meta_entries)
verify_meta_key(*meta_store, context.tablet_id, "meta_clear_key", 0,
FileCacheType::NORMAL, 0,
100000);
- cache.clear_file_cache_directly();
+ cache.clear_file_cache_sync();
std::string meta_dir = cache.get_base_path() + "/meta";
ASSERT_TRUE(fs::exists(meta_dir));
ASSERT_TRUE(fs::is_directory(meta_dir));
BlockMetaKey mkey(context.tablet_id, key, 0);
- auto meta = meta_store->get(mkey);
- ASSERT_FALSE(meta.has_value());
+ for (int i = 0; i < 100 && meta_store->get(mkey).has_value(); ++i) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ ASSERT_FALSE(meta_store->get(mkey).has_value());
auto iterator = meta_store->get_all();
if (iterator != nullptr) {
diff --git a/be/test/service/http/file_cache_action_test.cpp
b/be/test/service/http/file_cache_action_test.cpp
index 066d07b25e3..ce4f8b02372 100644
--- a/be/test/service/http/file_cache_action_test.cpp
+++ b/be/test/service/http/file_cache_action_test.cpp
@@ -207,8 +207,8 @@ TEST_F(FileCacheActionTest, clear_defaults_to_async) {
EXPECT_EQ(_cache->_cur_cache_size, 0);
}
-TEST_F(FileCacheActionTest, clear_sync_true_runs_async_and_warns) {
- cache_file_block("clear_sync_true_runs_async_and_warns.dat");
+TEST_F(FileCacheActionTest, clear_sync_true_returns_sync_summary_json) {
+ cache_file_block("clear_sync_true_returns_sync_summary_json.dat");
HttpRequest req(_evhttp_req);
std::string json_metrics;
@@ -218,9 +218,9 @@ TEST_F(FileCacheActionTest,
clear_sync_true_runs_async_and_warns) {
Status status = _action->_handle_header(&req, &json_metrics);
EXPECT_TRUE(status.ok());
- EXPECT_EQ(json_metrics,
- "{\"status\":\"OK\",\"msg\":\"sync clear_file_cache is no longer
supported in "
- "http api, running async clear instead\"}");
+ EXPECT_NE(json_metrics.find("\"status\":\"OK\""), std::string::npos);
+ EXPECT_NE(json_metrics.find("finish clear_file_cache_sync"),
std::string::npos);
+ EXPECT_NE(json_metrics.find("sync_remove=1"), std::string::npos);
EXPECT_EQ(_cache->_cur_cache_size, 0);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]