This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 59a68dd6 Fix max concurrency of thrift protocol and nshead protocol 
(#2613)
59a68dd6 is described below

commit 59a68dd65e250c96a81b9d162018797bdbf3e9be
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Mon Jun 3 16:50:09 2024 +0800

    Fix max concurrency of thrift protocol and nshead protocol (#2613)
---
 src/brpc/nshead_service.h | 13 +++++++------
 src/brpc/server.cpp       | 47 ++++++++++++++++++++++++++++++++++++++---------
 src/brpc/server.h         | 21 +++++++++++++++++++++
 src/brpc/thrift_service.h |  6 ++++--
 4 files changed, 70 insertions(+), 17 deletions(-)

diff --git a/src/brpc/nshead_service.h b/src/brpc/nshead_service.h
index c90c7873..49ff9d79 100644
--- a/src/brpc/nshead_service.h
+++ b/src/brpc/nshead_service.h
@@ -22,7 +22,7 @@
 #include "brpc/controller.h"                 // Controller
 #include "brpc/nshead_message.h"             // NsheadMessage
 #include "brpc/describable.h"
-
+#include "brpc/adaptive_max_concurrency.h"
 
 namespace brpc {
 
@@ -40,7 +40,7 @@ public:
     explicit NsheadClosure(void* additional_space);
 
     // [Required] Call this to send response back to the client.
-    void Run();
+    void Run() override;
 
     // [Optional] Set the full method name. If unset, use name of the service.
     void SetMethodName(const std::string& full_method_name);
@@ -59,7 +59,7 @@ private:
 friend void policy::ProcessNsheadRequest(InputMessageBase* msg_base);
 friend class DeleteNsheadClosure;
     // Only callable by Run().
-    ~NsheadClosure();
+    ~NsheadClosure() override;
 
     const Server* _server;
     int64_t _received_us;
@@ -84,8 +84,8 @@ struct NsheadServiceOptions {
 class NsheadService : public Describable {
 public:
     NsheadService();
-    NsheadService(const NsheadServiceOptions&);
-    virtual ~NsheadService();
+    explicit NsheadService(const NsheadServiceOptions&);
+    ~NsheadService() override;
 
     // Implement this method to handle nshead requests. Notice that this
     // method can be called with a failed Controller(something wrong with the
@@ -104,7 +104,7 @@ public:
                                       NsheadClosure* done) = 0;
 
     // Put descriptions into the stream.
-    void Describe(std::ostream &os, const DescribeOptions&) const;
+    void Describe(std::ostream &os, const DescribeOptions&) const override;
 
 private:
 DISALLOW_COPY_AND_ASSIGN(NsheadService);
@@ -118,6 +118,7 @@ private:
     
     // Tracking status of non NsheadPbService
     MethodStatus* _status;
+    AdaptiveMaxConcurrency _max_concurrency;
     size_t _additional_space;
     std::string _cached_name;
 };
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 93228289..51fb1d16 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -738,8 +738,8 @@ static int get_port_from_fd(int fd) {
     return ntohs(addr.sin_port);
 }
 
-static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
-                                     ConcurrencyLimiter** out) {
+bool Server::CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
+                                      ConcurrencyLimiter** out) {
     if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
         *out = NULL;
         return true;
@@ -1055,6 +1055,15 @@ int Server::StartInternal(const butil::EndPoint& 
endpoint,
             it->second.status->SetConcurrencyLimiter(cl);
         }
     }
+    if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
+        return -1;
+    }
+#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
+    if (0 != SetServiceMaxConcurrency(_options.thrift_service)) {
+        return -1;
+    }
+#endif
+
 
     // Create listening ports
     if (port_range.min_port > port_range.max_port) {
@@ -2216,13 +2225,33 @@ int Server::MaxConcurrencyOf(const MethodProperty* mp) 
const {
 }
 
 AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& 
full_method_name) {
-    MethodProperty* mp = _method_map.seek(full_method_name);
-    if (mp == NULL) {
-        LOG(ERROR) << "Fail to find method=" << full_method_name;
-        _failed_to_set_max_concurrency_of_method = true;
-        return g_default_max_concurrency_of_method;
-    }
-    return MaxConcurrencyOf(mp);
+    do {
+        if (full_method_name == butil::class_name_str<NsheadService>()) {
+            if (NULL == options().nshead_service) {
+                break;
+            }
+            return options().nshead_service->_max_concurrency;
+        }
+#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
+        if (full_method_name == butil::class_name_str<ThriftService>()) {
+            if (NULL == options().thrift_service) {
+                break;
+            }
+            return options().thrift_service->_max_concurrency;
+        }
+#endif
+
+        MethodProperty* mp = _method_map.seek(full_method_name);
+        if (mp == NULL) {
+            break;
+        }
+        return MaxConcurrencyOf(mp);
+
+    } while (false);
+
+    LOG(ERROR) << "Fail to find method=" << full_method_name;
+    _failed_to_set_max_concurrency_of_method = true;
+    return g_default_max_concurrency_of_method;
 }
 
 int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const 
{
diff --git a/src/brpc/server.h b/src/brpc/server.h
index 4843d0d0..5bc518ef 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -42,6 +42,7 @@
 #include "brpc/http2.h"
 #include "brpc/redis.h"
 #include "brpc/interceptor.h"
+#include "brpc/concurrency_limiter.h"
 
 namespace brpc {
 
@@ -674,6 +675,26 @@ friend class Controller;
     AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*);
     int MaxConcurrencyOf(const MethodProperty*) const;
 
+    static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
+                                         ConcurrencyLimiter** out);
+
+    template <typename T>
+    int SetServiceMaxConcurrency(T* service) {
+        if (NULL != service) {
+            const AdaptiveMaxConcurrency* amc = &service->_max_concurrency;
+            if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
+                amc = &_options.method_max_concurrency;
+            }
+            ConcurrencyLimiter* cl = NULL;
+            if (!CreateConcurrencyLimiter(*amc, &cl)) {
+                LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
+                return -1;
+            }
+            service->_status->SetConcurrencyLimiter(cl);
+        }
+        return 0;
+    }
+
     DISALLOW_COPY_AND_ASSIGN(Server);
 
     // Put frequently-accessed data pool at first.
diff --git a/src/brpc/thrift_service.h b/src/brpc/thrift_service.h
index c3d341e0..bd4ca44a 100644
--- a/src/brpc/thrift_service.h
+++ b/src/brpc/thrift_service.h
@@ -22,6 +22,7 @@
 #include "brpc/controller.h"                        // Controller
 #include "brpc/thrift_message.h"                    // ThriftFramedMessage
 #include "brpc/describable.h"
+#include "brpc/adaptive_max_concurrency.h"
 
 namespace brpc {
 
@@ -38,7 +39,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base);
 class ThriftService : public Describable {
 public:
     ThriftService();
-    virtual ~ThriftService();
+    ~ThriftService() override;
 
     // Implement this method to handle thrift_binary requests.
     // Parameters:
@@ -53,7 +54,7 @@ public:
         ::google::protobuf::Closure* done) = 0;
 
     // Put descriptions into the stream.
-    void Describe(std::ostream &os, const DescribeOptions&) const;
+    void Describe(std::ostream &os, const DescribeOptions&) const override;
 
 private:
 DISALLOW_COPY_AND_ASSIGN(ThriftService);
@@ -66,6 +67,7 @@ private:
     void Expose(const butil::StringPiece& prefix);
     
     MethodStatus* _status;
+    AdaptiveMaxConcurrency _max_concurrency;
 };
 
 } // namespace brpc


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to