This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3e100936a2a [opt](recycler) Speed up recycling txn info by introducing
parallelism (#50037)
3e100936a2a is described below
commit 3e100936a2acd55f7e4dc0d1c8372261c39b60b3
Author: abmdocrt <[email protected]>
AuthorDate: Wed Apr 23 15:06:56 2025 +0800
[opt](recycler) Speed up recycling txn info by introducing parallelism
(#50037)
---
cloud/src/recycler/recycler.cpp | 52 +++++++-
cloud/test/recycler_test.cpp | 278 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 324 insertions(+), 6 deletions(-)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 581ee4067d4..6f11455ce00 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -2500,6 +2500,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
int64_t num_scanned = 0;
int64_t num_expired = 0;
int64_t num_recycled = 0;
+ int ret = 0;
RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0};
RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id_, INT64_MAX,
INT64_MAX};
@@ -2507,6 +2508,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
std::string end_recycle_txn_key;
recycle_txn_key(recycle_txn_key_info0, &begin_recycle_txn_key);
recycle_txn_key(recycle_txn_key_info1, &end_recycle_txn_key);
+ std::vector<std::string> recycle_txn_info_keys;
LOG_INFO("begin to recycle expired txn").tag("instance_id", instance_id_);
@@ -2534,12 +2536,15 @@ int InstanceRecycler::recycle_expired_txn_label() {
return final_expiration;
};
+ SyncExecutor<int> concurrent_delete_executor(
+ _thread_pool_group.s3_producer_pool,
+ fmt::format("recycle expired txn label, instance id {}",
instance_id_),
+ [](const int& ret) { return ret != 0; });
+
int64_t current_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled,
¤t_time_ms,
- &calc_expiration,
- this](std::string_view k, std::string_view
v) -> int {
+ auto handle_recycle_txn_kv = [&](std::string_view k, std::string_view v)
-> int {
++num_scanned;
RecycleTxnPB recycle_txn_pb;
if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
@@ -2551,10 +2556,12 @@ int InstanceRecycler::recycle_expired_txn_label() {
(calc_expiration(recycle_txn_pb) <= current_time_ms)) {
VLOG_DEBUG << "found recycle txn, key=" << hex(k);
num_expired++;
- } else {
- return 0;
+ recycle_txn_info_keys.emplace_back(k);
}
+ return 0;
+ };
+ auto delete_recycle_txn_kv = [&](const std::string& k) -> int {
std::string_view k1 = k;
//RecycleTxnKeyInfo 0:instance_id 1:db_id 2:txn_id
k1.remove_prefix(1); // Remove key space
@@ -2638,8 +2645,41 @@ int InstanceRecycler::recycle_expired_txn_label() {
return 0;
};
+ auto loop_done = [&]() -> int {
+ for (const auto& k : recycle_txn_info_keys) {
+ concurrent_delete_executor.add([&]() {
+ if (delete_recycle_txn_kv(k) != 0) {
+ LOG_WARNING("failed to delete recycle txn kv")
+ .tag("instance id", instance_id_)
+ .tag("key", hex(k));
+ return -1;
+ }
+ return 0;
+ });
+ }
+ bool finished = true;
+ std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
+ for (int r : rets) {
+ if (r != 0) {
+ ret = -1;
+ }
+ }
+
+ ret = finished ? ret : -1;
+
+ if (ret != 0) {
+ LOG_WARNING("recycle txn kv ret!=0")
+ .tag("finished", finished)
+ .tag("ret", ret)
+ .tag("instance_id", instance_id_);
+ return ret;
+ }
+ recycle_txn_info_keys.clear();
+ return ret;
+ };
+
return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
- std::move(handle_recycle_txn_kv));
+ std::move(handle_recycle_txn_kv),
std::move(loop_done));
}
struct CopyJobIdTuple {
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 5b9203b1d4e..2383b71b210 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -17,11 +17,13 @@
#include "recycler/recycler.h"
+#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <gtest/gtest.h>
#include <chrono>
+#include <cstdint>
#include <memory>
#include <random>
#include <string>
@@ -35,11 +37,13 @@
#include "meta-service/keys.h"
#include "meta-service/mem_txn_kv.h"
#include "meta-service/meta_service.h"
+#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
#include "mock_accessor.h"
#include "mock_resource_manager.h"
#include "rate-limiter/rate_limiter.h"
#include "recycler/checker.h"
+#include "recycler/recycler.cpp"
#include "recycler/storage_vault_accessor.h"
#include "recycler/util.h"
#include "recycler/white_black_list.h"
@@ -4051,4 +4055,278 @@ TEST(RecyclerTest,
delete_tmp_rowset_without_resource_id) {
}
}
+static std::string generate_random_string(int length) {
+ std::string char_set =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+ std::random_device rd;
+ std::mt19937 generator(rd());
+ std::uniform_int_distribution<int> distribution(0, char_set.length() - 1);
+
+ std::string randomString;
+ for (int i = 0; i < length; ++i) {
+ randomString += char_set[distribution(generator)];
+ }
+ return randomString;
+}
+
+std::string instance_id = "concurrent_recycle_txn_label_test_" +
generate_random_string(10);
+
+/**
+ * Creates key-value pairs for a single transaction
+ * Includes key-value pairs for RecycleTxnKeyInfo, TxnIndexKey, TxnInfoKey,
SubTxnIndex, and TxnLabel
+ * @param txn_kv Transaction key-value storage object
+ * @param i Index number used to generate unique IDs
+ * @param expired_kv_num Number of expired key-value pairs
+ */
+void make_single_txn_related_kvs(std::shared_ptr<cloud::TxnKv> txn_kv, int64_t
i,
+ int64_t expired_kv_num) {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string recycle_txn_info_key;
+ std::string recycle_txn_info_val;
+ int64_t db_id = i;
+ int64_t txn_id = 1000000 + i;
+ int64_t sub_txn_id = 2000000 + i;
+ int64_t current_time = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ // RecycleTxnKeyInfo -> RecycleTxnPB
+ RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id};
+ recycle_txn_key(recycle_txn_key_info, &recycle_txn_info_key);
+ RecycleTxnPB recycle_txn_pb;
+ if (i < expired_kv_num) {
+ recycle_txn_pb.set_creation_time(current_time - 4 * 24 * 3600 * 1000L);
+ } else {
+ recycle_txn_pb.set_creation_time(current_time);
+ }
+ recycle_txn_pb.set_label("recycle_txn_key_info_label_" +
std::to_string(i));
+ if (!recycle_txn_pb.SerializeToString(&recycle_txn_info_val)) {
+ LOG_WARNING("failed to serialize recycle txn info")
+ .tag("key", hex(recycle_txn_info_key))
+ .tag("db_id", db_id)
+ .tag("txn_id", txn_id);
+ return;
+ }
+ LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}",
instance_id, db_id,
+ recycle_txn_pb.label(),
hex(recycle_txn_info_key));
+ txn->put(recycle_txn_info_key, recycle_txn_info_val);
+
+ // TxnIndexKey -> TxnIndexPB
+ std::string txn_idx_key = txn_index_key({instance_id, txn_id});
+ std::string txn_idx_val;
+ TxnIndexPB txn_index_pb;
+ if (!txn_index_pb.SerializeToString(&txn_idx_val)) {
+ LOG_WARNING("failed to serialize txn index")
+ .tag("key", hex(txn_idx_key))
+ .tag("db_id", db_id)
+ .tag("txn_id", txn_id);
+ return;
+ }
+ txn->put(txn_idx_key, txn_idx_val);
+
+ // TxnInfoKey -> TxnInfoPB
+ std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ std::string info_val;
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.add_sub_txn_ids(sub_txn_id);
+ txn_info_pb.set_label("txn_info_label_" + std::to_string(i));
+ if (!txn_info_pb.SerializeToString(&info_val)) {
+ LOG_WARNING("failed to serialize txn info")
+ .tag("key", hex(info_key))
+ .tag("db_id", db_id)
+ .tag("txn_id", txn_id);
+ return;
+ }
+ txn->put(info_key, info_val);
+
+ // SubTxnIndex -> TxnIndexPB
+ std::string idx_key = txn_index_key({instance_id, sub_txn_id});
+ std::string idx_val;
+ TxnIndexPB sub_txn_index_pb;
+ if (!sub_txn_index_pb.SerializeToString(&idx_val)) {
+ LOG_WARNING("failed to serialize sub txn index")
+ .tag("key", hex(idx_key))
+ .tag("db_id", db_id)
+ .tag("txn_id", txn_id);
+ return;
+ }
+ txn->put(idx_key, idx_val);
+
+ // TxnLabel -> TxnLabelPB
+ std::string label_key;
+ std::string label_val;
+ txn_label_key({instance_id, db_id, txn_info_pb.label()}, &label_key);
+ TxnLabelPB txn_label_pb;
+ txn_label_pb.add_txn_ids(txn_id);
+ LOG(INFO) << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}",
instance_id, db_id,
+ txn_info_pb.label(), hex(label_key));
+ if (!txn_label_pb.SerializeToString(&label_val)) {
+ LOG_WARNING("failed to serialize txn label")
+ .tag("key", hex(label_key))
+ .tag("db_id", db_id)
+ .tag("txn_id", txn_id);
+ return;
+ }
+ MemTxnKv::gen_version_timestamp(123456790, 0, &label_val);
+ txn->put(label_key, label_val);
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
+
+/**
+ * Creates multiple transaction info key-value pairs in batches
+ * Processes in batches of 2000 entries
+ * @param txn_kv Transaction key-value storage object
+ * @param total_kv_num Total number of key-value pairs to create
+ * @param expired_kv_num Number of expired key-value pairs
+ */
+void make_multiple_txn_info_kvs(std::shared_ptr<cloud::TxnKv> txn_kv, int64_t
total_kv_num,
+ int64_t expired_kv_num) {
+ using namespace doris::cloud;
+ for (int64_t i = 0; i < total_kv_num; i += 2000) {
+ for (int64_t j = i; j < i + 2000; j++) {
+ make_single_txn_related_kvs(txn_kv, j, expired_kv_num);
+ }
+ }
+}
+
+/**
+ * Verifies key-value pairs related to a single transaction
+ * Validates existence and format of RecycleTxnInfo, TxnIndex, TxnInfo,
SubTxnIndex, and TxnLabel
+ * @param k Key
+ * @param v Value
+ * @param txn_kv Transaction key-value storage object
+ */
+void check_single_txn_info_kvs(const std::string_view& k, const
std::string_view& v,
+ std::shared_ptr<cloud::TxnKv> txn_kv) {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ // check RecycleTxnInfo
+ RecycleTxnPB recycle_txn_pb;
+ ASSERT_TRUE(recycle_txn_pb.ParseFromArray(v.data(), v.size()));
+ std::string_view k1 = k;
+
+ // check TxnIndex
+ std::string index_key, index_value;
+ k1.remove_prefix(1); // Remove key space
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ ASSERT_EQ(decode_key(&k1, &out), 0);
+ int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
+ int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
+ index_key = txn_index_key({instance_id, txn_id});
+ ASSERT_EQ(txn->get(index_key, &index_value), TxnErrorCode::TXN_OK);
+
+ // check TxnInfo
+ std::string info_key, info_val;
+ txn_info_key({instance_id, db_id, txn_id}, &info_key);
+ ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK);
+
+ // check SubTxnIndex
+ TxnInfoPB txn_info;
+ ASSERT_TRUE(txn_info.ParseFromString(info_val));
+ std::vector<std::string> sub_txn_index_keys;
+ std::string sub_txn_index_value;
+ for (auto sub_txn_id : txn_info.sub_txn_ids()) {
+ auto sub_txn_index_key = txn_index_key({instance_id, sub_txn_id});
+ sub_txn_index_keys.push_back(sub_txn_index_key);
+ }
+ for (auto& sub_txn_index_key : sub_txn_index_keys) {
+ ASSERT_EQ(txn->get(sub_txn_index_key, &sub_txn_index_value),
TxnErrorCode::TXN_OK);
+ }
+
+ // check TxnLabel
+ std::string label_key, label_val;
+ txn_label_key({instance_id, db_id, txn_info.label()}, &label_key);
+ ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK)
+ << fmt::format("instance_id: {}, db_id: {}, label: {}, k: {}",
instance_id, db_id,
+ txn_info.label(), hex(label_key));
+}
+
+/**
+ * Verifies multiple transaction info key-value pairs
+ * Uses an iterator to traverse and validate key-value pairs within a
specified range
+ * @param txn_kv Transaction key-value storage object
+ * @param size Expected number of key-value pairs
+ */
+void check_multiple_txn_info_kvs(std::shared_ptr<cloud::TxnKv> txn_kv, int64_t
size) {
+ using namespace doris::cloud;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id, 0, 0};
+ RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id, INT64_MAX,
INT64_MAX};
+ std::string begin = recycle_txn_key(recycle_txn_key_info0);
+ std::string end = recycle_txn_key(recycle_txn_key_info1);
+ int64_t total_kv = 0;
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ int get_ret = txn_get(txn_kv.get(), begin, end, it);
+ if (get_ret != 0) { // txn kv may complain "Request for future version"
+ LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << ","
<< hex(end)
+ << ") txn_get_ret=" << get_ret;
+ ASSERT_TRUE(false);
+ }
+ if (!it->has_next()) {
+ LOG(INFO) << "no keys in the given range=[" << hex(begin) << ","
<< hex(end) << ")";
+ break; // scan finished
+ }
+ while (it->has_next()) {
+ // recycle corresponding resources
+ auto [k, v] = it->next();
+ if (!it->has_next()) {
+ begin = k;
+ VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k);
+ }
+ check_single_txn_info_kvs(k, v, txn_kv);
+ total_kv++;
+ }
+ begin.push_back('\x00'); // Update to next smallest key for iteration
+ } while (it->more());
+ ASSERT_EQ(total_kv, size);
+}
+
+TEST(RecyclerTest, concurrent_recycle_txn_label_test) {
+ config::label_keep_max_second = 259200;
+ doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group;
+ config::recycle_pool_parallelism = 10;
+ auto recycle_txn_label_s3_producer_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_txn_label_s3_producer_pool->start();
+ auto recycle_txn_label_recycle_tablet_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_txn_label_recycle_tablet_pool->start();
+ auto recycle_txn_label_group_recycle_function_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_txn_label_group_recycle_function_pool->start();
+ recycle_txn_label_thread_group =
+
RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool),
+
std::move(recycle_txn_label_recycle_tablet_pool),
+
std::move(recycle_txn_label_group_recycle_function_pool));
+
+ auto mem_txn_kv = std::make_shared<MemTxnKv>();
+
+ // cloud::config::fdb_cluster_file_path = "fdb.cluster";
+ // auto fdb_txn_kv =
std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>());
+ // fdb_txn_kv->init();
+
+ auto txn_kv = mem_txn_kv;
+ ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl;
+ make_multiple_txn_info_kvs(txn_kv, 10000, 8000);
+ check_multiple_txn_info_kvs(txn_kv, 10000);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ InstanceRecycler recycler(txn_kv, instance, recycle_txn_label_thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+ auto start = std::chrono::steady_clock::now();
+ ASSERT_EQ(recycler.recycle_expired_txn_label(), 0);
+ auto finish = std::chrono::steady_clock::now();
+ std::cout << "recycle expired txn label cost="
+ << std::chrono::duration_cast<std::chrono::milliseconds>(finish
- start).count()
+ << "ms" << std::endl;
+ check_multiple_txn_info_kvs(txn_kv, 2000);
+}
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]