This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4bc9ddcb271 branch-3.0: [Fix](recycler) Fix transaction label 
recycling to prevent key cleanup failures and 'key not found' errors #50766 
(#50814)
4bc9ddcb271 is described below

commit 4bc9ddcb271b6aeaa5d2f6cb0bd534cdff97163f
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 13 21:39:00 2025 +0800

    branch-3.0: [Fix](recycler) Fix transaction label recycling to prevent key 
cleanup failures and 'key not found' errors #50766 (#50814)
    
    Cherry-picked from #50766
    
    Co-authored-by: abmdocrt <[email protected]>
---
 cloud/src/recycler/recycler.cpp |  8 +++++-
 cloud/test/recycler_test.cpp    | 55 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 0cee75994e6..307beab63d4 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -2635,6 +2635,11 @@ int InstanceRecycler::recycle_expired_txn_label() {
     };
 
     auto loop_done = [&]() -> int {
+        std::unique_ptr<int, std::function<void(int*)>> defer(
+                (int*)0x01, [&](int*) { recycle_txn_info_keys.clear(); });
+        TEST_SYNC_POINT_CALLBACK(
+                
"InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys",
+                &recycle_txn_info_keys);
         for (const auto& k : recycle_txn_info_keys) {
             concurrent_delete_executor.add([&]() {
                 if (delete_recycle_txn_kv(k) != 0) {
@@ -2656,6 +2661,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
 
         ret = finished ? ret : -1;
 
+        
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_expired_txn_label.failure", 
&ret);
+
         if (ret != 0) {
             LOG_WARNING("recycle txn kv ret!=0")
                     .tag("finished", finished)
@@ -2663,7 +2670,6 @@ int InstanceRecycler::recycle_expired_txn_label() {
                     .tag("instance_id", instance_id_);
             return ret;
         }
-        recycle_txn_info_keys.clear();
         return ret;
     };
 
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 6ef782b9c5f..bcd7dd39160 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -4285,4 +4285,59 @@ TEST(RecyclerTest, concurrent_recycle_txn_label_test) {
               << "ms" << std::endl;
     check_multiple_txn_info_kvs(txn_kv, 2000);
 }
+
+TEST(RecyclerTest, concurrent_recycle_txn_label_failure_test) {
+    config::label_keep_max_second = 259200;
+    doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group;
+    config::recycle_pool_parallelism = 40;
+    auto recycle_txn_label_s3_producer_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_txn_label_s3_producer_pool->start();
+    auto recycle_txn_label_recycle_tablet_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_txn_label_recycle_tablet_pool->start();
+    auto recycle_txn_label_group_recycle_function_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_txn_label_group_recycle_function_pool->start();
+    recycle_txn_label_thread_group =
+            
RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool),
+                                    
std::move(recycle_txn_label_recycle_tablet_pool),
+                                    
std::move(recycle_txn_label_group_recycle_function_pool));
+
+    auto mem_txn_kv = std::make_shared<MemTxnKv>();
+
+    auto txn_kv = mem_txn_kv;
+    ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl;
+    make_multiple_txn_info_kvs(txn_kv, 20000, 15000);
+    check_multiple_txn_info_kvs(txn_kv, 20000);
+
+    auto* sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer(
+            (int*)0x01, [](int*) { 
SyncPoint::get_instance()->clear_all_call_backs(); });
+    
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys",
+                      [](auto&& args) {
+                          auto* recycle_txn_info_keys =
+                                  
try_any_cast<std::vector<std::string>*>(args[0]);
+
+                          ASSERT_LE(recycle_txn_info_keys->size(), 10000);
+                      });
+    sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.failure", 
[](auto&& args) {
+        auto* ret = try_any_cast<int*>(args[0]);
+        *ret = -1;
+    });
+    sp->enable_processing();
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    InstanceRecycler recycler(txn_kv, instance, recycle_txn_label_thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    ASSERT_EQ(recycler.init(), 0);
+    auto start = std::chrono::steady_clock::now();
+    ASSERT_EQ(recycler.recycle_expired_txn_label(), -1);
+    auto finish = std::chrono::steady_clock::now();
+    std::cout << "recycle expired txn label cost="
+              << std::chrono::duration_cast<std::chrono::milliseconds>(finish 
- start).count()
+              << "ms" << std::endl;
+    check_multiple_txn_info_kvs(txn_kv, 5000);
+}
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to