This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 976c588f circuit breaker with half open state (#2634)
976c588f is described below

commit 976c588f011e28222cc667b0c3694dc88d684ab6
Author: jiangyt-git <64257436+jiangyt-...@users.noreply.github.com>
AuthorDate: Mon Jun 3 16:25:35 2024 +0800

    circuit breaker with half open state (#2634)
    
    * circuit breaker with half open state
    
    * add switch for half open state
    
    * add half open ut
    
    * add some description
    
    * record all lats of the half open window
    
    * fix some typo
    
    ---------
    
    Co-authored-by: jiangyuting <jiangyutingwan...@163.com>
---
 src/brpc/circuit_breaker.cpp           | 32 +++++++++++++++++--
 src/brpc/circuit_breaker.h             |  2 ++
 test/brpc_circuit_breaker_unittest.cpp | 57 ++++++++++++++++++++++++++++++++++
 3 files changed, 88 insertions(+), 3 deletions(-)

diff --git a/src/brpc/circuit_breaker.cpp b/src/brpc/circuit_breaker.cpp
index 889fe65f..785ec77a 100644
--- a/src/brpc/circuit_breaker.cpp
+++ b/src/brpc/circuit_breaker.cpp
@@ -21,6 +21,7 @@
 #include <gflags/gflags.h>
 
 #include "brpc/errno.pb.h"
+#include "brpc/reloadable_flags.h"
 #include "butil/time.h"
 
 namespace brpc {
@@ -45,6 +46,12 @@ DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 
30000,
     "Maximum isolation duration in milliseconds");
 DEFINE_double(circuit_breaker_epsilon_value, 0.02,
     "ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)");
+DEFINE_int32(circuit_breaker_half_open_window_size, 0,
+    "The limited number of requests allowed to pass through by the half-open "
+    "window. Only if all of them are successful, the circuit breaker will "
+    "go to the closed state. Otherwise, it goes back to the open state. "
+    "Values == 0 disables this feature");
+BRPC_VALIDATE_GFLAG(circuit_breaker_half_open_window_size, NonNegativeInteger);
 
 namespace {
 // EPSILON is used to generate the smoothing coefficient when calculating EMA.
@@ -132,7 +139,7 @@ bool 
CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
     if (ema_latency != 0) {
         error_cost = std::min(ema_latency * max_mutiple, error_cost);
     }
-    //Errorous response
+    // Errorous response
     if (error_cost != 0) {
         int64_t ema_error_cost =
             _ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed);
@@ -142,7 +149,7 @@ bool 
CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
         return ema_error_cost <= max_error_cost;
     }
 
-    //Ordinary response
+    // Ordinary response
     int64_t ema_error_cost = _ema_error_cost.load(butil::memory_order_relaxed);
     do {
         if (ema_error_cost == 0) {
@@ -171,7 +178,9 @@ CircuitBreaker::CircuitBreaker()
     , _last_reset_time_ms(0)
     , _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms)
     , _isolated_times(0)
-    , _broken(false) {
+    , _broken(false)
+    , _half_open(false)
+    , _half_open_success_count(0) {
 }
 
 bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
@@ -188,6 +197,19 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t 
latency) {
     if (_broken.load(butil::memory_order_relaxed)) {
         return false;
     }
+    if (FLAGS_circuit_breaker_half_open_window_size > 0
+        && _half_open.load(butil::memory_order_relaxed)) {
+        if (error_code != 0) {
+            MarkAsBroken();
+            return false;
+        }
+        if (_half_open_success_count.fetch_add(1, butil::memory_order_relaxed)
+                + 1 == FLAGS_circuit_breaker_half_open_window_size) {
+            _half_open.store(false, butil::memory_order_relaxed);
+            _half_open_success_count.store(0, butil::memory_order_relaxed);
+        }
+    }
+
     if (_long_window.OnCallEnd(error_code, latency) &&
         _short_window.OnCallEnd(error_code, latency)) {
         return true;
@@ -201,6 +223,10 @@ void CircuitBreaker::Reset() {
     _short_window.Reset();
     _last_reset_time_ms = butil::cpuwide_time_ms();
     _broken.store(false, butil::memory_order_release);
+    if (FLAGS_circuit_breaker_half_open_window_size > 0) {
+        _half_open.store(true, butil::memory_order_relaxed);
+        _half_open_success_count.store(0, butil::memory_order_relaxed);
+    }
 }
 
 void CircuitBreaker::MarkAsBroken() {
diff --git a/src/brpc/circuit_breaker.h b/src/brpc/circuit_breaker.h
index 826e6914..b16a4299 100644
--- a/src/brpc/circuit_breaker.h
+++ b/src/brpc/circuit_breaker.h
@@ -87,6 +87,8 @@ private:
     butil::atomic<int> _isolation_duration_ms;
     butil::atomic<int> _isolated_times;
     butil::atomic<bool> _broken;
+    butil::atomic<bool> _half_open;
+    butil::atomic<int32_t> _half_open_success_count;
 };
 
 }  // namespace brpc
diff --git a/test/brpc_circuit_breaker_unittest.cpp 
b/test/brpc_circuit_breaker_unittest.cpp
index ef09cd94..e8f55153 100644
--- a/test/brpc_circuit_breaker_unittest.cpp
+++ b/test/brpc_circuit_breaker_unittest.cpp
@@ -45,6 +45,7 @@ const int kErrorCodeForSucc = 0;
 const int kErrorCost = 1000;
 const int kLatency = 1000;
 const int kThreadNum = 3;
+const int kHalfWindowSize = 0;
 } // namespace
 
 namespace brpc {
@@ -54,6 +55,7 @@ DECLARE_int32(circuit_breaker_short_window_error_percent);
 DECLARE_int32(circuit_breaker_long_window_error_percent);
 DECLARE_int32(circuit_breaker_min_isolation_duration_ms);
 DECLARE_int32(circuit_breaker_max_isolation_duration_ms);
+DECLARE_int32(circuit_breaker_half_open_window_size);
 } // namespace brpc
 
 int main(int argc, char* argv[]) {
@@ -63,6 +65,7 @@ int main(int argc, char* argv[]) {
     brpc::FLAGS_circuit_breaker_long_window_error_percent = 
kLongWindowErrorPercent;
     brpc::FLAGS_circuit_breaker_min_isolation_duration_ms = 
kMinIsolationDurationMs;
     brpc::FLAGS_circuit_breaker_max_isolation_duration_ms = 
kMaxIsolationDurationMs;
+    brpc::FLAGS_circuit_breaker_half_open_window_size = kHalfWindowSize;
     testing::InitGoogleTest(&argc, argv);
     GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
     return RUN_ALL_TESTS();
@@ -160,6 +163,60 @@ TEST_F(CircuitBreakerTest, should_isolate) {
     }
 }
 
+TEST_F(CircuitBreakerTest, should_isolate_with_half_open) {
+    std::vector<pthread_t> thread_list;
+    std::vector<std::unique_ptr<FeedbackControl>> fc_list;
+    StartFeedbackThread(&thread_list, &fc_list, 100);
+    int total_failed = 0;
+    for (int  i = 0; i < kThreadNum; ++i) {
+        void* ret_data = nullptr;
+        ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
+        FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
+        EXPECT_GT(fc->_unhealthy_cnt, 0);
+        EXPECT_FALSE(fc->_healthy);
+        total_failed += fc->_unhealthy_cnt;
+    }
+    _circuit_breaker.Reset();
+
+    int total_failed1 = 0;
+    StartFeedbackThread(&thread_list, &fc_list, 100);
+    for (int  i = 0; i < kThreadNum; ++i) {
+        void* ret_data = nullptr;
+        ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
+        FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
+        EXPECT_FALSE(fc->_healthy);
+        EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
+        EXPECT_GT(fc->_unhealthy_cnt, 0);
+        total_failed1 += fc->_unhealthy_cnt;
+    }
+
+    // Enable the half-open state.
+    // The first request cause _broken = true immediately.
+    brpc::FLAGS_circuit_breaker_half_open_window_size = 10;
+    _circuit_breaker.Reset();
+    int total_failed2 = 0;
+    StartFeedbackThread(&thread_list, &fc_list, 100);
+    for (int  i = 0; i < kThreadNum; ++i) {
+        void* ret_data = nullptr;
+        ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
+        FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
+        EXPECT_FALSE(fc->_healthy);
+        EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
+        EXPECT_GT(fc->_unhealthy_cnt, 0);
+        total_failed2 += fc->_unhealthy_cnt;
+    }
+    brpc::FLAGS_circuit_breaker_half_open_window_size = 0;
+
+    EXPECT_EQ(kLongWindowSize * 2 * kThreadNum -
+                  kShortWindowSize *
+                      brpc::FLAGS_circuit_breaker_short_window_error_percent /
+                      100,
+              total_failed);
+
+    EXPECT_EQ(total_failed1, total_failed);
+    EXPECT_EQ(kLongWindowSize * 2 * kThreadNum, total_failed2);
+}
+
 TEST_F(CircuitBreakerTest, isolation_duration_grow_and_reset) {
     std::vector<pthread_t> thread_list;
     std::vector<std::unique_ptr<FeedbackControl>> fc_list;


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to