This is an automated email from the ASF dual-hosted git repository.
leander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new bb081bef feat(auto_cl): add error rate threshold for punishment
attenuation (#3219)
bb081bef is described below
commit bb081befc0479b2d99b39d865fa000780da9cd55
Author: yanfeng <[email protected]>
AuthorDate: Thu Feb 12 14:02:26 2026 +0800
feat(auto_cl): add error rate threshold for punishment attenuation (#3219)
* feat(auto_cl): add error rate threshold for punishment attenuation
Add new GFlag `auto_cl_error_rate_punish_threshold` to enable
error-rate-based punishment attenuation in AutoConcurrencyLimiter.
Problem: Low error rates (e.g., 1.3% sporadic timeouts) cause
disproportionate avg_latency inflation (+31%), leading the limiter
to mistakenly shrink max_concurrency and trigger ELIMIT rejections.
Solution: Inspired by Alibaba Sentinel's threshold-based approach:
- threshold=0 (default): Original behavior preserved (backward compat)
- threshold>0 (e.g., 0.1): Error rates below threshold produce zero
punishment; above it, punishment scales linearly from 0 to full
Example: With threshold=0.1, a 5% error rate produces no punishment,
while a 50% error rate produces 44% of the original punishment.
---------
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
docs/cn/auto_concurrency_limiter.md | 18 +++
src/brpc/policy/auto_concurrency_limiter.cpp | 31 ++++-
test/BUILD.bazel | 13 ++
test/brpc_auto_concurrency_limiter_unittest.cpp | 168 ++++++++++++++++++++++++
4 files changed, 229 insertions(+), 1 deletion(-)
diff --git a/docs/cn/auto_concurrency_limiter.md
b/docs/cn/auto_concurrency_limiter.md
index 17ef5d7e..342e9ba6 100644
--- a/docs/cn/auto_concurrency_limiter.md
+++ b/docs/cn/auto_concurrency_limiter.md
@@ -154,3 +154,21 @@ netflix中的gradient算法公式为:max_concurrency = min_latency /
latency *
* gradient算法中的max_concurrency / latency从概念上和qps有关联(根据little's
law),但可能严重脱节。比如在重测
min_latency前,若所有latency都小于min_latency,那么max_concurrency会不断下降甚至到0;但按照本算法,max_qps和min_latency仍然是稳定的,它们计算出的max_concurrency也不会剧烈变动。究其本质,gradient算法在迭代max_concurrency时,latency并不能代表实际并发为max_concurrency时的延时,两者是脱节的,所以max_concurrency
/ latency的实际物理含义不明,与qps可能差异甚大,最后导致了很大的偏差。
*
gradient算法的queue_size推荐为sqrt(max_concurrency),这是不合理的。netflix对queue_size的理解大概是代表各种不可控环节的缓存,比如socket里的,和max_concurrency存在一定的正向关系情有可原。但在我们的理解中,这部分queue_size作用微乎其微,没有或用常量即可。我们关注的queue_size是给concurrency上升留出的探索空间:
max_concurrency的更新是有延迟的,在并发从低到高的增长过程中,queue_size的作用就是在max_concurrency更新前不限制qps上升。而当concurrency高时,服务可能已经过载了,queue_size就应该小一点,防止进一步恶化延时。这里的queue_size和并发是反向关系。
+
+## 错误率惩罚阈值
+
+`auto_cl_error_rate_punish_threshold`用于设置错误率"死区",低于该阈值的错误率不会产生惩罚,避免少量错误请求对max_concurrency的过度影响。
+
+| GFlag | 默认值 | 有效范围 | 说明 |
+|-------|--------|----------|------|
+| auto_cl_error_rate_punish_threshold | 0 | [0, 1) | 错误率惩罚阈值,0表示禁用 |
+
+- **默认值为0**:禁用该功能,保持原有行为
+- **设置为有效值(如0.1)**:错误率 ≤ 阈值时惩罚为0;错误率 > 阈值时惩罚线性增长
+- **无效值处理**:≥1 的值会被忽略,等同于0
+
+**示例**:
+```
+# 错误率低于10%时不惩罚,高于10%时线性增加惩罚
+--auto_cl_error_rate_punish_threshold=0.1
+```
diff --git a/src/brpc/policy/auto_concurrency_limiter.cpp
b/src/brpc/policy/auto_concurrency_limiter.cpp
index dd5a02ec..51ea56d7 100644
--- a/src/brpc/policy/auto_concurrency_limiter.cpp
+++ b/src/brpc/policy/auto_concurrency_limiter.cpp
@@ -77,6 +77,13 @@ DEFINE_int32(auto_cl_latency_fluctuation_correction_factor,
1,
"the value, the higher the tolerance for the fluctuation of the "
"latency. If the value is too large, the latency will be higher "
"when the server is overloaded.");
+DEFINE_double(auto_cl_error_rate_punish_threshold, 0,
+ "Threshold for error-rate-based punishment attenuation. "
+ "Valid range: [0, 1). 0 (default) disables the feature. "
+ "Values >= 1 are ignored and treated as 0. "
+ "e.g. 0.1: error rates below 10%% produce zero punishment; "
+ "above it the punishment scales linearly from 0 to full
strength. "
+ "Only effective when auto_cl_enable_error_punish is true.");
AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _max_concurrency(FLAGS_auto_cl_initial_max_concurrency)
@@ -236,7 +243,29 @@ void AutoConcurrencyLimiter::AdjustMaxConcurrency(int
next_max_concurrency) {
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
double failed_punish = _sw.total_failed_us *
FLAGS_auto_cl_fail_punish_ratio;
- int64_t avg_latency =
+
+ // Threshold-based attenuation: when 0 < threshold < 1, attenuate
punishment
+ // based on error rate. Inspired by Sentinel's threshold-based circuit
breaker:
+ // low error rates should not inflate avg_latency. Above threshold,
punishment
+ // scales linearly from 0 to full strength.
+ // Invalid values (<=0 or >=1) skip this block entirely, preserving
original behavior.
+ if (FLAGS_auto_cl_error_rate_punish_threshold > 0 &&
+ FLAGS_auto_cl_error_rate_punish_threshold < 1.0 &&
+ _sw.failed_count > 0) {
+ double threshold = FLAGS_auto_cl_error_rate_punish_threshold;
+ double error_rate = static_cast<double>(_sw.failed_count) /
+ (_sw.succ_count + _sw.failed_count);
+ if (error_rate <= threshold) {
+ // Error rate within dead zone, cancel punishment.
+ failed_punish = 0;
+ } else {
+ // Linear ramp: 0 at threshold, 1.0 at 100% error rate.
+ double punish_factor = (error_rate - threshold) / (1.0 -
threshold);
+ failed_punish *= punish_factor;
+ }
+ }
+
+ int64_t avg_latency =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
double qps = 1000000.0 * total_succ_req / (sampling_time_us -
_sw.start_time_us);
UpdateMinLatency(avg_latency);
diff --git a/test/BUILD.bazel b/test/BUILD.bazel
index 05420ae3..66aef425 100644
--- a/test/BUILD.bazel
+++ b/test/BUILD.bazel
@@ -269,6 +269,19 @@ cc_test(
],
)
+cc_test(
+ name = "brpc_auto_concurrency_limiter_test",
+ srcs = [
+ "brpc_auto_concurrency_limiter_unittest.cpp",
+ ],
+ copts = COPTS,
+ deps = [
+ "//:brpc",
+ "@com_google_googletest//:gtest",
+ "@com_google_googletest//:gtest_main",
+ ],
+)
+
refresh_compile_commands(
name = "brpc_test_compdb",
# Specify the targets of interest.
diff --git a/test/brpc_auto_concurrency_limiter_unittest.cpp
b/test/brpc_auto_concurrency_limiter_unittest.cpp
new file mode 100644
index 00000000..77163e2f
--- /dev/null
+++ b/test/brpc_auto_concurrency_limiter_unittest.cpp
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/policy/auto_concurrency_limiter.h"
+#include "butil/time.h"
+#include <gtest/gtest.h>
+
+namespace brpc {
+namespace policy {
+
+DECLARE_int32(auto_cl_sample_window_size_ms);
+DECLARE_int32(auto_cl_min_sample_count);
+DECLARE_int32(auto_cl_max_sample_count);
+DECLARE_bool(auto_cl_enable_error_punish);
+DECLARE_double(auto_cl_fail_punish_ratio);
+DECLARE_double(auto_cl_error_rate_punish_threshold);
+
+} // namespace policy
+} // namespace brpc
+
+class AutoConcurrencyLimiterTest : public ::testing::Test {
+protected:
+ void SetUp() override {
+ // Save original values
+ orig_sample_window_size_ms_ =
brpc::policy::FLAGS_auto_cl_sample_window_size_ms;
+ orig_min_sample_count_ = brpc::policy::FLAGS_auto_cl_min_sample_count;
+ orig_max_sample_count_ = brpc::policy::FLAGS_auto_cl_max_sample_count;
+ orig_enable_error_punish_ =
brpc::policy::FLAGS_auto_cl_enable_error_punish;
+ orig_fail_punish_ratio_ =
brpc::policy::FLAGS_auto_cl_fail_punish_ratio;
+ orig_error_rate_threshold_ =
brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold;
+
+ // Set test-friendly values
+ brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 1000;
+ brpc::policy::FLAGS_auto_cl_min_sample_count = 5;
+ brpc::policy::FLAGS_auto_cl_max_sample_count = 200;
+ brpc::policy::FLAGS_auto_cl_enable_error_punish = true;
+ brpc::policy::FLAGS_auto_cl_fail_punish_ratio = 1.0;
+ }
+
+ void TearDown() override {
+ // Restore original values
+ brpc::policy::FLAGS_auto_cl_sample_window_size_ms =
orig_sample_window_size_ms_;
+ brpc::policy::FLAGS_auto_cl_min_sample_count = orig_min_sample_count_;
+ brpc::policy::FLAGS_auto_cl_max_sample_count = orig_max_sample_count_;
+ brpc::policy::FLAGS_auto_cl_enable_error_punish =
orig_enable_error_punish_;
+ brpc::policy::FLAGS_auto_cl_fail_punish_ratio =
orig_fail_punish_ratio_;
+ brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold =
orig_error_rate_threshold_;
+ }
+
+private:
+ int32_t orig_sample_window_size_ms_;
+ int32_t orig_min_sample_count_;
+ int32_t orig_max_sample_count_;
+ bool orig_enable_error_punish_;
+ double orig_fail_punish_ratio_;
+ double orig_error_rate_threshold_;
+};
+
+// Helper function to add samples and trigger window completion
+// Uses synthetic timestamps instead of sleeping for faster, deterministic
tests.
+// The final successful sample is used as the trigger, so actual counts match
+// succ_count/fail_count exactly (preserving intended error rates).
+void AddSamplesAndTriggerWindow(brpc::policy::AutoConcurrencyLimiter& limiter,
+ int succ_count, int64_t succ_latency,
+ int fail_count, int64_t fail_latency) {
+ ASSERT_GT(succ_count, 0) << "Need at least 1 success to trigger window";
+ int64_t now = butil::gettimeofday_us();
+
+ // Add successful samples (reserve one for the trigger)
+ for (int i = 0; i < succ_count - 1; ++i) {
+ limiter.AddSample(0, succ_latency, now);
+ }
+ // Add failed samples
+ for (int i = 0; i < fail_count; ++i) {
+ limiter.AddSample(1, fail_latency, now);
+ }
+
+ // Advance timestamp past window expiry instead of sleeping
+ int64_t after_window = now +
brpc::policy::FLAGS_auto_cl_sample_window_size_ms * 1000 + 1000;
+
+ // Use the final success sample to trigger window submission
+ limiter.AddSample(0, succ_latency, after_window);
+}
+
+// Test 1: Backward compatibility - threshold=0 preserves original punishment
behavior
+TEST_F(AutoConcurrencyLimiterTest, ThresholdZeroPreservesOriginalBehavior) {
+ brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0;
+ brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
+
+ brpc::policy::AutoConcurrencyLimiter limiter;
+ AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);
+
+ // 10% error rate, threshold=0 means full punishment applied
+ // avg_latency = ceil((10*1000 + 90*100) / 90) = ceil(211.1) = 212us
+ ASSERT_GT(limiter._min_latency_us, 180);
+ ASSERT_LT(limiter._min_latency_us, 250);
+}
+
+// Test 2: Dead zone - error rate below threshold produces zero punishment
+TEST_F(AutoConcurrencyLimiterTest, BelowThresholdZeroPunishment) {
+ brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.2; // 20%
threshold
+ brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
+
+ brpc::policy::AutoConcurrencyLimiter limiter;
+ AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);
+
+ // 10% error rate < 20% threshold, punishment should be zero
+ // avg_latency = 90*100 / 90 = 100us (no inflation)
+ ASSERT_GT(limiter._min_latency_us, 80);
+ ASSERT_LT(limiter._min_latency_us, 130);
+}
+
+// Test 3: Boundary - error rate exactly at threshold produces zero punishment
+TEST_F(AutoConcurrencyLimiterTest, ExactlyAtThresholdZeroPunishment) {
+ brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.1; // 10%
threshold
+ brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
+
+ brpc::policy::AutoConcurrencyLimiter limiter;
+ AddSamplesAndTriggerWindow(limiter, 90, 100, 10, 1000);
+
+ // 10% error rate == 10% threshold, punishment should be zero
+ // avg_latency = 90*100 / 90 = 100us
+ ASSERT_GT(limiter._min_latency_us, 80);
+ ASSERT_LT(limiter._min_latency_us, 130);
+}
+
+// Test 4: Linear scaling - above threshold, punishment scales proportionally
+TEST_F(AutoConcurrencyLimiterTest, AboveThresholdLinearScaling) {
+ brpc::policy::FLAGS_auto_cl_error_rate_punish_threshold = 0.1; // 10%
threshold
+ brpc::policy::FLAGS_auto_cl_sample_window_size_ms = 10;
+
+ // Case A: 50% error rate
+ // punish_factor = (0.5 - 0.1) / (1.0 - 0.1) = 4/9 ≈ 0.444
+ // failed_punish = 50 * 1000 * (4/9) = 22222.2us
+ // avg_latency = ceil((22222.2 + 50*100) / 50) = ceil(544.4) = 545us
+ {
+ brpc::policy::AutoConcurrencyLimiter limiter;
+ AddSamplesAndTriggerWindow(limiter, 50, 100, 50, 1000);
+ ASSERT_GT(limiter._min_latency_us, 450);
+ ASSERT_LT(limiter._min_latency_us, 650);
+ }
+
+ // Case B: 90% error rate (near full punishment)
+ // punish_factor = (0.9 - 0.1) / (1.0 - 0.1) = 8/9 ≈ 0.889
+ // failed_punish = 90 * 1000 * (8/9) = 80000us
+ // avg_latency = ceil((80000 + 10*100) / 10) = ceil(8100) = 8100us
+ {
+ brpc::policy::AutoConcurrencyLimiter limiter;
+ AddSamplesAndTriggerWindow(limiter, 10, 100, 90, 1000);
+ ASSERT_GT(limiter._min_latency_us, 7000);
+ ASSERT_LT(limiter._min_latency_us, 9000);
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]