This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f5e102a Support proxy and generic call of baidu protocol (#2629)
0f5e102a is described below

commit 0f5e102a219af016e00f40cacbbb193a418ab358
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Thu Jun 13 12:21:52 2024 +0800

    Support proxy and generic call of baidu protocol (#2629)
    
    * Support proxy and generic call of baidu protocol
    
    * Add example
---
 .../baidu_proxy_and_generic_call/CMakeLists.txt    | 134 ++++++++++++
 example/baidu_proxy_and_generic_call/client.cpp    |  94 +++++++++
 .../baidu_proxy_and_generic_call/echo.proto        |  23 +--
 example/baidu_proxy_and_generic_call/proxy.cpp     | 142 +++++++++++++
 example/baidu_proxy_and_generic_call/server.cpp    | 118 +++++++++++
 src/brpc/baidu_master_service.cpp                  |  52 +++++
 src/brpc/baidu_master_service.h                    |  99 +++++++++
 src/brpc/builtin/status_service.cpp                |  11 +
 src/brpc/channel.cpp                               |   2 +
 src/brpc/controller.cpp                            |   6 +
 src/brpc/controller.h                              |   6 +-
 src/brpc/policy/baidu_rpc_protocol.cpp             | 230 +++++++++++++--------
 src/brpc/proto_base.proto                          |   4 +
 src/brpc/serialized_request.cpp                    |   2 +-
 ...ialized_request.cpp => serialized_response.cpp} |  60 +++---
 src/brpc/serialized_response.h                     |  83 ++++++++
 src/brpc/server.cpp                                |  11 +
 src/brpc/server.h                                  |   5 +-
 src/butil/memory/scope_guard.h                     |   2 +-
 test/brpc_server_unittest.cpp                      | 111 ++++++++++
 test/endpoint_unittest.cpp                         |   2 +-
 21 files changed, 1066 insertions(+), 131 deletions(-)

diff --git a/example/baidu_proxy_and_generic_call/CMakeLists.txt 
b/example/baidu_proxy_and_generic_call/CMakeLists.txt
new file mode 100644
index 00000000..8cc9c0f1
--- /dev/null
+++ b/example/baidu_proxy_and_generic_call/CMakeLists.txt
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.8.10)
+project(baidu_proxy_and_generic_call C CXX)
+
+option(LINK_SO "Whether examples are linked dynamically" OFF)
+
+execute_process(
+    COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex 
\".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
+    OUTPUT_VARIABLE OUTPUT_PATH
+)
+
+set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
+
+include(FindThreads)
+include(FindProtobuf)
+protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto)
+# include PROTO_HEADER
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
+if(LINK_SO)
+    find_library(BRPC_LIB NAMES brpc)
+else()
+    find_library(BRPC_LIB NAMES libbrpc.a brpc)
+endif()
+if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
+    message(FATAL_ERROR "Fail to find brpc")
+endif()
+include_directories(${BRPC_INCLUDE_PATH})
+
+find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
+find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
+if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
+    message(FATAL_ERROR "Fail to find gflags")
+endif()
+include_directories(${GFLAGS_INCLUDE_PATH})
+
+execute_process(
+    COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" 
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | 
tr -d '\n'"
+    OUTPUT_VARIABLE GFLAGS_NS
+)
+if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
+    execute_process(
+        COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" 
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | 
tr -d '\n'"
+        OUTPUT_VARIABLE GFLAGS_NS
+    )
+endif()
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    include(CheckFunctionExists)
+    CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+    if(NOT HAVE_CLOCK_GETTIME)
+        set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
+    endif()
+endif()
+
+set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}")
+set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ 
-pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
+
+if(CMAKE_VERSION VERSION_LESS "3.1.3")
+    if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+    endif()
+    if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+    endif()
+else()
+    set(CMAKE_CXX_STANDARD 11)
+    set(CMAKE_CXX_STANDARD_REQUIRED ON)
+endif()
+
+find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
+find_library(LEVELDB_LIB NAMES leveldb)
+if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
+    message(FATAL_ERROR "Fail to find leveldb")
+endif()
+include_directories(${LEVELDB_INCLUDE_PATH})
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    set(OPENSSL_ROOT_DIR
+        "/usr/local/opt/openssl"    # Homebrew installed OpenSSL
+        )
+endif()
+
+find_package(OpenSSL)
+include_directories(${OPENSSL_INCLUDE_DIR})
+
+set(DYNAMIC_LIB
+    ${CMAKE_THREAD_LIBS_INIT}
+    ${GFLAGS_LIBRARY}
+    ${PROTOBUF_LIBRARIES}
+    ${LEVELDB_LIB}
+    ${OPENSSL_CRYPTO_LIBRARY}
+    ${OPENSSL_SSL_LIBRARY}
+    dl
+    )
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    set(DYNAMIC_LIB ${DYNAMIC_LIB}
+        pthread
+        "-framework CoreFoundation"
+        "-framework CoreGraphics"
+        "-framework CoreData"
+        "-framework CoreText"
+        "-framework Security"
+        "-framework Foundation"
+        "-Wl,-U,_MallocExtension_ReleaseFreeMemory"
+        "-Wl,-U,_ProfilerStart"
+        "-Wl,-U,_ProfilerStop"
+        "-Wl,-U,__Z13GetStackTracePPvii")
+endif()
+
+add_executable(echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER})
+add_executable(proxy proxy.cpp)
+add_executable(echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})
+
+target_link_libraries(echo_client ${BRPC_LIB} ${DYNAMIC_LIB})
+target_link_libraries(proxy ${BRPC_LIB} ${DYNAMIC_LIB})
+target_link_libraries(echo_server ${BRPC_LIB} ${DYNAMIC_LIB})
diff --git a/example/baidu_proxy_and_generic_call/client.cpp 
b/example/baidu_proxy_and_generic_call/client.cpp
new file mode 100644
index 00000000..d8040740
--- /dev/null
+++ b/example/baidu_proxy_and_generic_call/client.cpp
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A client sending requests to server every 1 second.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <brpc/channel.h>
+#include "echo.pb.h"
+
+DEFINE_int32(compress_type, 2, "The compress type of request");
+DEFINE_string(attachment, "", "Carry this along with requests");
+DEFINE_string(connection_type, "", "Connection type. Available values: single, 
pooled, short");
+DEFINE_string(proxy_address, "0.0.0.0:8000", "IP Address of proxy");
+DEFINE_string(load_balancer, "", "The algorithm for load balancing");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); 
+DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
+
+int main(int argc, char* argv[]) {
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+    
+    // A Channel represents a communication line to a Server. Notice that 
+    // Channel is thread-safe and can be shared by all threads in your program.
+    brpc::Channel channel;
+    
+    // Initialize the channel, NULL means using default options.
+    brpc::ChannelOptions options;
+    options.protocol = brpc::PROTOCOL_BAIDU_STD;
+    options.connection_type = FLAGS_connection_type;
+    options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
+    options.max_retry = FLAGS_max_retry;
+    if (channel.Init(FLAGS_proxy_address.c_str(),
+                     FLAGS_load_balancer.c_str(), &options) != 0) {
+        LOG(ERROR) << "Fail to initialize channel";
+        return -1;
+    }
+
+    // Normally, you should not call a Channel directly, but instead construct
+    // a stub Service wrapping it. stub can be shared by all threads as well.
+    example::EchoService_Stub stub(&channel);
+
+    // Send a request and wait for the response every 1 second.
+    int log_id = 0;
+    while (!brpc::IsAskedToQuit()) {
+        // We will receive response synchronously, safe to put variables
+        // on stack.
+        example::EchoRequest request;
+        example::EchoResponse response;
+        brpc::Controller cntl;
+
+        request.set_message("hello world");
+        
cntl.set_request_compress_type((brpc::CompressType)FLAGS_compress_type);
+
+        cntl.set_log_id(log_id++);  // set by user
+        // Set attachment which is wired to network directly instead of 
+        // being serialized into protobuf messages.
+        cntl.request_attachment().append(FLAGS_attachment);
+
+        // Because `done'(last parameter) is NULL, this function waits until
+        // the response comes back or error occurs(including timedout).
+        stub.Echo(&cntl, &request, &response, NULL);
+        if (!cntl.Failed()) {
+            LOG(INFO) << "Received response from " << cntl.remote_side()
+                << " to " << cntl.local_side()
+                << ": " << response.message()
+                << ", response compress type=" << cntl.response_compress_type()
+                << ", attached=" << cntl.response_attachment()
+                << ", latency=" << cntl.latency_us() << "us";
+        } else {
+            LOG(WARNING) << cntl.ErrorText();
+        }
+        usleep(FLAGS_interval_ms * 1000L);
+    }
+
+    LOG(INFO) << "EchoClient is going to quit";
+    return 0;
+}
diff --git a/src/brpc/proto_base.proto 
b/example/baidu_proxy_and_generic_call/echo.proto
similarity index 75%
copy from src/brpc/proto_base.proto
copy to example/baidu_proxy_and_generic_call/echo.proto
index c0bbc086..2b39627f 100644
--- a/src/brpc/proto_base.proto
+++ b/example/baidu_proxy_and_generic_call/echo.proto
@@ -16,19 +16,18 @@
 // under the License.
 
 syntax="proto2";
+package example;
 
-package brpc;
+option cc_generic_services = true;
 
-message RedisRequestBase {}
-message RedisResponseBase {}
+message EchoRequest {
+      required string message = 1;
+};
 
-message EspMessageBase {}
+message EchoResponse {
+      required string message = 1;
+};
 
-message MemcacheRequestBase {}
-message MemcacheResponseBase {}
-
-message NsheadMessageBase {}
-
-message SerializedRequestBase {}
-
-message ThriftFramedMessageBase {}
+service EchoService {
+      rpc Echo(EchoRequest) returns (EchoResponse);
+};
diff --git a/example/baidu_proxy_and_generic_call/proxy.cpp 
b/example/baidu_proxy_and_generic_call/proxy.cpp
new file mode 100644
index 00000000..81e1176b
--- /dev/null
+++ b/example/baidu_proxy_and_generic_call/proxy.cpp
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// todo
+// A proxy to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <butil/strings/string_number_conversions.h>
+#include <brpc/server.h>
+#include <brpc/controller.h>
+#include <brpc/channel.h>
+#include <json2pb/pb_to_json.h>
+
+DEFINE_int32(port, 8000, "TCP Port of this server");
+DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS."
+            " If this is set, the flag port will be ignored");
+DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
+             "read/write operations during the last `idle_timeout_s'");
+DEFINE_string(connection_type, "", "Connection type. Available values: single, 
pooled, short");
+DEFINE_string(server_address, "0.0.0.0:8001", "IP Address of server");
+DEFINE_string(load_balancer, "", "The algorithm for load balancing");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
+DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
+
+// Your implementation of example::EchoService
+// Notice that implementing brpc::Describable grants the ability to put
+// additional information in /status.
+namespace example {
+class BaiduMasterServiceImpl : public brpc::BaiduMasterService {
+public:
+    void ProcessRpcRequest(brpc::Controller* cntl,
+                           const brpc::SerializedRequest* request,
+                           brpc::SerializedResponse* response,
+                           ::google::protobuf::Closure* done) override {
+        // This object helps you to call done->Run() in RAII style. If you need
+        // to process the request asynchronously, pass done_guard.release().
+        brpc::ClosureGuard done_guard(done);
+
+        // A Channel represents a communication line to a Server. Notice that
+        // Channel is thread-safe and can be shared by all threads in your 
program.
+        brpc::Channel channel;
+
+        // Initialize the channel, NULL means using default options.
+        brpc::ChannelOptions options;
+        options.protocol = brpc::PROTOCOL_BAIDU_STD;
+        options.connection_type = FLAGS_connection_type;
+        options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
+        options.max_retry = FLAGS_max_retry;
+        if (channel.Init(FLAGS_server_address.c_str(),
+            FLAGS_load_balancer.c_str(), &options) != 0) {
+            LOG(ERROR) << "Fail to initialize channel";
+            (*cntl->response_user_fields())["x-bd-proxy-error-code"] =
+                butil::IntToString(brpc::EINTERNAL);
+            (*cntl->response_user_fields())["x-bd-proxy-error-text"] =
+                "Fail to initialize channel";
+            return;
+        }
+
+        LOG(INFO) << "Received request[log_id=" << cntl->log_id()
+                  << "] from " << cntl->remote_side()
+                  << " to " << cntl->local_side()
+                  << ", serialized request size=" << 
request->serialized_data().size()
+                  << ", request compress type=" << 
cntl->request_compress_type()
+                  << " (attached=" << cntl->request_attachment() << ")";
+
+        brpc::Controller call_cntl;
+        call_cntl.set_log_id(cntl->log_id());
+        call_cntl.request_attachment().swap(cntl->request_attachment());
+        call_cntl.set_request_compress_type(cntl->request_compress_type());
+        call_cntl.reset_sampled_request(cntl->release_sampled_request());
+        // It is ok to use request and response for sync rpc.
+        channel.CallMethod(NULL, &call_cntl, request, response, NULL);
+        (*cntl->response_user_fields())["x-bd-proxy-error-code"] =
+            butil::IntToString(call_cntl.ErrorCode());
+        if (call_cntl.Failed()) {
+            (*cntl->response_user_fields())["x-bd-proxy-error-text"] =
+                call_cntl.ErrorText();
+            LOG(ERROR) << "Fail to call service=" << 
call_cntl.sampled_request()->meta.service_name()
+                       << ", method=" << 
call_cntl.sampled_request()->meta.method_name()
+                       << ", error_code=" << call_cntl.ErrorCode()
+                       << ", error_text=" << call_cntl.ErrorCode();
+            return;
+        } else {
+            LOG(INFO) << "Received response from " << call_cntl.remote_side()
+                      << " to " << call_cntl.local_side()
+                      << ", serialized response size=" << 
response->serialized_data().size()
+                      << ", response compress type=" << 
call_cntl.response_compress_type()
+                      << ", attached=" << call_cntl.response_attachment()
+                      << ", latency=" << call_cntl.latency_us() << "us";
+        }
+        cntl->response_attachment().swap(call_cntl.response_attachment());
+        cntl->set_response_compress_type(call_cntl.response_compress_type());
+    }
+};
+}  // namespace example
+
+int main(int argc, char* argv[]) {
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+    // Generally you only need one Server.
+    brpc::Server server;
+
+    butil::EndPoint point;
+    if (!FLAGS_listen_addr.empty()) {
+        if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) {
+            LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr;
+            return -1;
+        }
+    } else {
+        point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
+    }
+    // Start the server.
+    brpc::ServerOptions options;
+    // Add the baidu master service into server.
+    // Notice new operator, because server will delete it in dtor of Server.
+    options.baidu_master_service = new example::BaiduMasterServiceImpl();
+    options.idle_timeout_sec = FLAGS_idle_timeout_s;
+    if (server.Start(point, &options) != 0) {
+        LOG(ERROR) << "Fail to start EchoServer";
+        return -1;
+    }
+
+    // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
+    server.RunUntilAskedToQuit();
+    return 0;
+}
diff --git a/example/baidu_proxy_and_generic_call/server.cpp 
b/example/baidu_proxy_and_generic_call/server.cpp
new file mode 100644
index 00000000..b3f16173
--- /dev/null
+++ b/example/baidu_proxy_and_generic_call/server.cpp
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include <json2pb/pb_to_json.h>
+#include "echo.pb.h"
+
+DEFINE_int32(compress_type, 2, "The compress type of response");
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port, 8001, "TCP Port of this server");
+DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS."
+            " If this is set, the flag port will be ignored");
+DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
+             "read/write operations during the last `idle_timeout_s'");
+
+// Your implementation of example::EchoService
+// Notice that implementing brpc::Describable grants the ability to put
+// additional information in /status.
+namespace example {
+class EchoServiceImpl : public EchoService {
+public:
+    EchoServiceImpl() = default;
+    ~EchoServiceImpl() override = default;
+    void Echo(google::protobuf::RpcController* cntl_base,
+              const EchoRequest* request,
+              EchoResponse* response,
+              google::protobuf::Closure* done) override {
+        // This object helps you to call done->Run() in RAII style. If you need
+        // to process the request asynchronously, pass done_guard.release().
+        brpc::ClosureGuard done_guard(done);
+
+        auto cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        // The purpose of following logs is to help you to understand
+        // how clients interact with servers more intuitively. You should 
+        // remove these logs in performance-sensitive servers.
+        LOG(INFO) << "Received request[log_id=" << cntl->log_id() 
+                  << "] from " << cntl->remote_side() 
+                  << " to " << cntl->local_side()
+                  << ": " << request->message()
+                  << ", request compress type=" << 
cntl->request_compress_type()
+                  << ", attached=" << cntl->request_attachment();
+
+        // Fill response.
+        response->set_message(request->message());
+        
cntl->set_response_compress_type((brpc::CompressType)FLAGS_compress_type);
+
+        // You can compress the response by setting Controller, but be aware
+        // that compression may be costly, evaluate before turning on.
+        // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
+
+        if (FLAGS_echo_attachment) {
+            // Set attachment which is wired to network directly instead of
+            // being serialized into protobuf messages.
+            cntl->response_attachment().append(cntl->request_attachment());
+        }
+    }
+};
+}  // namespace example
+
+int main(int argc, char* argv[]) {
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+    // Generally you only need one Server.
+    brpc::Server server;
+
+    // Instance of your service.
+    example::EchoServiceImpl echo_service_impl;
+
+    // Add the service into server. Notice the second parameter, because the
+    // service is put on stack, we don't want server to delete it, otherwise
+    // use brpc::SERVER_OWNS_SERVICE.
+    if (server.AddService(&echo_service_impl, 
+                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+        LOG(ERROR) << "Fail to add service";
+        return -1;
+    }
+
+    butil::EndPoint point;
+    if (!FLAGS_listen_addr.empty()) {
+        if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) {
+            LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr;
+            return -1;
+        }
+    } else {
+        point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
+    }
+    // Start the server.
+    brpc::ServerOptions options;
+    options.idle_timeout_sec = FLAGS_idle_timeout_s;
+    if (server.Start(point, &options) != 0) {
+        LOG(ERROR) << "Fail to start EchoServer";
+        return -1;
+    }
+
+    // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
+    server.RunUntilAskedToQuit();
+    return 0;
+}
diff --git a/src/brpc/baidu_master_service.cpp 
b/src/brpc/baidu_master_service.cpp
new file mode 100644
index 00000000..0b983732
--- /dev/null
+++ b/src/brpc/baidu_master_service.cpp
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include "brpc/baidu_master_service.h"
+#include "butil/logging.h"
+
+namespace brpc {
+
+BaiduMasterService::BaiduMasterService()
+    :_status(new(std::nothrow) MethodStatus) {
+    LOG_IF(FATAL, NULL == _status) << "Fail to new MethodStatus";
+}
+
+BaiduMasterService::~BaiduMasterService() {
+    delete _status;
+    _status = NULL;
+}
+
+void BaiduMasterService::Describe(std::ostream &os,
+                                  const DescribeOptions&) const {
+    os << butil::class_name_str(*this);
+}
+
+void BaiduMasterService::Expose(const butil::StringPiece& prefix) {
+    if (NULL == _status) {
+        return;
+    }
+    std::string s;
+    const std::string& cached_name = butil::class_name_str(*this);
+    s.reserve(prefix.size() + 1 + cached_name.size());
+    s.append(prefix.data(), prefix.size());
+    s.push_back('_');
+    s.append(cached_name);
+    _status->Expose(s);
+}
+
+}
\ No newline at end of file
diff --git a/src/brpc/baidu_master_service.h b/src/brpc/baidu_master_service.h
new file mode 100644
index 00000000..9dc7ebbf
--- /dev/null
+++ b/src/brpc/baidu_master_service.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef BRPC_BAIDU_MASTER_SERVICE_H
+#define BRPC_BAIDU_MASTER_SERVICE_H
+
+#include <google/protobuf/service.h>
+#include <google/protobuf/stubs/callback.h>
+#include "brpc/details/method_status.h"
+#include "brpc/proto_base.pb.h"
+#include "brpc/controller.h"
+#include "brpc/serialized_request.h"
+#include "brpc/serialized_response.h"
+
+namespace brpc {
+
+namespace policy {
+void ProcessRpcRequest(InputMessageBase* msg_base);
+}
+
+class BaiduMasterService : public ::google::protobuf::Service
+                         , public Describable {
+public:
+    BaiduMasterService();
+    ~BaiduMasterService() override;
+
+    DISALLOW_COPY_AND_ASSIGN(BaiduMasterService);
+
+    AdaptiveMaxConcurrency& MaxConcurrency() {
+        return _max_concurrency;
+    }
+
+    virtual void ProcessRpcRequest(Controller* cntl,
+                                   const SerializedRequest* request,
+                                   SerializedResponse* response,
+                                   ::google::protobuf::Closure* done) = 0;
+
+
+    // Put descriptions into the stream.
+    void Describe(std::ostream &os, const DescribeOptions&) const override;
+
+    // implements Service ----------------------------------------------
+
+    void CallMethod(const ::google::protobuf::MethodDescriptor*,
+                    ::google::protobuf::RpcController* controller,
+                    const ::google::protobuf::Message* request,
+                    ::google::protobuf::Message* response,
+                    ::google::protobuf::Closure* done) override {
+        ProcessRpcRequest((Controller*)controller,
+                          (const SerializedRequest*)request,
+                          (SerializedResponse*)response, done);
+    }
+
+    const ::google::protobuf::ServiceDescriptor* GetDescriptor() override {
+        return BaiduMasterServiceBase::descriptor();
+    }
+
+    const ::google::protobuf::Message& GetRequestPrototype(
+        const ::google::protobuf::MethodDescriptor*) const override {
+        return _default_request;
+    }
+
+    const ::google::protobuf::Message& GetResponsePrototype(
+        const ::google::protobuf::MethodDescriptor*) const override {
+        return _default_response;
+    }
+
+private:
+friend void policy::ProcessRpcRequest(InputMessageBase* msg_base);
+friend class StatusService;
+friend class Server;
+
+    void Expose(const butil::StringPiece& prefix);
+
+    SerializedRequest _default_request;
+    SerializedResponse _default_response;
+
+    MethodStatus* _status;
+    AdaptiveMaxConcurrency _max_concurrency;
+};
+
+}
+
+#endif //BRPC_BAIDU_MASTER_SERVICE_H
diff --git a/src/brpc/builtin/status_service.cpp 
b/src/brpc/builtin/status_service.cpp
index a6f5a4da..ea731e52 100644
--- a/src/brpc/builtin/status_service.cpp
+++ b/src/brpc/builtin/status_service.cpp
@@ -200,6 +200,17 @@ void 
StatusService::default_method(::google::protobuf::RpcController* cntl_base,
             }
         }
     }
+    const BaiduMasterService* baidu_master_service = 
server->options().baidu_master_service;
+    if (baidu_master_service && baidu_master_service->_status) {
+        DescribeOptions options;
+        options.verbose = false;
+        options.use_html = use_html;
+        os << (use_html ? "<h3>" : "[");
+        baidu_master_service->Describe(os, options);
+        os << (use_html ? "</h3>\n" : "]\n");
+        baidu_master_service->_status->Describe(os, desc_options);
+        os << '\n';
+    }
     const NsheadService* nshead_svc = server->options().nshead_service;
     if (nshead_svc && nshead_svc->_status) {
         DescribeOptions options;
diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp
index ba0e8161..5fc66096 100644
--- a/src/brpc/channel.cpp
+++ b/src/brpc/channel.cpp
@@ -32,6 +32,8 @@
 #include "brpc/details/load_balancer_with_naming.h"
 #include "brpc/controller.h"
 #include "brpc/channel.h"
+#include "brpc/serialized_request.h"
+#include "brpc/serialized_response.h"
 #include "brpc/details/usercode_backup_pool.h"       // TooManyUserCode
 #include "brpc/rdma/rdma_helper.h"
 #include "brpc/policy/esp_authenticator.h"
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 1d9b1bb9..98e25ae2 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -1427,6 +1427,12 @@ void Controller::reset_sampled_request(SampledRequest* 
req) {
     _sampled_request = req;
 }
 
+SampledRequest* Controller::release_sampled_request() {
+    SampledRequest* saved_sampled_request = _sampled_request;
+    _sampled_request = NULL;
+    return saved_sampled_request;
+}
+
 void Controller::set_stream_creator(StreamCreator* sc) {
     if (_stream_creator) {
         LOG(FATAL) << "A StreamCreator has been set previously";
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index d3ffb99f..5b2132b4 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -46,6 +46,7 @@
 #include "brpc/progressive_reader.h"           // ProgressiveReader
 #include "brpc/grpc.h"
 #include "brpc/kvmap.h"
+#include "brpc/rpc_dump.h"
 
 // EAUTH is defined in MAC
 #ifndef EAUTH
@@ -68,7 +69,6 @@ class SharedLoadBalancer;
 class ExcludedServers;
 class RPCSender;
 class StreamSettings;
-class SampledRequest;
 class MongoContext;
 class RetryPolicy;
 class InputMessageBase;
@@ -305,7 +305,9 @@ public:
     // Get/own SampledRequest for sending dumped requests.
     // Deleted along with controller.
     void reset_sampled_request(SampledRequest* req);
-    const SampledRequest* sampled_request() { return _sampled_request; }
+    const SampledRequest* sampled_request() const { return _sampled_request; }
+    SampledRequest* release_sampled_request();
+
 
     // Attach a StreamCreator to this RPC. Notice that the ownership of sc has
     // been transferred to cntl, and sc->DestroyStreamCreator() would be called
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp 
b/src/brpc/policy/baidu_rpc_protocol.cpp
index 6fa17d6c..6ce76467 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -135,6 +135,28 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* 
socket,
     return MakeMessage(msg);
 }
 
+static bool SerializeResponse(const google::protobuf::Message& res,
+                              Controller& cntl, CompressType compress_type,
+                              butil::IOBuf& buf) {
+    if (res.GetDescriptor() == SerializedResponse::descriptor()) {
+        buf.swap(((SerializedResponse&)res).serialized_data());
+        return true;
+    }
+
+    if (!res.IsInitialized()) {
+        cntl.SetFailed(ERESPONSE,
+                       "Missing required fields in response: %s",
+                       res.InitializationErrorString().c_str());
+        return false;
+    } else if (!SerializeAsCompressedData(res, &buf, compress_type)) {
+        cntl.SetFailed(ERESPONSE,
+                       "Fail to serialize response, CompressType=%s",
+                       CompressTypeToCStr(compress_type));
+        return false;
+    }
+    return true;
+}
+
 // Used by UT, can't be static.
 void SendRpcResponse(int64_t correlation_id,
                      Controller* cntl, 
@@ -170,18 +192,10 @@ void SendRpcResponse(int64_t correlation_id,
     // `res' can be NULL here, in which case we don't serialize it
     // If user calls `SetFailed' on Controller, we don't serialize
     // response either
-    CompressType type = cntl->response_compress_type();
+    CompressType compress_type = cntl->response_compress_type();
     if (res != NULL && !cntl->Failed()) {
-        if (!res->IsInitialized()) {
-            cntl->SetFailed(
-                ERESPONSE, "Missing required fields in response: %s", 
-                res->InitializationErrorString().c_str());
-        } else if (!SerializeAsCompressedData(*res, &res_body, type)) {
-            cntl->SetFailed(ERESPONSE, "Fail to serialize response, "
-                            "CompressType=%s", CompressTypeToCStr(type));
-        } else {
-            append_body = true;
-        }
+        append_body = SerializeResponse(
+            *res, *cntl, compress_type, res_body);
     }
 
     // Don't use res->ByteSize() since it may be compressed
@@ -207,7 +221,7 @@ void SendRpcResponse(int64_t correlation_id,
         response_meta->set_error_text(cntl->ErrorText());
     }
     meta.set_correlation_id(correlation_id);
-    meta.set_compress_type(cntl->response_compress_type());
+    meta.set_compress_type(compress_type);
     if (attached_size > 0) {
         meta.set_attachment_size(attached_size);
     }
@@ -236,7 +250,7 @@ void SendRpcResponse(int64_t correlation_id,
     SerializeRpcHeaderAndMeta(&res_buf, meta, res_size + attached_size);
     if (append_body) {
         res_buf.append(res_body.movable());
-        if (attached_size) {
+        if (attached_size > 0) {
             res_buf.append(cntl->response_attachment().movable());
         }
     }
@@ -360,6 +374,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
         LOG(WARNING) << "Fail to new Controller";
         return;
     }
+
     std::unique_ptr<google::protobuf::Message> req;
     std::unique_ptr<google::protobuf::Message> res;
 
@@ -442,84 +457,129 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
             break;
         }
 
-        // NOTE(gejun): jprotobuf sends service names without packages. So the
-        // name should be changed to full when it's not.
-        butil::StringPiece svc_name(request_meta.service_name());
-        if (svc_name.find('.') == butil::StringPiece::npos) {
-            const Server::ServiceProperty* sp =
-                server_accessor.FindServicePropertyByName(svc_name);
-            if (NULL == sp) {
-                cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",
-                                request_meta.service_name().c_str());
+        const int req_size = static_cast<int>(msg->payload.size());
+        if (meta.has_attachment_size()) {
+            if (req_size < meta.attachment_size()) {
+                cntl->SetFailed(EREQUEST,
+                    "attachment_size=%d is larger than request_size=%d",
+                    meta.attachment_size(), req_size);
                 break;
             }
-            svc_name = sp->service->GetDescriptor()->full_name();
         }
-        const Server::MethodProperty* mp =
-            server_accessor.FindMethodPropertyByFullName(
-                svc_name, request_meta.method_name());
-        if (NULL == mp) {
-            cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",
-                            request_meta.service_name().c_str(),
-                            request_meta.method_name().c_str());
-            break;
-        } else if (mp->service->GetDescriptor()
-                   == BadMethodService::descriptor()) {
-            BadMethodRequest breq;
-            BadMethodResponse bres;
-            breq.set_service_name(request_meta.service_name());
-            mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, 
NULL);
-            break;
-        }
-        // Switch to service-specific error.
-        non_service_error.release();
-        method_status = mp->status;
-        if (method_status) {
-            int rejected_cc = 0;
-            if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
-                cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, 
concurrency=%d",
-                                mp->method->full_name().c_str(), rejected_cc);
+
+        google::protobuf::Service* svc = NULL;
+        google::protobuf::MethodDescriptor* method = NULL;
+        if (NULL != server->options().baidu_master_service) {
+            svc = server->options().baidu_master_service;
+            auto sampled_request = new (std::nothrow) SampledRequest;
+            if (NULL == sampled_request) {
+                cntl->SetFailed(ENOMEM, "Fail to get sampled_request");
                 break;
             }
-        }
-        google::protobuf::Service* svc = mp->service;
-        const google::protobuf::MethodDescriptor* method = mp->method;
-        accessor.set_method(method);
+            
sampled_request->meta.set_service_name(request_meta.service_name());
+            sampled_request->meta.set_method_name(request_meta.method_name());
+            cntl->reset_sampled_request(sampled_request);
+            // Switch to service-specific error.
+            non_service_error.release();
+            method_status = server->options().baidu_master_service->_status;
+            if (method_status) {
+                int rejected_cc = 0;
+                if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
+                    cntl->SetFailed(
+                        ELIMIT,
+                        "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
+                        butil::class_name<BaiduMasterService>(), rejected_cc);
+                    break;
+                }
+            }
+            if (span) {
+                span->ResetServerSpanName(sampled_request->meta.method_name());
+            }
 
+            auto serialized_request = (SerializedRequest*)
+                svc->GetRequestPrototype(NULL).New();
+            req.reset(serialized_request);
+            res.reset(svc->GetResponsePrototype(NULL).New());
 
-        if (!server->AcceptRequest(cntl.get())) {
-            break;
-        }
+            msg->payload.cutn(&serialized_request->serialized_data(),
+                              req_size - meta.attachment_size());
+            if (!msg->payload.empty()) {
+                cntl->request_attachment().swap(msg->payload);
+            }
+        } else {
+            // NOTE(gejun): jprotobuf sends service names without packages. So 
the
+            // name should be changed to full when it's not.
+            butil::StringPiece svc_name(request_meta.service_name());
+            if (svc_name.find('.') == butil::StringPiece::npos) {
+                const Server::ServiceProperty* sp =
+                    server_accessor.FindServicePropertyByName(svc_name);
+                if (NULL == sp) {
+                    cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",
+                        request_meta.service_name().c_str());
+                    break;
+                }
+                svc_name = sp->service->GetDescriptor()->full_name();
+            }
+            const Server::MethodProperty* mp =
+                server_accessor.FindMethodPropertyByFullName(
+                    svc_name, request_meta.method_name());
+            if (NULL == mp) {
+                cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",
+                                request_meta.service_name().c_str(),
+                                request_meta.method_name().c_str());
+                break;
+            } else if (mp->service->GetDescriptor() == 
BadMethodService::descriptor()) {
+                BadMethodRequest breq;
+                BadMethodResponse bres;
+                breq.set_service_name(request_meta.service_name());
+                mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, 
NULL);
+                break;
+            }
+            // Switch to service-specific error.
+            non_service_error.release();
+            method_status = mp->status;
+            if (method_status) {
+                int rejected_cc = 0;
+                if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
+                    cntl->SetFailed(
+                        ELIMIT,
+                        "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
+                        mp->method->full_name().c_str(), rejected_cc);
+                    break;
+                }
+            }
+            svc = mp->service;
+            method = 
const_cast<google::protobuf::MethodDescriptor*>(mp->method);
+            accessor.set_method(method);
 
-        if (span) {
-            span->ResetServerSpanName(method->full_name());
-        }
-        const int req_size = static_cast<int>(msg->payload.size());
-        butil::IOBuf req_buf;
-        butil::IOBuf* req_buf_ptr = &msg->payload;
-        if (meta.has_attachment_size()) {
-            if (req_size < meta.attachment_size()) {
-                cntl->SetFailed(EREQUEST,
-                    "attachment_size=%d is larger than request_size=%d",
-                     meta.attachment_size(), req_size);
+            if (span) {
+                span->ResetServerSpanName(method->full_name());
+            }
+
+            if (!server->AcceptRequest(cntl.get())) {
                 break;
             }
+
+            butil::IOBuf req_buf;
             int body_without_attachment_size = req_size - 
meta.attachment_size();
             msg->payload.cutn(&req_buf, body_without_attachment_size);
-            req_buf_ptr = &req_buf;
-            cntl->request_attachment().swap(msg->payload);
-        }
+            if (meta.attachment_size() > 0) {
+                cntl->request_attachment().swap(msg->payload);
+            }
 
-        CompressType req_cmp_type = (CompressType)meta.compress_type();
-        req.reset(svc->GetRequestPrototype(method).New());
-        if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {
-            cntl->SetFailed(EREQUEST, "Fail to parse request message, "
-                            "CompressType=%s, request_size=%d", 
-                            CompressTypeToCStr(req_cmp_type), req_size);
-            break;
+            auto req_cmp_type = 
static_cast<CompressType>(meta.compress_type());
+            req.reset(svc->GetRequestPrototype(method).New());
+            if (!ParseFromCompressedData(req_buf, req.get(), req_cmp_type)) {
+                cntl->SetFailed(EREQUEST, "Fail to parse request message, "
+                                          "CompressType=%s, request_size=%d",
+                                CompressTypeToCStr(req_cmp_type), req_size);
+                break;
+            }
+
+            res.reset(svc->GetResponsePrototype(method).New());
+            req_buf.clear();
         }
-        
-        res.reset(svc->GetResponsePrototype(method).New());
+
         // `socket' will be held until response has been sent
         google::protobuf::Closure* done = ::brpc::NewCallback<
             int64_t, Controller*, const google::protobuf::Message*,
@@ -531,7 +591,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
 
         // optional, just release resource ASAP
         msg.reset();
-        req_buf.clear();
 
         if (span) {
             span->set_start_callback_us(butil::cpuwide_time_us());
@@ -653,10 +712,13 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
             cntl->response_attachment().swap(msg->payload);
         }
 
-        const CompressType res_cmp_type = (CompressType)meta.compress_type();
+        auto res_cmp_type = (CompressType)meta.compress_type();
         cntl->set_response_compress_type(res_cmp_type);
         if (cntl->response()) {
-            if (!ParseFromCompressedData(
+            if (cntl->response()->GetDescriptor() == 
SerializedResponse::descriptor()) {
+                ((SerializedResponse*)cntl->response())->
+                    serialized_data().append(*res_buf_ptr);
+            } else if (!ParseFromCompressedData(
                     *res_buf_ptr, cntl->response(), res_cmp_type)) {
                 cntl->SetFailed(
                     ERESPONSE, "Fail to parse response message, "
@@ -692,13 +754,15 @@ void PackRpcRequest(butil::IOBuf* req_buf,
                                        method->service()->name());
         request_meta->set_method_name(method->name());
         meta.set_compress_type(cntl->request_compress_type());
-    } else if (cntl->sampled_request()) {
+    } else if (NULL != cntl->sampled_request()) {
         // Replaying. Keep service-name as the one seen by server.
         
request_meta->set_service_name(cntl->sampled_request()->meta.service_name());
         
request_meta->set_method_name(cntl->sampled_request()->meta.method_name());
-        meta.set_compress_type(cntl->sampled_request()->meta.compress_type());
+        
meta.set_compress_type(cntl->sampled_request()->meta.has_compress_type() ?
+                               cntl->sampled_request()->meta.compress_type() :
+                               cntl->request_compress_type());
     } else {
-        return cntl->SetFailed(ENOMETHOD, "%s.method is NULL", __FUNCTION__);
+        return cntl->SetFailed(ENOMETHOD, "%s.method is NULL", __func__ );
     }
     if (cntl->has_log_id()) {
         request_meta->set_log_id(cntl->log_id());
diff --git a/src/brpc/proto_base.proto b/src/brpc/proto_base.proto
index c0bbc086..30033d49 100644
--- a/src/brpc/proto_base.proto
+++ b/src/brpc/proto_base.proto
@@ -16,6 +16,7 @@
 // under the License.
 
 syntax="proto2";
+option cc_generic_services = true;
 
 package brpc;
 
@@ -30,5 +31,8 @@ message MemcacheResponseBase {}
 message NsheadMessageBase {}
 
 message SerializedRequestBase {}
+message SerializedResponseBase {}
 
 message ThriftFramedMessageBase {}
+
+service BaiduMasterServiceBase {}
diff --git a/src/brpc/serialized_request.cpp b/src/brpc/serialized_request.cpp
index 499738f6..33082883 100644
--- a/src/brpc/serialized_request.cpp
+++ b/src/brpc/serialized_request.cpp
@@ -43,7 +43,7 @@ void SerializedRequest::SharedDtor() {
 }
 
 void SerializedRequest::SetCachedSize(int /*size*/) const {
-    CHECK(false) << "You're not supposed to call " << __FUNCTION__;
+    CHECK(false) << "You're not supposed to call " << __func__;
 }
 const ::google::protobuf::Descriptor* SerializedRequest::descriptor() {
     return SerializedRequestBase::descriptor();
diff --git a/src/brpc/serialized_request.cpp b/src/brpc/serialized_response.cpp
similarity index 55%
copy from src/brpc/serialized_request.cpp
copy to src/brpc/serialized_response.cpp
index 499738f6..817ccb4c 100644
--- a/src/brpc/serialized_request.cpp
+++ b/src/brpc/serialized_response.cpp
@@ -16,112 +16,112 @@
 // under the License.
 
 
-#include "brpc/serialized_request.h"
+#include "brpc/serialized_response.h"
 #include "butil/logging.h"
 
 namespace brpc {
 
-SerializedRequest::SerializedRequest()
+SerializedResponse::SerializedResponse()
     : ::google::protobuf::Message() {
     SharedCtor();
 }
 
-SerializedRequest::SerializedRequest(const SerializedRequest& from)
+SerializedResponse::SerializedResponse(const SerializedResponse& from)
     : ::google::protobuf::Message() {
     SharedCtor();
     MergeFrom(from);
 }
 
-void SerializedRequest::SharedCtor() {
+void SerializedResponse::SharedCtor() {
 }
 
-SerializedRequest::~SerializedRequest() {
+SerializedResponse::~SerializedResponse() {
     SharedDtor();
 }
 
-void SerializedRequest::SharedDtor() {
+void SerializedResponse::SharedDtor() {
 }
 
-void SerializedRequest::SetCachedSize(int /*size*/) const {
-    CHECK(false) << "You're not supposed to call " << __FUNCTION__;
+void SerializedResponse::SetCachedSize(int /*size*/) const {
+    CHECK(false) << "You're not supposed to call " << __func__;
 }
-const ::google::protobuf::Descriptor* SerializedRequest::descriptor() {
-    return SerializedRequestBase::descriptor();
+const ::google::protobuf::Descriptor* SerializedResponse::descriptor() {
+    return SerializedResponseBase::descriptor();
 }
 
-SerializedRequest* SerializedRequest::New() const {
-    return new SerializedRequest;
+SerializedResponse* SerializedResponse::New() const {
+    return new SerializedResponse;
 }
 
 #if GOOGLE_PROTOBUF_VERSION >= 3006000
-SerializedRequest*
-SerializedRequest::New(::google::protobuf::Arena* arena) const {
-    return CreateMaybeMessage<SerializedRequest>(arena);
+SerializedResponse*
+SerializedResponse::New(::google::protobuf::Arena* arena) const {
+    return CreateMaybeMessage<SerializedResponse>(arena);
 }
 #endif
 
-void SerializedRequest::Clear() {
+void SerializedResponse::Clear() {
     _serialized.clear();
 }
 
-bool SerializedRequest::MergePartialFromCodedStream(
+bool SerializedResponse::MergePartialFromCodedStream(
     ::google::protobuf::io::CodedInputStream*) {
     CHECK(false) << "You're not supposed to call " << __FUNCTION__;
     return false;
 }
 
-void SerializedRequest::SerializeWithCachedSizes(
+void SerializedResponse::SerializeWithCachedSizes(
     ::google::protobuf::io::CodedOutputStream*) const {
     CHECK(false) << "You're not supposed to call " << __FUNCTION__;
 }
 
-::google::protobuf::uint8* SerializedRequest::SerializeWithCachedSizesToArray(
+::google::protobuf::uint8* SerializedResponse::SerializeWithCachedSizesToArray(
     ::google::protobuf::uint8* target) const {
     CHECK(false) << "You're not supposed to call " << __FUNCTION__;
     return target;
 }
 
-int SerializedRequest::ByteSize() const {
+int SerializedResponse::ByteSize() const {
     return (int)_serialized.size();
 }
 
-void SerializedRequest::MergeFrom(const ::google::protobuf::Message&) {
+void SerializedResponse::MergeFrom(const ::google::protobuf::Message&) {
     CHECK(false) << "You're not supposed to call " << __FUNCTION__;
 }
 
-void SerializedRequest::MergeFrom(const SerializedRequest&) {
+void SerializedResponse::MergeFrom(const SerializedResponse&) {
     CHECK(false) << "You're not supposed to call " << __FUNCTION__;
 }
 
-void SerializedRequest::CopyFrom(const ::google::protobuf::Message& from) {
+void SerializedResponse::CopyFrom(const ::google::protobuf::Message& from) {
     if (&from == this) return;
-    const SerializedRequest* source = dynamic_cast<const 
SerializedRequest*>(&from);
+    const SerializedResponse* source = dynamic_cast<const 
SerializedResponse*>(&from);
     if (source == NULL) {
-        CHECK(false) << "SerializedRequest can only CopyFrom 
SerializedRequest";
+        CHECK(false) << "SerializedResponse can only CopyFrom 
SerializedResponse";
     } else {
         _serialized = source->_serialized;
     }
 }
 
-void SerializedRequest::CopyFrom(const SerializedRequest& from) {
+void SerializedResponse::CopyFrom(const SerializedResponse& from) {
     if (&from == this) return;
     _serialized = from._serialized;
 }
 
-bool SerializedRequest::IsInitialized() const {
+bool SerializedResponse::IsInitialized() const {
     // Always true because it's already serialized.
     return true;
 }
 
-void SerializedRequest::Swap(SerializedRequest* other) {
+void SerializedResponse::Swap(SerializedResponse* other) {
     if (other != this) {
         _serialized.swap(other->_serialized);
     }
 }
 
-::google::protobuf::Metadata SerializedRequest::GetMetadata() const {
+::google::protobuf::Metadata SerializedResponse::GetMetadata() const {
     ::google::protobuf::Metadata metadata;
-    metadata.descriptor = SerializedRequest::descriptor();
+    metadata.descriptor = SerializedResponse::descriptor();
     metadata.reflection = NULL;
     return metadata;
 }
diff --git a/src/brpc/serialized_response.h b/src/brpc/serialized_response.h
new file mode 100644
index 00000000..4e7d86e7
--- /dev/null
+++ b/src/brpc/serialized_response.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef BRPC_SERIALIZED_RESPONSE_H
+#define BRPC_SERIALIZED_RESPONSE_H
+
+#include <google/protobuf/message.h>
+#include "butil/iobuf.h"
+#include "brpc/proto_base.pb.h"
+#include "brpc/pb_compat.h"
+
+namespace brpc {
+
+class SerializedResponse : public ::google::protobuf::Message {
+public:
+    SerializedResponse();
+    virtual ~SerializedResponse();
+  
+    SerializedResponse(const SerializedResponse& from);
+  
+    inline SerializedResponse& operator=(const SerializedResponse& from) {
+        CopyFrom(from);
+        return *this;
+    }
+  
+    static const ::google::protobuf::Descriptor* descriptor();
+  
+    void Swap(SerializedResponse* other);
+  
+    // implements Message ----------------------------------------------
+  
+    SerializedResponse* New() const PB_319_OVERRIDE;
+#if GOOGLE_PROTOBUF_VERSION >= 3006000
+    SerializedResponse* New(::google::protobuf::Arena* arena) const override;
+#endif
+    void CopyFrom(const ::google::protobuf::Message& from) PB_321_OVERRIDE;
+    void CopyFrom(const SerializedResponse& from);
+    void Clear() override;
+    bool IsInitialized() const override;
+    int ByteSize() const;
+    int GetCachedSize() const override { return (int)_serialized.size(); }
+    butil::IOBuf& serialized_data() { return _serialized; }
+    const butil::IOBuf& serialized_data() const { return _serialized; }
+
+protected:
+    ::google::protobuf::Metadata GetMetadata() const override;
+    
+private:
+    bool MergePartialFromCodedStream(
+        ::google::protobuf::io::CodedInputStream* input) PB_310_OVERRIDE;
+    void SerializeWithCachedSizes(
+        ::google::protobuf::io::CodedOutputStream* output) const 
PB_310_OVERRIDE;
+    ::google::protobuf::uint8* SerializeWithCachedSizesToArray(
+        ::google::protobuf::uint8* output) const PB_310_OVERRIDE;
+    void MergeFrom(const ::google::protobuf::Message& from) override;
+    void MergeFrom(const SerializedResponse& from);
+    void SharedCtor();
+    void SharedDtor();
+    void SetCachedSize(int size) const override;
+  
+private:
+    butil::IOBuf _serialized;
+};
+
+} // namespace brpc
+
+
+#endif  // BRPC_SERIALIZED_RESPONSE_H
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 51fb1d16..399f348d 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -77,6 +77,7 @@
 #include "brpc/builtin/common.h"               // GetProgramName
 #include "brpc/details/tcmalloc_extension.h"
 #include "brpc/rdma/rdma_helper.h"
+#include "brpc/baidu_master_service.h"
 
 inline std::ostream& operator<<(std::ostream& os, const timeval& tm) {
     const char old_fill = os.fill();
@@ -145,6 +146,7 @@ ServerOptions::ServerOptions()
     , has_builtin_services(true)
     , force_ssl(false)
     , use_rdma(false)
+    , baidu_master_service(NULL)
     , http_master_service(NULL)
     , health_reporter(NULL)
     , rtmp_service(NULL)
@@ -338,6 +340,9 @@ void* Server::UpdateDerivedVars(void* arg) {
             it->second.status->Expose(mprefix);
         }
     }
+    if (server->options().baidu_master_service) {
+        server->options().baidu_master_service->Expose(prefix);
+    }
     if (server->options().nshead_service) {
         server->options().nshead_service->Expose(prefix);
     }
@@ -2240,6 +2245,12 @@ AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const 
butil::StringPiece& full_
             return options().thrift_service->_max_concurrency;
         }
 #endif
+        if (full_method_name == butil::class_name_str<BaiduMasterService>()) {
+            if (NULL == options().baidu_master_service) {
+                break;
+            }
+            return options().baidu_master_service->_max_concurrency;
+        }
 
         MethodProperty* mp = _method_map.seek(full_method_name);
         if (mp == NULL) {
diff --git a/src/brpc/server.h b/src/brpc/server.h
index 5bc518ef..c9459c23 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -43,6 +43,7 @@
 #include "brpc/redis.h"
 #include "brpc/interceptor.h"
 #include "brpc/concurrency_limiter.h"
+#include "brpc/baidu_master_service.h"
 
 namespace brpc {
 
@@ -225,6 +226,8 @@ struct ServerOptions {
     // Default: false
     bool use_rdma;
 
+    BaiduMasterService* baidu_master_service;
+
     // [CAUTION] This option is for implementing specialized http proxies,
     // most users don't need it. Don't change this option unless you fully
     // understand the description below.
@@ -235,7 +238,7 @@ struct ServerOptions {
     // and response must have no fields.
     //
     // Owned by Server and deleted in server's destructor
-    google::protobuf::Service* http_master_service;
+    ::google::protobuf::Service* http_master_service;
 
     // If this field is on, contents on /health page is generated by calling
     // health_reporter->GenerateReport(). This object is NOT owned by server
diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h
index ec662b46..1f2da79a 100644
--- a/src/butil/memory/scope_guard.h
+++ b/src/butil/memory/scope_guard.h
@@ -78,7 +78,7 @@ ScopeGuard<Callback> MakeScopeGuard(Callback&& callback) 
noexcept {
 }
 
 namespace internal {
-// for BAIDU_SCOPE_EXIT.
+// for BRPC_SCOPE_EXIT.
 enum class ScopeExitHelper {};
 
 template<typename Callback>
diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp
index 8a8a76d8..cc98f111 100644
--- a/test/brpc_server_unittest.cpp
+++ b/test/brpc_server_unittest.cpp
@@ -51,6 +51,7 @@
 #include "brpc/channel.h"
 #include "brpc/socket_map.h"
 #include "brpc/controller.h"
+#include "brpc/compress.h"
 #include "echo.pb.h"
 #include "v1.pb.h"
 #include "v2.pb.h"
@@ -1656,4 +1657,114 @@ TEST_F(ServerTest, user_fields) {
     ASSERT_EQ(*val, EXP_USER_FIELD_VALUE);
 }
 
+class BaiduMasterServiceImpl : public brpc::BaiduMasterService {
+public:
+    void ProcessRpcRequest(brpc::Controller* cntl,
+                           const brpc::SerializedRequest* request,
+                           brpc::SerializedResponse* response,
+                           ::google::protobuf::Closure* done) override {
+        // This object helps you to call done->Run() in RAII style. If you need
+        // to process the request asynchronously, pass done_guard.release().
+        brpc::ClosureGuard done_guard(done);
+        ASSERT_NE(nullptr, cntl->sampled_request());
+        ASSERT_TRUE(cntl->sampled_request()->meta.has_service_name());
+        ASSERT_EQ(test::EchoService::descriptor()->full_name(),
+                  cntl->sampled_request()->meta.service_name());
+        ASSERT_TRUE(cntl->sampled_request()->meta.has_method_name());
+        ASSERT_EQ("Echo", cntl->sampled_request()->meta.method_name());
+        test::EchoRequest echo_request;
+        test::EchoResponse echo_response;
+        brpc::CompressType type = cntl->request_compress_type();
+        ASSERT_TRUE(brpc::ParseFromCompressedData(
+            request->serialized_data(), &echo_request, type));
+        ASSERT_EQ(EXP_REQUEST, echo_request.message());
+        ASSERT_EQ(EXP_REQUEST, cntl->request_attachment().to_string());
+
+        echo_response.set_message(EXP_RESPONSE);
+        butil::IOBuf compressed_data;
+        ASSERT_TRUE(brpc::SerializeAsCompressedData(
+            echo_response, &response->serialized_data(), type));
+        cntl->set_response_compress_type(type);
+        cntl->response_attachment().append(EXP_RESPONSE);
+    }
+};
+
+TEST_F(ServerTest, baidu_master_service) {
+    butil::EndPoint ep;
+    ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
+    brpc::Server server;
+    EchoServiceImpl service;
+    ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+    brpc::ServerOptions opt;
+    opt.baidu_master_service = new BaiduMasterServiceImpl;
+    ASSERT_EQ(0, server.Start(ep, &opt));
+
+    brpc::Channel chan;
+    brpc::ChannelOptions copt;
+    copt.protocol = "baidu_std";
+    ASSERT_EQ(0, chan.Init(ep, &copt));
+    brpc::Controller cntl;
+    test::EchoRequest req;
+    test::EchoResponse res;
+    req.set_message(EXP_REQUEST);
+    cntl.request_attachment().append(EXP_REQUEST);
+    cntl.set_request_compress_type(brpc::COMPRESS_TYPE_GZIP);
+    test::EchoService_Stub stub(&chan);
+    stub.Echo(&cntl, &req, &res, NULL);
+    ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
+    ASSERT_EQ(EXP_RESPONSE, res.message());
+    ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string());
+
+    ASSERT_EQ(0, server.Stop(0));
+    ASSERT_EQ(0, server.Join());
+}
+
+
+TEST_F(ServerTest, generic_call) {
+    butil::EndPoint ep;
+    ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
+    brpc::Server server;
+    EchoServiceImpl service;
+    ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+    brpc::ServerOptions opt;
+    opt.baidu_master_service = new BaiduMasterServiceImpl;
+    ASSERT_EQ(0, server.Start(ep, &opt));
+
+    {
+        brpc::Channel chan;
+        brpc::ChannelOptions copt;
+        copt.protocol = "baidu_std";
+        ASSERT_EQ(0, chan.Init(ep, &copt));
+        brpc::Controller cntl;
+        test::EchoRequest req;
+        test::EchoResponse res;
+        req.set_message(EXP_REQUEST);
+
+        brpc::SerializedResponse serialized_response;
+        brpc::SerializedRequest serialized_request;
+        brpc::CompressType type = brpc::COMPRESS_TYPE_GZIP;
+        ASSERT_TRUE(brpc::SerializeAsCompressedData(
+            req, &serialized_request.serialized_data(), type));
+        cntl.request_attachment().append(EXP_REQUEST);
+        cntl.set_request_compress_type(type);
+        auto sampled_request = new (std::nothrow) brpc::SampledRequest();
+        sampled_request->meta.set_service_name(
+            test::EchoService::descriptor()->full_name());
+        sampled_request->meta.set_method_name(
+            test::EchoService::descriptor()->FindMethodByName("Echo")->name());
+        cntl.reset_sampled_request(sampled_request);
+        chan.CallMethod(NULL, &cntl, &serialized_request, 
&serialized_response, NULL);
+        ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
+
+        
ASSERT_TRUE(brpc::ParseFromCompressedData(serialized_response.serialized_data(),
+                                                  &res, 
cntl.response_compress_type()))
+                                                  << 
serialized_response.serialized_data().size();
+        ASSERT_EQ(EXP_RESPONSE, res.message());
+        ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string());
+    }
+
+    ASSERT_EQ(0, server.Stop(0));
+    ASSERT_EQ(0, server.Join());
+}
+
 } //namespace
diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp
index e0da1af1..14d150a7 100644
--- a/test/endpoint_unittest.cpp
+++ b/test/endpoint_unittest.cpp
@@ -494,7 +494,7 @@ TEST(EndPointTest, tcp_connect) {
     }
     {
         butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1));
-        ASSERT_EQ(-1, sockfd);
+        ASSERT_EQ(-1, sockfd) << "errno=" << errno;
         ASSERT_EQ(ETIMEDOUT, errno);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to