This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 07ad6482dd4 [fix](file-cache) Make sync clear use safe removal (#64578)
07ad6482dd4 is described below

commit 07ad6482dd4eb73a7e75a7af6c562d34a12ca52c
Author: Gavin Chou <[email protected]>
AuthorDate: Mon Jun 22 17:16:54 2026 +0800

    [fix](file-cache) Make sync clear use safe removal (#64578)
    
    ### What
    - Replace sync clear direct storage wipe with the safe block scan/remove
    path.
    - Keep busy blocks on holder lifecycle and synchronously remove
    releasable downloaded blocks.
    - Reject concurrent sync clear requests with HTTP 400.
    - Add large sync-clear and concurrent-clear BE tests with SyncPoint
    mocked storage removal.
    
    ### Tests
    - `sh run-be-ut.sh --run --filter
    
"BlockFileCacheTest.clear_file_cache_sync*:BlockFileCacheTest.clear_retains_meta_directory_and_clears_meta_entries"`
    
    Co-authored-by: gavinchou <[email protected]>
---
 be/src/io/cache/block_file_cache.cpp               | 111 ++-----------
 be/src/io/cache/block_file_cache.h                 |  10 +-
 be/src/io/cache/block_file_cache_factory.cpp       |  45 ++++-
 be/src/io/cache/block_file_cache_factory.h         |   2 +
 be/src/io/cache/fs_file_cache_storage.cpp          |   4 +
 be/src/service/http/action/file_cache_action.cpp   |  24 +--
 be/test/io/cache/block_file_cache_test.cpp         | 185 +++++++++++++++++++++
 .../io/cache/block_file_cache_test_meta_store.cpp  |  26 +--
 be/test/service/http/file_cache_action_test.cpp    |  10 +-
 9 files changed, 285 insertions(+), 132 deletions(-)

diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index 5c4a275ac3d..4684961f884 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -715,7 +715,16 @@ void 
BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
 }
 
 std::string BlockFileCache::clear_file_cache_async() {
-    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
+    return clear_file_cache_impl(false);
+}
+
+std::string BlockFileCache::clear_file_cache_sync() {
+    return clear_file_cache_impl(true);
+}
+
+std::string BlockFileCache::clear_file_cache_impl(bool sync_remove) {
+    const char* action = sync_remove ? "clear_file_cache_sync" : 
"clear_file_cache_async";
+    LOG(INFO) << "start " << action << ", path=" << _cache_base_path;
     _lru_dumper->remove_lru_dump_files();
     int64_t num_cells_all = 0;
     int64_t num_cells_to_delete = 0;
@@ -734,7 +743,11 @@ std::string BlockFileCache::clear_file_cache_async() {
             }
         }
 
-        // we cannot delete the element in the loop above, because it will 
break the iterator
+        // Do not erase while walking _files above: remove() may erase the 
current map element.
+        //
+        // sync_remove only changes how already releasable DOWNLOADED blocks 
are deleted from
+        // storage. Busy blocks keep the existing holder lifecycle: mark them 
deleting and leave
+        // them in _files until the last holder releases them.
         for (auto& cell : deleting_cells) {
             if (!cell->releasable()) {
                 LOG(INFO) << "cell is not releasable, hash="
@@ -746,7 +759,7 @@ std::string BlockFileCache::clear_file_cache_async() {
             FileBlockSPtr file_block = cell->file_block;
             if (file_block) {
                 std::lock_guard block_lock(file_block->_mutex);
-                remove(file_block, cache_lock, block_lock, false);
+                remove(file_block, cache_lock, block_lock, sync_remove);
                 ++num_cells_to_delete;
             }
         }
@@ -754,7 +767,7 @@ std::string BlockFileCache::clear_file_cache_async() {
     }
 
     std::stringstream ss;
-    ss << "finish clear_file_cache_async, path=" << _cache_base_path
+    ss << "finish " << action << ", path=" << _cache_base_path << " 
sync_remove=" << sync_remove
        << " num_files_all=" << num_files_all << " num_cells_all=" << 
num_cells_all
        << " num_cells_to_delete=" << num_cells_to_delete
        << " num_cells_wait_recycle=" << num_cells_wait_recycle;
@@ -2313,96 +2326,6 @@ void BlockFileCache::clear_need_update_lru_blocks() {
     *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
 }
 
-void BlockFileCache::pause_ttl_manager() {
-    if (_ttl_mgr) {
-        _ttl_mgr->stop();
-    }
-}
-
-void BlockFileCache::resume_ttl_manager() {
-    if (_ttl_mgr) {
-        _ttl_mgr->resume();
-    }
-}
-
-std::string BlockFileCache::clear_file_cache_directly() {
-    pause_ttl_manager();
-    _lru_dumper->remove_lru_dump_files();
-    using namespace std::chrono;
-    std::stringstream ss;
-    auto start = steady_clock::now();
-    std::string result;
-    {
-        SCOPED_CACHE_LOCK(_mutex, this);
-        LOG_INFO("start clear_file_cache_directly").tag("path", 
_cache_base_path);
-
-        std::string clear_msg;
-        auto s = _storage->clear(clear_msg);
-        if (!s.ok()) {
-            result = clear_msg;
-        } else {
-            int64_t num_files = _files.size();
-            int64_t cache_size = _cur_cache_size;
-            int64_t index_queue_size = 
_index_queue.get_elements_num(cache_lock);
-            int64_t normal_queue_size = 
_normal_queue.get_elements_num(cache_lock);
-            int64_t disposible_queue_size = 
_disposable_queue.get_elements_num(cache_lock);
-            int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
-
-            int64_t clear_fd_duration = 0;
-            {
-                // clear FDCache to release fd
-                SCOPED_RAW_TIMER(&clear_fd_duration);
-                for (const auto& [file_key, file_blocks] : _files) {
-                    for (const auto& [offset, file_block_cell] : file_blocks) {
-                        AccessKeyAndOffset access_key_and_offset(file_key, 
offset);
-                        
FDCache::instance()->remove_file_reader(access_key_and_offset);
-                    }
-                }
-            }
-
-            _files.clear();
-            _cur_cache_size = 0;
-            _cur_ttl_size = 0;
-            _time_to_key.clear();
-            _key_to_time.clear();
-            _index_queue.clear(cache_lock);
-            _normal_queue.clear(cache_lock);
-            _disposable_queue.clear(cache_lock);
-            _ttl_queue.clear(cache_lock);
-
-            // Update cache metrics immediately so consumers observe the 
cleared state
-            // without waiting for the next background monitor round.
-            _cur_cache_size_metrics->set_value(0);
-            _cur_ttl_cache_size_metrics->set_value(0);
-            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
-            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
-            _cur_normal_queue_cache_size_metrics->set_value(0);
-            _cur_normal_queue_element_count_metrics->set_value(0);
-            _cur_index_queue_cache_size_metrics->set_value(0);
-            _cur_index_queue_element_count_metrics->set_value(0);
-            _cur_disposable_queue_cache_size_metrics->set_value(0);
-            _cur_disposable_queue_element_count_metrics->set_value(0);
-
-            clear_need_update_lru_blocks();
-
-            ss << "finish clear_file_cache_directly"
-               << " path=" << _cache_base_path << " time_elapsed_ms="
-               << duration_cast<milliseconds>(steady_clock::now() - 
start).count()
-               << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
-               << " num_files=" << num_files << " cache_size=" << cache_size
-               << " index_queue_size=" << index_queue_size
-               << " normal_queue_size=" << normal_queue_size
-               << " disposible_queue_size=" << disposible_queue_size
-               << "ttl_queue_size=" << ttl_queue_size;
-            result = ss.str();
-            LOG(INFO) << result;
-        }
-    }
-    _lru_dumper->remove_lru_dump_files();
-    resume_ttl_manager();
-    return result;
-}
-
 std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const 
UInt128Wrapper& hash) {
     std::map<size_t, FileBlockSPtr> offset_to_block;
     SCOPED_CACHE_LOCK(_mutex, this);
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index 5c7faea9ac4..a1a7ced18e3 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -248,7 +248,7 @@ public:
      * @returns summary message
      */
     std::string clear_file_cache_async();
-    std::string clear_file_cache_directly();
+    std::string clear_file_cache_sync();
 
     /**
      * Reset the cache capacity. If the new_capacity is smaller than 
_capacity, the redundant data will be remove async.
@@ -313,9 +313,6 @@ public:
 
     void update_ttl_atime(const UInt128Wrapper& hash);
 
-    void pause_ttl_manager();
-    void resume_ttl_manager();
-
     std::map<std::string, double> get_stats();
 
     // for be UTs
@@ -398,6 +395,11 @@ public:
     Status check_file_cache_consistency(InconsistencyContext& 
inconsistency_context);
 
 private:
+    // Shared scan used by both clear modes. It keeps the FileBlock holder 
lifecycle intact:
+    // releasable blocks are removed immediately, while blocks held by readers 
are only marked
+    // deleting and are later removed by FileBlocksHolder destruction.
+    std::string clear_file_cache_impl(bool sync_remove);
+
     LRUQueue& get_queue(FileCacheType type);
     const LRUQueue& get_queue(FileCacheType type) const;
 
diff --git a/be/src/io/cache/block_file_cache_factory.cpp 
b/be/src/io/cache/block_file_cache_factory.cpp
index 03f26a0276f..0ffc9cea365 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -31,6 +31,7 @@
 #endif
 
 #include <algorithm>
+#include <atomic>
 #include <execution>
 #include <ostream>
 #include <utility>
@@ -246,27 +247,55 @@ FileCacheFactory::get_query_context_holders(const 
TUniqueId& query_id,
     return holders;
 }
 
-std::string FileCacheFactory::clear_file_caches(bool sync) {
+Status FileCacheFactory::clear_file_caches(bool sync, std::string* ret) {
+    DCHECK(ret != nullptr);
+
+    // Sync clear is an operational action and can synchronously remove many 
files. Keep a single
+    // process-wide sync clear in flight, so a second HTTP request fails fast 
instead of piling onto
+    // the same cache instances. Async clear keeps the previous behavior and 
is not gated here.
+    static std::atomic_bool sync_clear_running {false};
+    struct SyncClearRunningGuard {
+        std::atomic_bool* running = nullptr;
+        ~SyncClearRunningGuard() {
+            if (running != nullptr) {
+                running->store(false, std::memory_order_release);
+            }
+        }
+    } sync_clear_guard;
+    if (sync) {
+        bool expected = false;
+        if (!sync_clear_running.compare_exchange_strong(expected, true, 
std::memory_order_acq_rel,
+                                                        
std::memory_order_acquire)) {
+            return Status::InvalidArgument("sync clear_file_caches is already 
running");
+        }
+        sync_clear_guard.running = &sync_clear_running;
+    }
+
     std::vector<std::string> results(_caches.size());
 #ifndef USE_LIBCPP
     std::for_each(std::execution::par, _caches.begin(), _caches.end(), 
[&](const auto& cache) {
         size_t index = &cache - &_caches[0];
-        results[index] =
-                sync ? cache->clear_file_cache_directly() : 
cache->clear_file_cache_async();
+        results[index] = sync ? cache->clear_file_cache_sync() : 
cache->clear_file_cache_async();
     });
 #else
     // libcpp do not support std::execution::par
     std::for_each(_caches.begin(), _caches.end(), [&](const auto& cache) {
         size_t index = &cache - &_caches[0];
-        results[index] =
-                sync ? cache->clear_file_cache_directly() : 
cache->clear_file_cache_async();
+        results[index] = sync ? cache->clear_file_cache_sync() : 
cache->clear_file_cache_async();
     });
 #endif
     std::stringstream ss;
-    for (const auto& result : results) {
-        ss << result << "\n";
+    for (const auto& cache_result : results) {
+        ss << cache_result << "\n";
     }
-    return ss.str();
+    *ret = ss.str();
+    return Status::OK();
+}
+
+std::string FileCacheFactory::clear_file_caches(bool sync) {
+    std::string result;
+    auto st = clear_file_caches(sync, &result);
+    return st.ok() ? result : st.to_string();
 }
 
 void FileCacheFactory::dump_all_caches() {
diff --git a/be/src/io/cache/block_file_cache_factory.h 
b/be/src/io/cache/block_file_cache_factory.h
index 2a2b84010b6..f5be334de8b 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -23,6 +23,7 @@
 #include <gen_cpp/internal_service.pb.h>
 
 #include <memory>
+#include <mutex>
 #include <optional>
 #include <string>
 #include <string_view>
@@ -85,6 +86,7 @@ public:
      * @return summary message
      */
     std::string clear_file_caches(bool sync);
+    Status clear_file_caches(bool sync, std::string* result);
 
     /**
      * dump lru queue info for all file cache instances
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp 
b/be/src/io/cache/fs_file_cache_storage.cpp
index 19cecbeab1e..371edcc0b35 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -289,6 +289,10 @@ Status FSFileCacheStorage::read(const FileCacheKey& key, 
size_t value_offset, Sl
 }
 
 Status FSFileCacheStorage::remove(const FileCacheKey& key) {
+    // Large clear-cache tests only need to verify the synchronous remove 
handoff and in-memory
+    // index cleanup. They can return early here to avoid test-only disk churn.
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("FSFileCacheStorage::remove", 
Status::OK(), &key);
+
     const std::string v3_dir = get_path_in_local_cache_v3(key.hash);
     const std::string v3_file = get_path_in_local_cache_v3(v3_dir, key.offset);
     FDCache::instance()->remove_file_reader(std::make_pair(key.hash, 
key.offset));
diff --git a/be/src/service/http/action/file_cache_action.cpp 
b/be/src/service/http/action/file_cache_action.cpp
index 2402f1ad572..d291bc8df1d 100644
--- a/be/src/service/http/action/file_cache_action.cpp
+++ b/be/src/service/http/action/file_cache_action.cpp
@@ -59,8 +59,6 @@ constexpr static std::string_view RELEASED_ELEMENTS = 
"released_elements";
 constexpr static std::string_view DUMP = "dump";
 constexpr static std::string_view VALUE = "value";
 constexpr static std::string_view RELOAD = "reload";
-constexpr static std::string_view SYNC_CLEAR_UNSUPPORTED_MSG =
-        "sync clear_file_cache is no longer supported in http api, running 
async clear instead";
 
 Status FileCacheAction::_handle_header(HttpRequest* req, std::string* 
json_metrics) {
     const std::string header_json(HEADER_JSON);
@@ -87,18 +85,21 @@ Status FileCacheAction::_handle_header(HttpRequest* req, 
std::string* json_metri
         const std::string& sync = req->param(std::string(SYNC));
         const std::string& segment_path = req->param(std::string(VALUE));
         if (segment_path.empty()) {
-            io::FileCacheFactory::instance()->clear_file_caches(false);
+            const bool sync_clear = to_lower(sync) == "true";
+            std::string clear_msg;
+            RETURN_IF_ERROR(
+                    
io::FileCacheFactory::instance()->clear_file_caches(sync_clear, &clear_msg));
+            if (sync_clear) {
+                EasyJson json;
+                json["status"] = "OK";
+                json["msg"] = clear_msg;
+                *json_metrics = json.ToString();
+            }
         } else {
             io::UInt128Wrapper hash = io::BlockFileCache::hash(segment_path);
             io::BlockFileCache* cache = 
io::FileCacheFactory::instance()->get_by_path(hash);
             cache->remove_if_cached_async(hash);
         }
-        if (to_lower(sync) == "true") {
-            EasyJson json;
-            json["status"] = "OK";
-            json["msg"] = std::string(SYNC_CLEAR_UNSUPPORTED_MSG);
-            *json_metrics = json.ToString();
-        }
     } else if (operation == RESET) {
         std::string capacity = req->param(std::string(CAPACITY));
         int64_t new_capacity = 0;
@@ -220,7 +221,10 @@ void FileCacheAction::handle(HttpRequest* req) {
         HttpChannel::send_reply(req, HttpStatus::OK,
                                 json_metrics.empty() ? status.to_json() : 
json_metrics);
     } else {
-        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, 
status_result);
+        const auto http_status = status.is<ErrorCode::INVALID_ARGUMENT>()
+                                         ? HttpStatus::BAD_REQUEST
+                                         : HttpStatus::INTERNAL_SERVER_ERROR;
+        HttpChannel::send_reply(req, http_status, status_result);
     }
 }
 
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index b57e1285f87..dcf210e3cb1 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -18,6 +18,9 @@
 // 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/tests/gtest_lru_file_cache.cpp
 // and modified by Doris
 
+#include <atomic>
+#include <future>
+
 #include "io/cache/block_file_cache_test_common.h"
 #include "io/fs/buffered_reader.h"
 #include "storage/olap_define.h"
@@ -2768,6 +2771,188 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) {
     delete holder;
 }
 
+TEST_F(BlockFileCacheTest, 
clear_file_cache_sync_removes_releasable_blocks_synchronously) {
+    std::string my_cache_path = caches_dir / 
"clear_file_cache_sync_mixed_states" / "";
+    if (fs::exists(my_cache_path)) {
+        fs::remove_all(my_cache_path);
+    }
+    fs::create_directories(my_cache_path);
+
+    constexpr int num_blocks = 100000;
+    constexpr size_t block_size = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = num_blocks * block_size * 2;
+    settings.query_queue_elements = num_blocks + 1024;
+    settings.index_queue_size = num_blocks * block_size * 2;
+    settings.index_queue_elements = num_blocks + 1024;
+    settings.disposable_queue_size = num_blocks * block_size * 2;
+    settings.disposable_queue_elements = num_blocks + 1024;
+    settings.capacity = num_blocks * block_size * 2;
+    settings.max_file_block_size = 64;
+    settings.max_query_cache_size = num_blocks * block_size * 2;
+
+    io::BlockFileCache cache(my_cache_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    wait_until_cache_ready(cache);
+
+    io::CacheContext context;
+    context.cache_type = io::FileCacheType::NORMAL;
+    auto key = io::BlockFileCache::hash("clear_file_cache_sync_mixed_states");
+    int expected_sync_removes = 0;
+    int expected_remaining = 0;
+    std::vector<size_t> expected_remaining_offsets;
+    std::vector<io::FileBlockSPtr> busy_refs;
+    {
+        std::lock_guard<std::mutex> cache_lock(cache._mutex);
+        for (int i = 0; i < num_blocks; ++i) {
+            // Keep the scan large, but make non-releasable states sparse so 
the test does not
+            // spend most of its time writing one log line per busy block.
+            const bool busy = i % 997 == 0;
+            const bool downloading = i % 991 == 0;
+            const bool empty = !downloading && i % 983 == 0;
+            const auto state =
+                    empty ? io::FileBlock::State::EMPTY : 
io::FileBlock::State::DOWNLOADED;
+            auto* cell =
+                    cache.add_cell(key, context, i * block_size, block_size, 
state, cache_lock);
+            ASSERT_NE(cell, nullptr);
+            if (downloading) {
+                FileBlockTestAccessor::set_state(*cell->file_block,
+                                                 
io::FileBlock::State::DOWNLOADING);
+                FileBlockTestAccessor::set_downloader_id(*cell->file_block, 
1000 + i);
+            }
+            if (busy) {
+                busy_refs.push_back(cell->file_block);
+            }
+
+            if (busy || downloading) {
+                ++expected_remaining;
+                expected_remaining_offsets.push_back(i * block_size);
+            } else if (!empty) {
+                ++expected_sync_removes;
+            }
+        }
+    }
+
+    std::atomic<int> storage_remove_calls {0};
+    auto sp = SyncPoint::get_instance();
+    SyncPoint::CallbackGuard guard;
+    sp->set_call_back(
+            "FSFileCacheStorage::remove",
+            [&storage_remove_calls](auto&& args) {
+                storage_remove_calls.fetch_add(1);
+                try_any_cast_ret<Status>(args)->second = true;
+            },
+            &guard);
+    sp->enable_processing();
+    Defer defer {[sp] {
+        sp->disable_processing();
+        sp->clear_all_call_backs();
+    }};
+
+    auto msg = cache.clear_file_cache_sync();
+    EXPECT_NE(msg.find("clear_file_cache_sync"), std::string::npos);
+    EXPECT_NE(msg.find("sync_remove=1"), std::string::npos);
+    EXPECT_EQ(storage_remove_calls.load(), expected_sync_removes);
+    EXPECT_EQ(cache._cur_cache_size, expected_remaining * block_size);
+
+    {
+        std::lock_guard<std::mutex> cache_lock(cache._mutex);
+        for (auto offset : expected_remaining_offsets) {
+            auto* cell = cache.get_cell(key, offset, cache_lock);
+            ASSERT_NE(cell, nullptr);
+            EXPECT_TRUE(cell->file_block->is_deleting());
+        }
+    }
+
+    if (fs::exists(my_cache_path)) {
+        fs::remove_all(my_cache_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, 
clear_file_cache_sync_factory_rejects_concurrent_sync_clear) {
+    std::string my_cache_path = caches_dir / 
"clear_file_cache_sync_factory_busy" / "";
+    if (fs::exists(my_cache_path)) {
+        fs::remove_all(my_cache_path);
+    }
+    fs::create_directories(my_cache_path);
+
+    auto* factory = FileCacheFactory::instance();
+    factory->_caches.clear();
+    factory->_path_to_cache.clear();
+    factory->_capacity = 0;
+
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 1024;
+    settings.query_queue_elements = 128;
+    settings.index_queue_size = 1024;
+    settings.index_queue_elements = 128;
+    settings.disposable_queue_size = 1024;
+    settings.disposable_queue_elements = 128;
+    settings.capacity = 2048;
+    settings.max_file_block_size = 64;
+    settings.max_query_cache_size = 1024;
+    ASSERT_TRUE(factory->create_file_cache(my_cache_path, settings).ok());
+    auto* cache = factory->get_by_path(my_cache_path);
+    ASSERT_NE(cache, nullptr);
+    wait_until_cache_ready(*cache);
+
+    io::CacheContext context;
+    context.cache_type = io::FileCacheType::NORMAL;
+    auto key = io::BlockFileCache::hash("clear_file_cache_sync_factory_busy");
+    {
+        std::lock_guard<std::mutex> cache_lock(cache->_mutex);
+        ASSERT_NE(cache->add_cell(key, context, 0, 4, 
io::FileBlock::State::DOWNLOADED, cache_lock),
+                  nullptr);
+    }
+
+    std::promise<void> remove_entered;
+    std::promise<void> release_remove;
+    auto release_remove_future = release_remove.get_future().share();
+    std::atomic_bool remove_released {false};
+    auto release_remove_once = [&]() {
+        if (!remove_released.exchange(true)) {
+            release_remove.set_value();
+        }
+    };
+    std::atomic_bool block_first_remove {true};
+    auto sp = SyncPoint::get_instance();
+    SyncPoint::CallbackGuard guard;
+    sp->set_call_back(
+            "FSFileCacheStorage::remove",
+            [&](auto&& args) {
+                try_any_cast_ret<Status>(args)->second = true;
+                if (block_first_remove.exchange(false)) {
+                    remove_entered.set_value();
+                    release_remove_future.wait();
+                }
+            },
+            &guard);
+    sp->enable_processing();
+    Defer defer {[&] {
+        release_remove_once();
+        sp->disable_processing();
+        sp->clear_all_call_backs();
+        factory->_caches.clear();
+        factory->_path_to_cache.clear();
+        factory->_capacity = 0;
+        if (fs::exists(my_cache_path)) {
+            fs::remove_all(my_cache_path);
+        }
+    }};
+
+    auto first_clear =
+            std::async(std::launch::async, [factory] { return 
factory->clear_file_caches(true); });
+    ASSERT_EQ(remove_entered.get_future().wait_for(std::chrono::seconds(5)),
+              std::future_status::ready);
+
+    auto second_result = factory->clear_file_caches(true);
+    EXPECT_NE(second_result.find("already running"), std::string::npos);
+
+    release_remove_once();
+    auto first_result = first_clear.get();
+    EXPECT_NE(first_result.find("clear_file_cache_sync"), std::string::npos);
+}
+
 TEST_F(BlockFileCacheTest, remove_directly) {
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
diff --git a/be/test/io/cache/block_file_cache_test_meta_store.cpp 
b/be/test/io/cache/block_file_cache_test_meta_store.cpp
index 511320f1ede..b92bda1b0ae 100644
--- a/be/test/io/cache/block_file_cache_test_meta_store.cpp
+++ b/be/test/io/cache/block_file_cache_test_meta_store.cpp
@@ -628,14 +628,16 @@ TEST_F(BlockFileCacheTest, 
clear_retains_meta_directory_and_clears_meta_entries)
     context.tablet_id = 314;
     auto key = io::BlockFileCache::hash("meta_clear_key");
 
-    auto holder = cache.get_or_set(key, 0, 100000, context);
-    auto blocks = fromHolder(holder);
-    ASSERT_EQ(blocks.size(), 1);
-    assert_range(1, blocks[0], io::FileBlock::Range(0, 99999), 
io::FileBlock::State::EMPTY);
-    ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-    download(blocks[0]);
-    assert_range(2, blocks[0], io::FileBlock::Range(0, 99999), 
io::FileBlock::State::DOWNLOADED);
-    blocks.clear();
+    {
+        auto holder = cache.get_or_set(key, 0, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+        assert_range(1, blocks[0], io::FileBlock::Range(0, 99999), 
io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(0, 99999),
+                     io::FileBlock::State::DOWNLOADED);
+    }
 
     auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(cache._storage.get());
     ASSERT_NE(fs_storage, nullptr) << "Expected FSFileCacheStorage but got 
different storage type";
@@ -645,15 +647,17 @@ TEST_F(BlockFileCacheTest, 
clear_retains_meta_directory_and_clears_meta_entries)
     verify_meta_key(*meta_store, context.tablet_id, "meta_clear_key", 0, 
FileCacheType::NORMAL, 0,
                     100000);
 
-    cache.clear_file_cache_directly();
+    cache.clear_file_cache_sync();
 
     std::string meta_dir = cache.get_base_path() + "/meta";
     ASSERT_TRUE(fs::exists(meta_dir));
     ASSERT_TRUE(fs::is_directory(meta_dir));
 
     BlockMetaKey mkey(context.tablet_id, key, 0);
-    auto meta = meta_store->get(mkey);
-    ASSERT_FALSE(meta.has_value());
+    for (int i = 0; i < 100 && meta_store->get(mkey).has_value(); ++i) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    ASSERT_FALSE(meta_store->get(mkey).has_value());
 
     auto iterator = meta_store->get_all();
     if (iterator != nullptr) {
diff --git a/be/test/service/http/file_cache_action_test.cpp 
b/be/test/service/http/file_cache_action_test.cpp
index 066d07b25e3..ce4f8b02372 100644
--- a/be/test/service/http/file_cache_action_test.cpp
+++ b/be/test/service/http/file_cache_action_test.cpp
@@ -207,8 +207,8 @@ TEST_F(FileCacheActionTest, clear_defaults_to_async) {
     EXPECT_EQ(_cache->_cur_cache_size, 0);
 }
 
-TEST_F(FileCacheActionTest, clear_sync_true_runs_async_and_warns) {
-    cache_file_block("clear_sync_true_runs_async_and_warns.dat");
+TEST_F(FileCacheActionTest, clear_sync_true_returns_sync_summary_json) {
+    cache_file_block("clear_sync_true_returns_sync_summary_json.dat");
     HttpRequest req(_evhttp_req);
     std::string json_metrics;
 
@@ -218,9 +218,9 @@ TEST_F(FileCacheActionTest, 
clear_sync_true_runs_async_and_warns) {
     Status status = _action->_handle_header(&req, &json_metrics);
 
     EXPECT_TRUE(status.ok());
-    EXPECT_EQ(json_metrics,
-              "{\"status\":\"OK\",\"msg\":\"sync clear_file_cache is no longer 
supported in "
-              "http api, running async clear instead\"}");
+    EXPECT_NE(json_metrics.find("\"status\":\"OK\""), std::string::npos);
+    EXPECT_NE(json_metrics.find("finish clear_file_cache_sync"), 
std::string::npos);
+    EXPECT_NE(json_metrics.find("sync_remove=1"), std::string::npos);
     EXPECT_EQ(_cache->_cur_cache_size, 0);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to