This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 59a68dd6 Fix max concurrency of thrift protocol and nshead protocol (#2613) 59a68dd6 is described below commit 59a68dd65e250c96a81b9d162018797bdbf3e9be Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Mon Jun 3 16:50:09 2024 +0800 Fix max concurrency of thrift protocol and nshead protocol (#2613) --- src/brpc/nshead_service.h | 13 +++++++------ src/brpc/server.cpp | 47 ++++++++++++++++++++++++++++++++++++++--------- src/brpc/server.h | 21 +++++++++++++++++++++ src/brpc/thrift_service.h | 6 ++++-- 4 files changed, 70 insertions(+), 17 deletions(-) diff --git a/src/brpc/nshead_service.h b/src/brpc/nshead_service.h index c90c7873..49ff9d79 100644 --- a/src/brpc/nshead_service.h +++ b/src/brpc/nshead_service.h @@ -22,7 +22,7 @@ #include "brpc/controller.h" // Controller #include "brpc/nshead_message.h" // NsheadMessage #include "brpc/describable.h" - +#include "brpc/adaptive_max_concurrency.h" namespace brpc { @@ -40,7 +40,7 @@ public: explicit NsheadClosure(void* additional_space); // [Required] Call this to send response back to the client. - void Run(); + void Run() override; // [Optional] Set the full method name. If unset, use name of the service. void SetMethodName(const std::string& full_method_name); @@ -59,7 +59,7 @@ private: friend void policy::ProcessNsheadRequest(InputMessageBase* msg_base); friend class DeleteNsheadClosure; // Only callable by Run(). - ~NsheadClosure(); + ~NsheadClosure() override; const Server* _server; int64_t _received_us; @@ -84,8 +84,8 @@ struct NsheadServiceOptions { class NsheadService : public Describable { public: NsheadService(); - NsheadService(const NsheadServiceOptions&); - virtual ~NsheadService(); + explicit NsheadService(const NsheadServiceOptions&); + ~NsheadService() override; // Implement this method to handle nshead requests. Notice that this // method can be called with a failed Controller(something wrong with the @@ -104,7 +104,7 @@ public: NsheadClosure* done) = 0; // Put descriptions into the stream. - void Describe(std::ostream &os, const DescribeOptions&) const; + void Describe(std::ostream &os, const DescribeOptions&) const override; private: DISALLOW_COPY_AND_ASSIGN(NsheadService); @@ -118,6 +118,7 @@ private: // Tracking status of non NsheadPbService MethodStatus* _status; + AdaptiveMaxConcurrency _max_concurrency; size_t _additional_space; std::string _cached_name; }; diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 93228289..51fb1d16 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -738,8 +738,8 @@ static int get_port_from_fd(int fd) { return ntohs(addr.sin_port); } -static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc, - ConcurrencyLimiter** out) { +bool Server::CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc, + ConcurrencyLimiter** out) { if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) { *out = NULL; return true; @@ -1055,6 +1055,15 @@ int Server::StartInternal(const butil::EndPoint& endpoint, it->second.status->SetConcurrencyLimiter(cl); } } + if (0 != SetServiceMaxConcurrency(_options.nshead_service)) { + return -1; + } +#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL + if (0 != SetServiceMaxConcurrency(_options.thrift_service)) { + return -1; + } +#endif + // Create listening ports if (port_range.min_port > port_range.max_port) { @@ -2216,13 +2225,33 @@ int Server::MaxConcurrencyOf(const MethodProperty* mp) const { } AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) { - MethodProperty* mp = _method_map.seek(full_method_name); - if (mp == NULL) { - LOG(ERROR) << "Fail to find method=" << full_method_name; - _failed_to_set_max_concurrency_of_method = true; - return g_default_max_concurrency_of_method; - } - return MaxConcurrencyOf(mp); + do { + if (full_method_name == butil::class_name_str<NsheadService>()) { + if (NULL == options().nshead_service) { + break; + } + return options().nshead_service->_max_concurrency; + } +#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL + if (full_method_name == butil::class_name_str<ThriftService>()) { + if (NULL == options().thrift_service) { + break; + } + return options().thrift_service->_max_concurrency; + } +#endif + + MethodProperty* mp = _method_map.seek(full_method_name); + if (mp == NULL) { + break; + } + return MaxConcurrencyOf(mp); + + } while (false); + + LOG(ERROR) << "Fail to find method=" << full_method_name; + _failed_to_set_max_concurrency_of_method = true; + return g_default_max_concurrency_of_method; } int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const { diff --git a/src/brpc/server.h b/src/brpc/server.h index 4843d0d0..5bc518ef 100644 --- a/src/brpc/server.h +++ b/src/brpc/server.h @@ -42,6 +42,7 @@ #include "brpc/http2.h" #include "brpc/redis.h" #include "brpc/interceptor.h" +#include "brpc/concurrency_limiter.h" namespace brpc { @@ -674,6 +675,26 @@ friend class Controller; AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*); int MaxConcurrencyOf(const MethodProperty*) const; + static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc, + ConcurrencyLimiter** out); + + template <typename T> + int SetServiceMaxConcurrency(T* service) { + if (NULL != service) { + const AdaptiveMaxConcurrency* amc = &service->_max_concurrency; + if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) { + amc = &_options.method_max_concurrency; + } + ConcurrencyLimiter* cl = NULL; + if (!CreateConcurrencyLimiter(*amc, &cl)) { + LOG(ERROR) << "Fail to create ConcurrencyLimiter for method"; + return -1; + } + service->_status->SetConcurrencyLimiter(cl); + } + return 0; + } + DISALLOW_COPY_AND_ASSIGN(Server); // Put frequently-accessed data pool at first. diff --git a/src/brpc/thrift_service.h b/src/brpc/thrift_service.h index c3d341e0..bd4ca44a 100644 --- a/src/brpc/thrift_service.h +++ b/src/brpc/thrift_service.h @@ -22,6 +22,7 @@ #include "brpc/controller.h" // Controller #include "brpc/thrift_message.h" // ThriftFramedMessage #include "brpc/describable.h" +#include "brpc/adaptive_max_concurrency.h" namespace brpc { @@ -38,7 +39,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base); class ThriftService : public Describable { public: ThriftService(); - virtual ~ThriftService(); + ~ThriftService() override; // Implement this method to handle thrift_binary requests. // Parameters: @@ -53,7 +54,7 @@ public: ::google::protobuf::Closure* done) = 0; // Put descriptions into the stream. - void Describe(std::ostream &os, const DescribeOptions&) const; + void Describe(std::ostream &os, const DescribeOptions&) const override; private: DISALLOW_COPY_AND_ASSIGN(ThriftService); @@ -66,6 +67,7 @@ private: void Expose(const butil::StringPiece& prefix); MethodStatus* _status; + AdaptiveMaxConcurrency _max_concurrency; }; } // namespace brpc --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org