This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1a888e5d98c branch-3.0: [chore](cloud) Implement idempotent injection 
framework for meta-service #51905 (#52102)
1a888e5d98c is described below

commit 1a888e5d98c3e45de4d347af2d73c7fc04cecc40
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jun 23 21:54:21 2025 +0800

    branch-3.0: [chore](cloud) Implement idempotent injection framework for 
meta-service #51905 (#52102)
    
    Cherry-picked from #51905
    
    Co-authored-by: Gavin Chou <[email protected]>
---
 cloud/src/common/config.h                        |  8 ++++
 cloud/src/meta-service/meta_service.h            | 52 +++++++++++++++++++++++-
 cloud/src/meta-service/meta_service_resource.cpp |  7 ----
 3 files changed, 59 insertions(+), 8 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 23bb4f74b6a..7dc63683688 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -35,6 +35,14 @@ CONF_Bool(use_mem_kv, "false");
 CONF_Int32(meta_server_register_interval_ms, "20000");
 CONF_Int32(meta_server_lease_ms, "60000");
 
+// for chaos testing
+CONF_mBool(enable_idempotent_request_injection, "false");
+// idempotent_request_replay_delay_ms = 
idempotent_request_replay_delay_base_ms + 
random(-idempotent_request_replay_delay_range_ms, 
idempotent_request_replay_delay_range_ms)
+CONF_mInt64(idempotent_request_replay_delay_base_ms, "10000");
+CONF_mInt64(idempotent_request_replay_delay_range_ms, "5000");
+// exclude some request that are meaningless to replay, comma separated list. 
e.g. GetTabletStatsRequest,GetVersionRequest
+CONF_mString(idempotent_request_replay_exclusion, 
"GetTabletStatsRequest,GetVersionRequest");
+
 CONF_Int64(fdb_txn_timeout_ms, "10000");
 CONF_Int64(brpc_max_body_size, "3147483648");
 CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index 57f88d51dfe..4be017edc9a 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -46,6 +46,14 @@ void internal_get_rowset(Transaction* txn, int64_t start, 
int64_t end,
                          const std::string& instance_id, int64_t tablet_id, 
MetaServiceCode& code,
                          std::string& msg, GetRowsetResponse* response);
 
+// for wrapping stateful lambda to run in bthread
+static void* run_bthread_work(void* arg) {
+    auto f = reinterpret_cast<std::function<void()>*>(arg);
+    (*f)();
+    delete f;
+    return nullptr;
+}
+
 class MetaServiceImpl : public cloud::MetaService {
 public:
     MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv, 
std::shared_ptr<ResourceManager> resource_mgr,
@@ -720,8 +728,12 @@ private:
         static_assert(std::is_base_of_v<::google::protobuf::Message, 
Response>);
 
         using namespace std::chrono;
-
         brpc::ClosureGuard done_guard(done);
+
+        // life span of this defer MUST be longer than `done`
+        std::unique_ptr<int, std::function<void(int*)>> defer_injection(
+                (int*)(0x01), [&, this](int*) { idempotent_injection(method, 
req, resp); });
+
         if (!config::enable_txn_store_retry) {
             (impl_.get()->*method)(ctrl, req, resp, brpc::DoNothing());
             if (DCHECK_IS_ON()) {
@@ -791,6 +803,44 @@ private:
         }
     }
 
+    template <typename Request, typename Response>
+    void idempotent_injection(MetaServiceMethod<Request, Response> method, 
const Request* requ,
+                              Response* resp) {
+        if (!config::enable_idempotent_request_injection) return;
+
+        using namespace std::chrono;
+        auto s = system_clock::now();
+        static std::mt19937_64 
rng(duration_cast<milliseconds>(s.time_since_epoch()).count());
+        // clang-format off
+        // FIXME(gavin): make idempotent_request_replay_exclusion configurable 
via HTTP
+        static auto exclusion = []{ std::istringstream 
iss(config::idempotent_request_replay_exclusion); std::string e; 
std::unordered_set<std::string> r;
+            while (std::getline(iss, e, ',')) { r.insert(e); } return r;
+        }();
+        auto f = new std::function<void()>([s, req = *requ, res = *resp, 
method, this]() mutable { // copy and capture
+            auto dist = 
std::uniform_int_distribution(-config::idempotent_request_replay_delay_range_ms,
+                                                      
config::idempotent_request_replay_delay_range_ms);
+            int64_t sleep_ms = config::idempotent_request_replay_delay_base_ms 
+ dist(rng);
+            LOG(INFO) << " request_name=" << req.GetDescriptor()->name()
+                      << " response_name=" << res.GetDescriptor()->name()
+                      << " queue_ts=" << 
duration_cast<milliseconds>(s.time_since_epoch()).count()
+                      << " now_ts=" << 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()
+                      << " idempotent_request_replay_delay_base_ms=" << 
config::idempotent_request_replay_delay_base_ms
+                      << " idempotent_request_replay_delay_range_ms=" << 
config::idempotent_request_replay_delay_range_ms
+                      << " idempotent_request_replay_delay_ms=" << sleep_ms
+                      << " request=" << req.ShortDebugString();
+            if (sleep_ms < 0 || exclusion.count(req.GetDescriptor()->name())) 
return;
+            brpc::Controller ctrl;
+            bthread_usleep(sleep_ms * 1000);
+            (impl_.get()->*method)(&ctrl, &req, &res, brpc::DoNothing());
+        });
+        // clang-format on
+        bthread_t bid;
+        if (bthread_start_background(&bid, nullptr, run_bthread_work, f) != 0) 
{
+            LOG(WARNING) << "failed to bthread_start_background, run in 
current thread";
+            run_bthread_work(f);
+        }
+    }
+
     std::unique_ptr<MetaServiceImpl> impl_;
 };
 
diff --git a/cloud/src/meta-service/meta_service_resource.cpp 
b/cloud/src/meta-service/meta_service_resource.cpp
index bbd94b577b1..8c8b0646c94 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -51,13 +51,6 @@ bool is_valid_storage_vault_name(const std::string& str) {
 
 namespace doris::cloud {
 
-static void* run_bthread_work(void* arg) {
-    auto f = reinterpret_cast<std::function<void()>*>(arg);
-    (*f)();
-    delete f;
-    return nullptr;
-}
-
 static std::string_view print_cluster_status(const ClusterStatus& status) {
     switch (status) {
     case ClusterStatus::UNKNOWN:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to