This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 976c588f circuit breaker with half open state (#2634) 976c588f is described below commit 976c588f011e28222cc667b0c3694dc88d684ab6 Author: jiangyt-git <64257436+jiangyt-...@users.noreply.github.com> AuthorDate: Mon Jun 3 16:25:35 2024 +0800 circuit breaker with half open state (#2634) * circuit breaker with half open state * add switch for half open state * add half open ut * add some description * record all lats of the half open window * fix some typo --------- Co-authored-by: jiangyuting <jiangyutingwan...@163.com> --- src/brpc/circuit_breaker.cpp | 32 +++++++++++++++++-- src/brpc/circuit_breaker.h | 2 ++ test/brpc_circuit_breaker_unittest.cpp | 57 ++++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 3 deletions(-) diff --git a/src/brpc/circuit_breaker.cpp b/src/brpc/circuit_breaker.cpp index 889fe65f..785ec77a 100644 --- a/src/brpc/circuit_breaker.cpp +++ b/src/brpc/circuit_breaker.cpp @@ -21,6 +21,7 @@ #include <gflags/gflags.h> #include "brpc/errno.pb.h" +#include "brpc/reloadable_flags.h" #include "butil/time.h" namespace brpc { @@ -45,6 +46,12 @@ DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000, "Maximum isolation duration in milliseconds"); DEFINE_double(circuit_breaker_epsilon_value, 0.02, "ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)"); +DEFINE_int32(circuit_breaker_half_open_window_size, 0, + "The limited number of requests allowed to pass through by the half-open " + "window. Only if all of them are successful, the circuit breaker will " + "go to the closed state. Otherwise, it goes back to the open state. " + "Values == 0 disables this feature"); +BRPC_VALIDATE_GFLAG(circuit_breaker_half_open_window_size, NonNegativeInteger); namespace { // EPSILON is used to generate the smoothing coefficient when calculating EMA. @@ -132,7 +139,7 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost, if (ema_latency != 0) { error_cost = std::min(ema_latency * max_mutiple, error_cost); } - //Errorous response + // Errorous response if (error_cost != 0) { int64_t ema_error_cost = _ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed); @@ -142,7 +149,7 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost, return ema_error_cost <= max_error_cost; } - //Ordinary response + // Ordinary response int64_t ema_error_cost = _ema_error_cost.load(butil::memory_order_relaxed); do { if (ema_error_cost == 0) { @@ -171,7 +178,9 @@ CircuitBreaker::CircuitBreaker() , _last_reset_time_ms(0) , _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms) , _isolated_times(0) - , _broken(false) { + , _broken(false) + , _half_open(false) + , _half_open_success_count(0) { } bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) { @@ -188,6 +197,19 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) { if (_broken.load(butil::memory_order_relaxed)) { return false; } + if (FLAGS_circuit_breaker_half_open_window_size > 0 + && _half_open.load(butil::memory_order_relaxed)) { + if (error_code != 0) { + MarkAsBroken(); + return false; + } + if (_half_open_success_count.fetch_add(1, butil::memory_order_relaxed) + + 1 == FLAGS_circuit_breaker_half_open_window_size) { + _half_open.store(false, butil::memory_order_relaxed); + _half_open_success_count.store(0, butil::memory_order_relaxed); + } + } + if (_long_window.OnCallEnd(error_code, latency) && _short_window.OnCallEnd(error_code, latency)) { return true; @@ -201,6 +223,10 @@ void CircuitBreaker::Reset() { _short_window.Reset(); _last_reset_time_ms = butil::cpuwide_time_ms(); _broken.store(false, butil::memory_order_release); + if (FLAGS_circuit_breaker_half_open_window_size > 0) { + _half_open.store(true, butil::memory_order_relaxed); + _half_open_success_count.store(0, butil::memory_order_relaxed); + } } void CircuitBreaker::MarkAsBroken() { diff --git a/src/brpc/circuit_breaker.h b/src/brpc/circuit_breaker.h index 826e6914..b16a4299 100644 --- a/src/brpc/circuit_breaker.h +++ b/src/brpc/circuit_breaker.h @@ -87,6 +87,8 @@ private: butil::atomic<int> _isolation_duration_ms; butil::atomic<int> _isolated_times; butil::atomic<bool> _broken; + butil::atomic<bool> _half_open; + butil::atomic<int32_t> _half_open_success_count; }; } // namespace brpc diff --git a/test/brpc_circuit_breaker_unittest.cpp b/test/brpc_circuit_breaker_unittest.cpp index ef09cd94..e8f55153 100644 --- a/test/brpc_circuit_breaker_unittest.cpp +++ b/test/brpc_circuit_breaker_unittest.cpp @@ -45,6 +45,7 @@ const int kErrorCodeForSucc = 0; const int kErrorCost = 1000; const int kLatency = 1000; const int kThreadNum = 3; +const int kHalfWindowSize = 0; } // namespace namespace brpc { @@ -54,6 +55,7 @@ DECLARE_int32(circuit_breaker_short_window_error_percent); DECLARE_int32(circuit_breaker_long_window_error_percent); DECLARE_int32(circuit_breaker_min_isolation_duration_ms); DECLARE_int32(circuit_breaker_max_isolation_duration_ms); +DECLARE_int32(circuit_breaker_half_open_window_size); } // namespace brpc int main(int argc, char* argv[]) { @@ -63,6 +65,7 @@ int main(int argc, char* argv[]) { brpc::FLAGS_circuit_breaker_long_window_error_percent = kLongWindowErrorPercent; brpc::FLAGS_circuit_breaker_min_isolation_duration_ms = kMinIsolationDurationMs; brpc::FLAGS_circuit_breaker_max_isolation_duration_ms = kMaxIsolationDurationMs; + brpc::FLAGS_circuit_breaker_half_open_window_size = kHalfWindowSize; testing::InitGoogleTest(&argc, argv); GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); return RUN_ALL_TESTS(); @@ -160,6 +163,60 @@ TEST_F(CircuitBreakerTest, should_isolate) { } } +TEST_F(CircuitBreakerTest, should_isolate_with_half_open) { + std::vector<pthread_t> thread_list; + std::vector<std::unique_ptr<FeedbackControl>> fc_list; + StartFeedbackThread(&thread_list, &fc_list, 100); + int total_failed = 0; + for (int i = 0; i < kThreadNum; ++i) { + void* ret_data = nullptr; + ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0); + FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data); + EXPECT_GT(fc->_unhealthy_cnt, 0); + EXPECT_FALSE(fc->_healthy); + total_failed += fc->_unhealthy_cnt; + } + _circuit_breaker.Reset(); + + int total_failed1 = 0; + StartFeedbackThread(&thread_list, &fc_list, 100); + for (int i = 0; i < kThreadNum; ++i) { + void* ret_data = nullptr; + ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0); + FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data); + EXPECT_FALSE(fc->_healthy); + EXPECT_LE(fc->_healthy_cnt, kShortWindowSize); + EXPECT_GT(fc->_unhealthy_cnt, 0); + total_failed1 += fc->_unhealthy_cnt; + } + + // Enable the half-open state. + // The first request cause _broken = true immediately. + brpc::FLAGS_circuit_breaker_half_open_window_size = 10; + _circuit_breaker.Reset(); + int total_failed2 = 0; + StartFeedbackThread(&thread_list, &fc_list, 100); + for (int i = 0; i < kThreadNum; ++i) { + void* ret_data = nullptr; + ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0); + FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data); + EXPECT_FALSE(fc->_healthy); + EXPECT_LE(fc->_healthy_cnt, kShortWindowSize); + EXPECT_GT(fc->_unhealthy_cnt, 0); + total_failed2 += fc->_unhealthy_cnt; + } + brpc::FLAGS_circuit_breaker_half_open_window_size = 0; + + EXPECT_EQ(kLongWindowSize * 2 * kThreadNum - + kShortWindowSize * + brpc::FLAGS_circuit_breaker_short_window_error_percent / + 100, + total_failed); + + EXPECT_EQ(total_failed1, total_failed); + EXPECT_EQ(kLongWindowSize * 2 * kThreadNum, total_failed2); +} + TEST_F(CircuitBreakerTest, isolation_duration_grow_and_reset) { std::vector<pthread_t> thread_list; std::vector<std::unique_ptr<FeedbackControl>> fc_list; --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org