This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e5ec4fe Batch create and accept stream (#2754)
7e5ec4fe is described below

commit 7e5ec4fe8df1df135af16a26e4d4e4a24e9a0651
Author: Jenrry You <jenrry...@gmail.com>
AuthorDate: Wed Oct 9 10:39:42 2024 +0800

    Batch create and accept stream (#2754)
    
    * feat: batch create and accept stream
    
    * fix protobuf 22.5 compilation error related to thread_local in MacOS
    
    * refine style
    
    * modify code based on the code review feedback and add more tests
    
    modify code based on the code review feedback  and add more tests
---
 .github/workflows/ci-macos.yml                     |  16 +--
 docs/cn/streaming_rpc.md                           |  17 ++-
 docs/en/streaming_rpc.md                           |  16 ++-
 example/streaming_batch_echo_c++/CMakeLists.txt    | 140 +++++++++++++++++++++
 example/streaming_batch_echo_c++/client.cpp        | 118 +++++++++++++++++
 .../streaming_batch_echo_c++/echo.proto            |  37 ++----
 example/streaming_batch_echo_c++/server.cpp        | 122 ++++++++++++++++++
 src/brpc/channel.cpp                               |   2 +-
 src/brpc/controller.cpp                            |  44 +++++--
 src/brpc/controller.h                              |   6 +-
 src/brpc/details/controller_private_accessor.h     |   4 +-
 src/brpc/policy/baidu_rpc_protocol.cpp             |  64 +++++++---
 src/brpc/stream.cpp                                | 113 ++++++++++++++---
 src/brpc/stream.h                                  |  15 +++
 src/brpc/stream_impl.h                             |   5 +-
 src/brpc/streaming_rpc_meta.proto                  |   1 +
 16 files changed, 627 insertions(+), 93 deletions(-)

diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml
index 685d4e6b..50ab2e02 100644
--- a/.github/workflows/ci-macos.yml
+++ b/.github/workflows/ci-macos.yml
@@ -57,7 +57,7 @@ jobs:
            cd build
            make -j ${{env.proc_num}}
 
-  compile-with-make-protobuf22:
+  compile-with-make-protobuf23:
     runs-on: macos-latest # https://github.com/actions/runner-images
 
     steps:
@@ -67,9 +67,9 @@ jobs:
         run: |
           brew install openssl gnu-getopt coreutils gflags leveldb
           # abseil 20230125.3
-          curl -o abseil.rb   
https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/abseil.rb
-          # protobuf 22.5
-          curl -o protobuf.rb 
https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/protobuf.rb
+          curl -o abseil.rb   
https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/abseil.rb
+          # protobuf 23.3
+          curl -o protobuf.rb 
https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/protobuf.rb
           HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK=1 brew install --formula 
--ignore-dependencies ./abseil.rb ./protobuf.rb
 
       - name: config_brpc
@@ -82,7 +82,7 @@ jobs:
         run: |
           make -j ${{env.proc_num}}
 
-  compile-with-cmake-protobuf22:
+  compile-with-cmake-protobuf23:
     runs-on: macos-latest
 
     steps:
@@ -92,9 +92,9 @@ jobs:
         run: |
           brew install openssl gflags leveldb
           # abseil 20230125.3
-          curl -o abseil.rb   
https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/abseil.rb
-          # protobuf 22.5
-          curl -o protobuf.rb 
https://raw.githubusercontent.com/Homebrew/homebrew-core/1e04597501b3096952608efcb13301119a830b35/Formula/protobuf.rb
+          curl -o abseil.rb   
https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/abseil.rb
+          # protobuf 23.3
+          curl -o protobuf.rb 
https://raw.githubusercontent.com/Homebrew/homebrew-core/b85b8dbf23ad509f163677a88ac72268f31e9c4a/Formula/protobuf.rb
           HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK=1 brew install --formula 
--ignore-dependencies ./abseil.rb ./protobuf.rb
 
       - name: cmake
diff --git a/docs/cn/streaming_rpc.md b/docs/cn/streaming_rpc.md
index 1480b83b..c9f9ab36 100644
--- a/docs/cn/streaming_rpc.md
+++ b/docs/cn/streaming_rpc.md
@@ -23,7 +23,7 @@ Streaming RPC保证:
 
 # 建立Stream
 
-目前Stream都由Client端建立。Client先在本地创建一个Stream,再通过一次RPC(必须使用baidu_std协议)与指定的Service建立一个Stream,如果Service在收到请求之后选择接受这个Stream,
 
那在response返回Client后Stream就会建立成功。过程中的任何错误都把RPC标记为失败,同时也意味着Stream创建失败。用linux下建立连接的过程打比方,Client先创建[socket](http://linux.die.net/man/7/socket)(创建Stream),再调用[connect](http://linux.die.net/man/2/connect)尝试与远端建立连接(通过RPC建立Stream),远端[accept](http://linux.die.net/man/2/accept)后连接就建立了(service接受后创建成功)。
+目前Stream都由Client端建立。Client先在本地创建一个或者多个Stream,再通过一次RPC(必须使用baidu_std协议)与指定的Service建立一个Stream,如果Service在收到请求之后选择接受这批Stream,
 
那在response返回Client后这批Stream就会建立成功。过程中的任何错误都把RPC标记为失败,同时也意味着Stream创建失败。用linux下建立连接的过程打比方,Client先创建[socket](http://linux.die.net/man/7/socket)(创建Stream),再调用[connect](http://linux.die.net/man/2/connect)尝试与远端建立连接(通过RPC建立Stream),远端[accept](http://linux.die.net/man/2/accept)后连接就建立了(service接受后创建成功)。
 
 > 如果Client尝试向不支持Streaming RPC的老Server建立Stream,将总是失败。
 
@@ -58,11 +58,18 @@ struct StreamOptions
 // NULL, the Stream will be created with default options
 // Return 0 on success, -1 otherwise
 int StreamCreate(StreamId* request_stream, Controller &cntl, const 
StreamOptions* options);
+
+// [Called at the client side for creating multiple streams]
+// Create streams at client-side along with the |cntl|, which will be connected
+// when receiving the response with streams from server-side. If |options| is
+// NULL, the stream will be created with default options
+// Return 0 on success, -1 otherwise
+int StreamCreate(StreamIds& request_streams, int request_stream_size, 
Controller& cntl, const StreamOptions* options);
 ```
 
 # 接受Stream
 
-如果client在RPC上附带了一个Stream, 
service在收到RPC后可以通过调用StreamAccept接受。接受后Server端对应产生的Stream存放在response_stream中,Server可通过这个Stream向Client发送数据。
+如果client在RPC上附带了一个或者多个Stream, 
service在收到RPC后可以通过调用StreamAccept接受。接受后Server端对应产生的Stream存放在response_stream中,Server可通过这个Stream向Client发送数据。
 
 ```c++
 // [Called at the server side]
@@ -70,6 +77,12 @@ int StreamCreate(StreamId* request_stream, Controller &cntl, 
const StreamOptions
 // (cntl.has_remote_stream() returns false), this method would fail.
 // Return 0 on success, -1 otherwise.
 int StreamAccept(StreamId* response_stream, Controller &cntl, const 
StreamOptions* options);
+
+// [Called at the server side for accepting multiple streams]
+// Accept the streams. If client didn't create streams with the request
+// (cntl.has_remote_stream() returns false), this method would fail.
+// Return 0 on success, -1 otherwise.
+int StreamAccept(StreamIds& response_stream, Controller& cntl, const 
StreamOptions* options);
 ```
 
 # 读取Stream
diff --git a/docs/en/streaming_rpc.md b/docs/en/streaming_rpc.md
index 80302015..8e067148 100644
--- a/docs/en/streaming_rpc.md
+++ b/docs/en/streaming_rpc.md
@@ -23,7 +23,7 @@ For examples please refer to 
[example/streaming_echo_c++](https://github.com/apa
 
 # Create a Stream
 
-Currently stream is established by the client only. A new Stream object is 
created in client and then is used to issue an RPC (through baidu_std protocol) 
to the specified service. The service could accept this stream by responding to 
the request without error, thus a Stream is created once the client receives 
the response successfully. Any error during this process fails the RPC and thus 
fails the Stream creation. Take the Linux environment as an example, the client 
creates a [socket](h [...]
+Currently streams are established by the client only. The new Stream objects 
are created in client and then are used to issue an RPC (through baidu_std 
protocol) to the specified service. The service could accept these streams by 
responding to the request without error, thus the Streams are created once the 
client receives the response successfully. Any error during this process fails 
the RPC and thus fails the Stream creation. Take the Linux environment as an 
example, the client creates [...]
 
 > If the client tries to establish a stream to a server that doesn't support 
 > streaming RPC, it will always return failure.
 
@@ -58,6 +58,14 @@ struct StreamOptions
 // NULL, the Stream will be created with default options
 // Return 0 on success, -1 otherwise
 int StreamCreate(StreamId* request_stream, Controller &cntl, const 
StreamOptions* options);
+
+// [Called at the client side for creating multiple streams]
+// Create streams at client-side along with the |cntl|, which will be connected
+// when receiving the response with streams from server-side. If |options| is
+// NULL, the stream will be created with default options
+// Return 0 on success, -1 otherwise
+int StreamCreate(StreamIds& request_streams, int request_stream_size, 
Controller& cntl,
+                 const StreamOptions* options);
 ```
 
 # Accept a Stream
@@ -70,6 +78,12 @@ If a Stream is attached inside the request of an RPC, the 
service can accept the
 // (cntl.has_remote_stream() returns false), this method would fail.
 // Return 0 on success, -1 otherwise.
 int StreamAccept(StreamId* response_stream, Controller &cntl, const 
StreamOptions* options);
+
+// [Called at the server side for accepting multiple streams]
+// Accept the streams. If client didn't create streams with the request
+// (cntl.has_remote_stream() returns false), this method would fail.
+// Return 0 on success, -1 otherwise.
+int StreamAccept(StreamIds& response_stream, Controller& cntl, const 
StreamOptions* options);
 ```
 
 # Read from a Stream
diff --git a/example/streaming_batch_echo_c++/CMakeLists.txt 
b/example/streaming_batch_echo_c++/CMakeLists.txt
new file mode 100644
index 00000000..1d152ef9
--- /dev/null
+++ b/example/streaming_batch_echo_c++/CMakeLists.txt
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.8.10)
+project(streaming_batch_echo_c++ C CXX)
+
+option(LINK_SO "Whether examples are linked dynamically" OFF)
+
+execute_process(
+        COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex 
\".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
+        OUTPUT_VARIABLE OUTPUT_PATH
+)
+
+set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
+
+include(FindThreads)
+include(FindProtobuf)
+protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto)
+# include PROTO_HEADER
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+# Search for libthrift* by best effort. If it is not found and brpc is
+# compiled with thrift protocol enabled, a link error would be reported.
+find_library(THRIFT_LIB NAMES thrift)
+if (NOT THRIFT_LIB)
+    set(THRIFT_LIB "")
+endif()
+
+find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
+if(LINK_SO)
+    find_library(BRPC_LIB NAMES brpc)
+else()
+    find_library(BRPC_LIB NAMES libbrpc.a brpc)
+endif()
+if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
+    message(FATAL_ERROR "Fail to find brpc")
+endif()
+include_directories(${BRPC_INCLUDE_PATH})
+
+find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
+find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
+if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
+    message(FATAL_ERROR "Fail to find gflags")
+endif()
+include_directories(${GFLAGS_INCLUDE_PATH})
+
+execute_process(
+        COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" 
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | 
tr -d '\n'"
+        OUTPUT_VARIABLE GFLAGS_NS
+)
+if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
+    execute_process(
+            COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" 
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | 
tr -d '\n'"
+            OUTPUT_VARIABLE GFLAGS_NS
+    )
+endif()
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    include(CheckFunctionExists)
+    CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+    if(NOT HAVE_CLOCK_GETTIME)
+        set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
+    endif()
+endif()
+
+set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}")
+set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ 
-pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
+
+if(CMAKE_VERSION VERSION_LESS "3.1.3")
+    if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+    endif()
+    if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+    endif()
+else()
+    set(CMAKE_CXX_STANDARD 11)
+    set(CMAKE_CXX_STANDARD_REQUIRED ON)
+endif()
+
+find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
+find_library(LEVELDB_LIB NAMES leveldb)
+if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
+    message(FATAL_ERROR "Fail to find leveldb")
+endif()
+include_directories(${LEVELDB_INCLUDE_PATH})
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    set(OPENSSL_ROOT_DIR
+            "/usr/local/opt/openssl"    # Homebrew installed OpenSSL
+    )
+endif()
+
+find_package(OpenSSL)
+include_directories(${OPENSSL_INCLUDE_DIR})
+
+set(DYNAMIC_LIB
+        ${CMAKE_THREAD_LIBS_INIT}
+        ${GFLAGS_LIBRARY}
+        ${PROTOBUF_LIBRARIES}
+        ${LEVELDB_LIB}
+        ${OPENSSL_CRYPTO_LIBRARY}
+        ${OPENSSL_SSL_LIBRARY}
+        ${THRIFT_LIB}
+        dl
+)
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    set(DYNAMIC_LIB ${DYNAMIC_LIB}
+            pthread
+            "-framework CoreFoundation"
+            "-framework CoreGraphics"
+            "-framework CoreData"
+            "-framework CoreText"
+            "-framework Security"
+            "-framework Foundation"
+            "-Wl,-U,_MallocExtension_ReleaseFreeMemory"
+            "-Wl,-U,_ProfilerStart"
+            "-Wl,-U,_ProfilerStop"
+            "-Wl,-U,__Z13GetStackTracePPvii")
+endif()
+
+add_executable(streaming_batch_echo_client client.cpp ${PROTO_SRC} 
${PROTO_HEADER})
+add_executable(streaming_batch_echo_server server.cpp ${PROTO_SRC} 
${PROTO_HEADER})
+
+target_link_libraries(streaming_batch_echo_client ${BRPC_LIB} ${DYNAMIC_LIB})
+target_link_libraries(streaming_batch_echo_server ${BRPC_LIB} ${DYNAMIC_LIB})
diff --git a/example/streaming_batch_echo_c++/client.cpp 
b/example/streaming_batch_echo_c++/client.cpp
new file mode 100644
index 00000000..ea58a191
--- /dev/null
+++ b/example/streaming_batch_echo_c++/client.cpp
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A client sending requests to server in batch every 1 second.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/channel.h>
+#include <brpc/stream.h>
+#include "echo.pb.h"
+
+DEFINE_bool(send_attachment, true, "Carry attachment along with requests");
+DEFINE_string(connection_type, "", "Connection type. Available values: single, 
pooled, short");
+DEFINE_string(server, "0.0.0.0:8001", "IP Address of server");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); 
+
+class StreamClientReceiver : public brpc::StreamInputHandler {
+public:
+    virtual int on_received_messages(brpc::StreamId id,
+                                     butil::IOBuf *const messages[],
+                                     size_t size) {
+        std::ostringstream os;
+        for (size_t i = 0; i < size; ++i) {
+            os << "msg[" << i << "]=" << *messages[i];
+        }
+        LOG(INFO) << "Received from Stream=" << id << ": " << os.str();
+        return 0;
+    }
+    virtual void on_idle_timeout(brpc::StreamId id) {
+        LOG(INFO) << "Stream=" << id << " has no data transmission for a 
while";
+    }
+    virtual void on_closed(brpc::StreamId id) {
+        LOG(INFO) << "Stream=" << id << " is closed";
+    }
+
+    virtual void on_finished(brpc::StreamId id, int32_t finish_code) {
+        LOG(INFO) << "Stream=" << id << " is finished, code " << finish_code;
+    }
+};
+
+int main(int argc, char* argv[]) {
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+    // A Channel represents a communication line to a Server. Notice that 
+    // Channel is thread-safe and can be shared by all threads in your program.
+    brpc::Channel channel;
+        
+    // Initialize the channel, NULL means using default options. 
+    brpc::ChannelOptions options;
+    options.protocol = brpc::PROTOCOL_BAIDU_STD;
+    options.connection_type = FLAGS_connection_type;
+    options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
+    options.max_retry = FLAGS_max_retry;
+    if (channel.Init(FLAGS_server.c_str(), NULL) != 0) {
+        LOG(ERROR) << "Fail to initialize channel";
+        return -1;
+    }
+
+    // Normally, you should not call a Channel directly, but instead construct
+    // a stub Service wrapping it. stub can be shared by all threads as well.
+    example::EchoService_Stub stub(&channel);
+    StreamClientReceiver receiver;
+    brpc::Controller cntl;
+    brpc::StreamIds streams;
+    brpc::StreamOptions stream_options;
+    stream_options.handler = &receiver;
+    if (brpc::StreamCreate(streams, 3, cntl, &stream_options) != 0) {
+        LOG(ERROR) << "Fail to create stream";
+        return -1;
+    }
+    for(size_t i = 0; i < streams.size(); ++i) {
+        LOG(INFO) << "Created Stream=" << streams[i];
+    }
+    example::EchoRequest request;
+    example::EchoResponse response;
+    request.set_message("I'm a RPC to connect stream");
+    stub.Echo(&cntl, &request, &response, NULL);
+    if (cntl.Failed()) {
+        LOG(ERROR) << "Fail to connect stream, " << cntl.ErrorText();
+        return -1;
+    }
+    
+    while (!brpc::IsAskedToQuit()) {
+        butil::IOBuf msg1;
+        msg1.append("abcdefghijklmnopqrstuvwxyz");
+        CHECK_EQ(0, brpc::StreamWrite(streams[0], msg1));
+        butil::IOBuf msg2;
+        msg2.append("0123456789");
+        CHECK_EQ(0, brpc::StreamWrite(streams[1], msg2));
+        sleep(1);
+        butil::IOBuf msg3;
+        msg3.append("hello world");
+        CHECK_EQ(0, brpc::StreamWrite(streams[2], msg3));
+        sleep(1);
+    }
+
+    CHECK_EQ(0, brpc::StreamClose(streams[0]));
+    CHECK_EQ(0, brpc::StreamClose(streams[1]));
+    CHECK_EQ(0, brpc::StreamClose(streams[2]));
+    LOG(INFO) << "EchoClient is going to quit";
+    return 0;
+}
diff --git a/src/brpc/streaming_rpc_meta.proto 
b/example/streaming_batch_echo_c++/echo.proto
similarity index 54%
copy from src/brpc/streaming_rpc_meta.proto
copy to example/streaming_batch_echo_c++/echo.proto
index 05d83217..2b39627f 100644
--- a/src/brpc/streaming_rpc_meta.proto
+++ b/example/streaming_batch_echo_c++/echo.proto
@@ -16,33 +16,18 @@
 // under the License.
 
 syntax="proto2";
+package example;
 
-package brpc;
-option java_package="com.brpc";
-option java_outer_classname="StreamingRpcProto";
+option cc_generic_services = true;
 
-message StreamSettings {
-    required int64 stream_id = 1;
-    optional bool need_feedback = 2 [default = false];
-    optional bool writable = 3 [default = false];
-}
+message EchoRequest {
+      required string message = 1;
+};
 
-enum FrameType {
-    FRAME_TYPE_UNKNOWN = 0;
-    FRAME_TYPE_RST = 1;
-    FRAME_TYPE_CLOSE = 2;
-    FRAME_TYPE_DATA = 3;
-    FRAME_TYPE_FEEDBACK= 4;
-}
+message EchoResponse {
+      required string message = 1;
+};
 
-message StreamFrameMeta {
-    required int64 stream_id = 1;
-    optional int64 source_stream_id = 2;
-    optional FrameType frame_type = 3;
-    optional bool has_continuation = 4;
-    optional Feedback feedback = 5;
-}
-
-message Feedback {
-    optional int64 consumed_size = 1;
-}
+service EchoService {
+      rpc Echo(EchoRequest) returns (EchoResponse);
+};
diff --git a/example/streaming_batch_echo_c++/server.cpp 
b/example/streaming_batch_echo_c++/server.cpp
new file mode 100644
index 00000000..3690344a
--- /dev/null
+++ b/example/streaming_batch_echo_c++/server.cpp
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include "echo.pb.h"
+#include <brpc/stream.h>
+
+DEFINE_bool(send_attachment, true, "Carry attachment along with response");
+DEFINE_int32(port, 8001, "TCP Port of this server");
+DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
+             "read/write operations during the last `idle_timeout_s'");
+
+class StreamReceiver : public brpc::StreamInputHandler {
+public:
+    virtual int on_received_messages(brpc::StreamId id, 
+                                     butil::IOBuf *const messages[], 
+                                     size_t size) {
+        std::ostringstream os;
+        for (size_t i = 0; i < size; ++i) {
+            os << "msg[" << i << "]=" << *messages[i];
+        }
+        auto res = brpc::StreamWrite(id, *messages[0]);
+        LOG(INFO) << "Received from Stream=" << id << ": " << os.str() << " 
and write back result: " << res;
+        return 0;
+    }
+    virtual void on_idle_timeout(brpc::StreamId id) {
+        LOG(INFO) << "Stream=" << id << " has no data transmission for a 
while";
+    }
+    virtual void on_closed(brpc::StreamId id) {
+        LOG(INFO) << "Stream=" << id << " is closed";
+    }
+
+    virtual void on_finished(brpc::StreamId id, int32_t finish_code) {
+        LOG(INFO) << "Stream=" << id << " is finished, code " << finish_code;
+    }
+};
+
+// Your implementation of example::EchoService
+class StreamingBatchEchoService : public example::EchoService {
+public:
+    virtual ~StreamingBatchEchoService() {
+        closeStreams();
+    };
+    virtual void Echo(google::protobuf::RpcController* controller,
+                      const example::EchoRequest* /*request*/,
+                      example::EchoResponse* response,
+                      google::protobuf::Closure* done) {
+        // This object helps you to call done->Run() in RAII style. If you need
+        // to process the request asynchronously, pass done_guard.release().
+        brpc::ClosureGuard done_guard(done);
+        closeStreams();
+        brpc::Controller* cntl =
+            static_cast<brpc::Controller*>(controller);
+        brpc::StreamOptions stream_options;
+        stream_options.handler = &_receiver;
+        if (brpc::StreamAccept(_sds, *cntl, &stream_options) != 0) {
+            cntl->SetFailed("Fail to accept stream");
+            return;
+        }
+        response->set_message("Accepted stream");
+    }
+
+private:
+    void closeStreams() {
+        for(auto i = 0; i < _sds.size(); ++i) {
+            brpc::StreamClose(_sds[i]);
+        }
+        _sds.clear();
+    }
+    StreamReceiver _receiver;
+    brpc::StreamIds _sds;
+};
+
+int main(int argc, char* argv[]) {
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+    // Generally you only need one Server.
+    brpc::Server server;
+
+    // Instance of your service.
+    StreamingBatchEchoService echo_service_impl;
+
+    // Add the service into server. Notice the second parameter, because the
+    // service is put on stack, we don't want server to delete it, otherwise
+    // use brpc::SERVER_OWNS_SERVICE.
+    if (server.AddService(&echo_service_impl, 
+                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+        LOG(ERROR) << "Fail to add service";
+        return -1;
+    }
+
+    // Start the server. 
+    brpc::ServerOptions options;
+    options.idle_timeout_sec = FLAGS_idle_timeout_s;
+    if (server.Start(FLAGS_port, &options) != 0) {
+        LOG(ERROR) << "Fail to start EchoServer";
+        return -1;
+    }
+
+    // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
+    server.RunUntilAskedToQuit();
+    return 0;
+}
diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp
index c15611ea..ab07ad3b 100644
--- a/src/brpc/channel.cpp
+++ b/src/brpc/channel.cpp
@@ -535,7 +535,7 @@ void Channel::CallMethod(const 
google::protobuf::MethodDescriptor* method,
         return cntl->HandleSendFailed();
     }
 
-    if (cntl->_request_stream != INVALID_STREAM_ID) {
+    if (!cntl->_request_streams.empty()) {
         // Currently we cannot handle retry and backup request correctly
         cntl->set_max_retry(0);
         cntl->set_backup_request_ms(-1);
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index afebb3c2..271ccfa4 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -290,8 +290,8 @@ void Controller::ResetPods() {
     _http_response = NULL;
     _request_user_fields = NULL;
     _response_user_fields = NULL;
-    _request_stream = INVALID_STREAM_ID;
-    _response_stream = INVALID_STREAM_ID;
+    _request_streams.clear();
+    _response_streams.clear();
     _remote_stream_settings = NULL;
     _auth_flags = 0;
 }
@@ -1398,34 +1398,54 @@ void* Controller::session_local_data() {
 }
 
 void Controller::HandleStreamConnection(Socket *host_socket) {
-    if (_request_stream == INVALID_STREAM_ID) {
+    if (_request_streams.empty()) {
         CHECK(!has_remote_stream());
         return;
     }
-    SocketUniquePtr ptr;
+    size_t stream_num = _request_streams.size();
+    std::vector<SocketUniquePtr> ptrs(stream_num);
     if (!FailedInline()) {
-        if (Socket::Address(_request_stream, &ptr) != 0) {
-            if (!FailedInline()) {
-                SetFailed(EREQUEST, "Request stream=%" PRIu64 " was closed 
before responded",
-                                     _request_stream);
-            }
-        } else if (_remote_stream_settings == NULL) {
+        if (_remote_stream_settings == NULL) {
             if (!FailedInline()) {
                 SetFailed(EREQUEST, "The server didn't accept the stream");
             }
+        } else {
+            for (size_t i = 0; i < stream_num; ++i) {
+                if (Socket::Address(_request_streams[i], &ptrs[i]) != 0) {
+                    if (!FailedInline()) {
+                        SetFailed(EREQUEST, "Request stream=%" PRIu64 " was 
closed before responded",
+                                  _request_streams[i]);
+                        break;
+                    }
+                }
+            }
         }
     }
     if (FailedInline()) {
-        Stream::SetFailed(_request_stream, _error_code,
+        Stream::SetFailed(_request_streams, _error_code,
                           "%s", _error_text.c_str());
         if (_remote_stream_settings != NULL) {
             policy::SendStreamRst(host_socket,
                                   _remote_stream_settings->stream_id());
+            for (int i = 0; i < 
_remote_stream_settings->extra_stream_ids().size(); ++i) {
+                policy::SendStreamRst(host_socket,
+                                      
_remote_stream_settings->extra_stream_ids()[i]);
+            }
         }
         return;
     }
-    Stream* s = (Stream*)ptr->conn();
+    Stream* s = (Stream*)ptrs[0]->conn();
     s->SetConnected(_remote_stream_settings);
+    if (stream_num > 1) {
+        const auto& extra_stream_ids = 
_remote_stream_settings->extra_stream_ids();
+        _remote_stream_settings->clear_extra_stream_ids();
+        for (size_t i = 1; i < stream_num; ++i) {
+            Stream* extra_stream = (Stream *) ptrs[i]->conn();
+            _remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]);
+            s->ShareHostSocket(*extra_stream);
+            extra_stream->SetConnected(_remote_stream_settings);
+        }
+    }
 }
 
 // TODO: Need more security advices from professionals.
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 9b3c0201..2a4ec6b5 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -123,7 +123,9 @@ friend class schan::Sender;
 friend class schan::SubDone;
 friend class policy::OnServerStreamCreated;
 friend int StreamCreate(StreamId*, Controller&, const StreamOptions*);
+friend int StreamCreate(StreamIds&, int, Controller&, const StreamOptions*);
 friend int StreamAccept(StreamId*, Controller&, const StreamOptions*);
+friend int StreamAccept(StreamIds&, Controller&, const StreamOptions*);
 friend void policy::ProcessMongoRequest(InputMessageBase*);
 friend void policy::ProcessThriftRequest(InputMessageBase*);
     // << Flags >>
@@ -866,9 +868,9 @@ private:
 
     // TODO: Replace following fields with StreamCreator
     // Defined at client side
-    StreamId _request_stream;
+    StreamIds _request_streams;
     // Defined at server side
-    StreamId _response_stream;
+    StreamIds _response_streams;
     // Defined at both sides
     StreamSettings *_remote_stream_settings;
 
diff --git a/src/brpc/details/controller_private_accessor.h 
b/src/brpc/details/controller_private_accessor.h
index 1be7df8b..db40ca15 100644
--- a/src/brpc/details/controller_private_accessor.h
+++ b/src/brpc/details/controller_private_accessor.h
@@ -119,8 +119,8 @@ public:
         return _cntl->_remote_stream_settings;
     }
 
-    StreamId request_stream() { return _cntl->_request_stream; }
-    StreamId response_stream() { return _cntl->_response_stream; }
+    StreamIds request_streams() { return _cntl->_request_streams; }
+    StreamIds response_streams() { return _cntl->_response_streams; }
 
     void set_method(const google::protobuf::MethodDescriptor* method) 
     { _cntl->_method = method; }
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp 
b/src/brpc/policy/baidu_rpc_protocol.cpp
index 504895b1..cc2dcbd2 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -216,10 +216,12 @@ void SendRpcResponse(int64_t correlation_id,
     ClosureGuard guard(brpc::NewCallback(
         cntl, &Controller::CallAfterRpcResp, req, res));
     
-    StreamId response_stream_id = accessor.response_stream();
+    StreamIds response_stream_ids = accessor.response_streams();
 
     if (cntl->IsCloseConnection()) {
-        StreamClose(response_stream_id);
+        for(size_t i = 0; i < response_stream_ids.size(); ++i) {
+            StreamClose(response_stream_ids[i]);
+        }
         sock->SetFailed();
         return;
     }
@@ -261,12 +263,18 @@ void SendRpcResponse(int64_t correlation_id,
     if (attached_size > 0) {
         meta.set_attachment_size(attached_size);
     }
+    StreamId response_stream_id = INVALID_STREAM_ID;
     SocketUniquePtr stream_ptr;
-    if (response_stream_id != INVALID_STREAM_ID) {
+    if (!response_stream_ids.empty()) {
+        response_stream_id = response_stream_ids[0];
         if (Socket::Address(response_stream_id, &stream_ptr) == 0) {
-            Stream* s = (Stream*)stream_ptr->conn();
-            s->FillSettings(meta.mutable_stream_settings());
+            Stream* s = (Stream *) stream_ptr->conn();
+            StreamSettings *stream_settings = meta.mutable_stream_settings();
+            s->FillSettings(stream_settings);
             s->SetHostSocket(sock);
+            for (size_t i = 1; i < response_stream_ids.size(); ++i) {
+                
stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]);
+            }
         } else {
             LOG(WARNING) << "Stream=" << response_stream_id 
                          << " was closed before sending response";
@@ -302,24 +310,35 @@ void SendRpcResponse(int64_t correlation_id,
         // Response_stream can be INVALID_STREAM_ID when error occurs.
         if (SendStreamData(sock, &res_buf,
                            accessor.remote_stream_settings()->stream_id(),
-                           accessor.response_stream()) != 0) {
+                           response_stream_id) != 0) {
             const int errcode = errno;
             std::string error_text = butil::string_printf(64, "Fail to write 
into %s",
                                                           
sock->description().c_str());
             PLOG_IF(WARNING, errcode != EPIPE) << error_text;
             cntl->SetFailed(errcode,  "%s", error_text.c_str());
-            if(stream_ptr) {
-                ((Stream*)stream_ptr->conn())->Close(errcode, "%s",
-                                                     error_text.c_str());
-            }
+            Stream::SetFailed(response_stream_ids, errcode, "%s",
+                              error_text.c_str());
             return;
         }
 
+        // Now it's ok the mark these server-side streams as connected as all 
the
+        // written user data would follower the RPC response.
+        // Reuse stream_ptr to avoid address first stream id again
         if(stream_ptr) {
-            // Now it's ok the mark this server-side stream as connected as 
all the
-            // written user data would follower the RPC response.
             ((Stream*)stream_ptr->conn())->SetConnected();
         }
+        for (size_t i = 1; i < response_stream_ids.size(); ++i) {
+            StreamId extra_stream_id = response_stream_ids[i];
+            SocketUniquePtr extra_stream_ptr;
+            if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) {
+                Stream* extra_stream = (Stream *) extra_stream_ptr->conn();
+                extra_stream->SetHostSocket(sock);
+                extra_stream->SetConnected();
+            } else {
+                LOG(WARNING) << "Stream=" << extra_stream_id
+                             << " was closed before sending response";
+            }
+        }
     } else{
         // Have the risk of unlimited pending responses, in which case, tell
         // users to set max_concurrency.
@@ -715,7 +734,11 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
         LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
             << "Fail to lock correlation_id=" << cid << ": " << berror(rc);
         if (remote_stream_id != INVALID_STREAM_ID) {
-            SendStreamRst(msg->socket(), meta.stream_settings().stream_id());
+            SendStreamRst(msg->socket(), remote_stream_id);
+            const auto & extra_stream_ids = 
meta.stream_settings().extra_stream_ids();
+            for (int i = 0; i < extra_stream_ids.size(); ++i) {
+                policy::SendStreamRst(msg->socket(), extra_stream_ids[i]);
+            }
         }
         return;
     }
@@ -825,15 +848,20 @@ void PackRpcRequest(butil::IOBuf* req_buf,
         request_meta->set_request_id(cntl->request_id());
     }
     meta.set_correlation_id(correlation_id);
-    StreamId request_stream_id = accessor.request_stream();
-    if (request_stream_id != INVALID_STREAM_ID) {
+    StreamIds request_stream_ids = accessor.request_streams();
+    if (!request_stream_ids.empty()) {
+        StreamSettings* stream_settings = meta.mutable_stream_settings();
+        StreamId request_stream_id = request_stream_ids[0];
         SocketUniquePtr ptr;
         if (Socket::Address(request_stream_id, &ptr) != 0) {
-            return cntl->SetFailed(EREQUEST, "Stream=%" PRIu64 " was closed", 
+            return cntl->SetFailed(EREQUEST, "Stream=%" PRIu64 " was closed",
                                    request_stream_id);
         }
-        Stream *s = (Stream*)ptr->conn();
-        s->FillSettings(meta.mutable_stream_settings());
+        Stream* s = (Stream*) ptr->conn();
+        s->FillSettings(stream_settings);
+        for (size_t i = 1; i < request_stream_ids.size(); ++i) {
+            
stream_settings->mutable_extra_stream_ids()->Add(request_stream_ids[i]);
+        }
     }
 
     if (cntl->has_request_user_fields() && 
!cntl->request_user_fields()->empty()) {
diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp
index 27f87c6f..73d64051 100644
--- a/src/brpc/stream.cpp
+++ b/src/brpc/stream.cpp
@@ -68,7 +68,7 @@ Stream::~Stream() {
 
 int Stream::Create(const StreamOptions &options, 
                    const StreamSettings *remote_settings,
-                   StreamId *id) {
+                   StreamId *id, bool parse_rpc_response) {
     Stream* s = new Stream();
     s->_host_socket = NULL;
     s->_fake_socket_weak_ref = NULL;
@@ -88,10 +88,8 @@ int Stream::Create(const StreamOptions &options,
 
     if (remote_settings != NULL) {
         s->_remote_settings.MergeFrom(*remote_settings);
-        s->_parse_rpc_response = false;
-    } else {
-        s->_parse_rpc_response = true;
     }
+    s->_parse_rpc_response = parse_rpc_response;
     if (bthread_id_list_init(&s->_writable_wait_list, 8, 8/*FIXME*/)) {
         delete s;
         return -1;
@@ -668,6 +666,10 @@ void Stream::Close(int error_code, const char* reason_fmt, 
...) {
     return TriggerOnConnectIfNeed();
 }
 
+int Stream::ShareHostSocket(Stream& other_stream) {
+    return other_stream.SetHostSocket(_host_socket);
+}
+
 int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, 
...) {
     SocketUniquePtr ptr;
     if (Socket::AddressFailedAsWell(id, &ptr) == -1) {
@@ -682,6 +684,16 @@ int Stream::SetFailed(StreamId id, int error_code, const 
char* reason_fmt, ...)
     return 0;
 }
 
+int Stream::SetFailed(const StreamIds& ids, int error_code, const char* 
reason_fmt, ...) {
+    va_list ap;
+    va_start(ap, reason_fmt);
+    for(size_t i = 0; i< ids.size(); ++i) {
+        Stream::SetFailed(ids[i], error_code, reason_fmt, ap);
+    }
+    va_end(ap);
+    return 0;
+}
+
 void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) {
     CHECK(!_remote_settings.IsInitialized());
     CHECK(_host_socket != NULL);
@@ -759,37 +771,76 @@ int StreamClose(StreamId stream_id) {
 
 int StreamCreate(StreamId *request_stream, Controller &cntl,
                  const StreamOptions* options) {
-    if (cntl._request_stream != INVALID_STREAM_ID) {
+    if (request_stream == NULL) {
+        LOG(ERROR) << "request_stream is NULL";
+        return -1;
+    }
+    StreamIds request_streams;
+    StreamCreate(request_streams, 1, cntl, options);
+    *request_stream = request_streams[0];
+    return 0;
+}
+
+int StreamCreate(StreamIds& request_streams, int request_stream_size, 
Controller & cntl,
+                 const StreamOptions* options) {
+    if (!cntl._request_streams.empty()) {
         LOG(ERROR) << "Can't create request stream more than once";
         return -1;
     }
-    if (request_stream == NULL) {
-        LOG(ERROR) << "request_stream is NULL";
+    if (!request_streams.empty()) {
+        LOG(ERROR) << "request_streams should be empty";
         return -1;
     }
-    StreamId stream_id;
     StreamOptions opt;
     if (options != NULL) {
         opt = *options;
     }
-    if (Stream::Create(opt, NULL, &stream_id) != 0) {
-        LOG(ERROR) << "Fail to create stream";
-        return -1;
+    for (auto i = 0; i < request_stream_size; ++i) {
+        StreamId stream_id;
+        bool parse_rpc_response = (i == 0); // Only the first stream need 
parse rpc
+        if (Stream::Create(opt, NULL, &stream_id, parse_rpc_response) != 0) {
+            // Close already created streams
+            Stream::SetFailed(request_streams, 0 , "Fail to create stream at 
%d index", i);
+            LOG(ERROR) << "Fail to create stream";
+            return -1;
+        }
+        cntl._request_streams.push_back(stream_id);
+        request_streams.push_back(stream_id);
     }
-    cntl._request_stream = stream_id;
-    *request_stream = stream_id;
     return 0;
 }
 
 int StreamAccept(StreamId* response_stream, Controller &cntl,
                  const StreamOptions* options) {
+    if (response_stream == NULL) {
+        LOG(ERROR) << "response_stream is NULL";
+        return -1;
+    }
+    StreamIds response_streams;
+    int res = StreamAccept(response_streams, cntl, options);
+    if(res != 0) {
+        return res;
+    }
+    if(response_streams.size() != 1) {
+        Stream::SetFailed(response_streams, EINVAL,
+                          "misusing StreamAccept for single stream to accept 
multiple streams");
+        cntl._response_streams.clear();
+        LOG(ERROR) << "misusing StreamAccept for single stream to accept 
multiple streams";
+        return -1;
+    }
+    *response_stream = response_streams[0];
+    return 0;
+}
 
-    if (cntl._response_stream != INVALID_STREAM_ID) {
+int StreamAccept(StreamIds& response_streams, Controller& cntl,
+                 const StreamOptions* options) {
+    if (!cntl._response_streams.empty()) {
         LOG(ERROR) << "Can't create response stream more than once";
         return -1;
     }
-    if (response_stream == NULL) {
-        LOG(ERROR) << "response_stream is NULL";
+
+    if (!response_streams.empty()) {
+        LOG(ERROR) << "response_streams should be empty";
         return -1;
     }
     if (!cntl.has_remote_stream()) {
@@ -801,12 +852,34 @@ int StreamAccept(StreamId* response_stream, Controller 
&cntl,
         opt = *options;
     }
     StreamId stream_id;
-    if (Stream::Create(opt, cntl._remote_stream_settings, &stream_id) != 0) {
-        LOG(ERROR) << "Fail to create stream";
+    if (Stream::Create(opt, cntl._remote_stream_settings, &stream_id, false) 
!= 0) {
+        Stream::SetFailed(response_streams, 0, "Fail to accept stream");
+        LOG(ERROR) << "Fail to accept stream";
         return -1;
     }
-    cntl._response_stream = stream_id;
-    *response_stream = stream_id;
+
+    cntl._response_streams.push_back(stream_id);
+    response_streams.push_back(stream_id);
+    if(!cntl._remote_stream_settings->extra_stream_ids().empty()) {
+        StreamSettings stream_remote_settings;
+        stream_remote_settings.MergeFrom(*cntl._remote_stream_settings);
+        //Only the first stream needs extra_stream_ids settings
+        stream_remote_settings.clear_extra_stream_ids();
+        for (auto i = 0; i < 
cntl._remote_stream_settings->extra_stream_ids_size(); ++i) {
+            
stream_remote_settings.set_stream_id(cntl._remote_stream_settings->extra_stream_ids()[i]);
+            StreamId extra_stream_id;
+            if (Stream::Create(opt, &stream_remote_settings, &extra_stream_id, 
false) != 0) {
+                Stream::SetFailed(response_streams, 0, "Fail to accept stream 
at %d index", i);
+                cntl._response_streams.clear();
+                response_streams.clear();
+                LOG(ERROR) << "Fail to accept stream";
+                return -1;
+            }
+            cntl._response_streams.push_back(extra_stream_id);
+            response_streams.push_back(extra_stream_id);
+        }
+    }
+
     return 0;
 }
 
diff --git a/src/brpc/stream.h b/src/brpc/stream.h
index f222ba09..36c0def7 100644
--- a/src/brpc/stream.h
+++ b/src/brpc/stream.h
@@ -28,6 +28,7 @@ namespace brpc {
 class Controller;
 
 typedef SocketId StreamId;
+using StreamIds = std::vector<StreamId>;
 const StreamId INVALID_STREAM_ID = (StreamId)-1L;
 
 namespace detail {
@@ -105,6 +106,14 @@ struct StreamWriteOptions {
 int StreamCreate(StreamId* request_stream, Controller &cntl,
                  const StreamOptions* options);
 
+// [Called at the client side for creating multiple streams]
+// Create streams at client-side along with the |cntl|, which will be connected
+// when receiving the response with streams from server-side. If |options| is
+// NULL, the stream will be created with default options
+// Return 0 on success, -1 otherwise
+int StreamCreate(StreamIds& request_streams, int request_stream_size, 
Controller& cntl,
+                 const StreamOptions* options);
+
 // [Called at the server side]
 // Accept the stream. If client didn't create a stream with the request 
 // (cntl.has_remote_stream() returns false), this method would fail.
@@ -112,6 +121,12 @@ int StreamCreate(StreamId* request_stream, Controller 
&cntl,
 int StreamAccept(StreamId* response_stream, Controller &cntl,
                  const StreamOptions* options);
 
+// [Called at the server side for accepting multiple streams]
+// Accept the streams. If client didn't create streams with the request
+// (cntl.has_remote_stream() returns false), this method would fail.
+// Return 0 on success, -1 otherwise.
+int StreamAccept(StreamIds& response_stream, Controller& cntl,
+                 const StreamOptions* options);
 // Write |message| into |stream_id|. The remote-side handler will received the 
 // message by the written order
 // Returns 0 on success, errno otherwise
diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h
index db92dd63..66e0d719 100644
--- a/src/brpc/stream_impl.h
+++ b/src/brpc/stream_impl.h
@@ -46,7 +46,7 @@ public:
                         const StreamWriteOptions* options = NULL);
     static int Create(const StreamOptions& options,
                       const StreamSettings *remote_settings,
-                      StreamId *id);
+                      StreamId *id, bool parse_rpc_response = true);
     StreamId id() { return _id; }
 
     int OnReceived(const StreamFrameMeta& fm, butil::IOBuf *buf, Socket* sock);
@@ -63,8 +63,11 @@ public:
     void FillSettings(StreamSettings *settings);
     static int SetFailed(StreamId id, int error_code, const char* reason_fmt, 
...)
         __attribute__ ((__format__ (__printf__, 3, 4)));
+    static int SetFailed(const StreamIds& ids, int error_code, const char* 
reason_fmt, ...)
+    __attribute__ ((__format__ (__printf__, 3, 4)));
     void Close(int error_code, const char* reason_fmt, ...)
         __attribute__ ((__format__ (__printf__, 3, 4)));
+    int ShareHostSocket(Stream& other_stream);
 
 private:
 friend void StreamWait(StreamId stream_id, const timespec *due_time,
diff --git a/src/brpc/streaming_rpc_meta.proto 
b/src/brpc/streaming_rpc_meta.proto
index 05d83217..5474583d 100644
--- a/src/brpc/streaming_rpc_meta.proto
+++ b/src/brpc/streaming_rpc_meta.proto
@@ -25,6 +25,7 @@ message StreamSettings {
     required int64 stream_id = 1;
     optional bool need_feedback = 2 [default = false];
     optional bool writable = 3 [default = false];
+    repeated int64 extra_stream_ids = 4;
 }
 
 enum FrameType {


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to