This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 0f5e102a Support proxy and generic call of baidu protocol (#2629) 0f5e102a is described below commit 0f5e102a219af016e00f40cacbbb193a418ab358 Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Thu Jun 13 12:21:52 2024 +0800 Support proxy and generic call of baidu protocol (#2629) * Support proxy and generic call of baidu protocol * Add example --- .../baidu_proxy_and_generic_call/CMakeLists.txt | 134 ++++++++++++ example/baidu_proxy_and_generic_call/client.cpp | 94 +++++++++ .../baidu_proxy_and_generic_call/echo.proto | 23 +-- example/baidu_proxy_and_generic_call/proxy.cpp | 142 +++++++++++++ example/baidu_proxy_and_generic_call/server.cpp | 118 +++++++++++ src/brpc/baidu_master_service.cpp | 52 +++++ src/brpc/baidu_master_service.h | 99 +++++++++ src/brpc/builtin/status_service.cpp | 11 + src/brpc/channel.cpp | 2 + src/brpc/controller.cpp | 6 + src/brpc/controller.h | 6 +- src/brpc/policy/baidu_rpc_protocol.cpp | 230 +++++++++++++-------- src/brpc/proto_base.proto | 4 + src/brpc/serialized_request.cpp | 2 +- ...ialized_request.cpp => serialized_response.cpp} | 60 +++--- src/brpc/serialized_response.h | 83 ++++++++ src/brpc/server.cpp | 11 + src/brpc/server.h | 5 +- src/butil/memory/scope_guard.h | 2 +- test/brpc_server_unittest.cpp | 111 ++++++++++ test/endpoint_unittest.cpp | 2 +- 21 files changed, 1066 insertions(+), 131 deletions(-) diff --git a/example/baidu_proxy_and_generic_call/CMakeLists.txt b/example/baidu_proxy_and_generic_call/CMakeLists.txt new file mode 100644 index 00000000..8cc9c0f1 --- /dev/null +++ b/example/baidu_proxy_and_generic_call/CMakeLists.txt @@ -0,0 +1,134 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 2.8.10) +project(baidu_proxy_and_generic_call C CXX) + +option(LINK_SO "Whether examples are linked dynamically" OFF) + +execute_process( + COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +include(FindThreads) +include(FindProtobuf) +protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto) +# include PROTO_HEADER +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) +if(LINK_SO) + find_library(BRPC_LIB NAMES brpc) +else() + find_library(BRPC_LIB NAMES libbrpc.a brpc) +endif() +if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) + message(FATAL_ERROR "Fail to find brpc") +endif() +include_directories(${BRPC_INCLUDE_PATH}) + +find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) +find_library(GFLAGS_LIBRARY NAMES gflags libgflags) +if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) + message(FATAL_ERROR "Fail to find gflags") +endif() +include_directories(${GFLAGS_INCLUDE_PATH}) + +execute_process( + COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS +) +if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE") + execute_process( + COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS + ) +endif() +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + include(CheckFunctionExists) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + if(NOT HAVE_CLOCK_GETTIME) + set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") + endif() +endif() + +set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}") +set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(OPENSSL_ROOT_DIR + "/usr/local/opt/openssl" # Homebrew installed OpenSSL + ) +endif() + +find_package(OpenSSL) +include_directories(${OPENSSL_INCLUDE_DIR}) + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${GFLAGS_LIBRARY} + ${PROTOBUF_LIBRARIES} + ${LEVELDB_LIB} + ${OPENSSL_CRYPTO_LIBRARY} + ${OPENSSL_SSL_LIBRARY} + dl + ) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(DYNAMIC_LIB ${DYNAMIC_LIB} + pthread + "-framework CoreFoundation" + "-framework CoreGraphics" + "-framework CoreData" + "-framework CoreText" + "-framework Security" + "-framework Foundation" + "-Wl,-U,_MallocExtension_ReleaseFreeMemory" + "-Wl,-U,_ProfilerStart" + "-Wl,-U,_ProfilerStop" + "-Wl,-U,__Z13GetStackTracePPvii") +endif() + +add_executable(echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER}) +add_executable(proxy proxy.cpp) +add_executable(echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) + +target_link_libraries(echo_client ${BRPC_LIB} ${DYNAMIC_LIB}) +target_link_libraries(proxy ${BRPC_LIB} ${DYNAMIC_LIB}) +target_link_libraries(echo_server ${BRPC_LIB} ${DYNAMIC_LIB}) diff --git a/example/baidu_proxy_and_generic_call/client.cpp b/example/baidu_proxy_and_generic_call/client.cpp new file mode 100644 index 00000000..d8040740 --- /dev/null +++ b/example/baidu_proxy_and_generic_call/client.cpp @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A client sending requests to server every 1 second. + +#include <gflags/gflags.h> +#include <butil/logging.h> +#include <butil/time.h> +#include <brpc/channel.h> +#include "echo.pb.h" + +DEFINE_int32(compress_type, 2, "The compress type of request"); +DEFINE_string(attachment, "", "Carry this along with requests"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(proxy_address, "0.0.0.0:8000", "IP Address of proxy"); +DEFINE_string(load_balancer, "", "The algorithm for load balancing"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests"); + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_BAIDU_STD; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; + options.max_retry = FLAGS_max_retry; + if (channel.Init(FLAGS_proxy_address.c_str(), + FLAGS_load_balancer.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + // Normally, you should not call a Channel directly, but instead construct + // a stub Service wrapping it. stub can be shared by all threads as well. + example::EchoService_Stub stub(&channel); + + // Send a request and wait for the response every 1 second. + int log_id = 0; + while (!brpc::IsAskedToQuit()) { + // We will receive response synchronously, safe to put variables + // on stack. + example::EchoRequest request; + example::EchoResponse response; + brpc::Controller cntl; + + request.set_message("hello world"); + cntl.set_request_compress_type((brpc::CompressType)FLAGS_compress_type); + + cntl.set_log_id(log_id++); // set by user + // Set attachment which is wired to network directly instead of + // being serialized into protobuf messages. + cntl.request_attachment().append(FLAGS_attachment); + + // Because `done'(last parameter) is NULL, this function waits until + // the response comes back or error occurs(including timedout). + stub.Echo(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + LOG(INFO) << "Received response from " << cntl.remote_side() + << " to " << cntl.local_side() + << ": " << response.message() + << ", response compress type=" << cntl.response_compress_type() + << ", attached=" << cntl.response_attachment() + << ", latency=" << cntl.latency_us() << "us"; + } else { + LOG(WARNING) << cntl.ErrorText(); + } + usleep(FLAGS_interval_ms * 1000L); + } + + LOG(INFO) << "EchoClient is going to quit"; + return 0; +} diff --git a/src/brpc/proto_base.proto b/example/baidu_proxy_and_generic_call/echo.proto similarity index 75% copy from src/brpc/proto_base.proto copy to example/baidu_proxy_and_generic_call/echo.proto index c0bbc086..2b39627f 100644 --- a/src/brpc/proto_base.proto +++ b/example/baidu_proxy_and_generic_call/echo.proto @@ -16,19 +16,18 @@ // under the License. syntax="proto2"; +package example; -package brpc; +option cc_generic_services = true; -message RedisRequestBase {} -message RedisResponseBase {} +message EchoRequest { + required string message = 1; +}; -message EspMessageBase {} +message EchoResponse { + required string message = 1; +}; -message MemcacheRequestBase {} -message MemcacheResponseBase {} - -message NsheadMessageBase {} - -message SerializedRequestBase {} - -message ThriftFramedMessageBase {} +service EchoService { + rpc Echo(EchoRequest) returns (EchoResponse); +}; diff --git a/example/baidu_proxy_and_generic_call/proxy.cpp b/example/baidu_proxy_and_generic_call/proxy.cpp new file mode 100644 index 00000000..81e1176b --- /dev/null +++ b/example/baidu_proxy_and_generic_call/proxy.cpp @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// todo +// A proxy to receive EchoRequest and send back EchoResponse. + +#include <gflags/gflags.h> +#include <butil/logging.h> +#include <butil/strings/string_number_conversions.h> +#include <brpc/server.h> +#include <brpc/controller.h> +#include <brpc/channel.h> +#include <json2pb/pb_to_json.h> + +DEFINE_int32(port, 8000, "TCP Port of this server"); +DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS." + " If this is set, the flag port will be ignored"); +DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(server_address, "0.0.0.0:8001", "IP Address of server"); +DEFINE_string(load_balancer, "", "The algorithm for load balancing"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests"); + +// Your implementation of example::EchoService +// Notice that implementing brpc::Describable grants the ability to put +// additional information in /status. +namespace example { +class BaiduMasterServiceImpl : public brpc::BaiduMasterService { +public: + void ProcessRpcRequest(brpc::Controller* cntl, + const brpc::SerializedRequest* request, + brpc::SerializedResponse* response, + ::google::protobuf::Closure* done) override { + // This object helps you to call done->Run() in RAII style. If you need + // to process the request asynchronously, pass done_guard.release(). + brpc::ClosureGuard done_guard(done); + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_BAIDU_STD; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; + options.max_retry = FLAGS_max_retry; + if (channel.Init(FLAGS_server_address.c_str(), + FLAGS_load_balancer.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + (*cntl->response_user_fields())["x-bd-proxy-error-code"] = + butil::IntToString(brpc::EINTERNAL); + (*cntl->response_user_fields())["x-bd-proxy-error-text"] = + "Fail to initialize channel"; + return; + } + + LOG(INFO) << "Received request[log_id=" << cntl->log_id() + << "] from " << cntl->remote_side() + << " to " << cntl->local_side() + << ", serialized request size=" << request->serialized_data().size() + << ", request compress type=" << cntl->request_compress_type() + << " (attached=" << cntl->request_attachment() << ")"; + + brpc::Controller call_cntl; + call_cntl.set_log_id(cntl->log_id()); + call_cntl.request_attachment().swap(cntl->request_attachment()); + call_cntl.set_request_compress_type(cntl->request_compress_type()); + call_cntl.reset_sampled_request(cntl->release_sampled_request()); + // It is ok to use request and response for sync rpc. + channel.CallMethod(NULL, &call_cntl, request, response, NULL); + (*cntl->response_user_fields())["x-bd-proxy-error-code"] = + butil::IntToString(call_cntl.ErrorCode()); + if (call_cntl.Failed()) { + (*cntl->response_user_fields())["x-bd-proxy-error-text"] = + call_cntl.ErrorText(); + LOG(ERROR) << "Fail to call service=" << call_cntl.sampled_request()->meta.service_name() + << ", method=" << call_cntl.sampled_request()->meta.method_name() + << ", error_code=" << call_cntl.ErrorCode() + << ", error_text=" << call_cntl.ErrorCode(); + return; + } else { + LOG(INFO) << "Received response from " << call_cntl.remote_side() + << " to " << call_cntl.local_side() + << ", serialized response size=" << response->serialized_data().size() + << ", response compress type=" << call_cntl.response_compress_type() + << ", attached=" << call_cntl.response_attachment() + << ", latency=" << call_cntl.latency_us() << "us"; + } + cntl->response_attachment().swap(call_cntl.response_attachment()); + cntl->set_response_compress_type(call_cntl.response_compress_type()); + } +}; +} // namespace example + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // Generally you only need one Server. + brpc::Server server; + + butil::EndPoint point; + if (!FLAGS_listen_addr.empty()) { + if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) { + LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr; + return -1; + } + } else { + point = butil::EndPoint(butil::IP_ANY, FLAGS_port); + } + // Start the server. + brpc::ServerOptions options; + // Add the baidu master service into server. + // Notice new operator, because server will delete it in dtor of Server. + options.baidu_master_service = new example::BaiduMasterServiceImpl(); + options.idle_timeout_sec = FLAGS_idle_timeout_s; + if (server.Start(point, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server. + server.RunUntilAskedToQuit(); + return 0; +} diff --git a/example/baidu_proxy_and_generic_call/server.cpp b/example/baidu_proxy_and_generic_call/server.cpp new file mode 100644 index 00000000..b3f16173 --- /dev/null +++ b/example/baidu_proxy_and_generic_call/server.cpp @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A server to receive EchoRequest and send back EchoResponse. + +#include <gflags/gflags.h> +#include <butil/logging.h> +#include <brpc/server.h> +#include <json2pb/pb_to_json.h> +#include "echo.pb.h" + +DEFINE_int32(compress_type, 2, "The compress type of response"); +DEFINE_bool(echo_attachment, true, "Echo attachment as well"); +DEFINE_int32(port, 8001, "TCP Port of this server"); +DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS." + " If this is set, the flag port will be ignored"); +DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); + +// Your implementation of example::EchoService +// Notice that implementing brpc::Describable grants the ability to put +// additional information in /status. +namespace example { +class EchoServiceImpl : public EchoService { +public: + EchoServiceImpl() = default; + ~EchoServiceImpl() override = default; + void Echo(google::protobuf::RpcController* cntl_base, + const EchoRequest* request, + EchoResponse* response, + google::protobuf::Closure* done) override { + // This object helps you to call done->Run() in RAII style. If you need + // to process the request asynchronously, pass done_guard.release(). + brpc::ClosureGuard done_guard(done); + + auto cntl = static_cast<brpc::Controller*>(cntl_base); + + // The purpose of following logs is to help you to understand + // how clients interact with servers more intuitively. You should + // remove these logs in performance-sensitive servers. + LOG(INFO) << "Received request[log_id=" << cntl->log_id() + << "] from " << cntl->remote_side() + << " to " << cntl->local_side() + << ": " << request->message() + << ", request compress type=" << cntl->request_compress_type() + << ", attached=" << cntl->request_attachment(); + + // Fill response. + response->set_message(request->message()); + cntl->set_response_compress_type((brpc::CompressType)FLAGS_compress_type); + + // You can compress the response by setting Controller, but be aware + // that compression may be costly, evaluate before turning on. + // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP); + + if (FLAGS_echo_attachment) { + // Set attachment which is wired to network directly instead of + // being serialized into protobuf messages. + cntl->response_attachment().append(cntl->request_attachment()); + } + } +}; +} // namespace example + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // Generally you only need one Server. + brpc::Server server; + + // Instance of your service. + example::EchoServiceImpl echo_service_impl; + + // Add the service into server. Notice the second parameter, because the + // service is put on stack, we don't want server to delete it, otherwise + // use brpc::SERVER_OWNS_SERVICE. + if (server.AddService(&echo_service_impl, + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + + butil::EndPoint point; + if (!FLAGS_listen_addr.empty()) { + if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) { + LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr; + return -1; + } + } else { + point = butil::EndPoint(butil::IP_ANY, FLAGS_port); + } + // Start the server. + brpc::ServerOptions options; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + if (server.Start(point, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server. + server.RunUntilAskedToQuit(); + return 0; +} diff --git a/src/brpc/baidu_master_service.cpp b/src/brpc/baidu_master_service.cpp new file mode 100644 index 00000000..0b983732 --- /dev/null +++ b/src/brpc/baidu_master_service.cpp @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "brpc/baidu_master_service.h" +#include "butil/logging.h" + +namespace brpc { + +BaiduMasterService::BaiduMasterService() + :_status(new(std::nothrow) MethodStatus) { + LOG_IF(FATAL, NULL == _status) << "Fail to new MethodStatus"; +} + +BaiduMasterService::~BaiduMasterService() { + delete _status; + _status = NULL; +} + +void BaiduMasterService::Describe(std::ostream &os, + const DescribeOptions&) const { + os << butil::class_name_str(*this); +} + +void BaiduMasterService::Expose(const butil::StringPiece& prefix) { + if (NULL == _status) { + return; + } + std::string s; + const std::string& cached_name = butil::class_name_str(*this); + s.reserve(prefix.size() + 1 + cached_name.size()); + s.append(prefix.data(), prefix.size()); + s.push_back('_'); + s.append(cached_name); + _status->Expose(s); +} + +} \ No newline at end of file diff --git a/src/brpc/baidu_master_service.h b/src/brpc/baidu_master_service.h new file mode 100644 index 00000000..9dc7ebbf --- /dev/null +++ b/src/brpc/baidu_master_service.h @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef BRPC_BAIDU_MASTER_SERVICE_H +#define BRPC_BAIDU_MASTER_SERVICE_H + +#include <google/protobuf/service.h> +#include <google/protobuf/stubs/callback.h> +#include "brpc/details/method_status.h" +#include "brpc/proto_base.pb.h" +#include "brpc/controller.h" +#include "brpc/serialized_request.h" +#include "brpc/serialized_response.h" + +namespace brpc { + +namespace policy { +void ProcessRpcRequest(InputMessageBase* msg_base); +} + +class BaiduMasterService : public ::google::protobuf::Service + , public Describable { +public: + BaiduMasterService(); + ~BaiduMasterService() override; + + DISALLOW_COPY_AND_ASSIGN(BaiduMasterService); + + AdaptiveMaxConcurrency& MaxConcurrency() { + return _max_concurrency; + } + + virtual void ProcessRpcRequest(Controller* cntl, + const SerializedRequest* request, + SerializedResponse* response, + ::google::protobuf::Closure* done) = 0; + + + // Put descriptions into the stream. + void Describe(std::ostream &os, const DescribeOptions&) const override; + + // implements Service ---------------------------------------------- + + void CallMethod(const ::google::protobuf::MethodDescriptor*, + ::google::protobuf::RpcController* controller, + const ::google::protobuf::Message* request, + ::google::protobuf::Message* response, + ::google::protobuf::Closure* done) override { + ProcessRpcRequest((Controller*)controller, + (const SerializedRequest*)request, + (SerializedResponse*)response, done); + } + + const ::google::protobuf::ServiceDescriptor* GetDescriptor() override { + return BaiduMasterServiceBase::descriptor(); + } + + const ::google::protobuf::Message& GetRequestPrototype( + const ::google::protobuf::MethodDescriptor*) const override { + return _default_request; + } + + const ::google::protobuf::Message& GetResponsePrototype( + const ::google::protobuf::MethodDescriptor*) const override { + return _default_response; + } + +private: +friend void policy::ProcessRpcRequest(InputMessageBase* msg_base); +friend class StatusService; +friend class Server; + + void Expose(const butil::StringPiece& prefix); + + SerializedRequest _default_request; + SerializedResponse _default_response; + + MethodStatus* _status; + AdaptiveMaxConcurrency _max_concurrency; +}; + +} + +#endif //BRPC_BAIDU_MASTER_SERVICE_H diff --git a/src/brpc/builtin/status_service.cpp b/src/brpc/builtin/status_service.cpp index a6f5a4da..ea731e52 100644 --- a/src/brpc/builtin/status_service.cpp +++ b/src/brpc/builtin/status_service.cpp @@ -200,6 +200,17 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base, } } } + const BaiduMasterService* baidu_master_service = server->options().baidu_master_service; + if (baidu_master_service && baidu_master_service->_status) { + DescribeOptions options; + options.verbose = false; + options.use_html = use_html; + os << (use_html ? "<h3>" : "["); + baidu_master_service->Describe(os, options); + os << (use_html ? "</h3>\n" : "]\n"); + baidu_master_service->_status->Describe(os, desc_options); + os << '\n'; + } const NsheadService* nshead_svc = server->options().nshead_service; if (nshead_svc && nshead_svc->_status) { DescribeOptions options; diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index ba0e8161..5fc66096 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -32,6 +32,8 @@ #include "brpc/details/load_balancer_with_naming.h" #include "brpc/controller.h" #include "brpc/channel.h" +#include "brpc/serialized_request.h" +#include "brpc/serialized_response.h" #include "brpc/details/usercode_backup_pool.h" // TooManyUserCode #include "brpc/rdma/rdma_helper.h" #include "brpc/policy/esp_authenticator.h" diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 1d9b1bb9..98e25ae2 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1427,6 +1427,12 @@ void Controller::reset_sampled_request(SampledRequest* req) { _sampled_request = req; } +SampledRequest* Controller::release_sampled_request() { + SampledRequest* saved_sampled_request = _sampled_request; + _sampled_request = NULL; + return saved_sampled_request; +} + void Controller::set_stream_creator(StreamCreator* sc) { if (_stream_creator) { LOG(FATAL) << "A StreamCreator has been set previously"; diff --git a/src/brpc/controller.h b/src/brpc/controller.h index d3ffb99f..5b2132b4 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -46,6 +46,7 @@ #include "brpc/progressive_reader.h" // ProgressiveReader #include "brpc/grpc.h" #include "brpc/kvmap.h" +#include "brpc/rpc_dump.h" // EAUTH is defined in MAC #ifndef EAUTH @@ -68,7 +69,6 @@ class SharedLoadBalancer; class ExcludedServers; class RPCSender; class StreamSettings; -class SampledRequest; class MongoContext; class RetryPolicy; class InputMessageBase; @@ -305,7 +305,9 @@ public: // Get/own SampledRequest for sending dumped requests. // Deleted along with controller. void reset_sampled_request(SampledRequest* req); - const SampledRequest* sampled_request() { return _sampled_request; } + const SampledRequest* sampled_request() const { return _sampled_request; } + SampledRequest* release_sampled_request(); + // Attach a StreamCreator to this RPC. Notice that the ownership of sc has // been transferred to cntl, and sc->DestroyStreamCreator() would be called diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 6fa17d6c..6ce76467 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -135,6 +135,28 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, return MakeMessage(msg); } +static bool SerializeResponse(const google::protobuf::Message& res, + Controller& cntl, CompressType compress_type, + butil::IOBuf& buf) { + if (res.GetDescriptor() == SerializedResponse::descriptor()) { + buf.swap(((SerializedResponse&)res).serialized_data()); + return true; + } + + if (!res.IsInitialized()) { + cntl.SetFailed(ERESPONSE, + "Missing required fields in response: %s", + res.InitializationErrorString().c_str()); + return false; + } else if (!SerializeAsCompressedData(res, &buf, compress_type)) { + cntl.SetFailed(ERESPONSE, + "Fail to serialize response, CompressType=%s", + CompressTypeToCStr(compress_type)); + return false; + } + return true; +} + // Used by UT, can't be static. void SendRpcResponse(int64_t correlation_id, Controller* cntl, @@ -170,18 +192,10 @@ void SendRpcResponse(int64_t correlation_id, // `res' can be NULL here, in which case we don't serialize it // If user calls `SetFailed' on Controller, we don't serialize // response either - CompressType type = cntl->response_compress_type(); + CompressType compress_type = cntl->response_compress_type(); if (res != NULL && !cntl->Failed()) { - if (!res->IsInitialized()) { - cntl->SetFailed( - ERESPONSE, "Missing required fields in response: %s", - res->InitializationErrorString().c_str()); - } else if (!SerializeAsCompressedData(*res, &res_body, type)) { - cntl->SetFailed(ERESPONSE, "Fail to serialize response, " - "CompressType=%s", CompressTypeToCStr(type)); - } else { - append_body = true; - } + append_body = SerializeResponse( + *res, *cntl, compress_type, res_body); } // Don't use res->ByteSize() since it may be compressed @@ -207,7 +221,7 @@ void SendRpcResponse(int64_t correlation_id, response_meta->set_error_text(cntl->ErrorText()); } meta.set_correlation_id(correlation_id); - meta.set_compress_type(cntl->response_compress_type()); + meta.set_compress_type(compress_type); if (attached_size > 0) { meta.set_attachment_size(attached_size); } @@ -236,7 +250,7 @@ void SendRpcResponse(int64_t correlation_id, SerializeRpcHeaderAndMeta(&res_buf, meta, res_size + attached_size); if (append_body) { res_buf.append(res_body.movable()); - if (attached_size) { + if (attached_size > 0) { res_buf.append(cntl->response_attachment().movable()); } } @@ -360,6 +374,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { LOG(WARNING) << "Fail to new Controller"; return; } + std::unique_ptr<google::protobuf::Message> req; std::unique_ptr<google::protobuf::Message> res; @@ -442,84 +457,129 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { break; } - // NOTE(gejun): jprotobuf sends service names without packages. So the - // name should be changed to full when it's not. - butil::StringPiece svc_name(request_meta.service_name()); - if (svc_name.find('.') == butil::StringPiece::npos) { - const Server::ServiceProperty* sp = - server_accessor.FindServicePropertyByName(svc_name); - if (NULL == sp) { - cntl->SetFailed(ENOSERVICE, "Fail to find service=%s", - request_meta.service_name().c_str()); + const int req_size = static_cast<int>(msg->payload.size()); + if (meta.has_attachment_size()) { + if (req_size < meta.attachment_size()) { + cntl->SetFailed(EREQUEST, + "attachment_size=%d is larger than request_size=%d", + meta.attachment_size(), req_size); break; } - svc_name = sp->service->GetDescriptor()->full_name(); } - const Server::MethodProperty* mp = - server_accessor.FindMethodPropertyByFullName( - svc_name, request_meta.method_name()); - if (NULL == mp) { - cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s", - request_meta.service_name().c_str(), - request_meta.method_name().c_str()); - break; - } else if (mp->service->GetDescriptor() - == BadMethodService::descriptor()) { - BadMethodRequest breq; - BadMethodResponse bres; - breq.set_service_name(request_meta.service_name()); - mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL); - break; - } - // Switch to service-specific error. - non_service_error.release(); - method_status = mp->status; - if (method_status) { - int rejected_cc = 0; - if (!method_status->OnRequested(&rejected_cc, cntl.get())) { - cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d", - mp->method->full_name().c_str(), rejected_cc); + + google::protobuf::Service* svc = NULL; + google::protobuf::MethodDescriptor* method = NULL; + if (NULL != server->options().baidu_master_service) { + svc = server->options().baidu_master_service; + auto sampled_request = new (std::nothrow) SampledRequest; + if (NULL == sampled_request) { + cntl->SetFailed(ENOMEM, "Fail to get sampled_request"); break; } - } - google::protobuf::Service* svc = mp->service; - const google::protobuf::MethodDescriptor* method = mp->method; - accessor.set_method(method); + sampled_request->meta.set_service_name(request_meta.service_name()); + sampled_request->meta.set_method_name(request_meta.method_name()); + cntl->reset_sampled_request(sampled_request); + // Switch to service-specific error. + non_service_error.release(); + method_status = server->options().baidu_master_service->_status; + if (method_status) { + int rejected_cc = 0; + if (!method_status->OnRequested(&rejected_cc, cntl.get())) { + cntl->SetFailed( + ELIMIT, + "Rejected by %s's ConcurrencyLimiter, concurrency=%d", + butil::class_name<BaiduMasterService>(), rejected_cc); + break; + } + } + if (span) { + span->ResetServerSpanName(sampled_request->meta.method_name()); + } + auto serialized_request = (SerializedRequest*) + svc->GetRequestPrototype(NULL).New(); + req.reset(serialized_request); + res.reset(svc->GetResponsePrototype(NULL).New()); - if (!server->AcceptRequest(cntl.get())) { - break; - } + msg->payload.cutn(&serialized_request->serialized_data(), + req_size - meta.attachment_size()); + if (!msg->payload.empty()) { + cntl->request_attachment().swap(msg->payload); + } + } else { + // NOTE(gejun): jprotobuf sends service names without packages. So the + // name should be changed to full when it's not. + butil::StringPiece svc_name(request_meta.service_name()); + if (svc_name.find('.') == butil::StringPiece::npos) { + const Server::ServiceProperty* sp = + server_accessor.FindServicePropertyByName(svc_name); + if (NULL == sp) { + cntl->SetFailed(ENOSERVICE, "Fail to find service=%s", + request_meta.service_name().c_str()); + break; + } + svc_name = sp->service->GetDescriptor()->full_name(); + } + const Server::MethodProperty* mp = + server_accessor.FindMethodPropertyByFullName( + svc_name, request_meta.method_name()); + if (NULL == mp) { + cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s", + request_meta.service_name().c_str(), + request_meta.method_name().c_str()); + break; + } else if (mp->service->GetDescriptor() == BadMethodService::descriptor()) { + BadMethodRequest breq; + BadMethodResponse bres; + breq.set_service_name(request_meta.service_name()); + mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL); + break; + } + // Switch to service-specific error. + non_service_error.release(); + method_status = mp->status; + if (method_status) { + int rejected_cc = 0; + if (!method_status->OnRequested(&rejected_cc, cntl.get())) { + cntl->SetFailed( + ELIMIT, + "Rejected by %s's ConcurrencyLimiter, concurrency=%d", + mp->method->full_name().c_str(), rejected_cc); + break; + } + } + svc = mp->service; + method = const_cast<google::protobuf::MethodDescriptor*>(mp->method); + accessor.set_method(method); - if (span) { - span->ResetServerSpanName(method->full_name()); - } - const int req_size = static_cast<int>(msg->payload.size()); - butil::IOBuf req_buf; - butil::IOBuf* req_buf_ptr = &msg->payload; - if (meta.has_attachment_size()) { - if (req_size < meta.attachment_size()) { - cntl->SetFailed(EREQUEST, - "attachment_size=%d is larger than request_size=%d", - meta.attachment_size(), req_size); + if (span) { + span->ResetServerSpanName(method->full_name()); + } + + if (!server->AcceptRequest(cntl.get())) { break; } + + butil::IOBuf req_buf; int body_without_attachment_size = req_size - meta.attachment_size(); msg->payload.cutn(&req_buf, body_without_attachment_size); - req_buf_ptr = &req_buf; - cntl->request_attachment().swap(msg->payload); - } + if (meta.attachment_size() > 0) { + cntl->request_attachment().swap(msg->payload); + } - CompressType req_cmp_type = (CompressType)meta.compress_type(); - req.reset(svc->GetRequestPrototype(method).New()); - if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) { - cntl->SetFailed(EREQUEST, "Fail to parse request message, " - "CompressType=%s, request_size=%d", - CompressTypeToCStr(req_cmp_type), req_size); - break; + auto req_cmp_type = static_cast<CompressType>(meta.compress_type()); + req.reset(svc->GetRequestPrototype(method).New()); + if (!ParseFromCompressedData(req_buf, req.get(), req_cmp_type)) { + cntl->SetFailed(EREQUEST, "Fail to parse request message, " + "CompressType=%s, request_size=%d", + CompressTypeToCStr(req_cmp_type), req_size); + break; + } + + res.reset(svc->GetResponsePrototype(method).New()); + req_buf.clear(); } - - res.reset(svc->GetResponsePrototype(method).New()); + // `socket' will be held until response has been sent google::protobuf::Closure* done = ::brpc::NewCallback< int64_t, Controller*, const google::protobuf::Message*, @@ -531,7 +591,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { // optional, just release resource ASAP msg.reset(); - req_buf.clear(); if (span) { span->set_start_callback_us(butil::cpuwide_time_us()); @@ -653,10 +712,13 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { cntl->response_attachment().swap(msg->payload); } - const CompressType res_cmp_type = (CompressType)meta.compress_type(); + auto res_cmp_type = (CompressType)meta.compress_type(); cntl->set_response_compress_type(res_cmp_type); if (cntl->response()) { - if (!ParseFromCompressedData( + if (cntl->response()->GetDescriptor() == SerializedResponse::descriptor()) { + ((SerializedResponse*)cntl->response())-> + serialized_data().append(*res_buf_ptr); + } else if (!ParseFromCompressedData( *res_buf_ptr, cntl->response(), res_cmp_type)) { cntl->SetFailed( ERESPONSE, "Fail to parse response message, " @@ -692,13 +754,15 @@ void PackRpcRequest(butil::IOBuf* req_buf, method->service()->name()); request_meta->set_method_name(method->name()); meta.set_compress_type(cntl->request_compress_type()); - } else if (cntl->sampled_request()) { + } else if (NULL != cntl->sampled_request()) { // Replaying. Keep service-name as the one seen by server. request_meta->set_service_name(cntl->sampled_request()->meta.service_name()); request_meta->set_method_name(cntl->sampled_request()->meta.method_name()); - meta.set_compress_type(cntl->sampled_request()->meta.compress_type()); + meta.set_compress_type(cntl->sampled_request()->meta.has_compress_type() ? + cntl->sampled_request()->meta.compress_type() : + cntl->request_compress_type()); } else { - return cntl->SetFailed(ENOMETHOD, "%s.method is NULL", __FUNCTION__); + return cntl->SetFailed(ENOMETHOD, "%s.method is NULL", __func__ ); } if (cntl->has_log_id()) { request_meta->set_log_id(cntl->log_id()); diff --git a/src/brpc/proto_base.proto b/src/brpc/proto_base.proto index c0bbc086..30033d49 100644 --- a/src/brpc/proto_base.proto +++ b/src/brpc/proto_base.proto @@ -16,6 +16,7 @@ // under the License. syntax="proto2"; +option cc_generic_services = true; package brpc; @@ -30,5 +31,8 @@ message MemcacheResponseBase {} message NsheadMessageBase {} message SerializedRequestBase {} +message SerializedResponseBase {} message ThriftFramedMessageBase {} + +service BaiduMasterServiceBase {} diff --git a/src/brpc/serialized_request.cpp b/src/brpc/serialized_request.cpp index 499738f6..33082883 100644 --- a/src/brpc/serialized_request.cpp +++ b/src/brpc/serialized_request.cpp @@ -43,7 +43,7 @@ void SerializedRequest::SharedDtor() { } void SerializedRequest::SetCachedSize(int /*size*/) const { - CHECK(false) << "You're not supposed to call " << __FUNCTION__; + CHECK(false) << "You're not supposed to call " << __func__; } const ::google::protobuf::Descriptor* SerializedRequest::descriptor() { return SerializedRequestBase::descriptor(); diff --git a/src/brpc/serialized_request.cpp b/src/brpc/serialized_response.cpp similarity index 55% copy from src/brpc/serialized_request.cpp copy to src/brpc/serialized_response.cpp index 499738f6..817ccb4c 100644 --- a/src/brpc/serialized_request.cpp +++ b/src/brpc/serialized_response.cpp @@ -16,112 +16,112 @@ // under the License. -#include "brpc/serialized_request.h" +#include "brpc/serialized_response.h" #include "butil/logging.h" namespace brpc { -SerializedRequest::SerializedRequest() +SerializedResponse::SerializedResponse() : ::google::protobuf::Message() { SharedCtor(); } -SerializedRequest::SerializedRequest(const SerializedRequest& from) +SerializedResponse::SerializedResponse(const SerializedResponse& from) : ::google::protobuf::Message() { SharedCtor(); MergeFrom(from); } -void SerializedRequest::SharedCtor() { +void SerializedResponse::SharedCtor() { } -SerializedRequest::~SerializedRequest() { +SerializedResponse::~SerializedResponse() { SharedDtor(); } -void SerializedRequest::SharedDtor() { +void SerializedResponse::SharedDtor() { } -void SerializedRequest::SetCachedSize(int /*size*/) const { - CHECK(false) << "You're not supposed to call " << __FUNCTION__; +void SerializedResponse::SetCachedSize(int /*size*/) const { + CHECK(false) << "You're not supposed to call " << __func__; } -const ::google::protobuf::Descriptor* SerializedRequest::descriptor() { - return SerializedRequestBase::descriptor(); +const ::google::protobuf::Descriptor* SerializedResponse::descriptor() { + return SerializedResponseBase::descriptor(); } -SerializedRequest* SerializedRequest::New() const { - return new SerializedRequest; +SerializedResponse* SerializedResponse::New() const { + return new SerializedResponse; } #if GOOGLE_PROTOBUF_VERSION >= 3006000 -SerializedRequest* -SerializedRequest::New(::google::protobuf::Arena* arena) const { - return CreateMaybeMessage<SerializedRequest>(arena); +SerializedResponse* +SerializedResponse::New(::google::protobuf::Arena* arena) const { + return CreateMaybeMessage<SerializedResponse>(arena); } #endif -void SerializedRequest::Clear() { +void SerializedResponse::Clear() { _serialized.clear(); } -bool SerializedRequest::MergePartialFromCodedStream( +bool SerializedResponse::MergePartialFromCodedStream( ::google::protobuf::io::CodedInputStream*) { CHECK(false) << "You're not supposed to call " << __FUNCTION__; return false; } -void SerializedRequest::SerializeWithCachedSizes( +void SerializedResponse::SerializeWithCachedSizes( ::google::protobuf::io::CodedOutputStream*) const { CHECK(false) << "You're not supposed to call " << __FUNCTION__; } -::google::protobuf::uint8* SerializedRequest::SerializeWithCachedSizesToArray( +::google::protobuf::uint8* SerializedResponse::SerializeWithCachedSizesToArray( ::google::protobuf::uint8* target) const { CHECK(false) << "You're not supposed to call " << __FUNCTION__; return target; } -int SerializedRequest::ByteSize() const { +int SerializedResponse::ByteSize() const { return (int)_serialized.size(); } -void SerializedRequest::MergeFrom(const ::google::protobuf::Message&) { +void SerializedResponse::MergeFrom(const ::google::protobuf::Message&) { CHECK(false) << "You're not supposed to call " << __FUNCTION__; } -void SerializedRequest::MergeFrom(const SerializedRequest&) { +void SerializedResponse::MergeFrom(const SerializedResponse&) { CHECK(false) << "You're not supposed to call " << __FUNCTION__; } -void SerializedRequest::CopyFrom(const ::google::protobuf::Message& from) { +void SerializedResponse::CopyFrom(const ::google::protobuf::Message& from) { if (&from == this) return; - const SerializedRequest* source = dynamic_cast<const SerializedRequest*>(&from); + const SerializedResponse* source = dynamic_cast<const SerializedResponse*>(&from); if (source == NULL) { - CHECK(false) << "SerializedRequest can only CopyFrom SerializedRequest"; + CHECK(false) << "SerializedResponse can only CopyFrom SerializedResponse"; } else { _serialized = source->_serialized; } } -void SerializedRequest::CopyFrom(const SerializedRequest& from) { +void SerializedResponse::CopyFrom(const SerializedResponse& from) { if (&from == this) return; _serialized = from._serialized; } -bool SerializedRequest::IsInitialized() const { +bool SerializedResponse::IsInitialized() const { // Always true because it's already serialized. return true; } -void SerializedRequest::Swap(SerializedRequest* other) { +void SerializedResponse::Swap(SerializedResponse* other) { if (other != this) { _serialized.swap(other->_serialized); } } -::google::protobuf::Metadata SerializedRequest::GetMetadata() const { +::google::protobuf::Metadata SerializedResponse::GetMetadata() const { ::google::protobuf::Metadata metadata; - metadata.descriptor = SerializedRequest::descriptor(); + metadata.descriptor = SerializedResponse::descriptor(); metadata.reflection = NULL; return metadata; } diff --git a/src/brpc/serialized_response.h b/src/brpc/serialized_response.h new file mode 100644 index 00000000..4e7d86e7 --- /dev/null +++ b/src/brpc/serialized_response.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef BRPC_SERIALIZED_RESPONSE_H +#define BRPC_SERIALIZED_RESPONSE_H + +#include <google/protobuf/message.h> +#include "butil/iobuf.h" +#include "brpc/proto_base.pb.h" +#include "brpc/pb_compat.h" + +namespace brpc { + +class SerializedResponse : public ::google::protobuf::Message { +public: + SerializedResponse(); + virtual ~SerializedResponse(); + + SerializedResponse(const SerializedResponse& from); + + inline SerializedResponse& operator=(const SerializedResponse& from) { + CopyFrom(from); + return *this; + } + + static const ::google::protobuf::Descriptor* descriptor(); + + void Swap(SerializedResponse* other); + + // implements Message ---------------------------------------------- + + SerializedResponse* New() const PB_319_OVERRIDE; +#if GOOGLE_PROTOBUF_VERSION >= 3006000 + SerializedResponse* New(::google::protobuf::Arena* arena) const override; +#endif + void CopyFrom(const ::google::protobuf::Message& from) PB_321_OVERRIDE; + void CopyFrom(const SerializedResponse& from); + void Clear() override; + bool IsInitialized() const override; + int ByteSize() const; + int GetCachedSize() const override { return (int)_serialized.size(); } + butil::IOBuf& serialized_data() { return _serialized; } + const butil::IOBuf& serialized_data() const { return _serialized; } + +protected: + ::google::protobuf::Metadata GetMetadata() const override; + +private: + bool MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream* input) PB_310_OVERRIDE; + void SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream* output) const PB_310_OVERRIDE; + ::google::protobuf::uint8* SerializeWithCachedSizesToArray( + ::google::protobuf::uint8* output) const PB_310_OVERRIDE; + void MergeFrom(const ::google::protobuf::Message& from) override; + void MergeFrom(const SerializedResponse& from); + void SharedCtor(); + void SharedDtor(); + void SetCachedSize(int size) const override; + +private: + butil::IOBuf _serialized; +}; + +} // namespace brpc + + +#endif // BRPC_SERIALIZED_RESPONSE_H diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 51fb1d16..399f348d 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -77,6 +77,7 @@ #include "brpc/builtin/common.h" // GetProgramName #include "brpc/details/tcmalloc_extension.h" #include "brpc/rdma/rdma_helper.h" +#include "brpc/baidu_master_service.h" inline std::ostream& operator<<(std::ostream& os, const timeval& tm) { const char old_fill = os.fill(); @@ -145,6 +146,7 @@ ServerOptions::ServerOptions() , has_builtin_services(true) , force_ssl(false) , use_rdma(false) + , baidu_master_service(NULL) , http_master_service(NULL) , health_reporter(NULL) , rtmp_service(NULL) @@ -338,6 +340,9 @@ void* Server::UpdateDerivedVars(void* arg) { it->second.status->Expose(mprefix); } } + if (server->options().baidu_master_service) { + server->options().baidu_master_service->Expose(prefix); + } if (server->options().nshead_service) { server->options().nshead_service->Expose(prefix); } @@ -2240,6 +2245,12 @@ AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_ return options().thrift_service->_max_concurrency; } #endif + if (full_method_name == butil::class_name_str<BaiduMasterService>()) { + if (NULL == options().baidu_master_service) { + break; + } + return options().baidu_master_service->_max_concurrency; + } MethodProperty* mp = _method_map.seek(full_method_name); if (mp == NULL) { diff --git a/src/brpc/server.h b/src/brpc/server.h index 5bc518ef..c9459c23 100644 --- a/src/brpc/server.h +++ b/src/brpc/server.h @@ -43,6 +43,7 @@ #include "brpc/redis.h" #include "brpc/interceptor.h" #include "brpc/concurrency_limiter.h" +#include "brpc/baidu_master_service.h" namespace brpc { @@ -225,6 +226,8 @@ struct ServerOptions { // Default: false bool use_rdma; + BaiduMasterService* baidu_master_service; + // [CAUTION] This option is for implementing specialized http proxies, // most users don't need it. Don't change this option unless you fully // understand the description below. @@ -235,7 +238,7 @@ struct ServerOptions { // and response must have no fields. // // Owned by Server and deleted in server's destructor - google::protobuf::Service* http_master_service; + ::google::protobuf::Service* http_master_service; // If this field is on, contents on /health page is generated by calling // health_reporter->GenerateReport(). This object is NOT owned by server diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h index ec662b46..1f2da79a 100644 --- a/src/butil/memory/scope_guard.h +++ b/src/butil/memory/scope_guard.h @@ -78,7 +78,7 @@ ScopeGuard<Callback> MakeScopeGuard(Callback&& callback) noexcept { } namespace internal { -// for BAIDU_SCOPE_EXIT. +// for BRPC_SCOPE_EXIT. enum class ScopeExitHelper {}; template<typename Callback> diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp index 8a8a76d8..cc98f111 100644 --- a/test/brpc_server_unittest.cpp +++ b/test/brpc_server_unittest.cpp @@ -51,6 +51,7 @@ #include "brpc/channel.h" #include "brpc/socket_map.h" #include "brpc/controller.h" +#include "brpc/compress.h" #include "echo.pb.h" #include "v1.pb.h" #include "v2.pb.h" @@ -1656,4 +1657,114 @@ TEST_F(ServerTest, user_fields) { ASSERT_EQ(*val, EXP_USER_FIELD_VALUE); } +class BaiduMasterServiceImpl : public brpc::BaiduMasterService { +public: + void ProcessRpcRequest(brpc::Controller* cntl, + const brpc::SerializedRequest* request, + brpc::SerializedResponse* response, + ::google::protobuf::Closure* done) override { + // This object helps you to call done->Run() in RAII style. If you need + // to process the request asynchronously, pass done_guard.release(). + brpc::ClosureGuard done_guard(done); + ASSERT_NE(nullptr, cntl->sampled_request()); + ASSERT_TRUE(cntl->sampled_request()->meta.has_service_name()); + ASSERT_EQ(test::EchoService::descriptor()->full_name(), + cntl->sampled_request()->meta.service_name()); + ASSERT_TRUE(cntl->sampled_request()->meta.has_method_name()); + ASSERT_EQ("Echo", cntl->sampled_request()->meta.method_name()); + test::EchoRequest echo_request; + test::EchoResponse echo_response; + brpc::CompressType type = cntl->request_compress_type(); + ASSERT_TRUE(brpc::ParseFromCompressedData( + request->serialized_data(), &echo_request, type)); + ASSERT_EQ(EXP_REQUEST, echo_request.message()); + ASSERT_EQ(EXP_REQUEST, cntl->request_attachment().to_string()); + + echo_response.set_message(EXP_RESPONSE); + butil::IOBuf compressed_data; + ASSERT_TRUE(brpc::SerializeAsCompressedData( + echo_response, &response->serialized_data(), type)); + cntl->set_response_compress_type(type); + cntl->response_attachment().append(EXP_RESPONSE); + } +}; + +TEST_F(ServerTest, baidu_master_service) { + butil::EndPoint ep; + ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); + brpc::Server server; + EchoServiceImpl service; + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + brpc::ServerOptions opt; + opt.baidu_master_service = new BaiduMasterServiceImpl; + ASSERT_EQ(0, server.Start(ep, &opt)); + + brpc::Channel chan; + brpc::ChannelOptions copt; + copt.protocol = "baidu_std"; + ASSERT_EQ(0, chan.Init(ep, &copt)); + brpc::Controller cntl; + test::EchoRequest req; + test::EchoResponse res; + req.set_message(EXP_REQUEST); + cntl.request_attachment().append(EXP_REQUEST); + cntl.set_request_compress_type(brpc::COMPRESS_TYPE_GZIP); + test::EchoService_Stub stub(&chan); + stub.Echo(&cntl, &req, &res, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + ASSERT_EQ(EXP_RESPONSE, res.message()); + ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string()); + + ASSERT_EQ(0, server.Stop(0)); + ASSERT_EQ(0, server.Join()); +} + + +TEST_F(ServerTest, generic_call) { + butil::EndPoint ep; + ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); + brpc::Server server; + EchoServiceImpl service; + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + brpc::ServerOptions opt; + opt.baidu_master_service = new BaiduMasterServiceImpl; + ASSERT_EQ(0, server.Start(ep, &opt)); + + { + brpc::Channel chan; + brpc::ChannelOptions copt; + copt.protocol = "baidu_std"; + ASSERT_EQ(0, chan.Init(ep, &copt)); + brpc::Controller cntl; + test::EchoRequest req; + test::EchoResponse res; + req.set_message(EXP_REQUEST); + + brpc::SerializedResponse serialized_response; + brpc::SerializedRequest serialized_request; + brpc::CompressType type = brpc::COMPRESS_TYPE_GZIP; + ASSERT_TRUE(brpc::SerializeAsCompressedData( + req, &serialized_request.serialized_data(), type)); + cntl.request_attachment().append(EXP_REQUEST); + cntl.set_request_compress_type(type); + auto sampled_request = new (std::nothrow) brpc::SampledRequest(); + sampled_request->meta.set_service_name( + test::EchoService::descriptor()->full_name()); + sampled_request->meta.set_method_name( + test::EchoService::descriptor()->FindMethodByName("Echo")->name()); + cntl.reset_sampled_request(sampled_request); + chan.CallMethod(NULL, &cntl, &serialized_request, &serialized_response, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + + ASSERT_TRUE(brpc::ParseFromCompressedData(serialized_response.serialized_data(), + &res, cntl.response_compress_type())) + << serialized_response.serialized_data().size(); + ASSERT_EQ(EXP_RESPONSE, res.message()); + ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string()); + } + + ASSERT_EQ(0, server.Stop(0)); + ASSERT_EQ(0, server.Join()); +} + } //namespace diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp index e0da1af1..14d150a7 100644 --- a/test/endpoint_unittest.cpp +++ b/test/endpoint_unittest.cpp @@ -494,7 +494,7 @@ TEST(EndPointTest, tcp_connect) { } { butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1)); - ASSERT_EQ(-1, sockfd); + ASSERT_EQ(-1, sockfd) << "errno=" << errno; ASSERT_EQ(ETIMEDOUT, errno); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org