This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 54fff1b4fe7 [Enhancement](client) Supports dynamically changing the 
rate limiter config (#59465)
54fff1b4fe7 is described below

commit 54fff1b4fe7c2ebaeda12d339dd9a7d679fc8138
Author: Yixuan Wang <[email protected]>
AuthorDate: Wed Jan 21 19:08:50 2026 +0800

    [Enhancement](client) Supports dynamically changing the rate limiter config 
(#59465)
    
    This PR adds support for dynamically updating S3 rate limiter
    configuration at runtime. It adds getter methods to S3RateLimiter and
    S3RateLimiterHolder, and attempts to trigger rate limiter updates when
    S3 clients
    are created.
---
 be/src/util/s3_util.cpp                   | 59 +++++++++++++++++++++++++++++++
 be/test/io/client/s3_file_system_test.cpp | 31 ++++++++++++++++
 common/cpp/s3_rate_limiter.h              | 12 +++++++
 3 files changed, 102 insertions(+)

diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 0cf608877bb..ce24ab30e60 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -30,9 +30,11 @@
 #include <aws/s3/S3Client.h>
 #include <aws/sts/STSClient.h>
 #include <bvar/reducer.h>
+#include <cpp/s3_rate_limiter.h>
 #include <util/string_util.h>
 
 #include <atomic>
+
 #ifdef USE_AZURE
 #include <azure/core/diagnostics/logger.hpp>
 #include <azure/storage/blobs/blob_container_client.hpp>
@@ -131,11 +133,66 @@ bvar::Adder<int64_t> 
get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_nu
 bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns");
 bvar::Adder<int64_t> 
put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num");
 
+static std::atomic<int64_t> last_s3_get_token_bucket_tokens {0};
+static std::atomic<int64_t> last_s3_get_token_limit {0};
+static std::atomic<int64_t> last_s3_get_token_per_second {0};
+static std::atomic<int64_t> last_s3_put_token_per_second {0};
+static std::atomic<int64_t> last_s3_put_token_bucket_tokens {0};
+static std::atomic<int64_t> last_s3_put_token_limit {0};
+
+static std::atomic<bool> updating_get_limiter {false};
+static std::atomic<bool> updating_put_limiter {false};
+
 S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) {
     CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << 
to_string(type);
     return _rate_limiters[static_cast<size_t>(type)].get();
 }
 
+template <S3RateLimitType LimiterType>
+void update_rate_limiter_if_changed(int64_t current_tps, int64_t 
current_bucket,
+                                    int64_t current_limit, 
std::atomic<int64_t>& last_tps,
+                                    std::atomic<int64_t>& last_bucket,
+                                    std::atomic<int64_t>& last_limit,
+                                    std::atomic<bool>& updating_flag, const 
char* limiter_name) {
+    if (last_tps.load(std::memory_order_relaxed) != current_tps ||
+        last_bucket.load(std::memory_order_relaxed) != current_bucket ||
+        last_limit.load(std::memory_order_relaxed) != current_limit) {
+        bool expected = false;
+        if (!updating_flag.compare_exchange_strong(expected, true, 
std::memory_order_acq_rel)) {
+            return;
+        }
+        if (last_tps.load(std::memory_order_acquire) != current_tps ||
+            last_bucket.load(std::memory_order_acquire) != current_bucket ||
+            last_limit.load(std::memory_order_acquire) != current_limit) {
+            int ret =
+                    reset_s3_rate_limiter(LimiterType, current_tps, 
current_bucket, current_limit);
+
+            if (ret == 0) {
+                last_tps.store(current_tps, std::memory_order_release);
+                last_bucket.store(current_bucket, std::memory_order_release);
+                last_limit.store(current_limit, std::memory_order_release);
+            } else {
+                LOG(WARNING) << "Failed to reset S3 " << limiter_name
+                             << " rate limiter, error code: " << ret;
+            }
+        }
+
+        updating_flag.store(false, std::memory_order_release);
+    }
+}
+
+void check_s3_rate_limiter_config_changed() {
+    update_rate_limiter_if_changed<S3RateLimitType::GET>(
+            config::s3_get_token_per_second, config::s3_get_bucket_tokens,
+            config::s3_get_token_limit, last_s3_get_token_per_second,
+            last_s3_get_token_bucket_tokens, last_s3_get_token_limit, 
updating_get_limiter, "GET");
+
+    update_rate_limiter_if_changed<S3RateLimitType::PUT>(
+            config::s3_put_token_per_second, config::s3_put_bucket_tokens,
+            config::s3_put_token_limit, last_s3_put_token_per_second,
+            last_s3_put_token_bucket_tokens, last_s3_put_token_limit, 
updating_put_limiter, "PUT");
+}
+
 int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t 
max_burst, size_t limit) {
     if (type == S3RateLimitType::UNKNOWN) {
         return -1;
@@ -204,6 +261,8 @@ std::shared_ptr<io::ObjStorageClient> 
S3ClientFactory::create(const S3ClientConf
         return nullptr;
     }
 
+    check_s3_rate_limiter_config_changed();
+
 #ifdef BE_TEST
     {
         std::lock_guard l(_lock);
diff --git a/be/test/io/client/s3_file_system_test.cpp 
b/be/test/io/client/s3_file_system_test.cpp
index cc76c276759..d33240485af 100644
--- a/be/test/io/client/s3_file_system_test.cpp
+++ b/be/test/io/client/s3_file_system_test.cpp
@@ -2414,4 +2414,35 @@ TEST_F(S3FileSystemTest, 
AzureRateLimiterDeleteDirectoryExceptionHandlingTest) {
     ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining directory: " << 
status.to_string();
 }
 
+TEST_F(S3FileSystemTest, DynamicUpdateRateLimiterConfig) {
+    // Save original config values
+    int64_t original_get_bucket_tokens = config::s3_get_bucket_tokens;
+    int64_t original_get_token_per_second = config::s3_get_token_per_second;
+    int64_t original_get_token_limit = config::s3_get_token_limit;
+
+    std::cout << "Original GET config: bucket_tokens=" << 
original_get_bucket_tokens
+              << ", token_per_second=" << original_get_token_per_second
+              << ", limit=" << original_get_token_limit << std::endl;
+
+    int64_t new_s3_get_bucket_tokens_val = 50;
+    int64_t new_s3_get_token_per_second_val = 1;
+
+    auto [success1, msg7] = config::set_config(
+            "s3_get_bucket_tokens", 
std::to_string(new_s3_get_bucket_tokens_val), false, false);
+    ASSERT_EQ(success1, 0) << "Failed to set s3_get_bucket_tokens: " << msg7;
+    auto [success2, msg8] =
+            config::set_config("s3_get_token_per_second",
+                               
std::to_string(new_s3_get_token_per_second_val), false, false);
+    ASSERT_EQ(success2, 0) << "Failed to set s3_get_token_per_second: " << 
msg8;
+
+    auto st = create_client();
+    ASSERT_TRUE(st.ok());
+
+    // Verify restoration
+    
EXPECT_EQ(S3ClientFactory::instance().rate_limiter(S3RateLimitType::GET)->get_max_burst(),
+              new_s3_get_bucket_tokens_val);
+    
EXPECT_EQ(S3ClientFactory::instance().rate_limiter(S3RateLimitType::GET)->get_max_speed(),
+              new_s3_get_token_per_second_val);
+}
+
 } // namespace doris
diff --git a/common/cpp/s3_rate_limiter.h b/common/cpp/s3_rate_limiter.h
index 07ae8006941..357b8f9fcc1 100644
--- a/common/cpp/s3_rate_limiter.h
+++ b/common/cpp/s3_rate_limiter.h
@@ -52,6 +52,12 @@ public:
     // Returns duration of sleep in nanoseconds (to distinguish sleeping on 
different kinds of S3RateLimiters for metrics)
     int64_t add(size_t amount);
 
+    size_t get_max_speed() const { return _max_speed; }
+
+    size_t get_max_burst() const { return _max_burst; }
+
+    size_t get_limit() const { return _limit; }
+
 private:
     std::pair<size_t, double> _update_remain_token(long now, size_t amount);
     size_t _count {0};
@@ -75,6 +81,12 @@ public:
 
     int reset(size_t max_speed, size_t max_burst, size_t limit);
 
+    size_t get_max_speed() const { return rate_limiter->get_max_speed(); }
+
+    size_t get_max_burst() const { return rate_limiter->get_max_burst(); }
+
+    size_t get_limit() const { return rate_limiter->get_limit(); }
+
 private:
     std::shared_mutex rate_limiter_rw_lock;
     std::unique_ptr<S3RateLimiter> rate_limiter;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to