This is an automated email from the ASF dual-hosted git repository. guangmingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 7e5ec4fe Batch create and accept stream (#2754) 7e5ec4fe is described below commit 7e5ec4fe8df1df135af16a26e4d4e4a24e9a0651 Author: Jenrry You <jenrry...@gmail.com> AuthorDate: Wed Oct 9 10:39:42 2024 +0800 Batch create and accept stream (#2754) * feat: batch create and accept stream * fix protobuf 22.5 compilation error related to thread_local in MacOS * refine style * modify code based on the code review feedback and add more tests modify code based on the code review feedback and add more tests --- .github/workflows/ci-macos.yml | 16 +-- docs/cn/streaming_rpc.md | 17 ++- docs/en/streaming_rpc.md | 16 ++- example/streaming_batch_echo_c++/CMakeLists.txt | 140 +++++++++++++++++++++ example/streaming_batch_echo_c++/client.cpp | 118 +++++++++++++++++ .../streaming_batch_echo_c++/echo.proto | 37 ++---- example/streaming_batch_echo_c++/server.cpp | 122 ++++++++++++++++++ src/brpc/channel.cpp | 2 +- src/brpc/controller.cpp | 44 +++++-- src/brpc/controller.h | 6 +- src/brpc/details/controller_private_accessor.h | 4 +- src/brpc/policy/baidu_rpc_protocol.cpp | 64 +++++++--- src/brpc/stream.cpp | 113 ++++++++++++++--- src/brpc/stream.h | 15 +++ src/brpc/stream_impl.h | 5 +- src/brpc/streaming_rpc_meta.proto | 1 + 16 files changed, 627 insertions(+), 93 deletions(-) diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml index 685d4e6b..50ab2e02 100644 --- a/.github/workflows/ci-macos.yml +++ b/.github/workflows/ci-macos.yml @@ -57,7 +57,7 @@ jobs: cd build make -j ${{env.proc_num}} - compile-with-make-protobuf22: + compile-with-make-protobuf23: runs-on: macos-latest # https://github.com/actions/runner-images steps: @@ -67,9 +67,9 @@ jobs: run: | brew install openssl gnu-getopt coreutils gflags leveldb # abseil 20230125.3 - curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/abseil.rb - # protobuf 22.5 - curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/protobuf.rb + curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/abseil.rb + # protobuf 23.3 + curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/protobuf.rb HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK=1 brew install --formula --ignore-dependencies ./abseil.rb ./protobuf.rb - name: config_brpc @@ -82,7 +82,7 @@ jobs: run: | make -j ${{env.proc_num}} - compile-with-cmake-protobuf22: + compile-with-cmake-protobuf23: runs-on: macos-latest steps: @@ -92,9 +92,9 @@ jobs: run: | brew install openssl gflags leveldb # abseil 20230125.3 - curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/abseil.rb - # protobuf 22.5 - curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/protobuf.rb + curl -o abseil.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/abseil.rb + # protobuf 23.3 + curl -o protobuf.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/protobuf.rb HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK=1 brew install --formula --ignore-dependencies ./abseil.rb ./protobuf.rb - name: cmake diff --git a/docs/cn/streaming_rpc.md b/docs/cn/streaming_rpc.md index 1480b83b..c9f9ab36 100644 --- a/docs/cn/streaming_rpc.md +++ b/docs/cn/streaming_rpc.md @@ -23,7 +23,7 @@ Streaming RPC保证: # 建立Stream -目前Stream都由Client端建立。Client先在本地创建一个Stream,再通过一次RPC(必须使用baidu_std协议)与指定的Service建立一个Stream,如果Service在收到请求之后选择接受这个Stream, 那在response返回Client后Stream就会建立成功。过程中的任何错误都把RPC标记为失败,同时也意味着Stream创建失败。用linux下建立连接的过程打比方,Client先创建[socket](http://linux.die.net/man/7/socket)(创建Stream),再调用[connect](http://linux.die.net/man/2/connect)尝试与远端建立连接(通过RPC建立Stream),远端[accept](http://linux.die.net/man/2/accept)后连接就建立了(service接受后创建成功)。 +目前Stream都由Client端建立。Client先在本地创建一个或者多个Stream,再通过一次RPC(必须使用baidu_std协议)与指定的Service建立一个Stream,如果Service在收到请求之后选择接受这批Stream, 那在response返回Client后这批Stream就会建立成功。过程中的任何错误都把RPC标记为失败,同时也意味着Stream创建失败。用linux下建立连接的过程打比方,Client先创建[socket](http://linux.die.net/man/7/socket)(创建Stream),再调用[connect](http://linux.die.net/man/2/connect)尝试与远端建立连接(通过RPC建立Stream),远端[accept](http://linux.die.net/man/2/accept)后连接就建立了(service接受后创建成功)。 > 如果Client尝试向不支持Streaming RPC的老Server建立Stream,将总是失败。 @@ -58,11 +58,18 @@ struct StreamOptions // NULL, the Stream will be created with default options // Return 0 on success, -1 otherwise int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options); + +// [Called at the client side for creating multiple streams] +// Create streams at client-side along with the |cntl|, which will be connected +// when receiving the response with streams from server-side. If |options| is +// NULL, the stream will be created with default options +// Return 0 on success, -1 otherwise +int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller& cntl, const StreamOptions* options); ``` # 接受Stream -如果client在RPC上附带了一个Stream, service在收到RPC后可以通过调用StreamAccept接受。接受后Server端对应产生的Stream存放在response_stream中,Server可通过这个Stream向Client发送数据。 +如果client在RPC上附带了一个或者多个Stream, service在收到RPC后可以通过调用StreamAccept接受。接受后Server端对应产生的Stream存放在response_stream中,Server可通过这个Stream向Client发送数据。 ```c++ // [Called at the server side] @@ -70,6 +77,12 @@ int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions // (cntl.has_remote_stream() returns false), this method would fail. // Return 0 on success, -1 otherwise. int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options); + +// [Called at the server side for accepting multiple streams] +// Accept the streams. If client didn't create streams with the request +// (cntl.has_remote_stream() returns false), this method would fail. +// Return 0 on success, -1 otherwise. +int StreamAccept(StreamIds& response_stream, Controller& cntl, const StreamOptions* options); ``` # 读取Stream diff --git a/docs/en/streaming_rpc.md b/docs/en/streaming_rpc.md index 80302015..8e067148 100644 --- a/docs/en/streaming_rpc.md +++ b/docs/en/streaming_rpc.md @@ -23,7 +23,7 @@ For examples please refer to [example/streaming_echo_c++](https://github.com/apa # Create a Stream -Currently stream is established by the client only. A new Stream object is created in client and then is used to issue an RPC (through baidu_std protocol) to the specified service. The service could accept this stream by responding to the request without error, thus a Stream is created once the client receives the response successfully. Any error during this process fails the RPC and thus fails the Stream creation. Take the Linux environment as an example, the client creates a [socket](h [...] +Currently streams are established by the client only. The new Stream objects are created in client and then are used to issue an RPC (through baidu_std protocol) to the specified service. The service could accept these streams by responding to the request without error, thus the Streams are created once the client receives the response successfully. Any error during this process fails the RPC and thus fails the Stream creation. Take the Linux environment as an example, the client creates [...] > If the client tries to establish a stream to a server that doesn't support > streaming RPC, it will always return failure. @@ -58,6 +58,14 @@ struct StreamOptions // NULL, the Stream will be created with default options // Return 0 on success, -1 otherwise int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options); + +// [Called at the client side for creating multiple streams] +// Create streams at client-side along with the |cntl|, which will be connected +// when receiving the response with streams from server-side. If |options| is +// NULL, the stream will be created with default options +// Return 0 on success, -1 otherwise +int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller& cntl, + const StreamOptions* options); ``` # Accept a Stream @@ -70,6 +78,12 @@ If a Stream is attached inside the request of an RPC, the service can accept the // (cntl.has_remote_stream() returns false), this method would fail. // Return 0 on success, -1 otherwise. int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options); + +// [Called at the server side for accepting multiple streams] +// Accept the streams. If client didn't create streams with the request +// (cntl.has_remote_stream() returns false), this method would fail. +// Return 0 on success, -1 otherwise. +int StreamAccept(StreamIds& response_stream, Controller& cntl, const StreamOptions* options); ``` # Read from a Stream diff --git a/example/streaming_batch_echo_c++/CMakeLists.txt b/example/streaming_batch_echo_c++/CMakeLists.txt new file mode 100644 index 00000000..1d152ef9 --- /dev/null +++ b/example/streaming_batch_echo_c++/CMakeLists.txt @@ -0,0 +1,140 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 2.8.10) +project(streaming_batch_echo_c++ C CXX) + +option(LINK_SO "Whether examples are linked dynamically" OFF) + +execute_process( + COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +include(FindThreads) +include(FindProtobuf) +protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto) +# include PROTO_HEADER +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +# Search for libthrift* by best effort. If it is not found and brpc is +# compiled with thrift protocol enabled, a link error would be reported. +find_library(THRIFT_LIB NAMES thrift) +if (NOT THRIFT_LIB) + set(THRIFT_LIB "") +endif() + +find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) +if(LINK_SO) + find_library(BRPC_LIB NAMES brpc) +else() + find_library(BRPC_LIB NAMES libbrpc.a brpc) +endif() +if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) + message(FATAL_ERROR "Fail to find brpc") +endif() +include_directories(${BRPC_INCLUDE_PATH}) + +find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) +find_library(GFLAGS_LIBRARY NAMES gflags libgflags) +if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) + message(FATAL_ERROR "Fail to find gflags") +endif() +include_directories(${GFLAGS_INCLUDE_PATH}) + +execute_process( + COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS +) +if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE") + execute_process( + COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS + ) +endif() +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + include(CheckFunctionExists) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + if(NOT HAVE_CLOCK_GETTIME) + set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") + endif() +endif() + +set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}") +set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(OPENSSL_ROOT_DIR + "/usr/local/opt/openssl" # Homebrew installed OpenSSL + ) +endif() + +find_package(OpenSSL) +include_directories(${OPENSSL_INCLUDE_DIR}) + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${GFLAGS_LIBRARY} + ${PROTOBUF_LIBRARIES} + ${LEVELDB_LIB} + ${OPENSSL_CRYPTO_LIBRARY} + ${OPENSSL_SSL_LIBRARY} + ${THRIFT_LIB} + dl +) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(DYNAMIC_LIB ${DYNAMIC_LIB} + pthread + "-framework CoreFoundation" + "-framework CoreGraphics" + "-framework CoreData" + "-framework CoreText" + "-framework Security" + "-framework Foundation" + "-Wl,-U,_MallocExtension_ReleaseFreeMemory" + "-Wl,-U,_ProfilerStart" + "-Wl,-U,_ProfilerStop" + "-Wl,-U,__Z13GetStackTracePPvii") +endif() + +add_executable(streaming_batch_echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER}) +add_executable(streaming_batch_echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) + +target_link_libraries(streaming_batch_echo_client ${BRPC_LIB} ${DYNAMIC_LIB}) +target_link_libraries(streaming_batch_echo_server ${BRPC_LIB} ${DYNAMIC_LIB}) diff --git a/example/streaming_batch_echo_c++/client.cpp b/example/streaming_batch_echo_c++/client.cpp new file mode 100644 index 00000000..ea58a191 --- /dev/null +++ b/example/streaming_batch_echo_c++/client.cpp @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A client sending requests to server in batch every 1 second. + +#include <gflags/gflags.h> +#include <butil/logging.h> +#include <brpc/channel.h> +#include <brpc/stream.h> +#include "echo.pb.h" + +DEFINE_bool(send_attachment, true, "Carry attachment along with requests"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(server, "0.0.0.0:8001", "IP Address of server"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); + +class StreamClientReceiver : public brpc::StreamInputHandler { +public: + virtual int on_received_messages(brpc::StreamId id, + butil::IOBuf *const messages[], + size_t size) { + std::ostringstream os; + for (size_t i = 0; i < size; ++i) { + os << "msg[" << i << "]=" << *messages[i]; + } + LOG(INFO) << "Received from Stream=" << id << ": " << os.str(); + return 0; + } + virtual void on_idle_timeout(brpc::StreamId id) { + LOG(INFO) << "Stream=" << id << " has no data transmission for a while"; + } + virtual void on_closed(brpc::StreamId id) { + LOG(INFO) << "Stream=" << id << " is closed"; + } + + virtual void on_finished(brpc::StreamId id, int32_t finish_code) { + LOG(INFO) << "Stream=" << id << " is finished, code " << finish_code; + } +}; + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_BAIDU_STD; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; + options.max_retry = FLAGS_max_retry; + if (channel.Init(FLAGS_server.c_str(), NULL) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + // Normally, you should not call a Channel directly, but instead construct + // a stub Service wrapping it. stub can be shared by all threads as well. + example::EchoService_Stub stub(&channel); + StreamClientReceiver receiver; + brpc::Controller cntl; + brpc::StreamIds streams; + brpc::StreamOptions stream_options; + stream_options.handler = &receiver; + if (brpc::StreamCreate(streams, 3, cntl, &stream_options) != 0) { + LOG(ERROR) << "Fail to create stream"; + return -1; + } + for(size_t i = 0; i < streams.size(); ++i) { + LOG(INFO) << "Created Stream=" << streams[i]; + } + example::EchoRequest request; + example::EchoResponse response; + request.set_message("I'm a RPC to connect stream"); + stub.Echo(&cntl, &request, &response, NULL); + if (cntl.Failed()) { + LOG(ERROR) << "Fail to connect stream, " << cntl.ErrorText(); + return -1; + } + + while (!brpc::IsAskedToQuit()) { + butil::IOBuf msg1; + msg1.append("abcdefghijklmnopqrstuvwxyz"); + CHECK_EQ(0, brpc::StreamWrite(streams[0], msg1)); + butil::IOBuf msg2; + msg2.append("0123456789"); + CHECK_EQ(0, brpc::StreamWrite(streams[1], msg2)); + sleep(1); + butil::IOBuf msg3; + msg3.append("hello world"); + CHECK_EQ(0, brpc::StreamWrite(streams[2], msg3)); + sleep(1); + } + + CHECK_EQ(0, brpc::StreamClose(streams[0])); + CHECK_EQ(0, brpc::StreamClose(streams[1])); + CHECK_EQ(0, brpc::StreamClose(streams[2])); + LOG(INFO) << "EchoClient is going to quit"; + return 0; +} diff --git a/src/brpc/streaming_rpc_meta.proto b/example/streaming_batch_echo_c++/echo.proto similarity index 54% copy from src/brpc/streaming_rpc_meta.proto copy to example/streaming_batch_echo_c++/echo.proto index 05d83217..2b39627f 100644 --- a/src/brpc/streaming_rpc_meta.proto +++ b/example/streaming_batch_echo_c++/echo.proto @@ -16,33 +16,18 @@ // under the License. syntax="proto2"; +package example; -package brpc; -option java_package="com.brpc"; -option java_outer_classname="StreamingRpcProto"; +option cc_generic_services = true; -message StreamSettings { - required int64 stream_id = 1; - optional bool need_feedback = 2 [default = false]; - optional bool writable = 3 [default = false]; -} +message EchoRequest { + required string message = 1; +}; -enum FrameType { - FRAME_TYPE_UNKNOWN = 0; - FRAME_TYPE_RST = 1; - FRAME_TYPE_CLOSE = 2; - FRAME_TYPE_DATA = 3; - FRAME_TYPE_FEEDBACK= 4; -} +message EchoResponse { + required string message = 1; +}; -message StreamFrameMeta { - required int64 stream_id = 1; - optional int64 source_stream_id = 2; - optional FrameType frame_type = 3; - optional bool has_continuation = 4; - optional Feedback feedback = 5; -} - -message Feedback { - optional int64 consumed_size = 1; -} +service EchoService { + rpc Echo(EchoRequest) returns (EchoResponse); +}; diff --git a/example/streaming_batch_echo_c++/server.cpp b/example/streaming_batch_echo_c++/server.cpp new file mode 100644 index 00000000..3690344a --- /dev/null +++ b/example/streaming_batch_echo_c++/server.cpp @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A server to receive EchoRequest and send back EchoResponse. + +#include <gflags/gflags.h> +#include <butil/logging.h> +#include <brpc/server.h> +#include "echo.pb.h" +#include <brpc/stream.h> + +DEFINE_bool(send_attachment, true, "Carry attachment along with response"); +DEFINE_int32(port, 8001, "TCP Port of this server"); +DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); + +class StreamReceiver : public brpc::StreamInputHandler { +public: + virtual int on_received_messages(brpc::StreamId id, + butil::IOBuf *const messages[], + size_t size) { + std::ostringstream os; + for (size_t i = 0; i < size; ++i) { + os << "msg[" << i << "]=" << *messages[i]; + } + auto res = brpc::StreamWrite(id, *messages[0]); + LOG(INFO) << "Received from Stream=" << id << ": " << os.str() << " and write back result: " << res; + return 0; + } + virtual void on_idle_timeout(brpc::StreamId id) { + LOG(INFO) << "Stream=" << id << " has no data transmission for a while"; + } + virtual void on_closed(brpc::StreamId id) { + LOG(INFO) << "Stream=" << id << " is closed"; + } + + virtual void on_finished(brpc::StreamId id, int32_t finish_code) { + LOG(INFO) << "Stream=" << id << " is finished, code " << finish_code; + } +}; + +// Your implementation of example::EchoService +class StreamingBatchEchoService : public example::EchoService { +public: + virtual ~StreamingBatchEchoService() { + closeStreams(); + }; + virtual void Echo(google::protobuf::RpcController* controller, + const example::EchoRequest* /*request*/, + example::EchoResponse* response, + google::protobuf::Closure* done) { + // This object helps you to call done->Run() in RAII style. If you need + // to process the request asynchronously, pass done_guard.release(). + brpc::ClosureGuard done_guard(done); + closeStreams(); + brpc::Controller* cntl = + static_cast<brpc::Controller*>(controller); + brpc::StreamOptions stream_options; + stream_options.handler = &_receiver; + if (brpc::StreamAccept(_sds, *cntl, &stream_options) != 0) { + cntl->SetFailed("Fail to accept stream"); + return; + } + response->set_message("Accepted stream"); + } + +private: + void closeStreams() { + for(auto i = 0; i < _sds.size(); ++i) { + brpc::StreamClose(_sds[i]); + } + _sds.clear(); + } + StreamReceiver _receiver; + brpc::StreamIds _sds; +}; + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // Generally you only need one Server. + brpc::Server server; + + // Instance of your service. + StreamingBatchEchoService echo_service_impl; + + // Add the service into server. Notice the second parameter, because the + // service is put on stack, we don't want server to delete it, otherwise + // use brpc::SERVER_OWNS_SERVICE. + if (server.AddService(&echo_service_impl, + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + + // Start the server. + brpc::ServerOptions options; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + if (server.Start(FLAGS_port, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server. + server.RunUntilAskedToQuit(); + return 0; +} diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index c15611ea..ab07ad3b 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -535,7 +535,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, return cntl->HandleSendFailed(); } - if (cntl->_request_stream != INVALID_STREAM_ID) { + if (!cntl->_request_streams.empty()) { // Currently we cannot handle retry and backup request correctly cntl->set_max_retry(0); cntl->set_backup_request_ms(-1); diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index afebb3c2..271ccfa4 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -290,8 +290,8 @@ void Controller::ResetPods() { _http_response = NULL; _request_user_fields = NULL; _response_user_fields = NULL; - _request_stream = INVALID_STREAM_ID; - _response_stream = INVALID_STREAM_ID; + _request_streams.clear(); + _response_streams.clear(); _remote_stream_settings = NULL; _auth_flags = 0; } @@ -1398,34 +1398,54 @@ void* Controller::session_local_data() { } void Controller::HandleStreamConnection(Socket *host_socket) { - if (_request_stream == INVALID_STREAM_ID) { + if (_request_streams.empty()) { CHECK(!has_remote_stream()); return; } - SocketUniquePtr ptr; + size_t stream_num = _request_streams.size(); + std::vector<SocketUniquePtr> ptrs(stream_num); if (!FailedInline()) { - if (Socket::Address(_request_stream, &ptr) != 0) { - if (!FailedInline()) { - SetFailed(EREQUEST, "Request stream=%" PRIu64 " was closed before responded", - _request_stream); - } - } else if (_remote_stream_settings == NULL) { + if (_remote_stream_settings == NULL) { if (!FailedInline()) { SetFailed(EREQUEST, "The server didn't accept the stream"); } + } else { + for (size_t i = 0; i < stream_num; ++i) { + if (Socket::Address(_request_streams[i], &ptrs[i]) != 0) { + if (!FailedInline()) { + SetFailed(EREQUEST, "Request stream=%" PRIu64 " was closed before responded", + _request_streams[i]); + break; + } + } + } } } if (FailedInline()) { - Stream::SetFailed(_request_stream, _error_code, + Stream::SetFailed(_request_streams, _error_code, "%s", _error_text.c_str()); if (_remote_stream_settings != NULL) { policy::SendStreamRst(host_socket, _remote_stream_settings->stream_id()); + for (int i = 0; i < _remote_stream_settings->extra_stream_ids().size(); ++i) { + policy::SendStreamRst(host_socket, + _remote_stream_settings->extra_stream_ids()[i]); + } } return; } - Stream* s = (Stream*)ptr->conn(); + Stream* s = (Stream*)ptrs[0]->conn(); s->SetConnected(_remote_stream_settings); + if (stream_num > 1) { + const auto& extra_stream_ids = _remote_stream_settings->extra_stream_ids(); + _remote_stream_settings->clear_extra_stream_ids(); + for (size_t i = 1; i < stream_num; ++i) { + Stream* extra_stream = (Stream *) ptrs[i]->conn(); + _remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]); + s->ShareHostSocket(*extra_stream); + extra_stream->SetConnected(_remote_stream_settings); + } + } } // TODO: Need more security advices from professionals. diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 9b3c0201..2a4ec6b5 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -123,7 +123,9 @@ friend class schan::Sender; friend class schan::SubDone; friend class policy::OnServerStreamCreated; friend int StreamCreate(StreamId*, Controller&, const StreamOptions*); +friend int StreamCreate(StreamIds&, int, Controller&, const StreamOptions*); friend int StreamAccept(StreamId*, Controller&, const StreamOptions*); +friend int StreamAccept(StreamIds&, Controller&, const StreamOptions*); friend void policy::ProcessMongoRequest(InputMessageBase*); friend void policy::ProcessThriftRequest(InputMessageBase*); // << Flags >> @@ -866,9 +868,9 @@ private: // TODO: Replace following fields with StreamCreator // Defined at client side - StreamId _request_stream; + StreamIds _request_streams; // Defined at server side - StreamId _response_stream; + StreamIds _response_streams; // Defined at both sides StreamSettings *_remote_stream_settings; diff --git a/src/brpc/details/controller_private_accessor.h b/src/brpc/details/controller_private_accessor.h index 1be7df8b..db40ca15 100644 --- a/src/brpc/details/controller_private_accessor.h +++ b/src/brpc/details/controller_private_accessor.h @@ -119,8 +119,8 @@ public: return _cntl->_remote_stream_settings; } - StreamId request_stream() { return _cntl->_request_stream; } - StreamId response_stream() { return _cntl->_response_stream; } + StreamIds request_streams() { return _cntl->_request_streams; } + StreamIds response_streams() { return _cntl->_response_streams; } void set_method(const google::protobuf::MethodDescriptor* method) { _cntl->_method = method; } diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 504895b1..cc2dcbd2 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -216,10 +216,12 @@ void SendRpcResponse(int64_t correlation_id, ClosureGuard guard(brpc::NewCallback( cntl, &Controller::CallAfterRpcResp, req, res)); - StreamId response_stream_id = accessor.response_stream(); + StreamIds response_stream_ids = accessor.response_streams(); if (cntl->IsCloseConnection()) { - StreamClose(response_stream_id); + for(size_t i = 0; i < response_stream_ids.size(); ++i) { + StreamClose(response_stream_ids[i]); + } sock->SetFailed(); return; } @@ -261,12 +263,18 @@ void SendRpcResponse(int64_t correlation_id, if (attached_size > 0) { meta.set_attachment_size(attached_size); } + StreamId response_stream_id = INVALID_STREAM_ID; SocketUniquePtr stream_ptr; - if (response_stream_id != INVALID_STREAM_ID) { + if (!response_stream_ids.empty()) { + response_stream_id = response_stream_ids[0]; if (Socket::Address(response_stream_id, &stream_ptr) == 0) { - Stream* s = (Stream*)stream_ptr->conn(); - s->FillSettings(meta.mutable_stream_settings()); + Stream* s = (Stream *) stream_ptr->conn(); + StreamSettings *stream_settings = meta.mutable_stream_settings(); + s->FillSettings(stream_settings); s->SetHostSocket(sock); + for (size_t i = 1; i < response_stream_ids.size(); ++i) { + stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]); + } } else { LOG(WARNING) << "Stream=" << response_stream_id << " was closed before sending response"; @@ -302,24 +310,35 @@ void SendRpcResponse(int64_t correlation_id, // Response_stream can be INVALID_STREAM_ID when error occurs. if (SendStreamData(sock, &res_buf, accessor.remote_stream_settings()->stream_id(), - accessor.response_stream()) != 0) { + response_stream_id) != 0) { const int errcode = errno; std::string error_text = butil::string_printf(64, "Fail to write into %s", sock->description().c_str()); PLOG_IF(WARNING, errcode != EPIPE) << error_text; cntl->SetFailed(errcode, "%s", error_text.c_str()); - if(stream_ptr) { - ((Stream*)stream_ptr->conn())->Close(errcode, "%s", - error_text.c_str()); - } + Stream::SetFailed(response_stream_ids, errcode, "%s", + error_text.c_str()); return; } + // Now it's ok the mark these server-side streams as connected as all the + // written user data would follower the RPC response. + // Reuse stream_ptr to avoid address first stream id again if(stream_ptr) { - // Now it's ok the mark this server-side stream as connected as all the - // written user data would follower the RPC response. ((Stream*)stream_ptr->conn())->SetConnected(); } + for (size_t i = 1; i < response_stream_ids.size(); ++i) { + StreamId extra_stream_id = response_stream_ids[i]; + SocketUniquePtr extra_stream_ptr; + if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) { + Stream* extra_stream = (Stream *) extra_stream_ptr->conn(); + extra_stream->SetHostSocket(sock); + extra_stream->SetConnected(); + } else { + LOG(WARNING) << "Stream=" << extra_stream_id + << " was closed before sending response"; + } + } } else{ // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. @@ -715,7 +734,11 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { LOG_IF(ERROR, rc != EINVAL && rc != EPERM) << "Fail to lock correlation_id=" << cid << ": " << berror(rc); if (remote_stream_id != INVALID_STREAM_ID) { - SendStreamRst(msg->socket(), meta.stream_settings().stream_id()); + SendStreamRst(msg->socket(), remote_stream_id); + const auto & extra_stream_ids = meta.stream_settings().extra_stream_ids(); + for (int i = 0; i < extra_stream_ids.size(); ++i) { + policy::SendStreamRst(msg->socket(), extra_stream_ids[i]); + } } return; } @@ -825,15 +848,20 @@ void PackRpcRequest(butil::IOBuf* req_buf, request_meta->set_request_id(cntl->request_id()); } meta.set_correlation_id(correlation_id); - StreamId request_stream_id = accessor.request_stream(); - if (request_stream_id != INVALID_STREAM_ID) { + StreamIds request_stream_ids = accessor.request_streams(); + if (!request_stream_ids.empty()) { + StreamSettings* stream_settings = meta.mutable_stream_settings(); + StreamId request_stream_id = request_stream_ids[0]; SocketUniquePtr ptr; if (Socket::Address(request_stream_id, &ptr) != 0) { - return cntl->SetFailed(EREQUEST, "Stream=%" PRIu64 " was closed", + return cntl->SetFailed(EREQUEST, "Stream=%" PRIu64 " was closed", request_stream_id); } - Stream *s = (Stream*)ptr->conn(); - s->FillSettings(meta.mutable_stream_settings()); + Stream* s = (Stream*) ptr->conn(); + s->FillSettings(stream_settings); + for (size_t i = 1; i < request_stream_ids.size(); ++i) { + stream_settings->mutable_extra_stream_ids()->Add(request_stream_ids[i]); + } } if (cntl->has_request_user_fields() && !cntl->request_user_fields()->empty()) { diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 27f87c6f..73d64051 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -68,7 +68,7 @@ Stream::~Stream() { int Stream::Create(const StreamOptions &options, const StreamSettings *remote_settings, - StreamId *id) { + StreamId *id, bool parse_rpc_response) { Stream* s = new Stream(); s->_host_socket = NULL; s->_fake_socket_weak_ref = NULL; @@ -88,10 +88,8 @@ int Stream::Create(const StreamOptions &options, if (remote_settings != NULL) { s->_remote_settings.MergeFrom(*remote_settings); - s->_parse_rpc_response = false; - } else { - s->_parse_rpc_response = true; } + s->_parse_rpc_response = parse_rpc_response; if (bthread_id_list_init(&s->_writable_wait_list, 8, 8/*FIXME*/)) { delete s; return -1; @@ -668,6 +666,10 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) { return TriggerOnConnectIfNeed(); } +int Stream::ShareHostSocket(Stream& other_stream) { + return other_stream.SetHostSocket(_host_socket); +} + int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) { SocketUniquePtr ptr; if (Socket::AddressFailedAsWell(id, &ptr) == -1) { @@ -682,6 +684,16 @@ int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) return 0; } +int Stream::SetFailed(const StreamIds& ids, int error_code, const char* reason_fmt, ...) { + va_list ap; + va_start(ap, reason_fmt); + for(size_t i = 0; i< ids.size(); ++i) { + Stream::SetFailed(ids[i], error_code, reason_fmt, ap); + } + va_end(ap); + return 0; +} + void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) { CHECK(!_remote_settings.IsInitialized()); CHECK(_host_socket != NULL); @@ -759,37 +771,76 @@ int StreamClose(StreamId stream_id) { int StreamCreate(StreamId *request_stream, Controller &cntl, const StreamOptions* options) { - if (cntl._request_stream != INVALID_STREAM_ID) { + if (request_stream == NULL) { + LOG(ERROR) << "request_stream is NULL"; + return -1; + } + StreamIds request_streams; + StreamCreate(request_streams, 1, cntl, options); + *request_stream = request_streams[0]; + return 0; +} + +int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller & cntl, + const StreamOptions* options) { + if (!cntl._request_streams.empty()) { LOG(ERROR) << "Can't create request stream more than once"; return -1; } - if (request_stream == NULL) { - LOG(ERROR) << "request_stream is NULL"; + if (!request_streams.empty()) { + LOG(ERROR) << "request_streams should be empty"; return -1; } - StreamId stream_id; StreamOptions opt; if (options != NULL) { opt = *options; } - if (Stream::Create(opt, NULL, &stream_id) != 0) { - LOG(ERROR) << "Fail to create stream"; - return -1; + for (auto i = 0; i < request_stream_size; ++i) { + StreamId stream_id; + bool parse_rpc_response = (i == 0); // Only the first stream need parse rpc + if (Stream::Create(opt, NULL, &stream_id, parse_rpc_response) != 0) { + // Close already created streams + Stream::SetFailed(request_streams, 0 , "Fail to create stream at %d index", i); + LOG(ERROR) << "Fail to create stream"; + return -1; + } + cntl._request_streams.push_back(stream_id); + request_streams.push_back(stream_id); } - cntl._request_stream = stream_id; - *request_stream = stream_id; return 0; } int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options) { + if (response_stream == NULL) { + LOG(ERROR) << "response_stream is NULL"; + return -1; + } + StreamIds response_streams; + int res = StreamAccept(response_streams, cntl, options); + if(res != 0) { + return res; + } + if(response_streams.size() != 1) { + Stream::SetFailed(response_streams, EINVAL, + "misusing StreamAccept for single stream to accept multiple streams"); + cntl._response_streams.clear(); + LOG(ERROR) << "misusing StreamAccept for single stream to accept multiple streams"; + return -1; + } + *response_stream = response_streams[0]; + return 0; +} - if (cntl._response_stream != INVALID_STREAM_ID) { +int StreamAccept(StreamIds& response_streams, Controller& cntl, + const StreamOptions* options) { + if (!cntl._response_streams.empty()) { LOG(ERROR) << "Can't create response stream more than once"; return -1; } - if (response_stream == NULL) { - LOG(ERROR) << "response_stream is NULL"; + + if (!response_streams.empty()) { + LOG(ERROR) << "response_streams should be empty"; return -1; } if (!cntl.has_remote_stream()) { @@ -801,12 +852,34 @@ int StreamAccept(StreamId* response_stream, Controller &cntl, opt = *options; } StreamId stream_id; - if (Stream::Create(opt, cntl._remote_stream_settings, &stream_id) != 0) { - LOG(ERROR) << "Fail to create stream"; + if (Stream::Create(opt, cntl._remote_stream_settings, &stream_id, false) != 0) { + Stream::SetFailed(response_streams, 0, "Fail to accept stream"); + LOG(ERROR) << "Fail to accept stream"; return -1; } - cntl._response_stream = stream_id; - *response_stream = stream_id; + + cntl._response_streams.push_back(stream_id); + response_streams.push_back(stream_id); + if(!cntl._remote_stream_settings->extra_stream_ids().empty()) { + StreamSettings stream_remote_settings; + stream_remote_settings.MergeFrom(*cntl._remote_stream_settings); + //Only the first stream needs extra_stream_ids settings + stream_remote_settings.clear_extra_stream_ids(); + for (auto i = 0; i < cntl._remote_stream_settings->extra_stream_ids_size(); ++i) { + stream_remote_settings.set_stream_id(cntl._remote_stream_settings->extra_stream_ids()[i]); + StreamId extra_stream_id; + if (Stream::Create(opt, &stream_remote_settings, &extra_stream_id, false) != 0) { + Stream::SetFailed(response_streams, 0, "Fail to accept stream at %d index", i); + cntl._response_streams.clear(); + response_streams.clear(); + LOG(ERROR) << "Fail to accept stream"; + return -1; + } + cntl._response_streams.push_back(extra_stream_id); + response_streams.push_back(extra_stream_id); + } + } + return 0; } diff --git a/src/brpc/stream.h b/src/brpc/stream.h index f222ba09..36c0def7 100644 --- a/src/brpc/stream.h +++ b/src/brpc/stream.h @@ -28,6 +28,7 @@ namespace brpc { class Controller; typedef SocketId StreamId; +using StreamIds = std::vector<StreamId>; const StreamId INVALID_STREAM_ID = (StreamId)-1L; namespace detail { @@ -105,6 +106,14 @@ struct StreamWriteOptions { int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options); +// [Called at the client side for creating multiple streams] +// Create streams at client-side along with the |cntl|, which will be connected +// when receiving the response with streams from server-side. If |options| is +// NULL, the stream will be created with default options +// Return 0 on success, -1 otherwise +int StreamCreate(StreamIds& request_streams, int request_stream_size, Controller& cntl, + const StreamOptions* options); + // [Called at the server side] // Accept the stream. If client didn't create a stream with the request // (cntl.has_remote_stream() returns false), this method would fail. @@ -112,6 +121,12 @@ int StreamCreate(StreamId* request_stream, Controller &cntl, int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options); +// [Called at the server side for accepting multiple streams] +// Accept the streams. If client didn't create streams with the request +// (cntl.has_remote_stream() returns false), this method would fail. +// Return 0 on success, -1 otherwise. +int StreamAccept(StreamIds& response_stream, Controller& cntl, + const StreamOptions* options); // Write |message| into |stream_id|. The remote-side handler will received the // message by the written order // Returns 0 on success, errno otherwise diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index db92dd63..66e0d719 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -46,7 +46,7 @@ public: const StreamWriteOptions* options = NULL); static int Create(const StreamOptions& options, const StreamSettings *remote_settings, - StreamId *id); + StreamId *id, bool parse_rpc_response = true); StreamId id() { return _id; } int OnReceived(const StreamFrameMeta& fm, butil::IOBuf *buf, Socket* sock); @@ -63,8 +63,11 @@ public: void FillSettings(StreamSettings *settings); static int SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) __attribute__ ((__format__ (__printf__, 3, 4))); + static int SetFailed(const StreamIds& ids, int error_code, const char* reason_fmt, ...) + __attribute__ ((__format__ (__printf__, 3, 4))); void Close(int error_code, const char* reason_fmt, ...) __attribute__ ((__format__ (__printf__, 3, 4))); + int ShareHostSocket(Stream& other_stream); private: friend void StreamWait(StreamId stream_id, const timespec *due_time, diff --git a/src/brpc/streaming_rpc_meta.proto b/src/brpc/streaming_rpc_meta.proto index 05d83217..5474583d 100644 --- a/src/brpc/streaming_rpc_meta.proto +++ b/src/brpc/streaming_rpc_meta.proto @@ -25,6 +25,7 @@ message StreamSettings { required int64 stream_id = 1; optional bool need_feedback = 2 [default = false]; optional bool writable = 3 [default = false]; + repeated int64 extra_stream_ids = 4; } enum FrameType { --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org