This is an automated email from the ASF dual-hosted git repository.

leander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new bb081bef feat(auto_cl): add error rate threshold for punishment 
attenuation (#3219)
bb081bef is described below

commit bb081befc0479b2d99b39d865fa000780da9cd55
Author: yanfeng <[email protected]>
AuthorDate: Thu Feb 12 14:02:26 2026 +0800

    feat(auto_cl): add error rate threshold for punishment attenuation (#3219)
    
    * feat(auto_cl): add error rate threshold for punishment attenuation
    
    Add new GFlag `auto_cl_error_rate_punish_threshold` to enable
    error-rate-based punishment attenuation in AutoConcurrencyLimiter.
    
    Problem: Low error rates (e.g., 1.3% sporadic timeouts) cause
    disproportionate avg_latency inflation (+31%), leading the limiter
    to mistakenly shrink max_concurrency and trigger ELIMIT rejections.
    
    Solution: Inspired by Alibaba Sentinel's threshold-based approach:
    - threshold=0 (default): Original behavior preserved (backward compat)
    - threshold>0 (e.g., 0.1): Error rates below threshold produce zero
      punishment; above it, punishment scales linearly from 0 to full
    
    Example: With threshold=0.1, a 5% error rate produces no punishment,
    while a 50% error rate produces 44% of the original punishment.
    
    ---------
    
    Co-authored-by: Claude Opus 4.5 <[email protected]>
---
 docs/cn/auto_concurrency_limiter.md             |  18 +++
 src/brpc/policy/auto_concurrency_limiter.cpp    |  31 ++++-
 test/BUILD.bazel                                |  13 ++
 test/brpc_auto_concurrency_limiter_unittest.cpp | 168 ++++++++++++++++++++++++
 4 files changed, 229 insertions(+), 1 deletion(-)

diff --git a/docs/cn/auto_concurrency_limiter.md 
b/docs/cn/auto_concurrency_limiter.md
index 17ef5d7e..342e9ba6 100644
--- a/docs/cn/auto_concurrency_limiter.md
+++ b/docs/cn/auto_concurrency_limiter.md
@@ -154,3 +154,21 @@ netflix中的gradient算法公式为:max_concurrency = min_latency / 
latency *
 * gradient算法中的max_concurrency / latency从概念上和qps有关联(根据little's 
law),但可能严重脱节。比如在重测
 
min_latency前,若所有latency都小于min_latency,那么max_concurrency会不断下降甚至到0;但按照本算法,max_qps和min_latency仍然是稳定的,它们计算出的max_concurrency也不会剧烈变动。究其本质,gradient算法在迭代max_concurrency时,latency并不能代表实际并发为max_concurrency时的延时,两者是脱节的,所以max_concurrency
 / latency的实际物理含义不明,与qps可能差异甚大,最后导致了很大的偏差。
 * 
gradient算法的queue_size推荐为sqrt(max_concurrency),这是不合理的。netflix对queue_size的理解大概是代表各种不可控环节的缓存,比如socket里的,和max_concurrency存在一定的正向关系情有可原。但在我们的理解中,这部分queue_size作用微乎其微,没有或用常量即可。我们关注的queue_size是给concurrency上升留出的探索空间:
 
max_concurrency的更新是有延迟的,在并发从低到高的增长过程中,queue_size的作用就是在max_concurrency更新前不限制qps上升。而当concurrency高时,服务可能已经过载了,queue_size就应该小一点,防止进一步恶化延时。这里的queue_size和并发是反向关系。
+
+## 错误率惩罚阈值
+
+`auto_cl_error_rate_punish_threshold`用于设置错误率"死区",低于该阈值的错误率不会产生惩罚,避免少量错误请求对max_concurrency的过度影响。
+
+| GFlag | 默认值 | 有效范围 | 说明 |
+|-------|--------|----------|------|
+| auto_cl_error_rate_punish_threshold | 0 | [0, 1) | 错误率惩罚阈值,0表示禁用 |
+
+- **默认值为0**:禁用该功能,保持原有行为
+- **设置为有效值(如0.1)**:错误率 ≤ 阈值时惩罚为0;错误率 > 阈值时惩罚线性增长
+- **无效值处理**:≥1 的值会被忽略,等同于0
+
+**示例**:
+```
+# 错误率低于10%时不惩罚,高于10%时线性增加惩罚
+--auto_cl_error_rate_punish_threshold=0.1
+```
diff --git a/src/brpc/policy/auto_concurrency_limiter.cpp 
b/src/brpc/policy/auto_concurrency_limiter.cpp
index dd5a02ec..51ea56d7 100644
--- a/src/brpc/policy/auto_concurrency_limiter.cpp
+++ b/src/brpc/policy/auto_concurrency_limiter.cpp
@@ -77,6 +77,13 @@ DEFINE_int32(auto_cl_latency_fluctuation_correction_factor, 
1,
              "the value, the higher the tolerance for the fluctuation of the "
              "latency. If the value is too large, the latency will be higher "
              "when the server is overloaded.");
+DEFINE_double(auto_cl_error_rate_punish_threshold, 0,
+              "Threshold for error-rate-based punishment attenuation. "
+              "Valid range: [0, 1). 0 (default) disables the feature. "
+              "Values >= 1 are ignored and treated as 0. "
+              "e.g. 0.1: error rates below 10%% produce zero punishment; "
+              "above it the punishment scales linearly from 0 to full 
strength. "
+              "Only effective when auto_cl_enable_error_punish is true.");
 
 AutoConcurrencyLimiter::AutoConcurrencyLimiter()
     : _max_concurrency(FLAGS_auto_cl_initial_max_concurrency)
@@ -236,7 +243,29 @@ void AutoConcurrencyLimiter::AdjustMaxConcurrency(int 
next_max_concurrency) {
 void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
     int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
     double failed_punish = _sw.total_failed_us * 
FLAGS_auto_cl_fail_punish_ratio;
-    int64_t avg_latency = 
+
+    // Threshold-based attenuation: when 0 < threshold < 1, attenuate 
punishment
+    // based on error rate. Inspired by Sentinel's threshold-based circuit 
breaker:
+    // low error rates should not inflate avg_latency. Above threshold, 
punishment
+    // scales linearly from 0 to full strength.
+    // Invalid values (<=0 or >=1) skip this block entirely, preserving 
original behavior.
+    if (FLAGS_auto_cl_error_rate_punish_threshold > 0 &&
+        FLAGS_auto_cl_error_rate_punish_threshold < 1.0 &&
+        _sw.failed_count > 0) {
+        double threshold = FLAGS_auto_cl_error_rate_punish_threshold;
+        double error_rate = static_cast<double>(_sw.failed_count) /
+            (_sw.succ_count + _sw.failed_count);
+        if (error_rate <= threshold) {
+            // Error rate within dead zone, cancel punishment.
+            failed_punish = 0;
+        } else {
+            // Linear ramp: 0 at threshold, 1.0 at 100% error rate.
+            double punish_factor = (error_rate - threshold) / (1.0 - 
threshold);
+            failed_punish *= punish_factor;
+        }
+    }
+
+    int64_t avg_latency =
         std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
     double qps = 1000000.0 * total_succ_req / (sampling_time_us - 
_sw.start_time_us);
     UpdateMinLatency(avg_latency);
diff --git a/test/BUILD.bazel b/test/BUILD.bazel
index 05420ae3..66aef425 100644
--- a/test/BUILD.bazel
+++ b/test/BUILD.bazel
@@ -269,6 +269,19 @@ cc_test(
     ],
 )
 
+cc_test(
+    name = "brpc_auto_concurrency_limiter_test",
+    srcs = [
+        "brpc_auto_concurrency_limiter_unittest.cpp",
+    ],
+    copts = COPTS,
+    deps = [
+        "//:brpc",
+        "@com_google_googletest//:gtest",
+        "@com_google_googletest//:gtest_main",
+    ],
+)
+
 refresh_compile_commands(
     name = "brpc_test_compdb",
     # Specify the targets of interest.
diff --git a/test/brpc_auto_concurrency_limiter_unittest.cpp 
b/test/brpc_auto_concurrency_limiter_unittest.cpp
new file mode 100644
index 00000000..77163e2f
--- /dev/null
+++ b/test/brpc_auto_concurrency_limiter_unittest.cpp
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/policy/auto_concurrency_limiter.h"
+#include "butil/time.h"
+#include <gtest/gtest.h>
+
+namespace brpc {
+namespace policy {
+
+DECLARE_int32(auto_cl_sample_window_size_ms);
+DECLARE_int32(auto_cl_min_sample_count);
+DECLARE_int32(auto_cl_max_sample_count);
+DECLARE_bool(auto_cl_enable_error_punish);
+DECLARE_double(auto_cl_fail_punish_ratio);
+DECLARE_double(auto_cl_error_rate_punish_threshold);
+
+}  // namespace policy
+}  // namespace brpc
+
+class AutoConcurrencyLimiterTest : public ::testing::Test {
+protected:
+    void SetUp() override {
+        // Save original values
+        orig_sample_window_size_ms_ = 
brpc::policy::FLAGS_auto_cl_sample_window_size_ms;
+        orig_min_sample_count_ = brpc::policy::FLAGS_auto_cl_min_sample_count;
+        orig_max_sample_count_ = brpc::policy::FLAGS_auto_cl_max_sample_count;
+        orig_enable_error_punish_ = 
brpc::policy::FLAGS_auto_cl_enable_error_punish;
+        orig_fail_punish_ratio_ = 
brpc::policy::FLAGS_auto_cl_fail_punish_ratio;
+        orig_error_rate_threshold_ = 
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold;
+
+        // Set test-friendly values
+        brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 1000;
+        brpc::policy::FLAGS_auto_cl_min_sample_count = 5;
+        brpc::policy::FLAGS_auto_cl_max_sample_count = 200;
+        brpc::policy::FLAGS_auto_cl_enable_error_punish = true;
+        brpc::policy::FLAGS_auto_cl_fail_punish_ratio = 1.0;
+    }
+
+    void TearDown() override {
+        // Restore original values
+        brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 
orig_sample_window_size_ms_;
+        brpc::policy::FLAGS_auto_cl_min_sample_count = orig_min_sample_count_;
+        brpc::policy::FLAGS_auto_cl_max_sample_count = orig_max_sample_count_;
+        brpc::policy::FLAGS_auto_cl_enable_error_punish = 
orig_enable_error_punish_;
+        brpc::policy::FLAGS_auto_cl_fail_punish_ratio = 
orig_fail_punish_ratio_;
+        brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 
orig_error_rate_threshold_;
+    }
+
+private:
+    int32_t orig_sample_window_size_ms_;
+    int32_t orig_min_sample_count_;
+    int32_t orig_max_sample_count_;
+    bool orig_enable_error_punish_;
+    double orig_fail_punish_ratio_;
+    double orig_error_rate_threshold_;
+};
+
+// Helper function to add samples and trigger window completion
+// Uses synthetic timestamps instead of sleeping for faster, deterministic 
tests.
+// The final successful sample is used as the trigger, so actual counts match
+// succ_count/fail_count exactly (preserving intended error rates).
+void AddSamplesAndTriggerWindow(brpc::policy::AutoConcurrencyLimiter& limiter,
+                                 int succ_count, int64_t succ_latency,
+                                 int fail_count, int64_t fail_latency) {
+    ASSERT_GT(succ_count, 0) << "Need at least 1 success to trigger window";
+    int64_t now = butil::gettimeofday_us();
+
+    // Add successful samples (reserve one for the trigger)
+    for (int i = 0; i < succ_count - 1; ++i) {
+        limiter.AddSample(0, succ_latency, now);
+    }
+    // Add failed samples
+    for (int i = 0; i < fail_count; ++i) {
+        limiter.AddSample(1, fail_latency, now);
+    }
+
+    // Advance timestamp past window expiry instead of sleeping
+    int64_t after_window = now + 
brpc::policy::FLAGS_auto_cl_sample_window_size_ms * 1000 + 1000;
+
+    // Use the final success sample to trigger window submission
+    limiter.AddSample(0, succ_latency, after_window);
+}
+
+// Test 1: Backward compatibility - threshold=0 preserves original punishment 
behavior
+TEST_F(AutoConcurrencyLimiterTest, ThresholdZeroPreservesOriginalBehavior) {
+    brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0;
+    brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
+
+    brpc::policy::AutoConcurrencyLimiter limiter;
+    AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);
+
+    // 10% error rate, threshold=0 means full punishment applied
+    // avg_latency = ceil((10*1000 + 90*100) / 90) = ceil(211.1) = 212us
+    ASSERT_GT(limiter._min_latency_us, 180);
+    ASSERT_LT(limiter._min_latency_us, 250);
+}
+
+// Test 2: Dead zone - error rate below threshold produces zero punishment
+TEST_F(AutoConcurrencyLimiterTest, BelowThresholdZeroPunishment) {
+    brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.2;  // 20% 
threshold
+    brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
+
+    brpc::policy::AutoConcurrencyLimiter limiter;
+    AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);
+
+    // 10% error rate < 20% threshold, punishment should be zero
+    // avg_latency = 90*100 / 90 = 100us (no inflation)
+    ASSERT_GT(limiter._min_latency_us, 80);
+    ASSERT_LT(limiter._min_latency_us, 130);
+}
+
+// Test 3: Boundary - error rate exactly at threshold produces zero punishment
+TEST_F(AutoConcurrencyLimiterTest, ExactlyAtThresholdZeroPunishment) {
+    brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.1;  // 10% 
threshold
+    brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
+
+    brpc::policy::AutoConcurrencyLimiter limiter;
+    AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);
+
+    // 10% error rate == 10% threshold, punishment should be zero
+    // avg_latency = 90*100 / 90 = 100us
+    ASSERT_GT(limiter._min_latency_us, 80);
+    ASSERT_LT(limiter._min_latency_us, 130);
+}
+
+// Test 4: Linear scaling - above threshold, punishment scales proportionally
+TEST_F(AutoConcurrencyLimiterTest, AboveThresholdLinearScaling) {
+    brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.1;  // 10% 
threshold
+    brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
+
+    // Case A: 50% error rate
+    // punish_factor = (0.5 - 0.1) / (1.0 - 0.1) = 4/9 ≈ 0.444
+    // failed_punish = 50 * 1000 * (4/9) = 22222.2us
+    // avg_latency = ceil((22222.2 + 50*100) / 50) = ceil(544.4) = 545us
+    {
+        brpc::policy::AutoConcurrencyLimiter limiter;
+        AddSamplesAndTriggerWindow(limiter, 50, 100, 50, 1000);
+        ASSERT_GT(limiter._min_latency_us, 450);
+        ASSERT_LT(limiter._min_latency_us, 650);
+    }
+
+    // Case B: 90% error rate (near full punishment)
+    // punish_factor = (0.9 - 0.1) / (1.0 - 0.1) = 8/9 ≈ 0.889
+    // failed_punish = 90 * 1000 * (8/9) = 80000us
+    // avg_latency = ceil((80000 + 10*100) / 10) = ceil(8100) = 8100us
+    {
+        brpc::policy::AutoConcurrencyLimiter limiter;
+        AddSamplesAndTriggerWindow(limiter, 10, 100, 90, 1000);
+        ASSERT_GT(limiter._min_latency_us, 7000);
+        ASSERT_LT(limiter._min_latency_us, 9000);
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to