This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e100936a2a [opt](recycler) Speed up recycling txn info by introducing 
parallelism (#50037)
3e100936a2a is described below

commit 3e100936a2acd55f7e4dc0d1c8372261c39b60b3
Author: abmdocrt <[email protected]>
AuthorDate: Wed Apr 23 15:06:56 2025 +0800

    [opt](recycler) Speed up recycling txn info by introducing parallelism 
(#50037)
---
 cloud/src/recycler/recycler.cpp |  52 +++++++-
 cloud/test/recycler_test.cpp    | 278 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 324 insertions(+), 6 deletions(-)

diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 581ee4067d4..6f11455ce00 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -2500,6 +2500,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
     int64_t num_scanned = 0;
     int64_t num_expired = 0;
     int64_t num_recycled = 0;
+    int ret = 0;
 
     RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0};
     RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id_, INT64_MAX, 
INT64_MAX};
@@ -2507,6 +2508,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
     std::string end_recycle_txn_key;
     recycle_txn_key(recycle_txn_key_info0, &begin_recycle_txn_key);
     recycle_txn_key(recycle_txn_key_info1, &end_recycle_txn_key);
+    std::vector<std::string> recycle_txn_info_keys;
 
     LOG_INFO("begin to recycle expired txn").tag("instance_id", instance_id_);
 
@@ -2534,12 +2536,15 @@ int InstanceRecycler::recycle_expired_txn_label() {
         return final_expiration;
     };
 
+    SyncExecutor<int> concurrent_delete_executor(
+            _thread_pool_group.s3_producer_pool,
+            fmt::format("recycle expired txn label, instance id {}", 
instance_id_),
+            [](const int& ret) { return ret != 0; });
+
     int64_t current_time_ms =
             
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
 
-    auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled, 
&current_time_ms,
-                                  &calc_expiration,
-                                  this](std::string_view k, std::string_view 
v) -> int {
+    auto handle_recycle_txn_kv = [&](std::string_view k, std::string_view v) 
-> int {
         ++num_scanned;
         RecycleTxnPB recycle_txn_pb;
         if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
@@ -2551,10 +2556,12 @@ int InstanceRecycler::recycle_expired_txn_label() {
             (calc_expiration(recycle_txn_pb) <= current_time_ms)) {
             VLOG_DEBUG << "found recycle txn, key=" << hex(k);
             num_expired++;
-        } else {
-            return 0;
+            recycle_txn_info_keys.emplace_back(k);
         }
+        return 0;
+    };
 
+    auto delete_recycle_txn_kv = [&](const std::string& k) -> int {
         std::string_view k1 = k;
         //RecycleTxnKeyInfo 0:instance_id  1:db_id  2:txn_id
         k1.remove_prefix(1); // Remove key space
@@ -2638,8 +2645,41 @@ int InstanceRecycler::recycle_expired_txn_label() {
         return 0;
     };
 
+    auto loop_done = [&]() -> int {
+        for (const auto& k : recycle_txn_info_keys) {
+            concurrent_delete_executor.add([&]() {
+                if (delete_recycle_txn_kv(k) != 0) {
+                    LOG_WARNING("failed to delete recycle txn kv")
+                            .tag("instance id", instance_id_)
+                            .tag("key", hex(k));
+                    return -1;
+                }
+                return 0;
+            });
+        }
+        bool finished = true;
+        std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
+        for (int r : rets) {
+            if (r != 0) {
+                ret = -1;
+            }
+        }
+
+        ret = finished ? ret : -1;
+
+        if (ret != 0) {
+            LOG_WARNING("recycle txn kv ret!=0")
+                    .tag("finished", finished)
+                    .tag("ret", ret)
+                    .tag("instance_id", instance_id_);
+            return ret;
+        }
+        recycle_txn_info_keys.clear();
+        return ret;
+    };
+
     return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
-                            std::move(handle_recycle_txn_kv));
+                            std::move(handle_recycle_txn_kv), 
std::move(loop_done));
 }
 
 struct CopyJobIdTuple {
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 5b9203b1d4e..2383b71b210 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -17,11 +17,13 @@
 
 #include "recycler/recycler.h"
 
+#include <fmt/core.h>
 #include <gen_cpp/cloud.pb.h>
 #include <gen_cpp/olap_file.pb.h>
 #include <gtest/gtest.h>
 
 #include <chrono>
+#include <cstdint>
 #include <memory>
 #include <random>
 #include <string>
@@ -35,11 +37,13 @@
 #include "meta-service/keys.h"
 #include "meta-service/mem_txn_kv.h"
 #include "meta-service/meta_service.h"
+#include "meta-service/txn_kv.h"
 #include "meta-service/txn_kv_error.h"
 #include "mock_accessor.h"
 #include "mock_resource_manager.h"
 #include "rate-limiter/rate_limiter.h"
 #include "recycler/checker.h"
+#include "recycler/recycler.cpp"
 #include "recycler/storage_vault_accessor.h"
 #include "recycler/util.h"
 #include "recycler/white_black_list.h"
@@ -4051,4 +4055,278 @@ TEST(RecyclerTest, 
delete_tmp_rowset_without_resource_id) {
     }
 }
 
+static std::string generate_random_string(int length) {
+    std::string char_set = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+    std::random_device rd;
+    std::mt19937 generator(rd());
+    std::uniform_int_distribution<int> distribution(0, char_set.length() - 1);
+
+    std::string randomString;
+    for (int i = 0; i < length; ++i) {
+        randomString += char_set[distribution(generator)];
+    }
+    return randomString;
+}
+
+std::string instance_id = "concurrent_recycle_txn_label_test_" + 
generate_random_string(10);
+
+/**
+ * Creates key-value pairs for a single transaction
+ * Includes key-value pairs for RecycleTxnKeyInfo, TxnIndexKey, TxnInfoKey, 
SubTxnIndex, and TxnLabel
+ * @param txn_kv Transaction key-value storage object
+ * @param i Index number used to generate unique IDs
+ * @param expired_kv_num Number of expired key-value pairs
+ */
+void make_single_txn_related_kvs(std::shared_ptr<cloud::TxnKv> txn_kv, int64_t 
i,
+                                 int64_t expired_kv_num) {
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+    std::string recycle_txn_info_key;
+    std::string recycle_txn_info_val;
+    int64_t db_id = i;
+    int64_t txn_id = 1000000 + i;
+    int64_t sub_txn_id = 2000000 + i;
+    int64_t current_time = duration_cast<std::chrono::milliseconds>(
+                                   
std::chrono::system_clock::now().time_since_epoch())
+                                   .count();
+
+    // RecycleTxnKeyInfo -> RecycleTxnPB
+    RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id};
+    recycle_txn_key(recycle_txn_key_info, &recycle_txn_info_key);
+    RecycleTxnPB recycle_txn_pb;
+    if (i < expired_kv_num) {
+        recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L);
+    } else {
+        recycle_txn_pb.set_creation_time(current_time);
+    }
+    recycle_txn_pb.set_label("recycle_txn_key_info_label_" + 
std::to_string(i));
+    if (!recycle_txn_pb.SerializeToString(&recycle_txn_info_val)) {
+        LOG_WARNING("failed to serialize recycle txn info")
+                .tag("key", hex(recycle_txn_info_key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return;
+    }
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             recycle_txn_pb.label(), 
hex(recycle_txn_info_key));
+    txn->put(recycle_txn_info_key, recycle_txn_info_val);
+
+    // TxnIndexKey -> TxnIndexPB
+    std::string txn_idx_key = txn_index_key({instance_id, txn_id});
+    std::string txn_idx_val;
+    TxnIndexPB txn_index_pb;
+    if (!txn_index_pb.SerializeToString(&txn_idx_val)) {
+        LOG_WARNING("failed to serialize txn index")
+                .tag("key", hex(txn_idx_key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return;
+    }
+    txn->put(txn_idx_key, txn_idx_val);
+
+    // TxnInfoKey -> TxnInfoPB
+    std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    std::string info_val;
+    TxnInfoPB txn_info_pb;
+    txn_info_pb.add_sub_txn_ids(sub_txn_id);
+    txn_info_pb.set_label("txn_info_label_" + std::to_string(i));
+    if (!txn_info_pb.SerializeToString(&info_val)) {
+        LOG_WARNING("failed to serialize txn info")
+                .tag("key", hex(info_key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return;
+    }
+    txn->put(info_key, info_val);
+
+    // SubTxnIndex -> TxnIndexPB
+    std::string idx_key = txn_index_key({instance_id, sub_txn_id});
+    std::string idx_val;
+    TxnIndexPB sub_txn_index_pb;
+    if (!sub_txn_index_pb.SerializeToString(&idx_val)) {
+        LOG_WARNING("failed to serialize sub txn index")
+                .tag("key", hex(idx_key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return;
+    }
+    txn->put(idx_key, idx_val);
+
+    // TxnLabel -> TxnLabelPB
+    std::string label_key;
+    std::string label_val;
+    txn_label_key({instance_id, db_id, txn_info_pb.label()}, &label_key);
+    TxnLabelPB txn_label_pb;
+    txn_label_pb.add_txn_ids(txn_id);
+    LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                             txn_info_pb.label(), hex(label_key));
+    if (!txn_label_pb.SerializeToString(&label_val)) {
+        LOG_WARNING("failed to serialize txn label")
+                .tag("key", hex(label_key))
+                .tag("db_id", db_id)
+                .tag("txn_id", txn_id);
+        return;
+    }
+    MemTxnKv::gen_version_timestamp(123456790, 0, &label_val);
+    txn->put(label_key, label_val);
+
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
+
+/**
+ * Creates multiple transaction info key-value pairs in batches
+ * Processes in batches of 2000 entries
+ * @param txn_kv Transaction key-value storage object
+ * @param total_kv_num Total number of key-value pairs to create
+ * @param expired_kv_num Number of expired key-value pairs
+ */
+void make_multiple_txn_info_kvs(std::shared_ptr<cloud::TxnKv> txn_kv, int64_t 
total_kv_num,
+                                int64_t expired_kv_num) {
+    using namespace doris::cloud;
+    for (int64_t i = 0; i < total_kv_num; i += 2000) {
+        for (int64_t j = i; j < i + 2000; j++) {
+            make_single_txn_related_kvs(txn_kv, j, expired_kv_num);
+        }
+    }
+}
+
+/**
+ * Verifies key-value pairs related to a single transaction
+ * Validates existence and format of RecycleTxnInfo, TxnIndex, TxnInfo, 
SubTxnIndex, and TxnLabel
+ * @param k Key
+ * @param v Value
+ * @param txn_kv Transaction key-value storage object
+ */
+void check_single_txn_info_kvs(const std::string_view& k, const 
std::string_view& v,
+                               std::shared_ptr<cloud::TxnKv> txn_kv) {
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+    // check RecycleTxnInfo
+    RecycleTxnPB recycle_txn_pb;
+    ASSERT_TRUE(recycle_txn_pb.ParseFromArray(v.data(), v.size()));
+    std::string_view k1 = k;
+
+    // check TxnIndex
+    std::string index_key, index_value;
+    k1.remove_prefix(1); // Remove key space
+    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+    ASSERT_EQ(decode_key(&k1, &out), 0);
+    int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
+    int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
+    index_key = txn_index_key({instance_id, txn_id});
+    ASSERT_EQ(txn->get(index_key, &index_value), TxnErrorCode::TXN_OK);
+
+    // check TxnInfo
+    std::string info_key, info_val;
+    txn_info_key({instance_id, db_id, txn_id}, &info_key);
+    ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK);
+
+    // check SubTxnIndex
+    TxnInfoPB txn_info;
+    ASSERT_TRUE(txn_info.ParseFromString(info_val));
+    std::vector<std::string> sub_txn_index_keys;
+    std::string sub_txn_index_value;
+    for (auto sub_txn_id : txn_info.sub_txn_ids()) {
+        auto sub_txn_index_key = txn_index_key({instance_id, sub_txn_id});
+        sub_txn_index_keys.push_back(sub_txn_index_key);
+    }
+    for (auto& sub_txn_index_key : sub_txn_index_keys) {
+        ASSERT_EQ(txn->get(sub_txn_index_key, &sub_txn_index_value), 
TxnErrorCode::TXN_OK);
+    }
+
+    // check TxnLabel
+    std::string label_key, label_val;
+    txn_label_key({instance_id, db_id, txn_info.label()}, &label_key);
+    ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK)
+            << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}", 
instance_id, db_id,
+                           txn_info.label(), hex(label_key));
+}
+
+/**
+ * Verifies multiple transaction info key-value pairs
+ * Uses an iterator to traverse and validate key-value pairs within a 
specified range
+ * @param txn_kv Transaction key-value storage object
+ * @param size Expected number of key-value pairs
+ */
+void check_multiple_txn_info_kvs(std::shared_ptr<cloud::TxnKv> txn_kv, int64_t 
size) {
+    using namespace doris::cloud;
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id, 0, 0};
+    RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id, INT64_MAX, 
INT64_MAX};
+    std::string begin = recycle_txn_key(recycle_txn_key_info0);
+    std::string end = recycle_txn_key(recycle_txn_key_info1);
+    int64_t total_kv = 0;
+
+    std::unique_ptr<RangeGetIterator> it;
+    do {
+        int get_ret = txn_get(txn_kv.get(), begin, end, it);
+        if (get_ret != 0) { // txn kv may complain "Request for future version"
+            LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << "," 
<< hex(end)
+                         << ") txn_get_ret=" << get_ret;
+            ASSERT_TRUE(false);
+        }
+        if (!it->has_next()) {
+            LOG(INFO) << "no keys in the given range=[" << hex(begin) << "," 
<< hex(end) << ")";
+            break; // scan finished
+        }
+        while (it->has_next()) {
+            // recycle corresponding resources
+            auto [k, v] = it->next();
+            if (!it->has_next()) {
+                begin = k;
+                VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k);
+            }
+            check_single_txn_info_kvs(k, v, txn_kv);
+            total_kv++;
+        }
+        begin.push_back('\x00'); // Update to next smallest key for iteration
+    } while (it->more());
+    ASSERT_EQ(total_kv, size);
+}
+
+TEST(RecyclerTest, concurrent_recycle_txn_label_test) {
+    config::label_keep_max_second = 259200;
+    doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group;
+    config::recycle_pool_parallelism = 10;
+    auto recycle_txn_label_s3_producer_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_txn_label_s3_producer_pool->start();
+    auto recycle_txn_label_recycle_tablet_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_txn_label_recycle_tablet_pool->start();
+    auto recycle_txn_label_group_recycle_function_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_txn_label_group_recycle_function_pool->start();
+    recycle_txn_label_thread_group =
+            
RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool),
+                                    
std::move(recycle_txn_label_recycle_tablet_pool),
+                                    
std::move(recycle_txn_label_group_recycle_function_pool));
+
+    auto mem_txn_kv = std::make_shared<MemTxnKv>();
+
+    // cloud::config::fdb_cluster_file_path = "fdb.cluster";
+    // auto fdb_txn_kv = 
std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>());
+    // fdb_txn_kv->init();
+
+    auto txn_kv = mem_txn_kv;
+    ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl;
+    make_multiple_txn_info_kvs(txn_kv, 10000, 8000);
+    check_multiple_txn_info_kvs(txn_kv, 10000);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    InstanceRecycler recycler(txn_kv, instance, recycle_txn_label_thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    ASSERT_EQ(recycler.init(), 0);
+    auto start = std::chrono::steady_clock::now();
+    ASSERT_EQ(recycler.recycle_expired_txn_label(), 0);
+    auto finish = std::chrono::steady_clock::now();
+    std::cout << "recycle expired txn label cost="
+              << std::chrono::duration_cast<std::chrono::milliseconds>(finish 
- start).count()
+              << "ms" << std::endl;
+    check_multiple_txn_info_kvs(txn_kv, 2000);
+}
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to