This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4bc9ddcb271 branch-3.0: [Fix](recycler) Fix transaction label
recycling to prevent key cleanup failures and 'key not found' errors #50766
(#50814)
4bc9ddcb271 is described below
commit 4bc9ddcb271b6aeaa5d2f6cb0bd534cdff97163f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 13 21:39:00 2025 +0800
branch-3.0: [Fix](recycler) Fix transaction label recycling to prevent key
cleanup failures and 'key not found' errors #50766 (#50814)
Cherry-picked from #50766
Co-authored-by: abmdocrt <[email protected]>
---
cloud/src/recycler/recycler.cpp | 8 +++++-
cloud/test/recycler_test.cpp | 55 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 62 insertions(+), 1 deletion(-)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 0cee75994e6..307beab63d4 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -2635,6 +2635,11 @@ int InstanceRecycler::recycle_expired_txn_label() {
};
auto loop_done = [&]() -> int {
+ std::unique_ptr<int, std::function<void(int*)>> defer(
+ (int*)0x01, [&](int*) { recycle_txn_info_keys.clear(); });
+ TEST_SYNC_POINT_CALLBACK(
+
"InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys",
+ &recycle_txn_info_keys);
for (const auto& k : recycle_txn_info_keys) {
concurrent_delete_executor.add([&]() {
if (delete_recycle_txn_kv(k) != 0) {
@@ -2656,6 +2661,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
ret = finished ? ret : -1;
+
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_expired_txn_label.failure",
&ret);
+
if (ret != 0) {
LOG_WARNING("recycle txn kv ret!=0")
.tag("finished", finished)
@@ -2663,7 +2670,6 @@ int InstanceRecycler::recycle_expired_txn_label() {
.tag("instance_id", instance_id_);
return ret;
}
- recycle_txn_info_keys.clear();
return ret;
};
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 6ef782b9c5f..bcd7dd39160 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -4285,4 +4285,59 @@ TEST(RecyclerTest, concurrent_recycle_txn_label_test) {
<< "ms" << std::endl;
check_multiple_txn_info_kvs(txn_kv, 2000);
}
+
+TEST(RecyclerTest, concurrent_recycle_txn_label_failure_test) {
+ config::label_keep_max_second = 259200;
+ doris::cloud::RecyclerThreadPoolGroup recycle_txn_label_thread_group;
+ config::recycle_pool_parallelism = 40;
+ auto recycle_txn_label_s3_producer_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_txn_label_s3_producer_pool->start();
+ auto recycle_txn_label_recycle_tablet_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_txn_label_recycle_tablet_pool->start();
+ auto recycle_txn_label_group_recycle_function_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_txn_label_group_recycle_function_pool->start();
+ recycle_txn_label_thread_group =
+
RecyclerThreadPoolGroup(std::move(recycle_txn_label_s3_producer_pool),
+
std::move(recycle_txn_label_recycle_tablet_pool),
+
std::move(recycle_txn_label_group_recycle_function_pool));
+
+ auto mem_txn_kv = std::make_shared<MemTxnKv>();
+
+ auto txn_kv = mem_txn_kv;
+ ASSERT_TRUE(txn_kv.get()) << "exit get MemTxnKv error" << std::endl;
+ make_multiple_txn_info_kvs(txn_kv, 20000, 15000);
+ check_multiple_txn_info_kvs(txn_kv, 20000);
+
+ auto* sp = SyncPoint::get_instance();
+ std::unique_ptr<int, std::function<void(int*)>> defer(
+ (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys",
+ [](auto&& args) {
+ auto* recycle_txn_info_keys =
+
try_any_cast<std::vector<std::string>*>(args[0]);
+
+ ASSERT_LE(recycle_txn_info_keys->size(), 10000);
+ });
+ sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.failure",
[](auto&& args) {
+ auto* ret = try_any_cast<int*>(args[0]);
+ *ret = -1;
+ });
+ sp->enable_processing();
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ InstanceRecycler recycler(txn_kv, instance, recycle_txn_label_thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+ auto start = std::chrono::steady_clock::now();
+ ASSERT_EQ(recycler.recycle_expired_txn_label(), -1);
+ auto finish = std::chrono::steady_clock::now();
+ std::cout << "recycle expired txn label cost="
+ << std::chrono::duration_cast<std::chrono::milliseconds>(finish
- start).count()
+ << "ms" << std::endl;
+ check_multiple_txn_info_kvs(txn_kv, 5000);
+}
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]